@@ -37,7 +37,9 @@ import (
|
||||
iamv1alpha2 "kubesphere.io/kubesphere/pkg/apis/iam/v1alpha2"
|
||||
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
|
||||
iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2"
|
||||
tenantv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/tenant/v1alpha2"
|
||||
iamv1alpha2listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2"
|
||||
tenantv1alpha2listers "kubesphere.io/kubesphere/pkg/client/listers/tenant/v1alpha2"
|
||||
"kubesphere.io/kubesphere/pkg/constants"
|
||||
"reflect"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
@@ -60,6 +62,10 @@ type Controller struct {
|
||||
workspaceRoleBindingSynced cache.InformerSynced
|
||||
fedWorkspaceRoleBindingCache cache.Store
|
||||
fedWorkspaceRoleBindingCacheController cache.Controller
|
||||
workspaceTemplateInformer tenantv1alpha2informers.WorkspaceTemplateInformer
|
||||
workspaceTemplateLister tenantv1alpha2listers.WorkspaceTemplateLister
|
||||
workspaceTemplateSynced cache.InformerSynced
|
||||
multiClusterEnabled bool
|
||||
// workqueue is a rate limited work queue. This is used to queue work to be
|
||||
// processed instead of performing it as soon as a change happens. This
|
||||
// means we can ensure we only process a fixed amount of resources at a
|
||||
@@ -72,7 +78,7 @@ type Controller struct {
|
||||
}
|
||||
|
||||
func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface, workspaceRoleBindingInformer iamv1alpha2informers.WorkspaceRoleBindingInformer,
|
||||
fedWorkspaceRoleBindingCache cache.Store, fedWorkspaceRoleBindingCacheController cache.Controller) *Controller {
|
||||
fedWorkspaceRoleBindingCache cache.Store, fedWorkspaceRoleBindingCacheController cache.Controller, workspaceTemplateInformer tenantv1alpha2informers.WorkspaceTemplateInformer, multiClusterEnabled bool) *Controller {
|
||||
// Create event broadcaster
|
||||
// Add sample-controller types to the default Kubernetes Scheme so Events can be
|
||||
// logged for sample-controller types.
|
||||
@@ -90,16 +96,20 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
|
||||
workspaceRoleBindingSynced: workspaceRoleBindingInformer.Informer().HasSynced,
|
||||
fedWorkspaceRoleBindingCache: fedWorkspaceRoleBindingCache,
|
||||
fedWorkspaceRoleBindingCacheController: fedWorkspaceRoleBindingCacheController,
|
||||
workspaceTemplateInformer: workspaceTemplateInformer,
|
||||
workspaceTemplateLister: workspaceTemplateInformer.Lister(),
|
||||
workspaceTemplateSynced: workspaceTemplateInformer.Informer().HasSynced,
|
||||
multiClusterEnabled: multiClusterEnabled,
|
||||
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "WorkspaceRoleBinding"),
|
||||
recorder: recorder,
|
||||
}
|
||||
klog.Info("Setting up event handlers")
|
||||
workspaceRoleBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: ctl.enqueueClusterRoleBinding,
|
||||
AddFunc: ctl.enqueueWorkspaceRoleBinding,
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
ctl.enqueueClusterRoleBinding(new)
|
||||
ctl.enqueueWorkspaceRoleBinding(new)
|
||||
},
|
||||
DeleteFunc: ctl.enqueueClusterRoleBinding,
|
||||
DeleteFunc: ctl.enqueueWorkspaceRoleBinding,
|
||||
})
|
||||
return ctl
|
||||
}
|
||||
@@ -114,7 +124,13 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
|
||||
// Wait for the caches to be synced before starting workers
|
||||
klog.Info("Waiting for informer caches to sync")
|
||||
|
||||
if ok := cache.WaitForCacheSync(stopCh, c.workspaceRoleBindingSynced, c.fedWorkspaceRoleBindingCacheController.HasSynced); !ok {
|
||||
synced := make([]cache.InformerSynced, 0)
|
||||
synced = append(synced, c.workspaceRoleBindingSynced, c.workspaceTemplateSynced)
|
||||
if c.multiClusterEnabled {
|
||||
synced = append(synced, c.fedWorkspaceRoleBindingCacheController.HasSynced)
|
||||
}
|
||||
|
||||
if ok := cache.WaitForCacheSync(stopCh, synced...); !ok {
|
||||
return fmt.Errorf("failed to wait for caches to sync")
|
||||
}
|
||||
|
||||
@@ -130,7 +146,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) enqueueClusterRoleBinding(obj interface{}) {
|
||||
func (c *Controller) enqueueWorkspaceRoleBinding(obj interface{}) {
|
||||
var key string
|
||||
var err error
|
||||
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
|
||||
@@ -215,11 +231,18 @@ func (c *Controller) reconcile(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.multiClusterSync(workspaceRoleBinding); err != nil {
|
||||
if err = c.bindWorkspace(workspaceRoleBinding); err != nil {
|
||||
klog.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
if c.multiClusterEnabled {
|
||||
if err = c.multiClusterSync(workspaceRoleBinding); err != nil {
|
||||
klog.Error(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c.recorder.Event(workspaceRoleBinding, corev1.EventTypeNormal, successSynced, messageResourceSynced)
|
||||
return nil
|
||||
}
|
||||
@@ -228,6 +251,40 @@ func (c *Controller) Start(stopCh <-chan struct{}) error {
|
||||
return c.Run(4, stopCh)
|
||||
}
|
||||
|
||||
func (c *Controller) bindWorkspace(workspaceRoleBinding *iamv1alpha2.WorkspaceRoleBinding) error {
|
||||
|
||||
workspaceName := workspaceRoleBinding.Labels[constants.WorkspaceLabelKey]
|
||||
|
||||
if workspaceName == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
workspace, err := c.workspaceTemplateLister.Get(workspaceName)
|
||||
|
||||
if err != nil {
|
||||
// skip if workspace not found
|
||||
if errors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
klog.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
if !metav1.IsControlledBy(workspaceRoleBinding, workspace) {
|
||||
workspaceRoleBinding.OwnerReferences = nil
|
||||
if err := controllerutil.SetControllerReference(workspace, workspaceRoleBinding, scheme.Scheme); err != nil {
|
||||
klog.Error(err)
|
||||
return err
|
||||
}
|
||||
_, err = c.ksClient.IamV1alpha2().WorkspaceRoleBindings().Update(workspaceRoleBinding)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) multiClusterSync(workspaceRoleBinding *iamv1alpha2.WorkspaceRoleBinding) error {
|
||||
|
||||
if err := c.ensureNotControlledByKubefed(workspaceRoleBinding); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user