samplecontroller cache.SharedIndexInformer NewSharedInformerFactory 参考  Accessing Kubernetes CRDs from the client-go package DeltaFIFO watch updateIndices 深入浅出 kubernetes 之 WorkQueue 详解 kubernets controller 和 CRD 具体组件分析 深入k8s:Informer使用及其源码分析 client-go 中的 informer 源码分析

root@ubuntu:~/sample-controller# go mod  vendor
go: downloading k8s.io/code-generator v0.0.0-20210701054009-d874928e3dc5
go: downloading github.com/evanphx/json-patch v4.11.0+incompatible
go: downloading github.com/pkg/errors v0.9.1
go: downloading github.com/go-openapi/jsonreference v0.19.5
go: downloading github.com/go-openapi/swag v0.19.14
go: downloading github.com/go-openapi/jsonpointer v0.19.5
go: downloading github.com/mailru/easyjson v0.7.6
go: downloading github.com/josharian/intern v1.0.0
root@ubuntu:~/sample-controller# bash hack/update-codegen.sh 
Generating deepcopy funcs
Generating clientset for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/generated/clientset
Generating listers for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/generated/listers
Generating informers for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/generated/informers
root@ubuntu:~/sample-controller# 

  

root@ubuntu:~/sample-controller/artifacts/examples# kubectl get crds
NAME                                                  CREATED AT
bgpconfigurations.crd.projectcalico.org               2021-07-01T09:21:58Z
bgppeers.crd.projectcalico.org                        2021-07-01T09:21:58Z
blockaffinities.crd.projectcalico.org                 2021-07-01T09:21:58Z
clusterinformations.crd.projectcalico.org             2021-07-01T09:21:58Z
commands.bus.volcano.sh                               2021-07-05T07:47:45Z
felixconfigurations.crd.projectcalico.org             2021-07-01T09:21:58Z
foos.samplecontroller.k8s.io                          2021-07-06T03:20:25Z
globalnetworkpolicies.crd.projectcalico.org           2021-07-01T09:21:58Z
globalnetworksets.crd.projectcalico.org               2021-07-01T09:21:58Z
hostendpoints.crd.projectcalico.org                   2021-07-01T09:21:58Z
ipamblocks.crd.projectcalico.org                      2021-07-01T09:21:58Z
ipamconfigs.crd.projectcalico.org                     2021-07-01T09:21:58Z
ipamhandles.crd.projectcalico.org                     2021-07-01T09:21:58Z
ippools.crd.projectcalico.org                         2021-07-01T09:21:58Z
jobs.batch.volcano.sh                                 2021-07-05T07:47:45Z
kubecontrollersconfigurations.crd.projectcalico.org   2021-07-01T09:21:58Z
networkpolicies.crd.projectcalico.org                 2021-07-01T09:21:58Z
networksets.crd.projectcalico.org                     2021-07-01T09:21:58Z
podgroups.scheduling.incubator.k8s.io                 2021-07-05T06:53:38Z
podgroups.scheduling.sigs.dev                         2021-07-05T06:53:38Z
podgroups.scheduling.volcano.sh                       2021-07-05T07:47:45Z
queues.scheduling.incubator.k8s.io                    2021-07-05T06:53:38Z
queues.scheduling.sigs.dev                            2021-07-05T06:53:38Z
queues.scheduling.volcano.sh                          2021-07-05T07:47:45Z
root@ubuntu:~/sample-controller/artifacts/examples# ls
crd-status-subresource.yaml  crd.yaml  example-foo.yaml
root@ubuntu:~/sample-controller/artifacts/examples# cat crd.yaml 
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: foos.samplecontroller.k8s.io
  # for more information on the below annotation, please see
  # https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/2337-k8s.io-group-protection/README.md
  annotations:
    "api-approved.kubernetes.io": "unapproved, experimental-only; please get an approval from Kubernetes API reviewers if you're trying to develop a CRD in the *.k8s.io or *.kubernetes.io groups"
spec:
  group: samplecontroller.k8s.io
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        # schema used for validation
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                deploymentName:
                  type: string
                replicas:
                  type: integer
                  minimum: 1
                  maximum: 10
            status:
              type: object
              properties:
                availableReplicas:
                  type: integer
  names:
    kind: Foo
    plural: foos
  scope: Namespaced
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get crds | grep foos
foos.samplecontroller.k8s.io                          2021-07-06T03:20:25Z
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get crds | grep foos
foos.samplecontroller.k8s.io                          2021-07-06T03:20:25Z
root@ubuntu:~/sample-controller/artifacts/examples# kubectl create -f  example-foo.yaml
foo.samplecontroller.k8s.io/example-foo created
root@ubuntu:~/sample-controller/artifacts/examples# cat example-foo.yaml 
apiVersion: samplecontroller.k8s.io/v1alpha1
kind: Foo
metadata:
  name: example-foo
spec:
  deploymentName: example-foo
  replicas: 1
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get Foo
NAME          AGE
example-foo   2m2s
root@ubuntu:~/sample-controller/artifacts/examples# kubectl describe  example-foo 
error: the server doesn't have a resource type "example-foo"
root@ubuntu:~/sample-controller/artifacts/examples# kubectl describe  Foo
Name:         example-foo
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  samplecontroller.k8s.io/v1alpha1
Kind:         Foo
Metadata:
  Creation Timestamp:  2021-07-06T03:23:49Z
  Generation:          1
  Managed Fields:
    API Version:  samplecontroller.k8s.io/v1alpha1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        .:
        f:deploymentName:
        f:replicas:
    Manager:         kubectl
    Operation:       Update
    Time:            2021-07-06T03:23:49Z
  Resource Version:  1111619
  Self Link:         /apis/samplecontroller.k8s.io/v1alpha1/namespaces/default/foos/example-foo
  UID:               b91554b5-58fa-4b08-a011-edaf992b60de
Spec:
  Deployment Name:  example-foo
  Replicas:         1
Events:             <none>
root@ubuntu:~/sample-controller/artifacts/examples# cat example-foo.yaml
apiVersion: samplecontroller.k8s.io/v1alpha1
kind: Foo
metadata:
  name: example-foo
spec:
  deploymentName: example-foo
  replicas: 1
root@ubuntu:~/sample-controller/artifacts/examples# 

运行

./sample-controller -kubeconfig=$HOME/.kube/config
之前
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get deployments
No resources found in default namespace.
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get deployments
No resources found in default namespace.

运行

./sample-controller -kubeconfig=$HOME/.kube/config
root@ubuntu:~/sample-controller# ./sample-controller -kubeconfig=$HOME/.kube/config
I0706 11:36:14.120260   61579 controller.go:115] Setting up event handlers
I0706 11:36:14.120399   61579 controller.go:156] Starting Foo controller
I0706 11:36:14.120412   61579 controller.go:159] Waiting for informer caches to sync
I0706 11:36:14.220565   61579 controller.go:164] Starting workers
I0706 11:36:14.220596   61579 controller.go:170] Started workers
I0706 11:36:14.246385   61579 controller.go:228] Successfully synced 'default/example-foo'
I0706 11:36:14.246489   61579 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
I0706 11:36:14.250982   61579 controller.go:228] Successfully synced 'default/example-foo'
I0706 11:36:14.251023   61579 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
I0706 11:36:14.256740   61579 controller.go:228] Successfully synced 'default/example-foo'
I0706 11:36:14.256772   61579 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
I0706 11:36:14.276593   61579 controller.go:228] Successfully synced 'default/example-foo'
I0706 11:36:14.276643   61579 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"

运行

./sample-controller -kubeconfig=$HOME/.kube/config
之后
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get deployments
NAME          READY   UP-TO-DATE   AVAILABLE   AGE
example-foo   0/1     1            0           5s
root@ubuntu:~/sample-controller/artifacts/examples# 

io  port

root@ubuntu:~# netstat -pan | grep sample
tcp        0      0 10.10.16.82:34178       10.10.16.249:6443       ESTABLISHED 56698/./sample-cont 
root@ubuntu:~# ip a sh | grep 10.10.16.249
root@ubuntu:~# telnet 10.10.16.249 6443 
Trying 10.10.16.249...
Connected to 10.10.16.249.
Escape character is '^]'.
^CConnection closed by foreign host.
root@ubuntu:~# sp -elf | grep 56698
sp: command not found
root@ubuntu:~# ps  -elf | grep 56698
0 S root     56698 54998  0  80   0 - 517136 futex_ 11:49 pts/0   00:00:00 ./sample-controller -kubeconfig=/root/.kube/config
0 S root     60412 56743  0  80   0 -  1096 pipe_w 11:53 pts/1    00:00:00 grep 56698
root@ubuntu:~# 
10.10.16.249是keepavlived的vip


[root@centos7 ~]# ip a sh enp125s0f0
2: enp125s0f0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
    link/ether b0:08:75:5f:b8:5b brd ff:ff:ff:ff:ff:ff
    inet 10.10.16.251/24 scope global enp125s0f0
       valid_lft forever preferred_lft forever
    inet 10.10.16.249/32 scope global enp125s0f0
       valid_lft forever preferred_lft forever
[root@centos7 ~]# netstat -pan | grep 6449
[root@centos7 ~]# netstat -pan | grep 6443
tcp        0      0 10.10.16.249:54830      10.10.16.249:6443       ESTABLISHED 112750/kube-control 
tcp        0      0 10.10.16.249:41322      10.10.16.249:6443       ESTABLISHED 112750/kube-control 
tcp        0      0 10.10.16.249:41328      10.10.16.249:6443       ESTABLISHED 112872/kube-schedul 
tcp        0      1 10.10.16.251:48116      10.10.18.46:6443        SYN_SENT    25931/haproxy       
tcp        0      0 10.10.16.249:38122      10.10.16.249:6443       ESTABLISHED 20496/kubelet       
tcp        0      0 10.10.16.249:58900      10.10.16.249:6443       ESTABLISHED 21108/kube-proxy    
tcp        0      0 10.10.16.249:41330      10.10.16.249:6443       ESTABLISHED 112872/kube-schedul 
tcp6       0      0 :::6443                 :::*                    LISTEN      21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.81:17642       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.82:34178       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.82:46763       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.249:54830      ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 ::1:43226               ::1:6443                ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.251:57395      ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.81:3112        ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.82:45391       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.251:56994      ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.82:43286       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.244.129.129:50662    ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.47:59912       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.82:52248       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.47:59914       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.249:41328      ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.82:38136       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.82:50596       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.81:30491       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.249:41322      ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.82:44579       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 ::1:6443                ::1:43226               ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.249:38122      ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.81:40659       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.82:39354       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.249:41330      ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.47:46764       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.81:60932       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.82:43290       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.47:41240       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.81:46054       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.47:40538       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.249:58900      ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.251:6443       10.10.16.81:61069       ESTABLISHED 21302/kube-apiserve 
tcp6       0      0 10.10.16.249:6443       10.10.16.47:33230       ESTABLISHED 21302/kube-apiserve 
[root@centos7 ~]# 

也就是sample-controller不直接访问etcd,通过apiserver访问

