We really like the Kubernetes ideology of seeing the entire system as a control system. That is, the system constantly tries to move its current state to a desired state
controllers
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
jm.jobStore.Store, jm.jobController = framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { // Direct call to the API server, using the job client return jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(options) }, WatchFunc: func(options api.ListOptions) (watch.Interface, error) { // Direct call to the API server, using the job client return jm.kubeClient.Batch().Jobs(api.NamespaceAll).Watch(options) }, }, 〜 framework.ResourceEventHandlerFuncs{ AddFunc: jm.enqueueController, UpdateFunc: 〜 DeleteFunc: jm.enqueueController, }, )
You feed it a list and watch interface to the API server. The Informer automagically syncs the upstream data to a downstream store and even offers you some handy event hooks.
// source simulates an apiserver object endpoint. // This will be the resource for the Reflector. source := framework.NewFakeControllerSource()
// This will hold the downstream state, as we know it. downstream := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc)
// This will hold incoming changes. Note how we pass downstream in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. // This will be the store for the Reflector. fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, downstream)
// Let's do threadsafe output to get predictable test results. deletionCounter := make(chanstring, 1000)
// Configure the controller with a source, the FIFO queue and a Process function. cfg := &framework.Config{ Queue: fifo, ListerWatcher: source, ObjectType: &api.Pod{}, FullResyncPeriod: time.Millisecond * 100, RetryOnError: false,
// Let's implement a simple controller that just deletes // everything that comes in. Process: func(obj interface{})error { // Obj is from the Pop method of the Queue we make above. newest := obj.(cache.Deltas).Newest()
// Create the controller and run it until we close stop. stop := make(chanstruct{}) deferclose(stop) go framework.New(cfg).Run(stop)
// Lets add a few objects to the source. testIDs := []string{"a-hello", "b-controller", "c-framework"} for _, name := range testIDs { // Note that these pods are not valid-- the fake source doesnt // call validation or anything. source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}}) }
// Lets wait for the controller to process the things we just added. outputSet := sets.String{} for i := 0; i < len(testIDs); i++ { outputSet.Insert(<-deletionCounter) }
for _, key := range outputSet.List() { fmt.Println(key) }
关于事件的type部分
reflectors
a “Reflector watches a specified resource and causes all changes to be reflected in the given store”