DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

K8s Structures

原文链接

We really like the Kubernetes ideology of seeing the entire system as a control system. That is, the system constantly tries to move its current state to a desired state

controllers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
jm.jobStore.Store, jm.jobController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
// Direct call to the API server, using the job client
return jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
// Direct call to the API server, using the job client
return jm.kubeClient.Batch().Jobs(api.NamespaceAll).Watch(options)
},
},

framework.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
UpdateFunc: 〜
DeleteFunc: jm.enqueueController,
},
)

You feed it a list and watch interface to the API server. The Informer automagically syncs the upstream data to a downstream store and even offers you some handy event hooks.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// source simulates an apiserver object endpoint.
// This will be the resource for the Reflector.
source := framework.NewFakeControllerSource()

// This will hold the downstream state, as we know it.
downstream := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc)

// This will hold incoming changes. Note how we pass downstream in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
// This will be the store for the Reflector.
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, downstream)

// Let's do threadsafe output to get predictable test results.
deletionCounter := make(chan string, 1000)

// Configure the controller with a source, the FIFO queue and a Process function.
cfg := &framework.Config{
Queue: fifo,
ListerWatcher: source,
ObjectType: &api.Pod{},
FullResyncPeriod: time.Millisecond * 100,
RetryOnError: false,

// Let's implement a simple controller that just deletes
// everything that comes in.
Process: func(obj interface{}) error {
// Obj is from the Pop method of the Queue we make above.
newest := obj.(cache.Deltas).Newest()

if newest.Type != cache.Deleted {
// Update our downstream store.
err := downstream.Add(newest.Object)
if err != nil {
return err
}

// Delete this object.
source.Delete(newest.Object.(runtime.Object))
} else {
// Update our downstream store.
err := downstream.Delete(newest.Object)
if err != nil {
return err
}

// fifo's KeyOf is easiest, because it handles
// DeletedFinalStateUnknown markers.
key, err := fifo.KeyOf(newest.Object)
if err != nil {
return err
}

// Report this deletion.
deletionCounter <- key
}
return nil
},
}

// Create the controller and run it until we close stop.
stop := make(chan struct{})
defer close(stop)
go framework.New(cfg).Run(stop)

// Lets add a few objects to the source.
testIDs := []string{"a-hello", "b-controller", "c-framework"}
for _, name := range testIDs {
// Note that these pods are not valid-- the fake source doesnt
// call validation or anything.
source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}})
}

// Lets wait for the controller to process the things we just added.
outputSet := sets.String{}
for i := 0; i < len(testIDs); i++ {
outputSet.Insert(<-deletionCounter)
}

for _, key := range outputSet.List() {
fmt.Println(key)
}

关于事件的type部分

reflectors

a “Reflector watches a specified resource and causes all changes to be reflected in the given store”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error {

defer w.Stop()

for {
select {
case <-stopCh:
return errorStopRequested
// When resyncPeriod nanoseconds have passed.
case <-resyncCh:
return errorResyncRequested
case event, ok := <-w.ResultChan():

// Catch watch events.
switch event.Type {
case watch.Added:
r.store.Add(event.Object)
case watch.Modified:
r.store.Update(event.Object)
case watch.Deleted:

r.store.Delete(event.Object)

}
}
}

return nil
}

types

add/update/delete/replace/resync

The controller:

  • has a reference to the FIFO queue;
  • has a reference to the ListerWatcher (the upstream source in our case);
  • is responsible for consuming the FIFO queue;
  • has a process loop, which is responsible for getting the system to a desired state;
  • creates a Reflector.

The reflector:

  • has a reference to the same FIFO queue (called store internally);
  • has a reference to the same ListerWatcher;
  • lists and watches the ListerWatcher;
  • is responsible for producing the FIFO queue’s input;
  • is responsible for calling the Resync method on the FIFO queue every resyncPeriod ns.

The FIFO queue:

  • has a reference to the downstream store;
  • has a queue of Deltas for objects that were listed and watched by the Reflector.

Informer

NewInformer returns a cache.Store and a controller for populating the store while also providing event notifications