Merge pull request #3161 from RolandMa1986/feat-group-fedsync

enable multicluster sync for groups
This commit is contained in:
KubeSphere CI Bot
2020-12-09 15:17:43 +08:00
committed by GitHub
25 changed files with 2158 additions and 54 deletions

View File

@@ -18,6 +18,7 @@ package group
import (
"fmt"
"reflect"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -33,11 +34,16 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
iam1alpha2 "kubesphere.io/kubesphere/pkg/apis/iam/v1alpha2"
fedv1beta1types "kubesphere.io/kubesphere/pkg/apis/types/v1beta1"
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2"
fedv1beta1informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/types/v1beta1"
iamv1alpha1listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2"
fedv1beta1lister "kubesphere.io/kubesphere/pkg/client/listers/types/v1beta1"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/controller/utils/controller"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
const (
@@ -49,16 +55,21 @@ const (
type Controller struct {
controller.BaseController
scheme *runtime.Scheme
k8sClient kubernetes.Interface
ksClient kubesphere.Interface
groupInformer iamv1alpha2informers.GroupInformer
groupLister iamv1alpha1listers.GroupLister
recorder record.EventRecorder
scheme *runtime.Scheme
k8sClient kubernetes.Interface
ksClient kubesphere.Interface
groupInformer iamv1alpha2informers.GroupInformer
groupLister iamv1alpha1listers.GroupLister
recorder record.EventRecorder
federatedGroupInformer fedv1beta1informers.FederatedGroupInformer
federatedGroupLister fedv1beta1lister.FederatedGroupLister
multiClusterEnabled bool
}
// NewController creates Group Controller instance
func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface, groupInformer iamv1alpha2informers.GroupInformer) *Controller {
func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface, groupInformer iamv1alpha2informers.GroupInformer,
federatedGroupInformer fedv1beta1informers.FederatedGroupInformer,
multiClusterEnabled bool) *Controller {
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
@@ -70,12 +81,20 @@ func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface
Synced: []cache.InformerSynced{groupInformer.Informer().HasSynced},
Name: controllerName,
},
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}),
k8sClient: k8sClient,
ksClient: ksClient,
groupInformer: groupInformer,
groupLister: groupInformer.Lister(),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}),
k8sClient: k8sClient,
ksClient: ksClient,
groupInformer: groupInformer,
groupLister: groupInformer.Lister(),
federatedGroupInformer: federatedGroupInformer,
federatedGroupLister: federatedGroupInformer.Lister(),
multiClusterEnabled: multiClusterEnabled,
}
if ctl.multiClusterEnabled {
ctl.Synced = append(ctl.Synced, ctl.federatedGroupInformer.Informer().HasSynced)
}
ctl.Handler = ctl.reconcile
klog.Info("Setting up event handlers")
@@ -106,10 +125,27 @@ func (c *Controller) reconcile(key string) error {
return err
}
if group.ObjectMeta.DeletionTimestamp.IsZero() {
var g *iam1alpha2.Group
if !sliceutil.HasString(group.Finalizers, finalizer) {
group.ObjectMeta.Finalizers = append(group.ObjectMeta.Finalizers, finalizer)
g = group.DeepCopy()
g.ObjectMeta.Finalizers = append(g.ObjectMeta.Finalizers, finalizer)
}
if group, err = c.ksClient.IamV1alpha2().Groups().Update(group); err != nil {
if c.multiClusterEnabled {
// Ensure not controlled by Kubefed
if group.Labels == nil || group.Labels[constants.KubefedManagedLabel] != "false" {
if g == nil {
g = group.DeepCopy()
}
if g.Labels == nil {
g.Labels = make(map[string]string, 0)
}
g.Labels[constants.KubefedManagedLabel] = "false"
}
}
if g != nil {
if _, err = c.ksClient.IamV1alpha2().Groups().Update(g); err != nil {
return err
}
// Skip reconcile when group is updated.
@@ -139,6 +175,14 @@ func (c *Controller) reconcile(key string) error {
return nil
}
// synchronization through kubefed-controller when multi cluster is enabled
if c.multiClusterEnabled {
if err = c.multiClusterSync(group); err != nil {
klog.Error(err)
return err
}
}
c.recorder.Event(group, corev1.EventTypeNormal, successSynced, messageResourceSynced)
return nil
}
@@ -192,3 +236,54 @@ func (c *Controller) deleteRoleBindings(group *iam1alpha2.Group) error {
return nil
}
func (c *Controller) multiClusterSync(group *iam1alpha2.Group) error {
obj, err := c.federatedGroupLister.Get(group.Name)
if err != nil {
if errors.IsNotFound(err) {
return c.createFederatedGroup(group)
}
klog.Error(err)
return err
}
if !reflect.DeepEqual(obj.Spec.Template.Labels, group.Labels) {
obj.Spec.Template.Labels = group.Labels
if _, err = c.ksClient.TypesV1beta1().FederatedGroups().Update(obj); err != nil {
return err
}
}
return nil
}
func (c *Controller) createFederatedGroup(group *iam1alpha2.Group) error {
federatedGroup := &fedv1beta1types.FederatedGroup{
ObjectMeta: metav1.ObjectMeta{
Name: group.Name,
},
Spec: fedv1beta1types.FederatedGroupSpec{
Template: fedv1beta1types.GroupTemplate{
ObjectMeta: metav1.ObjectMeta{
Labels: group.Labels,
},
Spec: group.Spec,
},
Placement: fedv1beta1types.GenericPlacementFields{
ClusterSelector: &metav1.LabelSelector{},
},
},
}
// must bind group lifecycle
err := controllerutil.SetControllerReference(group, federatedGroup, scheme.Scheme)
if err != nil {
return err
}
if _, err = c.ksClient.TypesV1beta1().FederatedGroups().Create(federatedGroup); err != nil {
return err
}
return nil
}