Skip to content
gqlxj1987's Blog
Go back

K8s Structures

Edit page

原文链接

We really like the Kubernetes ideology of seeing the entire system as a control system. That is, the system constantly tries to move its current state to a desired state

controllers

jm.jobStore.Store, jm.jobController = framework.NewInformer(
  &cache.ListWatch{
    ListFunc: func(options api.ListOptions) (runtime.Object, error) {
      // Direct call to the API server, using the job client
      return jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(options)
    },
    WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
      // Direct call to the API server, using the job client
      return jm.kubeClient.Batch().Jobs(api.NamespaceAll).Watch(options)
    },
  },

  framework.ResourceEventHandlerFuncs{
    AddFunc: jm.enqueueController,
    UpdateFunc: 〜
    DeleteFunc: jm.enqueueController,
  },
)

You feed it a list and watch interface to the API server. The Informer automagically syncs the upstream data to a downstream store and even offers you some handy event hooks.

// source simulates an apiserver object endpoint.
// This will be the resource for the Reflector.
source := framework.NewFakeControllerSource()

// This will hold the downstream state, as we know it.
downstream := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc)

// This will hold incoming changes. Note how we pass downstream in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
// This will be the store for the Reflector.
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, downstream)

// Let's do threadsafe output to get predictable test results.
deletionCounter := make(chan string, 1000)

// Configure the controller with a source, the FIFO queue and a Process function.
cfg := &framework.Config{
  Queue:            fifo,
  ListerWatcher:    source,
  ObjectType:       &api.Pod{},
  FullResyncPeriod: time.Millisecond * 100,
  RetryOnError:     false,

  // Let's implement a simple controller that just deletes
  // everything that comes in.
  Process: func(obj interface{}) error {
    // Obj is from the Pop method of the Queue we make above.
    newest := obj.(cache.Deltas).Newest()

    if newest.Type != cache.Deleted {
      // Update our downstream store.
      err := downstream.Add(newest.Object)
      if err != nil {
        return err
      }

      // Delete this object.
      source.Delete(newest.Object.(runtime.Object))
    } else {
      // Update our downstream store.
      err := downstream.Delete(newest.Object)
      if err != nil {
        return err
      }

      // fifo's KeyOf is easiest, because it handles
      // DeletedFinalStateUnknown markers.
      key, err := fifo.KeyOf(newest.Object)
      if err != nil {
        return err
      }

      // Report this deletion.
      deletionCounter <- key
    }
    return nil
  },
}

// Create the controller and run it until we close stop.
stop := make(chan struct{})
defer close(stop)
go framework.New(cfg).Run(stop)

// Lets add a few objects to the source.
testIDs := []string{"a-hello", "b-controller", "c-framework"}
for _, name := range testIDs {
  // Note that these pods are not valid-- the fake source doesnt
  // call validation or anything.
  source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}})
}

// Lets wait for the controller to process the things we just added.
outputSet := sets.String{}
for i := 0; i < len(testIDs); i++ {
  outputSet.Insert(<-deletionCounter)
}

for _, key := range outputSet.List() {
  fmt.Println(key)
}

关于事件的type部分

reflectors

a “Reflector watches a specified resource and causes all changes to be reflected in the given store”

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error {

  defer w.Stop()

  for {
    select {
    case <-stopCh:
      return errorStopRequested
    // When resyncPeriod nanoseconds have passed.
    case <-resyncCh:
      return errorResyncRequested
    case event, ok := <-w.ResultChan():

      // Catch watch events.
      switch event.Type {
      case watch.Added:
        r.store.Add(event.Object)
      case watch.Modified:
        r.store.Update(event.Object)
      case watch.Deleted:

        r.store.Delete(event.Object)

      }
    }
  }

  return nil
}

types

add/update/delete/replace/resync

The controller:

The reflector:

The FIFO queue:

Informer

NewInformer returns a cache.Store and a controller for populating the store while also providing event notifications


Edit page
Share this post on:

Previous Post
Akka Cluster & K8s
Next Post
K8s Batch Jobs in Background