root@ubuntu:~/sample-controller/artifacts/examples# kubectl get deployments
NAME          READY   UP-TO-DATE   AVAILABLE   AGE
example-foo   0/1     1            0           5s
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get pods
NAME                           READY   STATUS    RESTARTS   AGE
example-foo-54dc4db9fc-9t7br   1/1     Running   0          6m23s
test-job-default-nginx-0       1/1     Running   0          18h
test-job-default-nginx-1       1/1     Running   0          18h
test-job-default-nginx-2       1/1     Running   0          18h
test-job-default-nginx-3       1/1     Running   0          18h
test-job-default-nginx-4       1/1     Running   0          18h
test-job-default-nginx-5       1/1     Running   0          18h
root@ubuntu:~/sample-controller/artifacts/examples# kubectl describe pods  example-foo-54dc4db9fc-9t7br
Name:         example-foo-54dc4db9fc-9t7br
Namespace:    default
Priority:     0
Node:         bogon/10.10.16.81  , controller 运行在ubuntu
Start Time:   Tue, 06 Jul 2021 11:36:14 +0800
Labels:       app=nginx
              controller=example-foo
              pod-template-hash=54dc4db9fc
Annotations:  cni.projectcalico.org/podIP: 10.244.29.17/32
              cni.projectcalico.org/podIPs: 10.244.29.17/32
Status:       Running
IP:           10.244.29.17
IPs:
  IP:           10.244.29.17
Controlled By:  ReplicaSet/example-foo-54dc4db9fc
Containers:
  nginx:
    Container ID:   docker://83a4cf68e72bea51805126a8b1f4d2760aae38228a9123f2fffff730508fffce
    Image:          nginx:latest
    Image ID:       docker-pullable://nginx@sha256:47ae43cdfc7064d28800bc42e79a429540c7c80168e8c8952778c0d5af1c09db
    Port:           <none>
    Host Port:      <none>
    State:          Running
      Started:      Tue, 06 Jul 2021 11:37:31 +0800
    Ready:          True
    Restart Count:  0
    Environment:    <none>
    Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-cfr6q (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             True 
  ContainersReady   True 
  PodScheduled      True 
Volumes:
  default-token-cfr6q:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-cfr6q
    Optional:    false
QoS Class:       BestEffort
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:
  Type    Reason     Age        From               Message
  ----    ------     ----       ----               -------
  Normal  Scheduled  <unknown>  default-scheduler  Successfully assigned default/example-foo-54dc4db9fc-9t7br to bogon
  Normal  Pulling    6m45s      kubelet, bogon     Pulling image "nginx:latest"
  Normal  Pulled     5m31s      kubelet, bogon     Successfully pulled image "nginx:latest"
  Normal  Created    5m30s      kubelet, bogon     Created container nginx
  Normal  Started    5m30s      kubelet, bogon     Started container nginx
root@ubuntu:~/sample-controller/artifacts/examples#

nginx

// newDeployment creates a new Deployment for a Foo resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the Foo resource that 'owns' it.
func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
        labels := map[string]string{
                "app":        "nginx",
                "controller": foo.Name,
        }
        return &appsv1.Deployment{
                ObjectMeta: metav1.ObjectMeta{
                        Name:      foo.Spec.DeploymentName,
                        Namespace: foo.Namespace,
                        OwnerReferences: []metav1.OwnerReference{
                                *metav1.NewControllerRef(foo, samplev1alpha1.SchemeGroupVersion.WithKind("Foo")),
                        },
                },
                Spec: appsv1.DeploymentSpec{
                        Replicas: foo.Spec.Replicas,
                        Selector: &metav1.LabelSelector{
                                MatchLabels: labels,
                        },
                        Template: corev1.PodTemplateSpec{
                                ObjectMeta: metav1.ObjectMeta{
                                        Labels: labels,
                                },
                                Spec: corev1.PodSpec{
                                        Containers: []corev1.Container{
                                                {
                                                        Name:  "nginx",
                                                        Image: "nginx:latest",
                                                },
                                        },
                                },
                        },
                },
        }
}
root@ubuntu:/opt/gopath/src/github.com/go-delve/delve# ps -elf | grep sample
0 S root     20518 11022  0  80   0 -  1096 pipe_w 11:59 pts/2    00:00:00 grep sample
0 S root     61579 29030  0  80   0 - 590852 futex_ 11:36 pts/0   00:00:00 ./sample-controller -kubeconfig=/root/.kube/config
root@ubuntu:/opt/gopath/src/github.com/go-delve/delve# dlv attach  61579
Type 'help' for list of commands.
(dlv) b newDeployment
Breakpoint 1 (enabled) set at 0xeea018 for main.newDeployment() /root/sample-controller/controller.go:391
(dlv) r
Command failed: cannot restart process Delve did not create
(dlv) c
> main.newDeployment() /root/sample-controller/controller.go:391 (hits goroutine(135):1 total:1) (PC: 0xeea018)
Warning: debugging optimized function
   386: }
   387:
   388: // newDeployment creates a new Deployment for a Foo resource. It also sets
   389: // the appropriate OwnerReferences on the resource so handleObject can discover
   390: // the Foo resource that 'owns' it.
=> 391: func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
   392:         labels := map[string]string{
   393:                 "app":        "nginx",
   394:                 "controller": foo.Name,
   395:         }
   396:         return &appsv1.Deployment{
(dlv) bt
 0  0x0000000000eea018 in main.newDeployment
    at /root/sample-controller/controller.go:391
 1  0x0000000000ee9580 in main.(*Controller).syncHandler
    at /root/sample-controller/controller.go:277
 2  0x0000000000eeae38 in main.(*Controller).processNextWorkItem.func1
    at /root/sample-controller/controller.go:220
 3  0x0000000000ee8e70 in main.(*Controller).processNextWorkItem
    at /root/sample-controller/controller.go:230
 4  0x0000000000ee8e00 in main.(*Controller).runWorker
    at /root/sample-controller/controller.go:181
 5  0x0000000000eeb260 in main.(*Controller).runWorker-fm
    at /root/sample-controller/controller.go:180
 6  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 7  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 8  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
 9  0x00000000006b7d80 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
10  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) quit

  controller.Run(2, stopCh)

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
        defer utilruntime.HandleCrash()
        defer c.workqueue.ShutDown()

        // Start the informer factories to begin populating the informer caches
        klog.Info("Starting Foo controller")

        // Wait for the caches to be synced before starting workers
        klog.Info("Waiting for informer caches to sync")
        if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
                return fmt.Errorf("failed to wait for caches to sync")
        }

        klog.Info("Starting workers")
        // Launch two workers to process Foo resources
        for i := 0; i < threadiness; i++ {
                go wait.Until(c.runWorker, time.Second, stopCh)
        }

        klog.Info("Started workers")
        <-stopCh
        klog.Info("Shutting down workers")

        return nil
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
        for c.processNextWorkItem() {
        }
}

NewInformerFunc

pkg/generated/informers/externalversions/internalinterfaces/factory_interfaces.go

// NewInformerFunc takes versioned.Interface and time.Duration to return a SharedIndexInformer.
type NewInformerFunc func(versioned.Interface, time.Duration) cache.SharedIndexInformer

// SharedInformerFactory a small interface to allow for adding an informer without an import cycle
type SharedInformerFactory interface {
        Start(stopCh <-chan struct{})
        InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}

pkg/generated/informers/externalversions/factory.go

// SharedInformerFactory provides shared informers for resources in all known
// API group versions.
type SharedInformerFactory interface {
        internalinterfaces.SharedInformerFactory
        ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
        WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

        Samplecontroller() samplecontroller.Interface
}

kubebatch 有多个informer对象

