fix mistakenly cache federatedworkspace in non multicluster env
Signed-off-by: Jeff <zw0948@gmail.com>
This commit is contained in:
@@ -60,25 +60,20 @@ const (
|
||||
controllerName = "workspacetemplate-controller"
|
||||
)
|
||||
|
||||
type Controller struct {
|
||||
k8sClient kubernetes.Interface
|
||||
ksClient kubesphere.Interface
|
||||
workspaceTemplateInformer tenantv1alpha2informers.WorkspaceTemplateInformer
|
||||
workspaceTemplateLister tenantv1alpha2listers.WorkspaceTemplateLister
|
||||
workspaceTemplateSynced cache.InformerSynced
|
||||
workspaceRoleInformer iamv1alpha2informers.WorkspaceRoleInformer
|
||||
workspaceRoleLister iamv1alpha2listers.WorkspaceRoleLister
|
||||
workspaceRoleSynced cache.InformerSynced
|
||||
roleBaseInformer iamv1alpha2informers.RoleBaseInformer
|
||||
roleBaseLister iamv1alpha2listers.RoleBaseLister
|
||||
roleBaseSynced cache.InformerSynced
|
||||
workspaceInformer tenantv1alpha1informers.WorkspaceInformer
|
||||
workspaceLister tenantv1alpha1listers.WorkspaceLister
|
||||
workspaceSynced cache.InformerSynced
|
||||
federatedWorkspaceInformer typesv1beta1informers.FederatedWorkspaceInformer
|
||||
federatedWorkspaceLister typesv1beta1listers.FederatedWorkspaceLister
|
||||
federatedWorkspaceSynced cache.InformerSynced
|
||||
multiClusterEnabled bool
|
||||
type controller struct {
|
||||
k8sClient kubernetes.Interface
|
||||
ksClient kubesphere.Interface
|
||||
workspaceTemplateLister tenantv1alpha2listers.WorkspaceTemplateLister
|
||||
workspaceTemplateSynced cache.InformerSynced
|
||||
workspaceRoleLister iamv1alpha2listers.WorkspaceRoleLister
|
||||
workspaceRoleSynced cache.InformerSynced
|
||||
roleBaseLister iamv1alpha2listers.RoleBaseLister
|
||||
roleBaseSynced cache.InformerSynced
|
||||
workspaceLister tenantv1alpha1listers.WorkspaceLister
|
||||
workspaceSynced cache.InformerSynced
|
||||
federatedWorkspaceLister typesv1beta1listers.FederatedWorkspaceLister
|
||||
federatedWorkspaceSynced cache.InformerSynced
|
||||
multiClusterEnabled bool
|
||||
// workqueue is a rate limited work queue. This is used to queue work to be
|
||||
// processed instead of performing it as soon as a change happens. This
|
||||
// means we can ensure we only process a fixed amount of resources at a
|
||||
@@ -96,38 +91,34 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
|
||||
roleBaseInformer iamv1alpha2informers.RoleBaseInformer,
|
||||
workspaceRoleInformer iamv1alpha2informers.WorkspaceRoleInformer,
|
||||
federatedWorkspaceInformer typesv1beta1informers.FederatedWorkspaceInformer,
|
||||
multiClusterEnabled bool) *Controller {
|
||||
// Create event broadcaster
|
||||
// Add sample-controller types to the default Kubernetes Scheme so Events can be
|
||||
// logged for sample-controller types.
|
||||
multiClusterEnabled bool) *controller {
|
||||
|
||||
klog.V(4).Info("Creating event broadcaster")
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(klog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})
|
||||
ctl := &Controller{
|
||||
k8sClient: k8sClient,
|
||||
ksClient: ksClient,
|
||||
workspaceTemplateInformer: workspaceTemplateInformer,
|
||||
workspaceTemplateLister: workspaceTemplateInformer.Lister(),
|
||||
workspaceTemplateSynced: workspaceTemplateInformer.Informer().HasSynced,
|
||||
workspaceInformer: workspaceInformer,
|
||||
workspaceLister: workspaceInformer.Lister(),
|
||||
workspaceSynced: workspaceInformer.Informer().HasSynced,
|
||||
workspaceRoleInformer: workspaceRoleInformer,
|
||||
workspaceRoleLister: workspaceRoleInformer.Lister(),
|
||||
workspaceRoleSynced: workspaceRoleInformer.Informer().HasSynced,
|
||||
roleBaseInformer: roleBaseInformer,
|
||||
roleBaseLister: roleBaseInformer.Lister(),
|
||||
roleBaseSynced: roleBaseInformer.Informer().HasSynced,
|
||||
federatedWorkspaceInformer: federatedWorkspaceInformer,
|
||||
federatedWorkspaceLister: federatedWorkspaceInformer.Lister(),
|
||||
federatedWorkspaceSynced: federatedWorkspaceInformer.Informer().HasSynced,
|
||||
multiClusterEnabled: multiClusterEnabled,
|
||||
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "WorkspaceTemplate"),
|
||||
recorder: recorder,
|
||||
ctl := &controller{
|
||||
k8sClient: k8sClient,
|
||||
ksClient: ksClient,
|
||||
workspaceTemplateLister: workspaceTemplateInformer.Lister(),
|
||||
workspaceTemplateSynced: workspaceTemplateInformer.Informer().HasSynced,
|
||||
workspaceLister: workspaceInformer.Lister(),
|
||||
workspaceSynced: workspaceInformer.Informer().HasSynced,
|
||||
workspaceRoleLister: workspaceRoleInformer.Lister(),
|
||||
workspaceRoleSynced: workspaceRoleInformer.Informer().HasSynced,
|
||||
roleBaseLister: roleBaseInformer.Lister(),
|
||||
roleBaseSynced: roleBaseInformer.Informer().HasSynced,
|
||||
multiClusterEnabled: multiClusterEnabled,
|
||||
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "WorkspaceTemplate"),
|
||||
recorder: recorder,
|
||||
}
|
||||
|
||||
if multiClusterEnabled {
|
||||
ctl.federatedWorkspaceLister = federatedWorkspaceInformer.Lister()
|
||||
ctl.federatedWorkspaceSynced = federatedWorkspaceInformer.Informer().HasSynced
|
||||
}
|
||||
|
||||
klog.Info("Setting up event handlers")
|
||||
workspaceTemplateInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: ctl.enqueueWorkspaceTemplate,
|
||||
@@ -139,7 +130,7 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
|
||||
return ctl
|
||||
}
|
||||
|
||||
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
|
||||
func (c *controller) Run(threadiness int, stopCh <-chan struct{}) error {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.workqueue.ShutDown()
|
||||
|
||||
@@ -170,7 +161,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) enqueueWorkspaceTemplate(obj interface{}) {
|
||||
func (c *controller) enqueueWorkspaceTemplate(obj interface{}) {
|
||||
var key string
|
||||
var err error
|
||||
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
|
||||
@@ -180,51 +171,31 @@ func (c *Controller) enqueueWorkspaceTemplate(obj interface{}) {
|
||||
c.workqueue.Add(key)
|
||||
}
|
||||
|
||||
func (c *Controller) runWorker() {
|
||||
func (c *controller) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) processNextWorkItem() bool {
|
||||
func (c *controller) processNextWorkItem() bool {
|
||||
obj, shutdown := c.workqueue.Get()
|
||||
|
||||
if shutdown {
|
||||
return false
|
||||
}
|
||||
|
||||
// We wrap this block in a func so we can defer c.workqueue.Done.
|
||||
err := func(obj interface{}) error {
|
||||
// We call Done here so the workqueue knows we have finished
|
||||
// processing this item. We also must remember to call Forget if we
|
||||
// do not want this work item being re-queued. For example, we do
|
||||
// not call Forget if a transient error occurs, instead the item is
|
||||
// put back on the workqueue and attempted again after a back-off
|
||||
// period.
|
||||
defer c.workqueue.Done(obj)
|
||||
var key string
|
||||
var ok bool
|
||||
// We expect strings to come off the workqueue. These are of the
|
||||
// form namespace/name. We do this as the delayed nature of the
|
||||
// workqueue means the items in the informer cache may actually be
|
||||
// more up to date that when the item was initially put onto the
|
||||
// workqueue.
|
||||
if key, ok = obj.(string); !ok {
|
||||
// As the item in the workqueue is actually invalid, we call
|
||||
// Forget here else we'd go into a loop of attempting to
|
||||
// process a work item that is invalid.
|
||||
c.workqueue.Forget(obj)
|
||||
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
|
||||
return nil
|
||||
}
|
||||
// Run the reconcile, passing it the namespace/name string of the
|
||||
// Foo resource to be synced.
|
||||
if err := c.reconcile(key); err != nil {
|
||||
// Put the item back on the workqueue to handle any transient errors.
|
||||
c.workqueue.AddRateLimited(key)
|
||||
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
|
||||
}
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get queued again until another change happens.
|
||||
c.workqueue.Forget(obj)
|
||||
klog.Infof("Successfully synced %s:%s", "key", key)
|
||||
return nil
|
||||
@@ -241,7 +212,7 @@ func (c *Controller) processNextWorkItem() bool {
|
||||
// syncHandler compares the actual state with the desired, and attempts to
|
||||
// converge the two. It then updates the Status block of the Foo resource
|
||||
// with the current status of the resource.
|
||||
func (c *Controller) reconcile(key string) error {
|
||||
func (c *controller) reconcile(key string) error {
|
||||
workspaceTemplate, err := c.workspaceTemplateLister.Get(key)
|
||||
if err != nil {
|
||||
// The user may no longer exist, in which case we stop
|
||||
@@ -280,11 +251,11 @@ func (c *Controller) reconcile(key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) Start(stopCh <-chan struct{}) error {
|
||||
func (c *controller) Start(stopCh <-chan struct{}) error {
|
||||
return c.Run(4, stopCh)
|
||||
}
|
||||
|
||||
func (c *Controller) multiClusterSync(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
func (c *controller) multiClusterSync(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
// multi cluster environment, synchronize workspaces with kubefed
|
||||
federatedWorkspace, err := c.federatedWorkspaceLister.Get(workspaceTemplate.Name)
|
||||
if err != nil {
|
||||
@@ -298,7 +269,7 @@ func (c *Controller) multiClusterSync(workspaceTemplate *tenantv1alpha2.Workspac
|
||||
// update spec
|
||||
if !reflect.DeepEqual(federatedWorkspace.Spec, workspaceTemplate.Spec) {
|
||||
federatedWorkspace.Spec = workspaceTemplate.Spec
|
||||
if err := c.updateFederatedWorkspace(federatedWorkspace); err != nil {
|
||||
if err = c.updateFederatedWorkspace(federatedWorkspace); err != nil {
|
||||
klog.Error(err)
|
||||
return err
|
||||
}
|
||||
@@ -307,7 +278,7 @@ func (c *Controller) multiClusterSync(workspaceTemplate *tenantv1alpha2.Workspac
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) createFederatedWorkspace(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
func (c *controller) createFederatedWorkspace(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
federatedWorkspace := &typesv1beta1.FederatedWorkspace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: workspaceTemplate.Name,
|
||||
@@ -330,7 +301,7 @@ func (c *Controller) createFederatedWorkspace(workspaceTemplate *tenantv1alpha2.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) sync(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
func (c *controller) sync(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
workspace, err := c.workspaceLister.Get(workspaceTemplate.Name)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
@@ -355,7 +326,7 @@ func (c *Controller) sync(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) createWorkspace(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
func (c *controller) createWorkspace(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
workspace := &tenantv1alpha1.Workspace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: workspaceTemplate.Name,
|
||||
@@ -382,7 +353,7 @@ func (c *Controller) createWorkspace(workspaceTemplate *tenantv1alpha2.Workspace
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) updateWorkspace(workspace *tenantv1alpha1.Workspace) error {
|
||||
func (c *controller) updateWorkspace(workspace *tenantv1alpha1.Workspace) error {
|
||||
_, err := c.ksClient.TenantV1alpha1().Workspaces().Update(workspace)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
@@ -391,8 +362,8 @@ func (c *Controller) updateWorkspace(workspace *tenantv1alpha1.Workspace) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Controller) initRoles(workspace *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
roleBases, err := r.roleBaseLister.List(labels.Everything())
|
||||
func (c *controller) initRoles(workspace *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
roleBases, err := c.roleBaseLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return err
|
||||
@@ -407,10 +378,10 @@ func (r *Controller) initRoles(workspace *tenantv1alpha2.WorkspaceTemplate) erro
|
||||
// make sure workspace label always exist
|
||||
role.Labels[tenantv1alpha1.WorkspaceLabel] = workspace.Name
|
||||
role.Name = roleName
|
||||
old, err := r.workspaceRoleLister.Get(roleName)
|
||||
old, err := c.workspaceRoleLister.Get(roleName)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
_, err = r.ksClient.IamV1alpha2().WorkspaceRoles().Create(&role)
|
||||
_, err = c.ksClient.IamV1alpha2().WorkspaceRoles().Create(&role)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return err
|
||||
@@ -425,7 +396,7 @@ func (r *Controller) initRoles(workspace *tenantv1alpha2.WorkspaceTemplate) erro
|
||||
updated.Labels = role.Labels
|
||||
updated.Annotations = role.Annotations
|
||||
updated.Rules = role.Rules
|
||||
_, err = r.ksClient.IamV1alpha2().WorkspaceRoles().Update(updated)
|
||||
_, err = c.ksClient.IamV1alpha2().WorkspaceRoles().Update(updated)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return err
|
||||
@@ -436,25 +407,25 @@ func (r *Controller) initRoles(workspace *tenantv1alpha2.WorkspaceTemplate) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Controller) resetWorkspaceOwner(workspace *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
func (c *controller) resetWorkspaceOwner(workspace *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
workspace = workspace.DeepCopy()
|
||||
workspace.Spec.Template.Spec.Manager = ""
|
||||
_, err := r.ksClient.TenantV1alpha2().WorkspaceTemplates().Update(workspace)
|
||||
_, err := c.ksClient.TenantV1alpha2().WorkspaceTemplates().Update(workspace)
|
||||
klog.V(4).Infof("update workspace after manager has been deleted")
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Controller) initManagerRoleBinding(workspace *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
func (c *controller) initManagerRoleBinding(workspace *tenantv1alpha2.WorkspaceTemplate) error {
|
||||
manager := workspace.Spec.Template.Spec.Manager
|
||||
if manager == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
user, err := r.ksClient.IamV1alpha2().Users().Get(manager, metav1.GetOptions{})
|
||||
user, err := c.ksClient.IamV1alpha2().Users().Get(manager, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
// skip if user has been deleted
|
||||
if errors.IsNotFound(err) {
|
||||
return r.resetWorkspaceOwner(workspace)
|
||||
return c.resetWorkspaceOwner(workspace)
|
||||
}
|
||||
klog.Error(err)
|
||||
return err
|
||||
@@ -462,7 +433,7 @@ func (r *Controller) initManagerRoleBinding(workspace *tenantv1alpha2.WorkspaceT
|
||||
|
||||
// skip if user has been deleted
|
||||
if !user.DeletionTimestamp.IsZero() {
|
||||
return r.resetWorkspaceOwner(workspace)
|
||||
return c.resetWorkspaceOwner(workspace)
|
||||
}
|
||||
|
||||
workspaceAdminRoleName := fmt.Sprintf(iamv1alpha2.WorkspaceAdminFormat, workspace.Name)
|
||||
@@ -487,7 +458,7 @@ func (r *Controller) initManagerRoleBinding(workspace *tenantv1alpha2.WorkspaceT
|
||||
},
|
||||
},
|
||||
}
|
||||
_, err = r.ksClient.IamV1alpha2().WorkspaceRoleBindings().Create(managerRoleBinding)
|
||||
_, err = c.ksClient.IamV1alpha2().WorkspaceRoleBindings().Create(managerRoleBinding)
|
||||
if err != nil {
|
||||
if errors.IsAlreadyExists(err) {
|
||||
return nil
|
||||
@@ -499,7 +470,7 @@ func (r *Controller) initManagerRoleBinding(workspace *tenantv1alpha2.WorkspaceT
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) updateFederatedWorkspace(workspace *typesv1beta1.FederatedWorkspace) error {
|
||||
func (c *controller) updateFederatedWorkspace(workspace *typesv1beta1.FederatedWorkspace) error {
|
||||
_, err := c.ksClient.TypesV1beta1().FederatedWorkspaces().Update(workspace)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
|
||||
Reference in New Issue
Block a user