diff --git a/pkg/controller/group/group_controller.go b/pkg/controller/group/group_controller.go index 03c4e4694..57b9e2e96 100644 --- a/pkg/controller/group/group_controller.go +++ b/pkg/controller/group/group_controller.go @@ -18,7 +18,6 @@ package group import ( "fmt" - "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -26,7 +25,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -38,6 +36,7 @@ import ( kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned" iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2" iamv1alpha1listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2" + "kubesphere.io/kubesphere/pkg/controller/utils/controller" "kubesphere.io/kubesphere/pkg/utils/sliceutil" ) @@ -49,13 +48,12 @@ const ( ) type Controller struct { + controller.BaseController scheme *runtime.Scheme k8sClient kubernetes.Interface ksClient kubesphere.Interface groupInformer iamv1alpha2informers.GroupInformer groupLister iamv1alpha1listers.GroupLister - groupSynced cache.InformerSynced - workqueue workqueue.RateLimitingInterface recorder record.EventRecorder } @@ -66,95 +64,33 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}) ctl := &Controller{ + BaseController: controller.BaseController{ + Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Group"), + Synced: []cache.InformerSynced{groupInformer.Informer().HasSynced}, + Name: controllerName, + }, + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}), k8sClient: k8sClient, ksClient: ksClient, groupInformer: groupInformer, groupLister: groupInformer.Lister(), - groupSynced: groupInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Group"), - recorder: recorder, } + ctl.Handler = ctl.reconcile + klog.Info("Setting up event handlers") groupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: ctl.enqueueGroup, + AddFunc: ctl.Enqueue, UpdateFunc: func(old, new interface{}) { - ctl.enqueueGroup(new) + ctl.Enqueue(new) }, - DeleteFunc: ctl.enqueueGroup, + DeleteFunc: ctl.Enqueue, }) return ctl } -func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer utilruntime.HandleCrash() - defer c.workqueue.ShutDown() - - klog.Info("Starting Group controller") - klog.Info("Waiting for informer caches to sync") - synced := []cache.InformerSynced{c.groupSynced} - if ok := cache.WaitForCacheSync(stopCh, synced...); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - klog.Info("Starting workers") - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - klog.Info("Started workers") - <-stopCh - klog.Info("Shutting down workers") - return nil -} - -func (c *Controller) enqueueGroup(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - utilruntime.HandleError(err) - return - } - c.workqueue.Add(key) -} - -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - err := func(obj interface{}) error { - defer c.workqueue.Done(obj) - var key string - var ok bool - - if key, ok = obj.(string); !ok { - c.workqueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } - if err := c.reconcile(key); err != nil { - c.workqueue.AddRateLimited(key) - return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) - } - c.workqueue.Forget(obj) - klog.Infof("Successfully synced %s:%s", "key", key) - return nil - }(obj) - - if err != nil { - utilruntime.HandleError(err) - return true - } - - return true +func (c *Controller) Start(stopCh <-chan struct{}) error { + return c.Run(1, stopCh) } // reconcile handles Group informer events, clear up related reource when group is being deleted. @@ -207,10 +143,6 @@ func (c *Controller) reconcile(key string) error { return nil } -func (c *Controller) Start(stopCh <-chan struct{}) error { - return c.Run(1, stopCh) -} - func (c *Controller) deleteGroupBindings(group *iam1alpha2.Group) error { // Groupbindings that created by kubeshpere will be deleted directly. diff --git a/pkg/controller/group/group_controller_test.go b/pkg/controller/group/group_controller_test.go index 1935005cd..facac217c 100644 --- a/pkg/controller/group/group_controller_test.go +++ b/pkg/controller/group/group_controller_test.go @@ -90,7 +90,6 @@ func (f *fixture) newController() (*Controller, ksinformers.SharedInformerFactor c := NewController(f.k8sclient, f.ksclient, ksinformers.Iam().V1alpha2().Groups()) - c.groupSynced = alwaysReady c.recorder = &record.FakeRecorder{} return c, ksinformers, k8sinformers @@ -113,7 +112,7 @@ func (f *fixture) runController(group string, startInformers bool, expectError b k8sI.Start(stopCh) } - err := c.reconcile(group) + err := c.Handler(group) if !expectError && err != nil { f.t.Errorf("error syncing group: %v", err) } else if expectError && err == nil { diff --git a/pkg/controller/groupbinding/groupbinding_controller.go b/pkg/controller/groupbinding/groupbinding_controller.go index 0cc050db9..ee0bc0f3f 100644 --- a/pkg/controller/groupbinding/groupbinding_controller.go +++ b/pkg/controller/groupbinding/groupbinding_controller.go @@ -18,14 +18,12 @@ package groupbinding import ( "fmt" - "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -37,6 +35,7 @@ import ( kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned" iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2" iamv1alpha2listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2" + "kubesphere.io/kubesphere/pkg/controller/utils/controller" "kubesphere.io/kubesphere/pkg/utils/sliceutil" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -49,13 +48,12 @@ const ( ) type Controller struct { + controller.BaseController scheme *runtime.Scheme k8sClient kubernetes.Interface ksClient kubesphere.Interface groupBindingInformer iamv1alpha2informers.GroupBindingInformer groupBindingLister iamv1alpha2listers.GroupBindingLister - groupBindingSynced cache.InformerSynced - workqueue workqueue.RateLimitingInterface recorder record.EventRecorder } @@ -67,97 +65,29 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}) ctl := &Controller{ + BaseController: controller.BaseController{ + Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "GroupBinding"), + Synced: []cache.InformerSynced{groupBindingInformer.Informer().HasSynced}, + Name: controllerName, + }, k8sClient: k8sClient, ksClient: ksClient, groupBindingInformer: groupBindingInformer, groupBindingLister: groupBindingInformer.Lister(), - groupBindingSynced: groupBindingInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "GroupBinding"), recorder: recorder, } + ctl.Handler = ctl.reconcile klog.Info("Setting up event handlers") groupBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: ctl.enqueueGroupBinding, + AddFunc: ctl.Enqueue, UpdateFunc: func(old, new interface{}) { - ctl.enqueueGroupBinding(new) + ctl.Enqueue(new) }, - DeleteFunc: ctl.enqueueGroupBinding, + DeleteFunc: ctl.Enqueue, }) return ctl } -func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { - defer utilruntime.HandleCrash() - defer c.workqueue.ShutDown() - - klog.Info("Starting GroupBinding controller") - klog.Info("Waiting for informer caches to sync") - - synced := []cache.InformerSynced{c.groupBindingSynced} - - if ok := cache.WaitForCacheSync(stopCh, synced...); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - klog.Info("Starting workers") - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - klog.Info("Started workers") - <-stopCh - klog.Info("Shutting down workers") - return nil -} - -func (c *Controller) enqueueGroupBinding(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - utilruntime.HandleError(err) - return - } - c.workqueue.Add(key) -} - -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - err := func(obj interface{}) error { - defer c.workqueue.Done(obj) - var key string - var ok bool - if key, ok = obj.(string); !ok { - c.workqueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } - if err := c.reconcile(key); err != nil { - c.workqueue.AddRateLimited(key) - return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) - } - c.workqueue.Forget(obj) - klog.Infof("Successfully synced %s:%s", "key", key) - return nil - }(obj) - - if err != nil { - utilruntime.HandleError(err) - return true - } - - return true -} - // reconcile handles GroupBinding informer events, it updates user's Groups property with the current GroupBinding. func (c *Controller) reconcile(key string) error { diff --git a/pkg/controller/groupbinding/groupbinding_controller_test.go b/pkg/controller/groupbinding/groupbinding_controller_test.go index 699849a0f..91fcc5c00 100644 --- a/pkg/controller/groupbinding/groupbinding_controller_test.go +++ b/pkg/controller/groupbinding/groupbinding_controller_test.go @@ -117,7 +117,7 @@ func (f *fixture) newController() (*Controller, ksinformers.SharedInformerFactor c := NewController(f.k8sclient, f.ksclient, ksinformers.Iam().V1alpha2().GroupBindings()) - c.groupBindingSynced = alwaysReady + c.Synced = []cache.InformerSynced{alwaysReady} c.recorder = &record.FakeRecorder{} return c, ksinformers, k8sinformers diff --git a/pkg/controller/utils/controller/basecontroller.go b/pkg/controller/utils/controller/basecontroller.go new file mode 100644 index 000000000..823d02b35 --- /dev/null +++ b/pkg/controller/utils/controller/basecontroller.go @@ -0,0 +1,127 @@ +/* +Copyright 2020 KubeSphere Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +// BaseController provides a Controller template for watching a primary resources that defined as CRD. +type BaseController struct { + // Workers will wait informer caches to be synced + Synced []cache.InformerSynced + // Workqueue is a rate limited work queue. + Workqueue workqueue.RateLimitingInterface + Handler func(key string) error + MaxRetries int + Name string +} + +// Run will set up the event handlers for Primary resource, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (c *BaseController) Run(threadiness int, stopCh <-chan struct{}) error { + defer utilruntime.HandleCrash() + defer c.Workqueue.ShutDown() + + klog.Infof("Starting controller: %s", c.Name) + klog.Infof("Waiting for informer caches to sync for: %s", c.Name) + if ok := cache.WaitForCacheSync(stopCh, c.Synced...); !ok { + return fmt.Errorf("failed to wait for caches to sync for: %s", c.Name) + } + + klog.Infof("Starting workers for: %s", c.Name) + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + klog.Infof("Started workers for: %s", c.Name) + <-stopCh + klog.Infof("Shutting down workers for: %s", c.Name) + return nil +} + +// Enqueue takes a primary resource and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than primary resource. +func (c *BaseController) Enqueue(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + c.Workqueue.Add(key) +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (c *BaseController) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the Handler. +func (c *BaseController) processNextWorkItem() bool { + obj, shutdown := c.Workqueue.Get() + + if shutdown { + return false + } + err := func(obj interface{}) error { + defer c.Workqueue.Done(obj) + var key string + var ok bool + + if key, ok = obj.(string); !ok { + c.Workqueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in Workqueue but got %#v in %s", obj, c.Name)) + return nil + } + if err := c.Handler(key); err != nil { + // Put the item back on the workqueue to handle any transient errors, + // when the max retries haven't reached or there is no retry times limit. + if c.MaxRetries == 0 || c.Workqueue.NumRequeues(key) < c.MaxRetries { + c.Workqueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s' in %s: %s, requeuing ", key, c.Name, err.Error()) + } + klog.V(4).Infof("Dropping %s out of the queue in %s: %s", key, c.Name, err) + utilruntime.HandleError(err) + return nil + } + c.Workqueue.Forget(obj) + klog.Infof("Successfully Synced %s:%s in %s", "key", key, c.Name) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} diff --git a/pkg/controller/utils/controller/basecontroller_test.go b/pkg/controller/utils/controller/basecontroller_test.go new file mode 100644 index 000000000..ceb7234b0 --- /dev/null +++ b/pkg/controller/utils/controller/basecontroller_test.go @@ -0,0 +1,145 @@ +/* +Copyright 2020 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +var ( + alwaysReady = func() bool { return true } + noResyncPeriodFunc = func() time.Duration { return 0 } + controllerName = "base-controler-test" +) + +type fixture struct { + t *testing.T + stopCh chan struct{} + BaseController + handleTimes int +} +type fakeObj struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` +} + +func (in *fakeObj) DeepCopyInto(out *fakeObj) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GlobalRole. +func (in *fakeObj) DeepCopy() *fakeObj { + if in == nil { + return nil + } + out := new(fakeObj) + in.DeepCopyInto(out) + return out +} + +func (in *fakeObj) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +func newFixture(t *testing.T, retryTimes int) *fixture { + f := &fixture{} + f.t = t + f.stopCh = make(chan struct{}) + f.BaseController = BaseController{ + Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Group"), + Synced: []cache.InformerSynced{alwaysReady}, + Name: controllerName, + } + f.MaxRetries = retryTimes + return f +} + +func (f *fixture) reconcile(key string) error { + f.handleTimes++ + f.t.Logf("Current key is %s", key) + f.stopCh <- struct{}{} + return nil +} + +func (f *fixture) retryreconcile(key string) error { + f.handleTimes++ + f.t.Logf("Current key is %s", key) + if f.Workqueue.NumRequeues(key) == 2 { + defer func(f *fixture) { f.stopCh <- struct{}{} }(f) + } + err := fmt.Errorf("retry times: %d", f.Workqueue.NumRequeues(key)) + return err +} + +func createFakeobj() metav1.Object { + + var obj metav1.Object + + fake := fakeObj{ + ObjectMeta: metav1.ObjectMeta{Name: "Hello"}, + TypeMeta: metav1.TypeMeta{}, + } + obj = &fake + return obj +} + +func TestDequeue(t *testing.T) { + f := newFixture(t, 0) + f.Handler = f.reconcile + + go f.Run(1, f.stopCh) + + obj := createFakeobj() + f.Enqueue(obj) + + <-f.stopCh + + if f.handleTimes != 1 { + t.Error("Failed to call the handler!") + } + +} + +func TestRetry(t *testing.T) { + f := newFixture(t, 2) + f.Handler = f.retryreconcile + + go f.Run(1, f.stopCh) + + obj := createFakeobj() + f.Enqueue(obj) + + <-f.stopCh + + if f.handleTimes != f.MaxRetries+1 { + t.Error("Failed to call the handler!") + } + +}