client/informers/externalversions/scheduling/v1alpha2/queue.go:37:      Informer() cache.SharedIndexInformer
client/informers/externalversions/scheduling/v1alpha2/queue.go:49:func NewQueueInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
client/informers/externalversions/scheduling/v1alpha2/queue.go:56:func NewFilteredQueueInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
client/informers/externalversions/scheduling/v1alpha2/queue.go:78:func (f *queueInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
client/informers/externalversions/scheduling/v1alpha2/queue.go:82:func (f *queueInformer) Informer() cache.SharedIndexInformer {
client/informers/externalversions/scheduling/v1alpha2/podgroup.go:37:   Informer() cache.SharedIndexInformer
client/informers/externalversions/scheduling/v1alpha2/podgroup.go:50:func NewPodGroupInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
client/informers/externalversions/scheduling/v1alpha2/podgroup.go:57:func NewFilteredPodGroupInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
client/informers/externalversions/scheduling/v1alpha2/podgroup.go:79:func (f *podGroupInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
client/informers/externalversions/scheduling/v1alpha2/podgroup.go:83:func (f *podGroupInformer) Informer() cache.SharedIndexInformer {

NewSharedInformerFactory

import (
        "flag"
        "time"

        kubeinformers "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/clientcmd"
        "k8s.io/klog/v2"
        // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
        // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

        clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
        informers "k8s.io/sample-controller/pkg/generated/informers/externalversions"
        "k8s.io/sample-controller/pkg/signals"
)
func main() {
        klog.InitFlags(nil)
        flag.Parse()

        // set up signals so we handle the first shutdown signal gracefully
        stopCh := signals.SetupSignalHandler()

        cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
        if err != nil {
                klog.Fatalf("Error building kubeconfig: %s", err.Error())
        }

        kubeClient, err := kubernetes.NewForConfig(cfg)
        if err != nil {
                klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
        }

        exampleClient, err := clientset.NewForConfig(cfg)
        if err != nil {
                klog.Fatalf("Error building example clientset: %s", err.Error())
        }

        kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
        exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

        controller := NewController(kubeClient, exampleClient,
                kubeInformerFactory.Apps().V1().Deployments(),
                exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

        // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
        // Start method is non-blocking and runs all registered informers in a dedicated goroutine.
        kubeInformerFactory.Start(stopCh)
        exampleInformerFactory.Start(stopCh)

        if err = controller.Run(2, stopCh); err != nil {
                klog.Fatalf("Error running controller: %s", err.Error())
        }
}
root@ubuntu:~/sample-controller# ls  pkg/generated/informers/externalversions/
factory.go  generic.go  internalinterfaces  samplecontroller
root@ubuntu:~/sample-controller# 

ls  pkg/generated/informers/externalversions/

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
        factory := &sharedInformerFactory{
                client:           client,
                namespace:        v1.NamespaceAll,
                defaultResync:    defaultResync,
                informers:        make(map[reflect.Type]cache.SharedIndexInformer),
                startedInformers: make(map[reflect.Type]bool),
                customResync:     make(map[reflect.Type]time.Duration),
        }

        // Apply all options
        for _, opt := range options {
                factory = opt(factory)
        }

        return factory
}
package main

import (
    "k8s.io/client-go/informers"
    )
func main() {
    // client 是 kube api server 客户端,因为要从 kube api         // server 端拉取数据, resyncpersiod 是重新拉取周期,后面会细说
    sharedInformers := informers.NewSharedInformerFactory(client,ResyncPeriod)
    // 监听 pod 资源
    podInformer := sharedInformers.Core().V1().Pods()
    // 监听 service 资源
    servicesInformer := sharedInformers.Core().V1().Services()
    podLister = podInformer.Lister()
    servicesLister = servicesInformer.Lister()
    sListerSynced = sInformer.Informer().HasSynced
    dc.podListerSynced = podInformer.Informer().HasSynced
    // 启动各个资源 informer
    sharedInformers.Start(stopChannel)
}

vi pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go

root@ubuntu:~/sample-controller# dlv exec  ./sample-controller --  -kubeconfig=$HOME/.kube/config
Type 'help' for list of commands.
(dlv) b NewFilteredFooInformer
Breakpoint 1 (enabled) set at 0xe3be48 for k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.NewFilteredFooInformer() ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:58
(dlv) r
Process restarted with PID 30188
(dlv) bt
0  0x0000ffff8b6b51c0 in ???
   at ?:-1
   error: input/output error
(truncated)
(dlv) r
Process restarted with PID 30468
(dlv) b NewFilteredFooInformer
Command failed: Breakpoint exists at /root/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:58 at e3be48
(dlv) c
> k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.NewFilteredFooInformer() ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:58 (hits goroutine(1):1 total:1) (PC: 0xe3be48)
Warning: debugging optimized function
    53: }
    54:
    55: // NewFilteredFooInformer constructs a new informer for Foo type.
    56: // Always prefer using an informer factory to get a shared informer instead of getting an independent
    57: // one. This reduces memory footprint and number of connections to the server.
=>  58: func NewFilteredFooInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    59:         return cache.NewSharedIndexInformer(
    60:                 &cache.ListWatch{
    61:                         ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
    62:                                 if tweakListOptions != nil {
    63:                                         tweakListOptions(&options)
(dlv) bt
0  0x0000000000e3be48 in k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.NewFilteredFooInformer
   at ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:58
1  0x0000000000e3c744 in k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.(*fooInformer).defaultInformer
   at ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:81
2  0x0000000000e3c744 in k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.(*fooInformer).defaultInformer-fm
   at ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:80
3  0x0000000000ee6880 in k8s.io/sample-controller/pkg/generated/informers/externalversions.(*sharedInformerFactory).InformerFor
   at ./pkg/generated/informers/externalversions/factory.go:162
4  0x0000000000e3c100 in k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.(*fooInformer).Informer
   at ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:85
5  0x0000000000e3c170 in k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.(*fooInformer).Lister
   at ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:89
6  0x0000000000ee8020 in main.NewController
   at ./controller.go:109
7  0x0000000000eea680 in main.main
   at ./main.go:65
8  0x0000000000044aa4 in runtime.main
   at /usr/local/go/src/runtime/proc.go:203
9  0x00000000000718dc in runtime.goexit
   at /usr/local/go/src/runtime/asm_arm64.s:1148

samplecontroller
cache.SharedIndexInformer
NewSharedInformerFactory
参考  Accessing Kubernetes CRDs from the client-go package
DeltaFIFO
watch
updateIndices
深入浅出 kubernetes 之 WorkQueue 详解
kubernets controller 和 CRD 具体组件分析
深入k8s:Informer使用及其源码分析
client-go 中的 informer 源码分析

(dlv) b foo.go:82
Command failed: Location "foo.go:82" ambiguous: /root/sample-controller/pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/foo.go,
/root/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go,
/root/sample-controller/pkg/generated/listers/samplecontroller/v1alpha1/foo.go… (dlv)
root@ubuntu:~/sample-controller# find ./ -name foo.go
./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go
./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/foo.go
./pkg/generated/listers/samplecontroller/v1alpha1/foo.go

InformerFor

 产生一个samplecontrollerv1alpha1.Foo{}对象  return f.factory.InformerFor(&samplecontrollerv1alpha1.Foo{}, f.defaultInformer)

generated/informers/externalversions/factory.go:147:func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {

register.go 注册struct

参考  Accessing Kubernetes CRDs from the client-go package

package v1alpha1

import (
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/runtime/schema"

        samplecontroller "k8s.io/sample-controller/pkg/apis/samplecontroller"
)

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: samplecontroller.GroupName, Version: "v1alpha1"}

// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
        return SchemeGroupVersion.WithKind(kind).GroupKind()
}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
        return SchemeGroupVersion.WithResource(resource).GroupResource()
}

var (
        // SchemeBuilder initializes a scheme builder
        SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
        // AddToScheme is a global function that registers this API group & version to a scheme
        AddToScheme = SchemeBuilder.AddToScheme
)

// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
        scheme.AddKnownTypes(SchemeGroupVersion,
                &Foo{},
                &FooList{},
        )
        metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
        return nil
}

kube batch

package v1alpha2

import (
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/runtime/schema"
)

var (
        SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
        AddToScheme   = SchemeBuilder.AddToScheme
)

const (
        // GroupName is the group name used in this package.
        GroupName = "scheduling.sigs.dev"

        // GroupVersion is the version of scheduling group
        GroupVersion = "v1alpha2"
)

// SchemeGroupVersion is the group version used to register these objects.
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: GroupVersion}

// Resource takes an unqualified resource and returns a Group-qualified GroupResource.
func Resource(resource string) schema.GroupResource {
        return SchemeGroupVersion.WithResource(resource).GroupResource()
}

// addKnownTypes adds the set of types defined in this package to the supplied scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
        scheme.AddKnownTypes(SchemeGroupVersion,
                &PodGroup{},
                &PodGroupList{},
                &Queue{},
                &QueueList{},
        )

        metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
        return nil
}

调用Add delete Update 函数

        sc.pdbInformer = informerFactory.Policy().V1beta1().PodDisruptionBudgets()
        sc.pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    sc.AddPDB,
                UpdateFunc: sc.UpdatePDB,
                DeleteFunc: sc.DeletePDB,
        })

        sc.pcInformer = informerFactory.Scheduling().V1beta1().PriorityClasses()
        sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    sc.AddPriorityClass,
                UpdateFunc: sc.UpdatePriorityClass,
                DeleteFunc: sc.DeletePriorityClass,
        })

        kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0)
        // create informer for PodGroup(v1alpha1) information
        sc.podGroupInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().PodGroups()
        sc.podGroupInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    sc.AddPodGroupAlpha1,
                UpdateFunc: sc.UpdatePodGroupAlpha1,
                DeleteFunc: sc.DeletePodGroupAlpha1,
        })

        // create informer for PodGroup(v1alpha2) information
        sc.podGroupInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().PodGroups()
        sc.podGroupInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    sc.AddPodGroupAlpha2,
                UpdateFunc: sc.UpdatePodGroupAlpha2,
                DeleteFunc: sc.DeletePodGroupAlpha2,
        })

        // create informer for Queue(v1alpha1) information
        sc.queueInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().Queues()
        sc.queueInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    sc.AddQueuev1alpha1,
                UpdateFunc: sc.UpdateQueuev1alpha1,
                DeleteFunc: sc.DeleteQueuev1alpha1,
        })

        // create informer for Queue(v1alpha2) information
        sc.queueInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().Queues()
        sc.queueInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    sc.AddQueuev1alpha2,
                UpdateFunc: sc.UpdateQueuev1alpha2,
                DeleteFunc: sc.DeleteQueuev1alpha2,
        })

 samplecontroller
cache.SharedIndexInformer
NewSharedInformerFactory
参考  Accessing Kubernetes CRDs from the client-go package
DeltaFIFO
watch
updateIndices
深入浅出 kubernetes 之 WorkQueue 详解
kubernets controller 和 CRD 具体组件分析
深入k8s:Informer使用及其源码分析
client-go 中的 informer 源码分析samplecontroller
cache.SharedIndexInformer
NewSharedInformerFactory
参考  Accessing Kubernetes CRDs from the client-go package
DeltaFIFO
watch
updateIndices
深入浅出 kubernetes 之 WorkQueue 详解
kubernets controller 和 CRD 具体组件分析
深入k8s:Informer使用及其源码分析
client-go 中的 informer 源码分析

1-2
   err := r.store.Add(event.Object)
   cache.(*DeltaFIFO).Add()
3-4-5
从
(*DeltaFIFO).Pop
存入一个map
cache.(*threadSafeMap).Add
6-7
c.workqueue.Add
   338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
=> 341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
   344:         }
   345:         c.workqueue.Add(key)
   346: }
8-9
workqueue.(*Type).Get

DeltaFIFO

type DeltaFIFO struct {
...
    items map[string]Deltas
    queue []string
...
}

type Delta struct {
    Type   DeltaType
    Object interface{}
}

type Deltas []Delta


type DeltaType string

// Change type definition
const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    Sync DeltaType = "Sync"
)

其中queue存储的是Object的id,而items存储的是以ObjectID为key的这个Object的事件列表,

可以想象到是这样的一个数据结构,左边是Key,右边是一个数组对象,其中每个元素都是由type和obj组成.

samplecontroller
cache.SharedIndexInformer
NewSharedInformerFactory
参考  Accessing Kubernetes CRDs from the client-go package
DeltaFIFO
watch
updateIndices
深入浅出 kubernetes 之 WorkQueue 详解
kubernets controller 和 CRD 具体组件分析
深入k8s:Informer使用及其源码分析
client-go 中的 informer 源码分析

DeltaFIFO顾名思义存放Delta数据的先入先出队列,相当于一个数据的中转站,将数据从一个地方转移另一个地方。

watchHandler

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    eventCount := 0
    defer w.Stop() // 关闭watch通道

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested // 收到停止通道的
        case err := <-errc: // 错误通道
            return err
        case event, ok := <-w.ResultChan(): // 从resultChan通道中获取事件
            if !ok { // 通道被关闭
                break loop // 跳出循环
            }
            if event.Type == watch.Error { // 事件类型是ERROR
                return apierrors.FromObject(event.Object)
            }
            if r.expectedType != nil { // 查看reflector是设置了期望获取的资源类型
                // 这是在判断期待的类型和监听到的事件类型是否一致
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
            }
            if r.expectedGVK != nil {
                // GVK是否一致
                if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
                    continue
                }
            }
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type { // 根据事件类型,对delta队列进行增删改操作
            case watch.Added: // 创建事件
                err := r.store.Add(event.Object) // 将该事件放入deltalFIFO
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                err := r.store.Update(event.Object) // 将该事件放入deltalFIFO
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                err := r.store.Delete(event.Object) // 将该事件放入deltalFIFO
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            case watch.Bookmark: // 意思是”表示监听已在此处同步,只需更新
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }

    watchDuration := r.clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    return nil
}
 
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    eventCount := 0
    defer w.Stop() // 关闭watch通道

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested // 收到停止通道的
        case err := <-errc: // 错误通道
            return err
        case event, ok := <-w.ResultChan(): // 从resultChan通道中获取事件
            if !ok { // 通道被关闭
                break loop // 跳出循环
            }
            if event.Type == watch.Error { // 事件类型是ERROR
                return apierrors.FromObject(event.Object)
            }
            if r.expectedType != nil { // 查看reflector是设置了期望获取的资源类型
                // 这是在判断期待的类型和监听到的事件类型是否一致
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
            }
            if r.expectedGVK != nil {
                // GVK是否一致
                if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
                    continue
                }
            }
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type { // 根据事件类型,对delta队列进行增删改操作
            case watch.Added: // 创建事件
                err := r.store.Add(event.Object) // 将该事件放入deltalFIFO
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                err := r.store.Update(event.Object) // 将该事件放入deltalFIFO
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                err := r.store.Delete(event.Object) // 将该事件放入deltalFIFO
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            case watch.Bookmark: // 意思是”表示监听已在此处同步,只需更新
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }

    watchDuration := r.clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    return nil
}
 

