fix workspacetemplate patch API not working

Signed-off-by: hongming <talonwan@yunify.com>
This commit is contained in:
hongming
2020-08-04 00:03:51 +08:00
parent fd4790a64f
commit e8c1acdcf3
19 changed files with 136 additions and 475 deletions

View File

@@ -94,11 +94,11 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
}
klog.Info("Setting up event handlers")
globalRoleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctl.enqueueClusterRole,
AddFunc: ctl.enqueueGlobalRole,
UpdateFunc: func(old, new interface{}) {
ctl.enqueueClusterRole(new)
ctl.enqueueGlobalRole(new)
},
DeleteFunc: ctl.enqueueClusterRole,
DeleteFunc: ctl.enqueueGlobalRole,
})
return ctl
}
@@ -129,7 +129,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
return nil
}
func (c *Controller) enqueueClusterRole(obj interface{}) {
func (c *Controller) enqueueGlobalRole(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {

View File

@@ -293,14 +293,6 @@ func (r *ReconcileNamespace) initRoles(namespace *corev1.Namespace) error {
return nil
}
func (r *ReconcileNamespace) resetNamespaceOwner(namespace *corev1.Namespace) error {
namespace = namespace.DeepCopy()
delete(namespace.Annotations, constants.CreatorAnnotationKey)
err := r.Update(context.Background(), namespace)
klog.V(4).Infof("update namespace after creator has been deleted")
return err
}
func (r *ReconcileNamespace) initCreatorRoleBinding(namespace *corev1.Namespace) error {
creator := namespace.Annotations[constants.CreatorAnnotationKey]
if creator == "" {
@@ -308,21 +300,14 @@ func (r *ReconcileNamespace) initCreatorRoleBinding(namespace *corev1.Namespace)
}
var user iamv1alpha2.User
err := r.Get(context.Background(), types.NamespacedName{Name: creator}, &user)
if err != nil {
// skip if user has been deleted
if err := r.Get(context.Background(), types.NamespacedName{Name: creator}, &user); err != nil {
if errors.IsNotFound(err) {
return r.resetNamespaceOwner(namespace)
return nil
}
klog.Error(err)
return err
}
// skip if user has been deleted
if !user.DeletionTimestamp.IsZero() {
return r.resetNamespaceOwner(namespace)
}
creatorRoleBinding := &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", creator, iamv1alpha2.NamespaceAdmin),
@@ -342,8 +327,8 @@ func (r *ReconcileNamespace) initCreatorRoleBinding(namespace *corev1.Namespace)
},
},
}
err = r.Client.Create(context.Background(), creatorRoleBinding)
if err != nil {
if err := r.Client.Create(context.Background(), creatorRoleBinding); err != nil {
if errors.IsAlreadyExists(err) {
return nil
}

View File

@@ -105,11 +105,11 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
}
klog.Info("Setting up event handlers")
workspaceRoleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctl.enqueueClusterRole,
AddFunc: ctl.enqueueWorkspaceRole,
UpdateFunc: func(old, new interface{}) {
ctl.enqueueClusterRole(new)
ctl.enqueueWorkspaceRole(new)
},
DeleteFunc: ctl.enqueueClusterRole,
DeleteFunc: ctl.enqueueWorkspaceRole,
})
return ctl
}
@@ -146,7 +146,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
return nil
}
func (c *Controller) enqueueClusterRole(obj interface{}) {
func (c *Controller) enqueueWorkspaceRole(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {

View File

@@ -18,15 +18,12 @@ package workspacetemplate
import (
"bytes"
"encoding/json"
"fmt"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/util/yaml"
@@ -40,13 +37,16 @@ import (
iamv1alpha2 "kubesphere.io/kubesphere/pkg/apis/iam/v1alpha2"
tenantv1alpha1 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha1"
tenantv1alpha2 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha2"
typesv1beta1 "kubesphere.io/kubesphere/pkg/apis/types/v1beta1"
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2"
tenantv1alpha1informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/tenant/v1alpha1"
tenantv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/tenant/v1alpha2"
typesv1beta1informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/types/v1beta1"
iamv1alpha2listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2"
tenantv1alpha1listers "kubesphere.io/kubesphere/pkg/client/listers/tenant/v1alpha1"
tenantv1alpha2listers "kubesphere.io/kubesphere/pkg/client/listers/tenant/v1alpha2"
typesv1beta1listers "kubesphere.io/kubesphere/pkg/client/listers/types/v1beta1"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"time"
@@ -61,23 +61,24 @@ const (
)
type Controller struct {
k8sClient kubernetes.Interface
ksClient kubesphere.Interface
workspaceTemplateInformer tenantv1alpha2informers.WorkspaceTemplateInformer
workspaceTemplateLister tenantv1alpha2listers.WorkspaceTemplateLister
workspaceTemplateSynced cache.InformerSynced
workspaceRoleInformer iamv1alpha2informers.WorkspaceRoleInformer
workspaceRoleLister iamv1alpha2listers.WorkspaceRoleLister
workspaceRoleSynced cache.InformerSynced
roleBaseInformer iamv1alpha2informers.RoleBaseInformer
roleBaseLister iamv1alpha2listers.RoleBaseLister
roleBaseSynced cache.InformerSynced
workspaceInformer tenantv1alpha1informers.WorkspaceInformer
workspaceLister tenantv1alpha1listers.WorkspaceLister
workspaceSynced cache.InformerSynced
fedWorkspaceCache cache.Store
fedWorkspaceCacheController cache.Controller
multiClusterEnabled bool
k8sClient kubernetes.Interface
ksClient kubesphere.Interface
workspaceTemplateInformer tenantv1alpha2informers.WorkspaceTemplateInformer
workspaceTemplateLister tenantv1alpha2listers.WorkspaceTemplateLister
workspaceTemplateSynced cache.InformerSynced
workspaceRoleInformer iamv1alpha2informers.WorkspaceRoleInformer
workspaceRoleLister iamv1alpha2listers.WorkspaceRoleLister
workspaceRoleSynced cache.InformerSynced
roleBaseInformer iamv1alpha2informers.RoleBaseInformer
roleBaseLister iamv1alpha2listers.RoleBaseLister
roleBaseSynced cache.InformerSynced
workspaceInformer tenantv1alpha1informers.WorkspaceInformer
workspaceLister tenantv1alpha1listers.WorkspaceLister
workspaceSynced cache.InformerSynced
federatedWorkspaceInformer typesv1beta1informers.FederatedWorkspaceInformer
federatedWorkspaceLister typesv1beta1listers.FederatedWorkspaceLister
federatedWorkspaceSynced cache.InformerSynced
multiClusterEnabled bool
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
@@ -89,9 +90,13 @@ type Controller struct {
recorder record.EventRecorder
}
func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface, workspaceTemplateInformer tenantv1alpha2informers.WorkspaceTemplateInformer,
workspaceInformer tenantv1alpha1informers.WorkspaceInformer, roleBaseInformer iamv1alpha2informers.RoleBaseInformer, workspaceRoleInformer iamv1alpha2informers.WorkspaceRoleInformer,
fedWorkspaceCache cache.Store, fedWorkspaceCacheController cache.Controller, multiClusterEnabled bool) *Controller {
func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface,
workspaceTemplateInformer tenantv1alpha2informers.WorkspaceTemplateInformer,
workspaceInformer tenantv1alpha1informers.WorkspaceInformer,
roleBaseInformer iamv1alpha2informers.RoleBaseInformer,
workspaceRoleInformer iamv1alpha2informers.WorkspaceRoleInformer,
federatedWorkspaceInformer typesv1beta1informers.FederatedWorkspaceInformer,
multiClusterEnabled bool) *Controller {
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
@@ -102,33 +107,34 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})
ctl := &Controller{
k8sClient: k8sClient,
ksClient: ksClient,
workspaceTemplateInformer: workspaceTemplateInformer,
workspaceTemplateLister: workspaceTemplateInformer.Lister(),
workspaceTemplateSynced: workspaceTemplateInformer.Informer().HasSynced,
workspaceInformer: workspaceInformer,
workspaceLister: workspaceInformer.Lister(),
workspaceSynced: workspaceInformer.Informer().HasSynced,
workspaceRoleInformer: workspaceRoleInformer,
workspaceRoleLister: workspaceRoleInformer.Lister(),
workspaceRoleSynced: workspaceRoleInformer.Informer().HasSynced,
roleBaseInformer: roleBaseInformer,
roleBaseLister: roleBaseInformer.Lister(),
roleBaseSynced: roleBaseInformer.Informer().HasSynced,
fedWorkspaceCache: fedWorkspaceCache,
fedWorkspaceCacheController: fedWorkspaceCacheController,
multiClusterEnabled: multiClusterEnabled,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "WorkspaceTemplate"),
recorder: recorder,
k8sClient: k8sClient,
ksClient: ksClient,
workspaceTemplateInformer: workspaceTemplateInformer,
workspaceTemplateLister: workspaceTemplateInformer.Lister(),
workspaceTemplateSynced: workspaceTemplateInformer.Informer().HasSynced,
workspaceInformer: workspaceInformer,
workspaceLister: workspaceInformer.Lister(),
workspaceSynced: workspaceInformer.Informer().HasSynced,
workspaceRoleInformer: workspaceRoleInformer,
workspaceRoleLister: workspaceRoleInformer.Lister(),
workspaceRoleSynced: workspaceRoleInformer.Informer().HasSynced,
roleBaseInformer: roleBaseInformer,
roleBaseLister: roleBaseInformer.Lister(),
roleBaseSynced: roleBaseInformer.Informer().HasSynced,
federatedWorkspaceInformer: federatedWorkspaceInformer,
federatedWorkspaceLister: federatedWorkspaceInformer.Lister(),
federatedWorkspaceSynced: federatedWorkspaceInformer.Informer().HasSynced,
multiClusterEnabled: multiClusterEnabled,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "WorkspaceTemplate"),
recorder: recorder,
}
klog.Info("Setting up event handlers")
workspaceTemplateInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctl.enqueueClusterRole,
AddFunc: ctl.enqueueWorkspaceTemplate,
UpdateFunc: func(old, new interface{}) {
ctl.enqueueClusterRole(new)
ctl.enqueueWorkspaceTemplate(new)
},
DeleteFunc: ctl.enqueueClusterRole,
DeleteFunc: ctl.enqueueWorkspaceTemplate,
})
return ctl
}
@@ -146,7 +152,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
synced := make([]cache.InformerSynced, 0)
synced = append(synced, c.workspaceTemplateSynced, c.workspaceSynced, c.workspaceRoleSynced, c.roleBaseSynced)
if c.multiClusterEnabled {
synced = append(synced, c.fedWorkspaceCacheController.HasSynced)
synced = append(synced, c.federatedWorkspaceSynced)
}
if ok := cache.WaitForCacheSync(stopCh, synced...); !ok {
return fmt.Errorf("failed to wait for caches to sync")
@@ -164,7 +170,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
return nil
}
func (c *Controller) enqueueClusterRole(obj interface{}) {
func (c *Controller) enqueueWorkspaceTemplate(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
@@ -236,7 +242,6 @@ func (c *Controller) processNextWorkItem() bool {
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) reconcile(key string) error {
workspaceTemplate, err := c.workspaceTemplateLister.Get(key)
if err != nil {
// The user may no longer exist, in which case we stop
@@ -280,91 +285,45 @@ func (c *Controller) Start(stopCh <-chan struct{}) error {
}
func (c *Controller) multiClusterSync(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) error {
obj, exist, err := c.fedWorkspaceCache.GetByKey(workspaceTemplate.Name)
if !exist {
return c.createFederatedWorkspace(workspaceTemplate)
}
// multi cluster environment, synchronize workspaces with kubefed
federatedWorkspace, err := c.federatedWorkspaceLister.Get(workspaceTemplate.Name)
if err != nil {
// create federatedworkspace if not found
if errors.IsNotFound(err) {
return c.createFederatedWorkspace(workspaceTemplate)
}
klog.Error(err)
return err
}
var fedWorkspace tenantv1alpha2.FederatedWorkspace
if err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.(*unstructured.Unstructured).Object, &fedWorkspace); err != nil {
klog.Error(err)
return err
}
if !reflect.DeepEqual(fedWorkspace.Spec, workspaceTemplate.Spec) {
fedWorkspace.Spec = workspaceTemplate.Spec
return c.updateFederatedWorkspace(&fedWorkspace)
// update spec
if !reflect.DeepEqual(federatedWorkspace.Spec, workspaceTemplate.Spec) {
federatedWorkspace.Spec = workspaceTemplate.Spec
if err := c.updateFederatedWorkspace(federatedWorkspace); err != nil {
klog.Error(err)
return err
}
}
return nil
}
func (c *Controller) createFederatedWorkspace(workspaceTemplate *tenantv1alpha2.WorkspaceTemplate) error {
federatedWorkspace := &tenantv1alpha2.FederatedWorkspace{
TypeMeta: metav1.TypeMeta{
Kind: tenantv1alpha2.FedWorkspaceKind,
APIVersion: tenantv1alpha2.FedWorkspaceResource.Group + "/" + tenantv1alpha2.FedWorkspaceResource.Version,
},
federatedWorkspace := &typesv1beta1.FederatedWorkspace{
ObjectMeta: metav1.ObjectMeta{
Name: workspaceTemplate.Name,
},
Spec: workspaceTemplate.Spec,
}
err := controllerutil.SetControllerReference(workspaceTemplate, federatedWorkspace, scheme.Scheme)
if err != nil {
if err := controllerutil.SetControllerReference(workspaceTemplate, federatedWorkspace, scheme.Scheme); err != nil {
return err
}
data, err := json.Marshal(federatedWorkspace)
if err != nil {
return err
}
cli := c.k8sClient.(*kubernetes.Clientset)
err = cli.RESTClient().Post().
AbsPath(fmt.Sprintf("/apis/%s/%s/%s", tenantv1alpha2.FedWorkspaceResource.Group,
tenantv1alpha2.FedWorkspaceResource.Version, tenantv1alpha2.FedWorkspaceResource.Name)).
Body(data).
Do().Error()
if err != nil {
if _, err := c.ksClient.TypesV1beta1().FederatedWorkspaces().Create(federatedWorkspace); err != nil {
if errors.IsAlreadyExists(err) {
return nil
}
return err
}
return nil
}
func (c *Controller) updateFederatedWorkspace(fedWorkspace *tenantv1alpha2.FederatedWorkspace) error {
data, err := json.Marshal(fedWorkspace)
if err != nil {
return err
}
cli := c.k8sClient.(*kubernetes.Clientset)
err = cli.RESTClient().Put().
AbsPath(fmt.Sprintf("/apis/%s/%s/%s/%s", tenantv1alpha2.FedWorkspaceResource.Group,
tenantv1alpha2.FedWorkspaceResource.Version, tenantv1alpha2.FedWorkspaceResource.Name,
fedWorkspace.Name)).
Body(data).
Do().Error()
if err != nil {
if errors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
@@ -539,3 +498,12 @@ func (r *Controller) initManagerRoleBinding(workspace *tenantv1alpha2.WorkspaceT
return nil
}
func (c *Controller) updateFederatedWorkspace(workspace *typesv1beta1.FederatedWorkspace) error {
_, err := c.ksClient.TypesV1beta1().FederatedWorkspaces().Update(workspace)
if err != nil {
klog.Error(err)
return err
}
return nil
}