diff --git a/cmd/controller-manager/app/controllers.go b/cmd/controller-manager/app/controllers.go index aabcb9546..973bd800f 100644 --- a/cmd/controller-manager/app/controllers.go +++ b/cmd/controller-manager/app/controllers.go @@ -31,6 +31,8 @@ import ( "kubesphere.io/kubesphere/pkg/controller/devopsproject" "kubesphere.io/kubesphere/pkg/controller/globalrole" "kubesphere.io/kubesphere/pkg/controller/globalrolebinding" + "kubesphere.io/kubesphere/pkg/controller/group" + "kubesphere.io/kubesphere/pkg/controller/groupbinding" "kubesphere.io/kubesphere/pkg/controller/job" "kubesphere.io/kubesphere/pkg/controller/network/ippool" "kubesphere.io/kubesphere/pkg/controller/network/nsnetworkpolicy" @@ -258,6 +260,12 @@ func addControllers( kubesphereInformer.Types().V1beta1().FederatedWorkspaces(), multiClusterEnabled) + groupBindingController := groupbinding.NewController(client.Kubernetes(), client.KubeSphere(), + kubesphereInformer.Iam().V1alpha2().GroupBindings()) + + groupController := group.NewController(client.Kubernetes(), client.KubeSphere(), + kubesphereInformer.Iam().V1alpha2().Groups()) + var clusterController manager.Runnable if multiClusterEnabled { clusterController = cluster.NewClusterController( @@ -319,6 +327,8 @@ func addControllers( "workspacerole-controller": workspaceRoleController, "workspacerolebinding-controller": workspaceRoleBindingController, "ippool-controller": ippoolController, + "groupbinding-controller": groupBindingController, + "group-controller": groupController, } if devopsClient != nil { diff --git a/pkg/controller/group/group_controller.go b/pkg/controller/group/group_controller.go new file mode 100644 index 000000000..02886be4f --- /dev/null +++ b/pkg/controller/group/group_controller.go @@ -0,0 +1,307 @@ +/* +Copyright 2020 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package group + +import ( + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" + iam1alpha2 "kubesphere.io/kubesphere/pkg/apis/iam/v1alpha2" + kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned" + iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2" + iamv1alpha1listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2" + "kubesphere.io/kubesphere/pkg/utils/sliceutil" +) + +const ( + // SuccessSynced is used as part of the Event 'reason' when a Foo is synced + successSynced = "Synced" + // is synced successfully + messageResourceSynced = "Group synced successfully" + controllerName = "groupbinding-controller" + finalizer = "finalizers.kubesphere.io/groups" +) + +type Controller struct { + scheme *runtime.Scheme + k8sClient kubernetes.Interface + ksClient kubesphere.Interface + groupInformer iamv1alpha2informers.GroupInformer + groupLister iamv1alpha1listers.GroupLister + groupSynced cache.InformerSynced + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder +} + +// NewController creates Group Controller instance +func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface, groupInformer iamv1alpha2informers.GroupInformer) *Controller { + // Create event broadcaster + // Add sample-controller types to the default Kubernetes Scheme so Events can be + // logged for sample-controller types. + + klog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}) + ctl := &Controller{ + k8sClient: k8sClient, + ksClient: ksClient, + groupInformer: groupInformer, + groupLister: groupInformer.Lister(), + groupSynced: groupInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Group"), + recorder: recorder, + } + klog.Info("Setting up event handlers") + groupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: ctl.enqueueGroup, + UpdateFunc: func(old, new interface{}) { + ctl.enqueueGroup(new) + }, + DeleteFunc: ctl.enqueueGroup, + }) + return ctl +} + +func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { + defer utilruntime.HandleCrash() + defer c.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + klog.Info("Starting Group controller") + + // Wait for the caches to be synced before starting workers + klog.Info("Waiting for informer caches to sync") + + synced := []cache.InformerSynced{c.groupSynced} + + if ok := cache.WaitForCacheSync(stopCh, synced...); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + klog.Info("Starting workers") + // Launch two workers to process Foo resources + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + klog.Info("Started workers") + <-stopCh + klog.Info("Shutting down workers") + return nil +} + +func (c *Controller) enqueueGroup(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + c.workqueue.Add(key) +} + +func (c *Controller) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *Controller) processNextWorkItem() bool { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.workqueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.workqueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the reconcile, passing it the namespace/name string of the + // Foo resource to be synced. + if err := c.reconcile(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.workqueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(obj) + klog.Infof("Successfully synced %s:%s", "key", key) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the Foo resource +// with the current status of the resource. +func (c *Controller) reconcile(key string) error { + + group, err := c.groupLister.Get(key) + if err != nil { + // The user may no longer exist, in which case we stop + // processing. + if errors.IsNotFound(err) { + utilruntime.HandleError(fmt.Errorf("group '%s' in work queue no longer exists", key)) + return nil + } + klog.Error(err) + return err + } + if group.ObjectMeta.DeletionTimestamp.IsZero() { + // The object is not being deleted, so if it does not have our finalizer, + // then lets add the finalizer and update the object. + if !sliceutil.HasString(group.Finalizers, finalizer) { + group.ObjectMeta.Finalizers = append(group.ObjectMeta.Finalizers, finalizer) + + if group, err = c.ksClient.IamV1alpha2().Groups().Update(group); err != nil { + return err + } + // Skip reconcile when group is updated. + return nil + } + } else { + // The object is being deleted + if sliceutil.HasString(group.ObjectMeta.Finalizers, finalizer) { + if err = c.deleteGroupBindings(group); err != nil { + klog.Error(err) + return err + } + + if err = c.deleteRoleBindings(group); err != nil { + klog.Error(err) + return err + } + + // remove our finalizer from the list and update it. + group.Finalizers = sliceutil.RemoveString(group.ObjectMeta.Finalizers, func(item string) bool { + return item == finalizer + }) + + if group, err = c.ksClient.IamV1alpha2().Groups().Update(group); err != nil { + return err + } + } + // Our finalizer has finished, so the reconciler can do nothing. + return nil + } + + c.recorder.Event(group, corev1.EventTypeNormal, successSynced, messageResourceSynced) + return nil +} + +func (c *Controller) Start(stopCh <-chan struct{}) error { + return c.Run(4, stopCh) +} + +func (c *Controller) deleteGroupBindings(group *iam1alpha2.Group) error { + + // Groupbindings that created by kubeshpere will be deleted directly. + listOptions := metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{iam1alpha2.GroupReferenceLabel: group.Name}).String(), + } + deleteOptions := metav1.NewDeleteOptions(0) + + if err := c.ksClient.IamV1alpha2().GroupBindings(). + DeleteCollection(deleteOptions, listOptions); err != nil { + klog.Error(err) + return err + } + return nil +} + +func (c *Controller) deleteRoleBindings(group *iam1alpha2.Group) error { + listOptions := metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{iam1alpha2.GroupReferenceLabel: group.Name}).String(), + } + deleteOptions := metav1.NewDeleteOptions(0) + + if err := c.ksClient.IamV1alpha2().WorkspaceRoleBindings(). + DeleteCollection(deleteOptions, listOptions); err != nil { + klog.Error(err) + return err + } + + if err := c.k8sClient.RbacV1().ClusterRoleBindings(). + DeleteCollection(deleteOptions, listOptions); err != nil { + klog.Error(err) + return err + } + + if result, err := c.k8sClient.CoreV1().Namespaces().List(metav1.ListOptions{}); err != nil { + klog.Error(err) + return err + } else { + for _, namespace := range result.Items { + if err = c.k8sClient.RbacV1().RoleBindings(namespace.Name).DeleteCollection(deleteOptions, listOptions); err != nil { + klog.Error(err) + return err + } + } + } + + return nil +} diff --git a/pkg/controller/group/group_controller_test.go b/pkg/controller/group/group_controller_test.go new file mode 100644 index 000000000..d7edc7a5b --- /dev/null +++ b/pkg/controller/group/group_controller_test.go @@ -0,0 +1,262 @@ +/* +Copyright 2019 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package group + +import ( + "reflect" + "testing" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/diff" + kubeinformers "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + v1alpha2 "kubesphere.io/kubesphere/pkg/apis/iam/v1alpha2" + "kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake" + ksinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + alwaysReady = func() bool { return true } + noResyncPeriodFunc = func() time.Duration { return 0 } +) + +type fixture struct { + t *testing.T + + ksclient *fake.Clientset + k8sclient *k8sfake.Clientset + // Objects to put in the store. + groupLister []*v1alpha2.Group + // Actions expected to happen on the client. + kubeactions []core.Action + actions []core.Action + // Objects from here preloaded into NewSimpleFake. + kubeobjects []runtime.Object + objects []runtime.Object +} + +func newFixture(t *testing.T) *fixture { + f := &fixture{} + f.t = t + f.objects = []runtime.Object{} + f.kubeobjects = []runtime.Object{} + return f +} + +func newGroup(name string) *v1alpha2.Group { + return &v1alpha2.Group{ + TypeMeta: metav1.TypeMeta{APIVersion: v1alpha2.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1alpha2.GroupSpec{}, + } +} + +func (f *fixture) newController() (*Controller, ksinformers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { + f.ksclient = fake.NewSimpleClientset(f.objects...) + f.k8sclient = k8sfake.NewSimpleClientset(f.kubeobjects...) + + ksinformers := ksinformers.NewSharedInformerFactory(f.ksclient, noResyncPeriodFunc()) + k8sinformers := kubeinformers.NewSharedInformerFactory(f.k8sclient, noResyncPeriodFunc()) + + for _, group := range f.groupLister { + err := ksinformers.Iam().V1alpha2().Groups().Informer().GetIndexer().Add(group) + if err != nil { + f.t.Errorf("add group:%s", err) + } + } + + c := NewController(f.k8sclient, f.ksclient, + ksinformers.Iam().V1alpha2().Groups()) + c.groupSynced = alwaysReady + c.recorder = &record.FakeRecorder{} + + return c, ksinformers, k8sinformers +} + +func (f *fixture) run(userName string) { + f.runController(userName, true, false) +} + +func (f *fixture) runExpectError(userName string) { + f.runController(userName, true, true) +} + +func (f *fixture) runController(group string, startInformers bool, expectError bool) { + c, i, k8sI := f.newController() + if startInformers { + stopCh := make(chan struct{}) + defer close(stopCh) + i.Start(stopCh) + k8sI.Start(stopCh) + } + + err := c.reconcile(group) + if !expectError && err != nil { + f.t.Errorf("error syncing group: %v", err) + } else if expectError && err == nil { + f.t.Error("expected error syncing group, got nil") + } + + actions := filterInformerActions(f.ksclient.Actions()) + for i, action := range actions { + if len(f.actions) < i+1 { + f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:]) + break + } + + expectedAction := f.actions[i] + checkAction(expectedAction, action, f.t) + } + + if len(f.actions) > len(actions) { + f.t.Errorf("%d additional expected actions:%+v", len(f.actions)-len(actions), f.actions[len(actions):]) + } + + k8sActions := filterInformerActions(f.k8sclient.Actions()) + for i, action := range k8sActions { + if len(f.kubeactions) < i+1 { + f.t.Errorf("%d unexpected actions: %+v", len(k8sActions)-len(f.kubeactions), k8sActions[i:]) + break + } + + expectedAction := f.kubeactions[i] + checkAction(expectedAction, action, f.t) + } + + if len(f.kubeactions) > len(k8sActions) { + f.t.Errorf("%d additional expected actions:%+v", len(f.kubeactions)-len(k8sActions), f.kubeactions[len(k8sActions):]) + } +} + +// checkAction verifies that expected and actual actions are equal and both have +// same attached resources +func checkAction(expected, actual core.Action, t *testing.T) { + if !(expected.Matches(actual.GetVerb(), actual.GetResource().Resource) && actual.GetSubresource() == expected.GetSubresource()) { + t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expected, actual) + return + } + + if reflect.TypeOf(actual) != reflect.TypeOf(expected) { + t.Errorf("Action has wrong type. Expected: %t. Got: %t", expected, actual) + return + } + + switch a := actual.(type) { + case core.CreateActionImpl: + e, _ := expected.(core.CreateActionImpl) + expObject := e.GetObject() + object := a.GetObject() + + if !reflect.DeepEqual(expObject, object) { + t.Errorf("Action %s %s has wrong object\nDiff:\n %s", + a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object)) + } + case core.UpdateActionImpl: + e, _ := expected.(core.UpdateActionImpl) + expObject := e.GetObject() + object := a.GetObject() + expUser := expObject.(*v1alpha2.Group) + group := object.(*v1alpha2.Group) + + if !reflect.DeepEqual(expUser, group) { + t.Errorf("Action %s %s has wrong object\nDiff:\n %s", + a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expObject, object)) + } + case core.PatchActionImpl: + e, _ := expected.(core.PatchActionImpl) + expPatch := e.GetPatch() + patch := a.GetPatch() + + if !reflect.DeepEqual(expPatch, patch) { + t.Errorf("Action %s %s has wrong patch\nDiff:\n %s", + a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expPatch, patch)) + } + default: + t.Errorf("Uncaptured Action %s %s, you should explicitly add a case to capture it", + actual.GetVerb(), actual.GetResource().Resource) + } +} + +// filterInformerActions filters list and watch actions for testing resources. +// Since list and watch don't change resource state we can filter it to lower +// nose level in our tests. +func filterInformerActions(actions []core.Action) []core.Action { + var ret []core.Action + for _, action := range actions { + if !action.Matches("update", "groups") { + continue + } + ret = append(ret, action) + } + + return ret +} + +func (f *fixture) expectUpdateGroupsFinalizerAction(group *v1alpha2.Group) { + expect := group.DeepCopy() + expect.Finalizers = []string{"finalizers.kubesphere.io/groups"} + action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "groups"}, "", expect) + f.actions = append(f.actions, action) +} + +func (f *fixture) expectUpdateGroupsDeleteAction(group *v1alpha2.Group) { + expect := group.DeepCopy() + expect.Finalizers = []string{} + action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "groups"}, "", expect) + f.actions = append(f.actions, action) +} + +func getKey(group *v1alpha2.Group, t *testing.T) string { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(group) + if err != nil { + t.Errorf("Unexpected error getting key for group %v: %v", group.Name, err) + return "" + } + return key +} + +func TestDoNothing(t *testing.T) { + f := newFixture(t) + group := newGroup("test") + + f.groupLister = append(f.groupLister, group) + f.objects = append(f.objects, group) + + f.expectUpdateGroupsFinalizerAction(group) + f.run(getKey(group, t)) + + f = newFixture(t) + + deletedGroup := group.DeepCopy() + deletedGroup.Finalizers = []string{"finalizers.kubesphere.io/groups"} + now := metav1.Now() + deletedGroup.ObjectMeta.DeletionTimestamp = &now + + f.groupLister = append(f.groupLister, deletedGroup) + f.objects = append(f.objects, deletedGroup) + f.expectUpdateGroupsDeleteAction(deletedGroup) + f.run(getKey(deletedGroup, t)) +} diff --git a/pkg/controller/groupbinding/groupbinding_controller.go b/pkg/controller/groupbinding/groupbinding_controller.go new file mode 100644 index 000000000..4f7a7776c --- /dev/null +++ b/pkg/controller/groupbinding/groupbinding_controller.go @@ -0,0 +1,324 @@ +/* +Copyright 2020 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package groupbinding + +import ( + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" + iamv1alpha2 "kubesphere.io/kubesphere/pkg/apis/iam/v1alpha2" + kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned" + iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2" + iamv1alpha2listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2" + "kubesphere.io/kubesphere/pkg/utils/sliceutil" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + // SuccessSynced is used as part of the Event 'reason' when a Foo is synced + successSynced = "Synced" + // is synced successfully + messageResourceSynced = "GroupBinding synced successfully" + controllerName = "groupbinding-controller" + finalizer = "finalizers.kubesphere.io/groupsbindings" +) + +type Controller struct { + scheme *runtime.Scheme + k8sClient kubernetes.Interface + ksClient kubesphere.Interface + groupBindingInformer iamv1alpha2informers.GroupBindingInformer + groupBindingLister iamv1alpha2listers.GroupBindingLister + groupBindingSynced cache.InformerSynced + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder +} + +// NewController creates GroupBinding Controller instance +func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface, groupBindingInformer iamv1alpha2informers.GroupBindingInformer) *Controller { + // Create event broadcaster + // Add sample-controller types to the default Kubernetes Scheme so Events can be + // logged for sample-controller types. + + klog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}) + ctl := &Controller{ + k8sClient: k8sClient, + ksClient: ksClient, + groupBindingInformer: groupBindingInformer, + groupBindingLister: groupBindingInformer.Lister(), + groupBindingSynced: groupBindingInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "GroupBinding"), + recorder: recorder, + } + klog.Info("Setting up event handlers") + groupBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: ctl.enqueueGroupBinding, + UpdateFunc: func(old, new interface{}) { + ctl.enqueueGroupBinding(new) + }, + DeleteFunc: ctl.enqueueGroupBinding, + }) + return ctl +} + +func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { + defer utilruntime.HandleCrash() + defer c.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + klog.Info("Starting GroupBinding controller") + + // Wait for the caches to be synced before starting workers + klog.Info("Waiting for informer caches to sync") + + synced := []cache.InformerSynced{c.groupBindingSynced} + + if ok := cache.WaitForCacheSync(stopCh, synced...); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + klog.Info("Starting workers") + // Launch two workers to process Foo resources + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + klog.Info("Started workers") + <-stopCh + klog.Info("Shutting down workers") + return nil +} + +func (c *Controller) enqueueGroupBinding(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + c.workqueue.Add(key) +} + +func (c *Controller) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *Controller) processNextWorkItem() bool { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.workqueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.workqueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the reconcile, passing it the namespace/name string of the + // Foo resource to be synced. + if err := c.reconcile(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.workqueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(obj) + klog.Infof("Successfully synced %s:%s", "key", key) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the Foo resource +// with the current status of the resource. +func (c *Controller) reconcile(key string) error { + + groupBinding, err := c.groupBindingLister.Get(key) + if err != nil { + // The user may no longer exist, in which case we stop + // processing. + if errors.IsNotFound(err) { + utilruntime.HandleError(fmt.Errorf("groupbinding '%s' in work queue no longer exists", key)) + return nil + } + klog.Error(err) + return err + } + if groupBinding.ObjectMeta.DeletionTimestamp.IsZero() { + + // The object is not being deleted, so if it does not have our finalizer, + // then lets add the finalizer and update the object. + if !sliceutil.HasString(groupBinding.Finalizers, finalizer) { + groupBinding.ObjectMeta.Finalizers = append(groupBinding.ObjectMeta.Finalizers, finalizer) + if groupBinding, err = c.ksClient.IamV1alpha2().GroupBindings().Update(groupBinding); err != nil { + return err + } + // Skip reconcile when groupbinding is updated. + return nil + } + } else { + // The object is being deleted + if sliceutil.HasString(groupBinding.ObjectMeta.Finalizers, finalizer) { + if err = c.bindUser(groupBinding); err != nil { + klog.Error(err) + return err + } + + // remove our finalizer from the list and update it. + groupBinding.Finalizers = sliceutil.RemoveString(groupBinding.ObjectMeta.Finalizers, func(item string) bool { + return item == finalizer + }) + + if groupBinding, err = c.ksClient.IamV1alpha2().GroupBindings().Update(groupBinding); err != nil { + return err + } + } + // Our finalizer has finished, so the reconciler can do nothing. + return nil + } + + if err = c.bindUser(groupBinding); err != nil { + klog.Error(err) + return err + } + + c.recorder.Event(groupBinding, corev1.EventTypeNormal, successSynced, messageResourceSynced) + return nil +} + +func (c *Controller) Start(stopCh <-chan struct{}) error { + return c.Run(4, stopCh) +} + +// Udpate user's Group property. So no need to query user's groups when authorizing. +func (c *Controller) bindUser(groupBinding *iamv1alpha2.GroupBinding) error { + + users := make([]string, 0) + // Ignore the user if the user if being deleted. + for _, u := range groupBinding.Users { + if user, err := c.ksClient.IamV1alpha2().Users().Get(u, metav1.GetOptions{}); err == nil && user.ObjectMeta.DeletionTimestamp.IsZero() { + users = append(users, u) + } + } + + // Nothing to do + if len(users) == 0 { + return nil + } + + // Get all GroupBindings and check whether user exists in the Group. + listOptions := metav1.ListOptions{} + groupBindingList, err := c.ksClient.IamV1alpha2().GroupBindings().List(listOptions) + if err != nil { + klog.Error(err) + return err + } + + userGroups := make(map[string][]string) + for _, item := range groupBindingList.Items { + if item.ObjectMeta.DeletionTimestamp.IsZero() { + for _, u := range users { + if sliceutil.HasString(item.Users, u) { + if userGroups[u] == nil { + userGroups[u] = make([]string, 0) + } + userGroups[u] = append(userGroups[u], item.GroupRef.Name) + } + } + } + } + for k, v := range userGroups { + if err := c.patchUser(k, v); err != nil { + if errors.IsNotFound(err) { + klog.Infof("user %s doesn't exist any more", k) + return nil + } + klog.Error(err) + return err + } + } + return nil +} + +func (c *Controller) patchUser(userName string, groups []string) error { + if user, err := c.ksClient.IamV1alpha2().Users().Get(userName, metav1.GetOptions{}); err == nil && user.ObjectMeta.DeletionTimestamp.IsZero() { + newUser := user.DeepCopy() + newUser.Spec.Groups = groups + patch := client.MergeFrom(user) + patchData, _ := patch.Data(newUser) + if _, err := c.ksClient.IamV1alpha2().Users(). + Patch(userName, patch.Type(), patchData); err != nil { + return err + } + } else { + return err + } + return nil +} diff --git a/pkg/controller/user/user_controller.go b/pkg/controller/user/user_controller.go index 8ee3ed7eb..4409d5269 100644 --- a/pkg/controller/user/user_controller.go +++ b/pkg/controller/user/user_controller.go @@ -19,6 +19,10 @@ package user import ( "encoding/json" "fmt" + "reflect" + "strconv" + "time" + "golang.org/x/crypto/bcrypt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -49,10 +53,7 @@ import ( "kubesphere.io/kubesphere/pkg/simple/client/devops" ldapclient "kubesphere.io/kubesphere/pkg/simple/client/ldap" "kubesphere.io/kubesphere/pkg/utils/sliceutil" - "reflect" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "strconv" - "time" ) const ( @@ -296,6 +297,11 @@ func (c *Controller) reconcile(key string) error { return err } + if err = c.deleteGroupBindings(user); err != nil { + klog.Error(err) + return err + } + if c.devopsClient != nil { // unassign jenkins role, unassign multiple times is allowed if err := c.unassignDevOpsAdminRole(user); err != nil { @@ -552,6 +558,22 @@ func (c *Controller) ldapSync(user *iamv1alpha2.User) error { } } +func (c *Controller) deleteGroupBindings(user *iamv1alpha2.User) error { + + // Groupbindings that created by kubeshpere will be deleted directly. + listOptions := metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{iamv1alpha2.UserReferenceLabel: user.Name}).String(), + } + deleteOptions := metav1.NewDeleteOptions(0) + + if err := c.ksClient.IamV1alpha2().GroupBindings(). + DeleteCollection(deleteOptions, listOptions); err != nil { + klog.Error(err) + return err + } + return nil +} + func (c *Controller) deleteRoleBindings(user *iamv1alpha2.User) error { listOptions := metav1.ListOptions{ LabelSelector: labels.SelectorFromSet(labels.Set{iamv1alpha2.UserReferenceLabel: user.Name}).String(),