samplecontroller
cache.SharedIndexInformer
NewSharedInformerFactory
参考  Accessing Kubernetes CRDs from the client-go package
DeltaFIFO
watch
updateIndices
深入浅出 kubernetes 之 WorkQueue 详解
kubernets controller 和 CRD 具体组件分析
深入k8s:Informer使用及其源码分析
client-go 中的 informer 源码分析

root@ubuntu:~/sample-controller# dlv exec ./sample-controller --  --kubeconfig=$HOME/.kube/config 
Type 'help' for list of commands.
(dlv) b reflector.go:348
Breakpoint 1 (enabled) set at 0xe1e51c for k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:348
(dlv) c
I0709 15:21:03.739440   63308 controller.go:115] Setting up event handlers
I0709 15:21:03.739604   63308 controller.go:156] Starting Foo controller
I0709 15:21:03.739636   63308 controller.go:159] Waiting for informer caches to sync
> k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:348 (hits goroutine(102):1 total:1) (PC: 0xe1e51c)
Warning: debugging optimized function
   343:                 if err != nil {
   344:                         return fmt.Errorf("unable to understand list result %#v: %v", list, err)
   345:                 }
   346:                 resourceVersion = listMetaInterface.GetResourceVersion()
   347:                 initTrace.Step("Resource version extracted")
=> 348:                 items, err := meta.ExtractList(list)
   349:                 if err != nil {
   350:                         return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
   351:                 }
   352:                 initTrace.Step("Objects extracted")
   353:                 if err := r.syncWith(items, resourceVersion); err != nil {
(dlv) bt
0  0x0000000000e1e51c in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:348
1  0x0000000000e154a4 in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:360
2  0x0000000000e1dbac in k8s.io/client-go/tools/cache.(*Reflector).Run.func1
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:221
3  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
4  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
5  0x0000000000e14fc8 in k8s.io/client-go/tools/cache.(*Reflector).Run
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:220
6  0x0000000000e20a48 in k8s.io/client-go/tools/cache.(*Reflector).Run-fm
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:218
7  0x00000000006b8ce8 in k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:56
8  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
9  0x00000000000718dc in runtime.goexit
   at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) p list
k8s.io/apimachinery/pkg/runtime.Object(*k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooList) *{
        TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {Kind: "", APIVersion: ""},
        ListMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta {
                SelfLink: "/apis/samplecontroller.k8s.io/v1alpha1/foos",
                ResourceVersion: "1852450",
                Continue: "",
                RemainingItemCount: *int64 nil,},
        Items: []k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo len: 1, cap: 1, [
                (*"k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo")(0x4000436000),
        ],}
(dlv) n
> k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:352 (PC: 0xe1e60c)
Warning: debugging optimized function
   347:                 initTrace.Step("Resource version extracted")
   348:                 items, err := meta.ExtractList(list)
   349:                 if err != nil {
   350:                         return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
   351:                 }
=> 352:                 initTrace.Step("Objects extracted")
   353:                 if err := r.syncWith(items, resourceVersion); err != nil {
   354:                         return fmt.Errorf("unable to sync list result: %v", err)
   355:                 }
   356:                 initTrace.Step("SyncWith done")
   357:                 r.setLastSyncResourceVersion(resourceVersion)
(dlv) p items
[]k8s.io/apimachinery/pkg/runtime.Object len: 1, cap: 1, [
        *k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo {
                TypeMeta: (*"k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta")(0x4000436000),
                ObjectMeta: (*"k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta")(0x4000436020),
                Spec: (*"k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooSpec")(0x4000436118),
                Status: (*"k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooStatus")(0x4000436130),},
]
(dlv)  p items[0].TypeMeta
k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {
        Kind: "Foo",
        APIVersion: "samplecontroller.k8s.io/v1alpha1",}
(dlv) p items[0].ObjectMeta
k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta {
        Name: "example-foo",
        GenerateName: "",
        Namespace: "default",
        SelfLink: "/apis/samplecontroller.k8s.io/v1alpha1/namespaces/default/foos/e...+10 more",
        UID: "cadf9f8e-4355-453e-b249-4ed92abccc16",
        ResourceVersion: "1852450",
        Generation: 5,
        CreationTimestamp: k8s.io/apimachinery/pkg/apis/meta/v1.Time {
                Time: (*time.Time)(0x4000436088),},
        DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil,
        DeletionGracePeriodSeconds: *int64 nil,
        Labels: map[string]string nil,
        Annotations: map[string]string nil,
        OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 0, cap: 0, nil,
        Finalizers: []string len: 0, cap: 0, nil,
        ClusterName: "",
        ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 2, cap: 2, [
                (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x400032e3c0),
                (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x400032e420),
        ],}
(dlv)  p items[0].Spec
k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooSpec {
        DeploymentName: "example-foo",
        Replicas: *1,}
(dlv)  p items[0].Status
k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooStatus {AvailableReplicas: 1}
(dlv) 
(dlv) s
I0709 15:25:24.036509   63308 controller.go:164] Starting workers
I0709 15:25:24.036601   63308 controller.go:170] Started workers
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:464 (PC: 0xe16070)
Warning: debugging optimized function
   459: func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
   460:         eventCount := 0
   461:
   462:         // Stopping the watcher should be idempotent and if we return from this function there's no way
   463:         // we're coming back in with the same watch interface.
=> 464:         defer w.Stop()
   465:
   466: loop:
   467:         for {
   468:                 select {
   469:                 case <-stopCh:
(dlv) bt
0  0x0000000000e16070 in k8s.io/client-go/tools/cache.(*Reflector).watchHandler
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:464
1  0x0000000000e15824 in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:428
2  0x0000000000e1dbac in k8s.io/client-go/tools/cache.(*Reflector).Run.func1
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:221
3  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
4  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
5  0x0000000000e14fc8 in k8s.io/client-go/tools/cache.(*Reflector).Run
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:220
6  0x0000000000e20a48 in k8s.io/client-go/tools/cache.(*Reflector).Run-fm
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:218
7  0x00000000006b8ce8 in k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:56
8  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
9  0x00000000000718dc in runtime.goexit
   at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) 
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    eventCount := 0
    defer w.Stop() // 关闭watch通道

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested // 收到停止通道的
        case err := <-errc: // 错误通道
            return err
        case event, ok := <-w.ResultChan(): // 从resultChan通道中获取事件
            if !ok { // 通道被关闭
                break loop // 跳出循环
            }
            if event.Type == watch.Error { // 事件类型是ERROR
                return apierrors.FromObject(event.Object)
            }
            if r.expectedType != nil { // 查看reflector是设置了期望获取的资源类型
                // 这是在判断期待的类型和监听到的事件类型是否一致
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
            }
            if r.expectedGVK != nil {
                // GVK是否一致
                if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
                    continue
                }
            }
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type { // 根据事件类型,对delta队列进行增删改操作
            case watch.Added: // 创建事件
                err := r.store.Add(event.Object) // 将该事件放入deltalFIFO
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                err := r.store.Update(event.Object) // 将该事件放入deltalFIFO
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                err := r.store.Delete(event.Object) // 将该事件放入deltalFIFO
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            case watch.Bookmark: // 意思是”表示监听已在此处同步,只需更新
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }

    watchDuration := r.clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    return nil
}
 

watch

申请client request.watch的时候,会生成一个watch对象, 并启动receive
 
// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder) *StreamWatcher {
        sw := &StreamWatcher{
                source: d,
                // It's easy for a consumer to add buffering via an extra
                // goroutine/channel, but impossible for them to remove it,
                // so nonbuffered is better.
                result: make(chan Event),
        }
        go sw.receive()
        return sw
}

samplecontroller
cache.SharedIndexInformer
NewSharedInformerFactory
参考  Accessing Kubernetes CRDs from the client-go package
DeltaFIFO
watch
updateIndices
深入浅出 kubernetes 之 WorkQueue 详解
kubernets controller 和 CRD 具体组件分析
深入k8s:Informer使用及其源码分析
client-go 中的 informer 源码分析

 触发断点

root@ubuntu:~/sample-controller/artifacts/examples# kubectl apply  -f example-foo.yaml 
foo.samplecontroller.k8s.io/example-foo created
root@ubuntu:~/sample-controller/artifacts/examples# cd .
(dlv) b reflector.go:500
(dlv) 
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(91):2 total:3) (PC: 0xe16408)
Warning: debugging optimized function
   495:                                 continue
   496:                         }
   497:                         newResourceVersion := meta.GetResourceVersion()
   498:                         switch event.Type {
   499:                         case watch.Added:
=> 500:                                 err := r.store.Add(event.Object)
   501:                                 if err != nil {
   502:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
   503:                                 }
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
(dlv) bt
0  0x0000000000e16408 in k8s.io/client-go/tools/cache.(*Reflector).watchHandler
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500
1  0x0000000000e15824 in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:428
2  0x0000000000e1dbac in k8s.io/client-go/tools/cache.(*Reflector).Run.func1
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:221
3  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
4  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
5  0x0000000000e14fc8 in k8s.io/client-go/tools/cache.(*Reflector).Run
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:220
6  0x0000000000e20a48 in k8s.io/client-go/tools/cache.(*Reflector).Run-fm
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:218
7  0x00000000006b8ce8 in k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:56
8  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
9  0x00000000000718dc in runtime.goexit
   at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) p r.store
k8s.io/client-go/tools/cache.Store(*k8s.io/client-go/tools/cache.DeltaFIFO) *{
        lock: sync.RWMutex {
                w: (*sync.Mutex)(0x400053c000),
                writerSem: 0,
                readerSem: 0,
                readerCount: 0,
                readerWait: 0,},
        cond: sync.Cond {
                noCopy: sync.noCopy {},
                L: sync.Locker(*sync.RWMutex) ...,
                notify: (*sync.notifyList)(0x400053c028),
                checker: 274883395656,},
        items: map[string]k8s.io/client-go/tools/cache.Deltas [],
        queue: []string len: 0, cap: 2, [],
        populated: true,
        initialPopulationCount: 0,
        keyFunc: k8s.io/client-go/tools/cache.MetaNamespaceKeyFunc,
        knownObjects: k8s.io/client-go/tools/cache.KeyListerGetter(*k8s.io/client-go/tools/cache.cache) *{
                cacheStorage: k8s.io/client-go/tools/cache.ThreadSafeStore(*k8s.io/client-go/tools/cache.threadSafeMap) ...,
                keyFunc: k8s.io/client-go/tools/cache.DeletionHandlingMetaNamespaceKeyFunc,},
        closed: false,
        emitDeltaTypeReplaced: true,}
(dlv)  p w
k8s.io/apimachinery/pkg/watch.Interface(*k8s.io/apimachinery/pkg/watch.StreamWatcher) *{
        Mutex: sync.Mutex {state: 0, sema: 0},
        source: k8s.io/apimachinery/pkg/watch.Decoder(*k8s.io/client-go/rest/watch.Decoder) *{
                decoder: k8s.io/apimachinery/pkg/runtime/serializer/streaming.Decoder(*k8s.io/apimachinery/pkg/runtime/serializer/streaming.decoder) ...,
                embeddedDecoder: k8s.io/apimachinery/pkg/runtime.Decoder(k8s.io/apimachinery/pkg/runtime.WithoutVersionDecoder) *(*"k8s.io/apimachinery/pkg/runtime.Decoder")(0x40003f6010),},
        reporter: k8s.io/apimachinery/pkg/watch.Reporter(*k8s.io/apimachinery/pkg/api/errors.ErrorReporter) *{
                code: 500,
                verb: "GET",
                reason: "ClientWatchDecoding",},
        result: chan k8s.io/apimachinery/pkg/watch.Event {
                qcount: 0,
                dataqsiz: 0,
                buf: *[0]k8s.io/apimachinery/pkg/watch.Event [],
                elemsize: 32,
                closed: 0,
                elemtype: *runtime._type {size: 32, ptrdata: 32, hash: 107209836, tflag: tflagUncommon|tflagExtraStar|tflagNamed (7), align: 8, fieldAlign: 8, kind: 25, equal: type..eq.k8s.io/apimachinery/pkg/watch.Event, gcdata: *9, str: 70259, ptrToThis: 1793344},
                sendx: 0,
                recvx: 0,
                recvq: waitq<k8s.io/apimachinery/pkg/watch.Event> {
                        first: *sudog<k8s.io/apimachinery/pkg/watch.Event> nil,
                        last: *sudog<k8s.io/apimachinery/pkg/watch.Event> nil,},
                sendq: waitq<k8s.io/apimachinery/pkg/watch.Event> {
                        first: *sudog<k8s.io/apimachinery/pkg/watch.Event> nil,
                        last: *sudog<k8s.io/apimachinery/pkg/watch.Event> nil,},
                lock: runtime.mutex {key: 0},},
        done: chan struct {} {
                qcount: 0,
                dataqsiz: 0,
                buf: *[0]struct struct {} [],
                elemsize: 0,
                closed: 0,
                elemtype: *runtime._type {size: 0, ptrdata: 0, hash: 670477339, tflag: tflagExtraStar|tflagRegularMemory (10), align: 1, fieldAlign: 1, kind: 25, equal: runtime.memequal0, gcdata: *1, str: 47516, ptrToThis: 653632},
                sendx: 0,
                recvx: 0,
                recvq: waitq<struct {}> {
                        first: *sudog<struct {}> nil,
                        last: *sudog<struct {}> nil,},
                sendq: waitq<struct {}> {
                        first: *sudog<struct {}> nil,
                        last: *sudog<struct {}> nil,},
                lock: runtime.mutex {key: 0},},}
(dlv) 
(dlv) p event
Command failed: could not find symbol value for event
(dlv) p *event
Command failed: could not find symbol value for event
(dlv) p event.Object
Command failed: could not find symbol value for event
(dlv) n
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:522 (PC: 0xe16444)
Warning: debugging optimized function
   517:                         case watch.Bookmark:
   518:                                 // A `Bookmark` means watch has synced here, just update the resourceVersion
   519:                         default:
   520:                                 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
   521:                         }
=> 522:                         *resourceVersion = newResourceVersion
   523:                         r.setLastSyncResourceVersion(newResourceVersion)
   524:                         if rvu, ok := r.store.(ResourceVersionUpdater); ok {
   525:                                 rvu.UpdateResourceVersion(newResourceVersion)
   526:                         }
   527:                         eventCount++
(dlv) c
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(91):3 total:5) (PC: 0xe16408)
Warning: debugging optimized function
   495:                                 continue
   496:                         }
   497:                         newResourceVersion := meta.GetResourceVersion()
   498:                         switch event.Type {
   499:                         case watch.Added:
=> 500:                                 err := r.store.Add(event.Object)
   501:                                 if err != nil {
   502:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
   503:                                 }
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
(dlv) s
> k8s.io/client-go/tools/cache.(*DeltaFIFO).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/delta_fifo.go:277 (PC: 0xe0f8a8)
Warning: debugging optimized function
   272:         return f.populated && f.initialPopulationCount == 0
   273: }
   274:
   275: // Add inserts an item, and puts it in the queue. The item is only enqueued
   276: // if it doesn't already exist in the set.
=> 277: func (f *DeltaFIFO) Add(obj interface{}) error {
   278:         f.lock.Lock()
   279:         defer f.lock.Unlock()
   280:         f.populated = true
   281:         return f.queueActionLocked(Added, obj)
   282: }
(dlv) n
(dlv) b reflector.go:509

触发delete

root@ubuntu:~/sample-controller/artifacts/examples# kubectl delete -f example-foo.yaml 
foo.samplecontroller.k8s.io "example-foo" deleted
root@ubuntu:~/sample-controller/artifacts/examples# 
I0709 18:45:53.458531   63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(102):1 total:1) (PC: 0xe16674)
Warning: debugging optimized function
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
   506:                                 if err != nil {
   507:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
   508:                                 }
=> 509:                         case watch.Deleted:
   510:                                 // TODO: Will any consumers need access to the "last known
   511:                                 // state", which is passed in event.Object? If so, may need
   512:                                 // to change this.
   513:                                 err := r.store.Delete(event.Object)
   514:                                 if err != nil {
(dlv) bt
0  0x0000000000e16674 in k8s.io/client-go/tools/cache.(*Reflector).watchHandler
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509
1  0x0000000000e15824 in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:428
2  0x0000000000e1dbac in k8s.io/client-go/tools/cache.(*Reflector).Run.func1
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:221
3  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
4  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
5  0x0000000000e14fc8 in k8s.io/client-go/tools/cache.(*Reflector).Run
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:220
6  0x0000000000e20a48 in k8s.io/client-go/tools/cache.(*Reflector).Run-fm
   at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:218
7  0x00000000006b8ce8 in k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:56
8  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
   at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
9  0x00000000000718dc in runtime.goexit
   at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) c
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(91):1 total:2) (PC: 0xe16674)
Warning: debugging optimized function
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
   506:                                 if err != nil {
   507:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
   508:                                 }
=> 509:                         case watch.Deleted:
   510:                                 // TODO: Will any consumers need access to the "last known
   511:                                 // state", which is passed in event.Object? If so, may need
   512:                                 // to change this.
   513:                                 err := r.store.Delete(event.Object)
   514:                                 if err != nil {
(dlv) p event.Object
Command failed: could not find symbol value for event
(dlv) c

图中 3->4->5的流程

samplecontroller
cache.SharedIndexInformer
NewSharedInformerFactory
参考  Accessing Kubernetes CRDs from the client-go package
DeltaFIFO
watch
updateIndices
深入浅出 kubernetes 之 WorkQueue 详解
kubernets controller 和 CRD 具体组件分析
深入k8s:Informer使用及其源码分析
client-go 中的 informer 源码分析

(dlv) c
E0709 18:53:48.900792   63308 controller.go:257] foo 'default/example-foo' in work queue no longer exists
I0709 18:53:48.900868   63308 controller.go:228] Successfully synced 'default/example-foo'
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(102):3 total:6) (PC: 0xe16408)
Warning: debugging optimized function
   495:                                 continue
   496:                         }
   497:                         newResourceVersion := meta.GetResourceVersion()
   498:                         switch event.Type {
   499:                         case watch.Added:
=> 500:                                 err := r.store.Add(event.Object)
   501:                                 if err != nil {
   502:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
   503:                                 }
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
(dlv) b thread_safe_store.go:68
Command failed: could not find statement at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:68, please use a line with a statement
(dlv) b thread_safe_store.Add
Command failed: location "thread_safe_store.Add" not found
(dlv) b ThreadSafeStore.Add
Command failed: location "ThreadSafeStore.Add" not found
(dlv) b thread_safe_store.go:73
Breakpoint 5 (enabled) set at 0xe1b6b8 for k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73
(dlv) c
> k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73 (hits goroutine(49):1 total:1) (PC: 0xe1b6b8)
Warning: debugging optimized function
    68:         indexers Indexers
    69:         // indices maps a name to an Index
    70:         indices Indices
    71: }
    72:
=>  73: func (c *threadSafeMap) Add(key string, obj interface{}) {
    74:         c.lock.Lock()
    75:         defer c.lock.Unlock()
    76:         oldObject := c.items[key]
    77:         c.items[key] = obj
    78:         c.updateIndices(oldObject, obj, key)
(dlv) bt
 0  0x0000000000e1b6b8 in k8s.io/client-go/tools/cache.(*threadSafeMap).Add
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73
 1  0x0000000000e1ad30 in k8s.io/client-go/tools/cache.(*cache).Add
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/store.go:155
 2  0x0000000000e191bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:557
 3  0x0000000000e20b80 in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:527
 4  0x0000000000e1150c in k8s.io/client-go/tools/cache.(*DeltaFIFO).Pop
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/delta_fifo.go:539
 5  0x0000000000e0f354 in k8s.io/client-go/tools/cache.(*controller).processLoop
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:183
 6  0x0000000000e20a90 in k8s.io/client-go/tools/cache.(*controller).processLoop-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:181
 7  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 8  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 9  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
10  0x0000000000e0f0e8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
11  0x0000000000e0f0e8 in k8s.io/client-go/tools/cache.(*controller).Run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:154
12  0x0000000000e180bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).Run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:410
13  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) 
(dlv) s
> k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:74 (PC: 0xe1b6d0)
Warning: debugging optimized function
    69:         // indices maps a name to an Index
    70:         indices Indices
    71: }
    72:
    73: func (c *threadSafeMap) Add(key string, obj interface{}) {
=>  74:         c.lock.Lock()
    75:         defer c.lock.Unlock()
    76:         oldObject := c.items[key]
    77:         c.items[key] = obj
    78:         c.updateIndices(oldObject, obj, key)
    79: }
(dlv) p key
"default/example-foo"
(dlv) c.items
Command failed: command not available
(dlv) p c.items
map[string]interface {} []
(dlv) p c
*k8s.io/client-go/tools/cache.threadSafeMap {
        lock: sync.RWMutex {
                w: (*sync.Mutex)(0x40001fd350),
                writerSem: 0,
                readerSem: 0,
                readerCount: 0,
                readerWait: 0,},
        items: map[string]interface {} [],
        indexers: k8s.io/client-go/tools/cache.Indexers [
                "namespace": k8s.io/client-go/tools/cache.MetaNamespaceIndexFunc, 
        ],
        indices: k8s.io/client-go/tools/cache.Indices [
                "namespace": [], 
        ],}
(dlv) n
W0709 18:58:10.290090   63308 reflector.go:441] pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167: watch of *v1.Deployment ended with: an error on the server ("unable to decode an event from the watch stream: http2: client connection lost") has prevented the request from succeeding
W0709 18:58:10.290092   63308 reflector.go:441] pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167: watch of *v1alpha1.Foo ended with: an error on the server ("unable to decode an event from the watch stream: http2: client connection lost") has prevented the request from succeeding
> k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:75 (PC: 0xe1b6e0)
Warning: debugging optimized function
    70:         indices Indices
    71: }
    72:
    73: func (c *threadSafeMap) Add(key string, obj interface{}) {
    74:         c.lock.Lock()
=>  75:         defer c.lock.Unlock()
    76:         oldObject := c.items[key]
    77:         c.items[key] = obj
    78:         c.updateIndices(oldObject, obj, key)
    79: }
    80:
(dlv) n
> k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:76 (PC: 0xe1b6fc)
Warning: debugging optimized function
    71: }
    72:
    73: func (c *threadSafeMap) Add(key string, obj interface{}) {
    74:         c.lock.Lock()
    75:         defer c.lock.Unlock()
=>  76:         oldObject := c.items[key]
    77:         c.items[key] = obj
    78:         c.updateIndices(oldObject, obj, key)
    79: }
    80:
    81: func (c *threadSafeMap) Update(key string, obj interface{}) {
(dlv) n
> k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:77 (PC: 0xe1b738)
Warning: debugging optimized function
    72:
    73: func (c *threadSafeMap) Add(key string, obj interface{}) {
    74:         c.lock.Lock()
    75:         defer c.lock.Unlock()
    76:         oldObject := c.items[key]
=>  77:         c.items[key] = obj
    78:         c.updateIndices(oldObject, obj, key)
    79: }
    80:
    81: func (c *threadSafeMap) Update(key string, obj interface{}) {
    82:         c.lock.Lock()
(dlv) p obj
interface {}(*k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo) *{
        TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {Kind: "", APIVersion: ""},
        ObjectMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta {
                Name: "example-foo",
                GenerateName: "",
                Namespace: "default",
                SelfLink: "/apis/samplecontroller.k8s.io/v1alpha1/namespaces/default/foos/e...+10 more",
                UID: "1917188d-93f5-4f67-8a52-d07366704dde",
                ResourceVersion: "1889422",
                Generation: 1,
                CreationTimestamp: (*"k8s.io/apimachinery/pkg/apis/meta/v1.Time")(0x4000d22308),
                DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil,
                DeletionGracePeriodSeconds: *int64 nil,
                Labels: map[string]string nil,
                Annotations: map[string]string [...],
                OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 0, cap: 0, nil,
                Finalizers: []string len: 0, cap: 0, nil,
                ClusterName: "",
                ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 1, cap: 1, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x4000190240),
                ],},
        Spec: k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooSpec {
                DeploymentName: "example-foo",
                Replicas: *1,},
        Status: k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooStatus {AvailableReplicas: 0},}
(dlv) 
(dlv) bt
 0  0x0000000000e1b738 in k8s.io/client-go/tools/cache.(*threadSafeMap).Add
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:77
 1  0x0000000000e1ad30 in k8s.io/client-go/tools/cache.(*cache).Add
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/store.go:155
 2  0x0000000000e191bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:557
 3  0x0000000000e20b80 in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:527
 4  0x0000000000e1150c in k8s.io/client-go/tools/cache.(*DeltaFIFO).Pop
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/delta_fifo.go:539
 5  0x0000000000e0f354 in k8s.io/client-go/tools/cache.(*controller).processLoop
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:183
 6  0x0000000000e20a90 in k8s.io/client-go/tools/cache.(*controller).processLoop-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:181
 7  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 8  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 9  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
10  0x0000000000e0f0e8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
11  0x0000000000e0f0e8 in k8s.io/client-go/tools/cache.(*controller).Run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:154
12  0x0000000000e180bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).Run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:410
13  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) 

 删除

> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(102):3 total:5) (PC: 0xe16674)
Warning: debugging optimized function
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
   506:                                 if err != nil {
   507:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
   508:                                 }
=> 509:                         case watch.Deleted:
   510:                                 // TODO: Will any consumers need access to the "last known
   511:                                 // state", which is passed in event.Object? If so, may need
   512:                                 // to change this.
   513:                                 err := r.store.Delete(event.Object)
   514:                                 if err != nil {
(dlv) c
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(91):3 total:6) (PC: 0xe16674)
Warning: debugging optimized function
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):8 total:10) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) c
E0709 19:07:41.884046   63308 controller.go:257] foo 'default/example-foo' in work queue no longer exists
I0709 19:07:41.884085   63308 controller.go:228] Successfully synced 'default/example-foo'

图中 6->7的流程

(dlv) b enqueueFoo
Breakpoint 6 (enabled) set at 0xee9948 for main.(*Controller).enqueueFoo() ./controller.go:338
(dlv) c
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):1 total:1) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) bt
 0  0x0000000000ee9948 in main.(*Controller).enqueueFoo
    at ./controller.go:338
 1  0x0000000000eeb1d0 in main.(*Controller).enqueueFoo-fm
    at ./controller.go:338
 2  0x0000000000e20dc0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnAdd
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:231
 3  0x0000000000e1f2a0 in k8s.io/client-go/tools/cache.(*processorListener).run.func1
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:777
 4  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 5  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 6  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
 7  0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
 8  0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771
 9  0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765
10  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
11  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) b AddAfter
Command failed: Location "AddAfter" ambiguous: k8s.io/client-go/util/workqueue.(*delayingType).AddAfter, k8s.io/client-go/util/workqueue.(*rateLimitingType).AddAfter, k8s.io/client-go/util/workqueue.rateLimitingType.AddAfter…
(dlv) b k8s.io/client-go/util/workqueue.(*delayingType).AddAfter
Breakpoint 7 (enabled) set at 0xc24328 for k8s.io/client-go/util/workqueue.(*delayingType).AddAfter() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/delaying_queue.go:160
(dlv) c
I0709 19:03:25.953479   63308 trace.go:205] Trace[1225511528]: "Reflector ListAndWatch" name:pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167 (09-Jul-2021 18:58:12.969) (total time: 243729ms):
Trace[1225511528]: [4m3.72932702s] [4m3.72932702s] END
E0709 19:03:25.953605   63308 reflector.go:138] pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167: Failed to watch *v1alpha1.Foo: failed to list *v1alpha1.Foo: Get "https://10.10.16.249:6443/apis/samplecontroller.k8s.io/v1alpha1/foos?resourceVersion=1889422": net/http: TLS handshake timeout
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):2 total:2) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) btt
Command failed: command not available
(dlv) bt
 0  0x0000000000ee9948 in main.(*Controller).enqueueFoo
    at ./controller.go:338
 1  0x0000000000eeac70 in main.NewController.func1
    at ./controller.go:120
 2  0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238
 3  0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775
 4  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 5  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 6  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
 7  0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
 8  0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771
 9  0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765
10  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
11  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) c
I0709 19:03:25.953631   63308 trace.go:205] Trace[1852186258]: "Reflector ListAndWatch" name:pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167 (09-Jul-2021 18:58:12.970) (total time: 312982ms):
Trace[1852186258]: [5m12.982727975s] [5m12.982727975s] END
E0709 19:03:38.269089   63308 reflector.go:138] pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167: Failed to watch *v1.Deployment: failed to list *v1.Deployment: Get "https://10.10.16.249:6443/apis/apps/v1/deployments?resourceVersion=1889388": net/http: TLS handshake timeout
> k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:348 (hits goroutine(102):4 total:5) (PC: 0xe1e51c)
Warning: debugging optimized function
   343:                 if err != nil {
   344:                         return fmt.Errorf("unable to understand list result %#v: %v", list, err)
   345:                 }
   346:                 resourceVersion = listMetaInterface.GetResourceVersion()
   347:                 initTrace.Step("Resource version extracted")
=> 348:                 items, err := meta.ExtractList(list)
   349:                 if err != nil {
   350:                         return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
   351:                 }
   352:                 initTrace.Step("Objects extracted")
   353:                 if err := r.syncWith(items, resourceVersion); err != nil {
(dlv) c
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):3 total:3) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) bt
 0  0x0000000000ee9948 in main.(*Controller).enqueueFoo
    at ./controller.go:338
 1  0x0000000000eeac70 in main.NewController.func1
    at ./controller.go:120
 2  0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238
 3  0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775
 4  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 5  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 6  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
 7  0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
 8  0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771
 9  0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765
10  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
11  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) c
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:459 (hits goroutine(102):6 total:11) (PC: 0xe16048)
Warning: debugging optimized function
   454:         }
   455:         return r.store.Replace(found, resourceVersion)
   456: }
   457:
   458: // watchHandler watches w and keeps *resourceVersion up to date.
=> 459: func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
   460:         eventCount := 0
   461:
   462:         // Stopping the watcher should be idempotent and if we return from this function there's no way
   463:         // we're coming back in with the same watch interface.
   464:         defer w.Stop()
(dlv) c
I0709 19:03:59.868255   63308 controller.go:228] Successfully synced 'default/example-foo'
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):4 total:4) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) bt
 0  0x0000000000ee9948 in main.(*Controller).enqueueFoo
    at ./controller.go:338
 1  0x0000000000eeac70 in main.NewController.func1
    at ./controller.go:120
 2  0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238
 3  0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775
 4  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 5  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 6  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
 7  0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
 8  0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771
 9  0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765
10  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
11  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) c
I0709 19:04:13.265775   63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
> k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:348 (hits goroutine(91):2 total:6) (PC: 0xe1e51c)
Warning: debugging optimized function
   343:                 if err != nil {
   344:                         return fmt.Errorf("unable to understand list result %#v: %v", list, err)
   345:                 }
   346:                 resourceVersion = listMetaInterface.GetResourceVersion()
   347:                 initTrace.Step("Resource version extracted")
=> 348:                 items, err := meta.ExtractList(list)
   349:                 if err != nil {
   350:                         return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
   351:                 }
   352:                 initTrace.Step("Objects extracted")
   353:                 if err := r.syncWith(items, resourceVersion); err != nil {
(dlv) c
I0709 19:04:21.857750   63308 trace.go:205] Trace[443632888]: "Reflector ListAndWatch" name:pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167 (09-Jul-2021 19:03:47.178) (total time: 34678ms):
Trace[443632888]: ---"Objects listed" 26089ms (19:04:00.268)
Trace[443632888]: ---"Objects extracted" 8589ms (19:04:00.857)
Trace[443632888]: [34.678769477s] [34.678769477s] END
> k8s.io/client-go/util/workqueue.(*delayingType).AddAfter() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/delaying_queue.go:160 (hits goroutine(123):1 total:1) (PC: 0xc24328)
Warning: debugging optimized function
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:459 (hits goroutine(91):6 total:12) (PC: 0xe16048)
Warning: debugging optimized function
> k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73 (hits goroutine(48):1 total:2) (PC: 0xe1b6b8)
Warning: debugging optimized function
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):5 total:5) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) bt
 0  0x0000000000ee9948 in main.(*Controller).enqueueFoo
    at ./controller.go:338
 1  0x0000000000eeac70 in main.NewController.func1
    at ./controller.go:120
 2  0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238
 3  0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775
 4  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 5  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 6  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
 7  0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
 8  0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771
 9  0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765
10  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
11  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) c
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(130):1 total:6) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) c
E0709 19:04:30.134547   63308 controller.go:233] error syncing 'default/example-foo': deployments.apps "example-foo" already exists, requeuing
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(130):2 total:7) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) bt
 0  0x0000000000ee9948 in main.(*Controller).enqueueFoo
    at ./controller.go:338
 1  0x0000000000ee9d60 in main.(*Controller).handleObject
    at ./controller.go:383
 2  0x0000000000eeacf8 in main.NewController.func2
    at ./controller.go:139
 3  0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238
 4  0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775
 5  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 6  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 7  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
 8  0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
 9  0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771
10  0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765
11  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
12  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) c
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):6 total:8) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) bt
 0  0x0000000000ee9948 in main.(*Controller).enqueueFoo
    at ./controller.go:338
 1  0x0000000000eeac70 in main.NewController.func1
    at ./controller.go:120
 2  0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238
 3  0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775
 4  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 5  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 6  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
 7  0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
 8  0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771
 9  0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765
10  0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73
11  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) 
(dlv) c
I0709 19:11:07.684833   63308 controller.go:228] Successfully synced 'default/example-foo'
I0709 19:11:07.685614   63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
I0709 19:11:07.690807   63308 controller.go:228] Successfully synced 'default/example-foo'
I0709 19:11:07.690870   63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(102):4 total:7) (PC: 0xe16674)
Warning: debugging optimized function
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
   506:                                 if err != nil {
   507:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
   508:                                 }
=> 509:                         case watch.Deleted:
   510:                                 // TODO: Will any consumers need access to the "last known
   511:                                 // state", which is passed in event.Object? If so, may need
   512:                                 // to change this.
   513:                                 err := r.store.Delete(event.Object)
   514:                                 if err != nil {
(dlv) c
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(91):4 total:8) (PC: 0xe16674)
Warning: debugging optimized function
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
   506:                                 if err != nil {
   507:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
   508:                                 }
=> 509:                         case watch.Deleted:
   510:                                 // TODO: Will any consumers need access to the "last known
   511:                                 // state", which is passed in event.Object? If so, may need
   512:                                 // to change this.
   513:                                 err := r.store.Delete(event.Object)
   514:                                 if err != nil {
(dlv) c
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(102):5 total:9) (PC: 0xe16408)
Warning: debugging optimized function
   495:                                 continue
   496:                         }
   497:                         newResourceVersion := meta.GetResourceVersion()
   498:                         switch event.Type {
   499:                         case watch.Added:
=> 500:                                 err := r.store.Add(event.Object)
   501:                                 if err != nil {
   502:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
   503:                                 }
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
(dlv) c
> k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73 (hits goroutine(49):3 total:5) (PC: 0xe1b6b8)
Warning: debugging optimized function
    68:         indexers Indexers
    69:         // indices maps a name to an Index
    70:         indices Indices
    71: }
    72:
=>  73: func (c *threadSafeMap) Add(key string, obj interface{}) {
    74:         c.lock.Lock()
    75:         defer c.lock.Unlock()
    76:         oldObject := c.items[key]
    77:         c.items[key] = obj
    78:         c.updateIndices(oldObject, obj, key)
(dlv) c
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):15 total:22) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) s
> main.(*Controller).enqueueFoo() ./controller.go:341 (PC: 0xee9954)
Warning: debugging optimized function
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
   338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
=> 341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
   344:         }
   345:         c.workqueue.Add(key)
   346: }
(dlv) n
> main.(*Controller).enqueueFoo() ./controller.go:345 (PC: 0xee9994)
Warning: debugging optimized function
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
   344:         }
=> 345:         c.workqueue.Add(key)
   346: }
   347:
   348: // handleObject will take any resource implementing metav1.Object and attempt
   349: // to find the Foo resource that 'owns' it. It does this by looking at the
   350: // objects metadata.ownerReferences field for an appropriate OwnerReference.
(dlv) s
> k8s.io/client-go/util/workqueue.(*rateLimitingType).Add() <autogenerated>:1 (PC: 0xc26e88)
Warning: debugging optimized function
(dlv) n
> main.(*Controller).enqueueFoo() ./controller.go:346 (PC: 0xee99c4)
Warning: debugging optimized function
Values returned:

   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
   344:         }
   345:         c.workqueue.Add(key)
=> 346: }
   347:
   348: // handleObject will take any resource implementing metav1.Object and attempt
   349: // to find the Foo resource that 'owns' it. It does this by looking at the
   350: // objects metadata.ownerReferences field for an appropriate OwnerReference.
   351: // It then enqueues that Foo resource to be processed. If the object does not
(dlv) p c.workqueue
k8s.io/client-go/util/workqueue.RateLimitingInterface(*k8s.io/client-go/util/workqueue.rateLimitingType) *{
        DelayingInterface: k8s.io/client-go/util/workqueue.DelayingInterface(*k8s.io/client-go/util/workqueue.delayingType) *{
                Interface: k8s.io/client-go/util/workqueue.Interface(*k8s.io/client-go/util/workqueue.Type) ...,
                clock: k8s.io/apimachinery/pkg/util/clock.Clock(k8s.io/apimachinery/pkg/util/clock.RealClock) *(*"k8s.io/apimachinery/pkg/util/clock.Clock")(0x40003c3390),
                stopCh: chan struct {} {
                        qcount: 0,
                        dataqsiz: 0,
                        buf: *[0]struct struct {} [],
                        elemsize: 0,
                        closed: 0,
                        elemtype: *runtime._type {size: 0, ptrdata: 0, hash: 670477339, tflag: tflagExtraStar|tflagRegularMemory (10), align: 1, fieldAlign: 1, kind: 25, equal: runtime.memequal0, gcdata: *1, str: 47516, ptrToThis: 653632},
                        sendx: 0,
                        recvx: 0,
                        recvq: waitq<struct {}> {
                                first: *(*sudog<struct {}>)(0x4000423320),
                                last: *(*sudog<struct {}>)(0x4000423320),},
                        sendq: waitq<struct {}> {
                                first: *sudog<struct {}> nil,
                                last: *sudog<struct {}> nil,},
                        lock: runtime.mutex {key: 0},},
                stopOnce: (*sync.Once)(0x40003c33a8),
                heartbeat: k8s.io/apimachinery/pkg/util/clock.Ticker(*k8s.io/apimachinery/pkg/util/clock.realTicker) ...,
                waitingForAddCh: chan *k8s.io/client-go/util/workqueue.waitFor {
                        qcount: 0,
                        dataqsiz: 1000,
                        buf: *[1000]*k8s.io/client-go/util/workqueue.waitFor [
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                ...+936 more
                        ],
                        elemsize: 8,
                        closed: 0,
                        elemtype: *runtime._type {size: 8, ptrdata: 8, hash: 3317760887, tflag: tflagRegularMemory (8), align: 8, fieldAlign: 8, kind: 54, equal: runtime.memequal64, gcdata: *1, str: 144019, ptrToThis: 0},
                        sendx: 0,
                        recvx: 0,
                        recvq: waitq<*k8s.io/client-go/util/workqueue.waitFor> {
                                first: *(*"sudog<*k8s.io/client-go/util/workqueue.waitFor>")(0x4000423200),
                                last: *(*"sudog<*k8s.io/client-go/util/workqueue.waitFor>")(0x4000423200),},
                        sendq: waitq<*k8s.io/client-go/util/workqueue.waitFor> {
                                first: *sudog<*k8s.io/client-go/util/workqueue.waitFor> nil,
                                last: *sudog<*k8s.io/client-go/util/workqueue.waitFor> nil,},
                        lock: runtime.mutex {key: 0},},
                metrics: k8s.io/client-go/util/workqueue.retryMetrics(*k8s.io/client-go/util/workqueue.defaultRetryMetrics) ...,},
        rateLimiter: k8s.io/client-go/util/workqueue.RateLimiter(*k8s.io/client-go/util/workqueue.MaxOfRateLimiter) *{
                limiters: []k8s.io/client-go/util/workqueue.RateLimiter len: 2, cap: 2, [
                        ...,
                        ...,
                ],},}
(dlv) 
workqueue
(dlv) c
I0709 19:11:07.684833   63308 controller.go:228] Successfully synced 'default/example-foo'
I0709 19:11:07.685614   63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
I0709 19:11:07.690807   63308 controller.go:228] Successfully synced 'default/example-foo'
I0709 19:11:07.690870   63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(102):4 total:7) (PC: 0xe16674)
Warning: debugging optimized function
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
   506:                                 if err != nil {
   507:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
   508:                                 }
=> 509:                         case watch.Deleted:
   510:                                 // TODO: Will any consumers need access to the "last known
   511:                                 // state", which is passed in event.Object? If so, may need
   512:                                 // to change this.
   513:                                 err := r.store.Delete(event.Object)
   514:                                 if err != nil {
(dlv) c
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(91):4 total:8) (PC: 0xe16674)
Warning: debugging optimized function
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
   506:                                 if err != nil {
   507:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
   508:                                 }
=> 509:                         case watch.Deleted:
   510:                                 // TODO: Will any consumers need access to the "last known
   511:                                 // state", which is passed in event.Object? If so, may need
   512:                                 // to change this.
   513:                                 err := r.store.Delete(event.Object)
   514:                                 if err != nil {
(dlv) c
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(102):5 total:9) (PC: 0xe16408)
Warning: debugging optimized function
   495:                                 continue
   496:                         }
   497:                         newResourceVersion := meta.GetResourceVersion()
   498:                         switch event.Type {
   499:                         case watch.Added:
=> 500:                                 err := r.store.Add(event.Object)
   501:                                 if err != nil {
   502:                                         utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
   503:                                 }
   504:                         case watch.Modified:
   505:                                 err := r.store.Update(event.Object)
(dlv) c
> k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73 (hits goroutine(49):3 total:5) (PC: 0xe1b6b8)
Warning: debugging optimized function
    68:         indexers Indexers
    69:         // indices maps a name to an Index
    70:         indices Indices
    71: }
    72:
=>  73: func (c *threadSafeMap) Add(key string, obj interface{}) {
    74:         c.lock.Lock()
    75:         defer c.lock.Unlock()
    76:         oldObject := c.items[key]
    77:         c.items[key] = obj
    78:         c.updateIndices(oldObject, obj, key)
(dlv) c
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):15 total:22) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) s
> main.(*Controller).enqueueFoo() ./controller.go:341 (PC: 0xee9954)
Warning: debugging optimized function
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
   338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
=> 341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
   344:         }
   345:         c.workqueue.Add(key)
   346: }
(dlv) n
> main.(*Controller).enqueueFoo() ./controller.go:345 (PC: 0xee9994)
Warning: debugging optimized function
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
   344:         }
=> 345:         c.workqueue.Add(key)
   346: }
   347:
   348: // handleObject will take any resource implementing metav1.Object and attempt
   349: // to find the Foo resource that 'owns' it. It does this by looking at the
   350: // objects metadata.ownerReferences field for an appropriate OwnerReference.
(dlv) s
> k8s.io/client-go/util/workqueue.(*rateLimitingType).Add() <autogenerated>:1 (PC: 0xc26e88)
Warning: debugging optimized function
(dlv) n
> main.(*Controller).enqueueFoo() ./controller.go:346 (PC: 0xee99c4)
Warning: debugging optimized function
Values returned:

   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
   344:         }
   345:         c.workqueue.Add(key)
=> 346: }
   347:
   348: // handleObject will take any resource implementing metav1.Object and attempt
   349: // to find the Foo resource that 'owns' it. It does this by looking at the
   350: // objects metadata.ownerReferences field for an appropriate OwnerReference.
   351: // It then enqueues that Foo resource to be processed. If the object does not
(dlv) p c.workqueue
k8s.io/client-go/util/workqueue.RateLimitingInterface(*k8s.io/client-go/util/workqueue.rateLimitingType) *{
        DelayingInterface: k8s.io/client-go/util/workqueue.DelayingInterface(*k8s.io/client-go/util/workqueue.delayingType) *{
                Interface: k8s.io/client-go/util/workqueue.Interface(*k8s.io/client-go/util/workqueue.Type) ...,
                clock: k8s.io/apimachinery/pkg/util/clock.Clock(k8s.io/apimachinery/pkg/util/clock.RealClock) *(*"k8s.io/apimachinery/pkg/util/clock.Clock")(0x40003c3390),
                stopCh: chan struct {} {
                        qcount: 0,
                        dataqsiz: 0,
                        buf: *[0]struct struct {} [],
                        elemsize: 0,
                        closed: 0,
                        elemtype: *runtime._type {size: 0, ptrdata: 0, hash: 670477339, tflag: tflagExtraStar|tflagRegularMemory (10), align: 1, fieldAlign: 1, kind: 25, equal: runtime.memequal0, gcdata: *1, str: 47516, ptrToThis: 653632},
                        sendx: 0,
                        recvx: 0,
                        recvq: waitq<struct {}> {
                                first: *(*sudog<struct {}>)(0x4000423320),
                                last: *(*sudog<struct {}>)(0x4000423320),},
                        sendq: waitq<struct {}> {
                                first: *sudog<struct {}> nil,
                                last: *sudog<struct {}> nil,},
                        lock: runtime.mutex {key: 0},},
                stopOnce: (*sync.Once)(0x40003c33a8),
                heartbeat: k8s.io/apimachinery/pkg/util/clock.Ticker(*k8s.io/apimachinery/pkg/util/clock.realTicker) ...,
                waitingForAddCh: chan *k8s.io/client-go/util/workqueue.waitFor {
                        qcount: 0,
                        dataqsiz: 1000,
                        buf: *[1000]*k8s.io/client-go/util/workqueue.waitFor [
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                *nil,
                                ...+936 more
                        ],
                        elemsize: 8,
                        closed: 0,
                        elemtype: *runtime._type {size: 8, ptrdata: 8, hash: 3317760887, tflag: tflagRegularMemory (8), align: 8, fieldAlign: 8, kind: 54, equal: runtime.memequal64, gcdata: *1, str: 144019, ptrToThis: 0},
                        sendx: 0,
                        recvx: 0,
                        recvq: waitq<*k8s.io/client-go/util/workqueue.waitFor> {
                                first: *(*"sudog<*k8s.io/client-go/util/workqueue.waitFor>")(0x4000423200),
                                last: *(*"sudog<*k8s.io/client-go/util/workqueue.waitFor>")(0x4000423200),},
                        sendq: waitq<*k8s.io/client-go/util/workqueue.waitFor> {
                                first: *sudog<*k8s.io/client-go/util/workqueue.waitFor> nil,
                                last: *sudog<*k8s.io/client-go/util/workqueue.waitFor> nil,},
                        lock: runtime.mutex {key: 0},},
                metrics: k8s.io/client-go/util/workqueue.retryMetrics(*k8s.io/client-go/util/workqueue.defaultRetryMetrics) ...,},
        rateLimiter: k8s.io/client-go/util/workqueue.RateLimiter(*k8s.io/client-go/util/workqueue.MaxOfRateLimiter) *{
                limiters: []k8s.io/client-go/util/workqueue.RateLimiter len: 2, cap: 2, [
                        ...,
                        ...,
                ],},}
(dlv) 

图中8->9

workqueue
(dlv) b queue.go:152
Command failed: could not find statement at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/queue.go:152, please use a line with a statement
(dlv) b queue.go:147
Breakpoint 9 (enabled) set at 0xc25e48 for k8s.io/client-go/util/workqueue.(*Type).Get() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/queue.go:147
(dlv) c
> main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):16 total:23) (PC: 0xee9948)
Warning: debugging optimized function
   333: }
   334:
   335: // enqueueFoo takes a Foo resource and converts it into a namespace/name
   336: // string which is then put onto the work queue. This method should *not* be
   337: // passed resources of any type other than Foo.
=> 338: func (c *Controller) enqueueFoo(obj interface{}) {
   339:         var key string
   340:         var err error
   341:         if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
   342:                 utilruntime.HandleError(err)
   343:                 return
(dlv) c
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:459 (hits goroutine(102):7 total:14) (PC: 0xe16048)
Warning: debugging optimized function
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:459 (hits goroutine(91):7 total:14) (PC: 0xe16048)
Warning: debugging optimized function
   454:         }
   455:         return r.store.Replace(found, resourceVersion)
   456: }
   457:
   458: // watchHandler watches w and keeps *resourceVersion up to date.
=> 459: func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
   460:         eventCount := 0
   461:
   462:         // Stopping the watcher should be idempotent and if we return from this function there's no way
   463:         // we're coming back in with the same watch interface.
   464:         defer w.Stop()
(dlv) c
I0709 19:14:30.567295   63308 controller.go:228] Successfully synced 'default/example-foo'
I0709 19:14:30.567503   63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(91):5 total:10) (PC: 0xe16408)
Warning: debugging optimized function
> k8s.io/client-go/util/workqueue.(*Type).Get() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/queue.go:147 (hits goroutine(124):1 total:1) (PC: 0xc25e48)
Warning: debugging optimized function
   142: }
   143:
   144: // Get blocks until it can return an item to be processed. If shutdown = true,
   145: // the caller should end their goroutine. You must call Done with item when you
   146: // have finished processing it.
=> 147: func (q *Type) Get() (item interface{}, shutdown bool) {
   148:         q.cond.L.Lock()
   149:         defer q.cond.L.Unlock()
   150:         for len(q.queue) == 0 && !q.shuttingDown {
   151:                 q.cond.Wait()
   152:         }
(dlv) bt
 0  0x0000000000c25e48 in k8s.io/client-go/util/workqueue.(*Type).Get
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/queue.go:147
 1  0x0000000000c26a54 in k8s.io/client-go/util/workqueue.(*delayingType).Get
    at <autogenerated>:1
 2  0x0000000000c27044 in k8s.io/client-go/util/workqueue.(*rateLimitingType).Get
    at <autogenerated>:1
 3  0x0000000000ee8e5c in main.(*Controller).processNextWorkItem
    at ./controller.go:188
 4  0x0000000000ee8e00 in main.(*Controller).runWorker
    at ./controller.go:181
 5  0x0000000000eeb260 in main.(*Controller).runWorker-fm
    at ./controller.go:180
 6  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 7  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
 8  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
 9  0x00000000006b7d80 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
10  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) 

updateIndices

root@ubuntu:~/sample-controller# dlv exec ./sample-controller --  --kubeconfig=$HOME/.kube/config 
Type 'help' for list of commands.
(dlv) b updateIndices
Breakpoint 1 (enabled) set at 0xe1d248 for k8s.io/client-go/tools/cache.(*threadSafeMap).updateIndices() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:256
(dlv) c
I0709 19:30:20.770701   32518 controller.go:115] Setting up event handlers
I0709 19:30:20.770868   32518 controller.go:156] Starting Foo controller
I0709 19:30:20.770882   32518 controller.go:159] Waiting for informer caches to sync
> k8s.io/client-go/tools/cache.(*threadSafeMap).updateIndices() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:256 (hits goroutine(105):1 total:1) (PC: 0xe1d248)
Warning: debugging optimized function
   251:         return nil
   252: }
   253:
   254: // updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
   255: // updateIndices must be called from a function that already has a lock on the cache
=> 256: func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
   257:         // if we got an old object, we need to remove it before we add it again
   258:         if oldObj != nil {
   259:                 c.deleteFromIndices(oldObj, key)
   260:         }
   261:         for name, indexFunc := range c.indexers {
(dlv) bt
 0  0x0000000000e1d248 in k8s.io/client-go/tools/cache.(*threadSafeMap).updateIndices
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:256
 1  0x0000000000e1b7bc in k8s.io/client-go/tools/cache.(*threadSafeMap).Add
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:78
 2  0x0000000000e1ad30 in k8s.io/client-go/tools/cache.(*cache).Add
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/store.go:155
 3  0x0000000000e191bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:557
 4  0x0000000000e20b80 in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:527
 5  0x0000000000e1150c in k8s.io/client-go/tools/cache.(*DeltaFIFO).Pop
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/delta_fifo.go:539
 6  0x0000000000e0f354 in k8s.io/client-go/tools/cache.(*controller).processLoop
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:183
 7  0x0000000000e20a90 in k8s.io/client-go/tools/cache.(*controller).processLoop-fm
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:181
 8  0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155
 9  0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156
10  0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133
11  0x0000000000e0f0e8 in k8s.io/apimachinery/pkg/util/wait.Until
    at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90
12  0x0000000000e0f0e8 in k8s.io/client-go/tools/cache.(*controller).Run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:154
13  0x0000000000e180bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).Run
    at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:410
14  0x00000000000718dc in runtime.goexit
    at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) p oldObj  
interface {} nil
(dlv) p  newObj
interface {}(*k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo) *{
        TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {
                Kind: "Foo",
                APIVersion: "samplecontroller.k8s.io/v1alpha1",},
        ObjectMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta {
                Name: "example-foo",
                GenerateName: "",
                Namespace: "default",
                SelfLink: "/apis/samplecontroller.k8s.io/v1alpha1/namespaces/default/foos/e...+10 more",
                UID: "b95d5147-f93a-4e66-af15-53f095179b56",
                ResourceVersion: "1892878",
                Generation: 2,
                CreationTimestamp: (*"k8s.io/apimachinery/pkg/apis/meta/v1.Time")(0x40004b66c8),
                DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil,
                DeletionGracePeriodSeconds: *int64 nil,
                Labels: map[string]string nil,
                Annotations: map[string]string [...],
                OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 0, cap: 0, nil,
                Finalizers: []string len: 0, cap: 0, nil,
                ClusterName: "",
                ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 2, cap: 2, [
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x40003ae3c0),
                        (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x40003ae420),
                ],},
        Spec: k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooSpec {
                DeploymentName: "example-foo",
                Replicas: *1,},
        Status: k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooStatus {AvailableReplicas: 0},}
(dlv) 

Indexer使用的是threadsafe_store.go中的threadSafeMap存储数据,是一个线程安全并且带有索引功能的map,数据只会存放在内存中,每次涉及操作都会进行加锁。

(dlv) p c.indexers
k8s.io/client-go/tools/cache.Indexers [
        "namespace": k8s.io/client-go/tools/cache.MetaNamespaceIndexFunc, 
]
(dlv) p c.indices
k8s.io/client-go/tools/cache.Indices []
(dlv) p c.items
map[string]interface {} [
        "default/example-foo": *k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo {
                TypeMeta: (*"k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta")(0x40004b6640),
                ObjectMeta: (*"k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta")(0x40004b6660),
                Spec: (*"k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooSpec")(0x40004b6758),
                Status: (*"k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooStatus")(0x40004b6770),}, 
]
(dlv) p c
*k8s.io/client-go/tools/cache.threadSafeMap {
        lock: sync.RWMutex {
                w: (*sync.Mutex)(0x400059d2c0),
                writerSem: 0,
                readerSem: 0,
                readerCount: -1073741824,
                readerWait: 0,},
        items: map[string]interface {} [
                "default/example-foo": ..., 
        ],
        indexers: k8s.io/client-go/tools/cache.Indexers [
                "namespace": k8s.io/client-go/tools/cache.MetaNamespaceIndexFunc, 
        ],
        indices: k8s.io/client-go/tools/cache.Indices [],}
(dlv) 

 dlv exec ./sample-controller --  --kubeconfig=$HOME/.kube/config 

深入浅出 kubernetes 之 WorkQueue 详解

kubernets controller 和 CRD 具体组件分析

深入k8s:Informer使用及其源码分析

client-go 中的 informer 源码分析