improve multicluster resource controller

Signed-off-by: hongming <talonwan@yunify.com>
This commit is contained in:
hongming
2020-06-15 14:09:32 +08:00
parent 61d827db54
commit 4fcaa78b45
51 changed files with 3583 additions and 381 deletions

View File

@@ -23,6 +23,8 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
@@ -33,8 +35,11 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
iamv1alpha2 "kubesphere.io/kubesphere/pkg/apis/iam/v1alpha2"
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
iamv1alpha2informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/iam/v1alpha2"
iamv1alpha2listers "kubesphere.io/kubesphere/pkg/client/listers/iam/v1alpha2"
"kubesphere.io/kubesphere/pkg/constants"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"time"
)
@@ -43,18 +48,18 @@ const (
// SuccessSynced is used as part of the Event 'reason' when a Foo is synced
successSynced = "Synced"
// is synced successfully
messageResourceSynced = "GlobalRoleBinding synced successfully"
controllerName = "globalrolebinding-controller"
federatedClusterRoleBindingKind = "FederatedClusterRoleBinding"
federatedResourceVersion = "types.kubefed.io/v1beta1"
federatedResourceAPIPath = "/apis/types.kubefed.io/v1beta1/federatedclusterrolebindings"
messageResourceSynced = "GlobalRoleBinding synced successfully"
controllerName = "globalrolebinding-controller"
)
type Controller struct {
k8sClient kubernetes.Interface
informer iamv1alpha2informers.GlobalRoleBindingInformer
lister iamv1alpha2listers.GlobalRoleBindingLister
synced cache.InformerSynced
k8sClient kubernetes.Interface
ksClient kubesphere.Interface
globalRoleBindingInformer iamv1alpha2informers.GlobalRoleBindingInformer
globalRoleBindingLister iamv1alpha2listers.GlobalRoleBindingLister
globalRoleBindingSynced cache.InformerSynced
fedGlobalRoleBindingCache cache.Store
fedGlobalRoleBindingCacheController cache.Controller
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
@@ -67,7 +72,8 @@ type Controller struct {
multiClusterEnabled bool
}
func NewController(k8sClient kubernetes.Interface, globalRoleBindingInformer iamv1alpha2informers.GlobalRoleBindingInformer, multiClusterEnabled bool) *Controller {
func NewController(k8sClient kubernetes.Interface, ksClient kubesphere.Interface, globalRoleBindingInformer iamv1alpha2informers.GlobalRoleBindingInformer,
fedGlobalRoleBindingCache cache.Store, fedGlobalRoleBindingCacheController cache.Controller, multiClusterEnabled bool) *Controller {
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
@@ -78,13 +84,16 @@ func NewController(k8sClient kubernetes.Interface, globalRoleBindingInformer iam
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})
ctl := &Controller{
k8sClient: k8sClient,
informer: globalRoleBindingInformer,
lister: globalRoleBindingInformer.Lister(),
synced: globalRoleBindingInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ClusterRoleBinding"),
recorder: recorder,
multiClusterEnabled: multiClusterEnabled,
k8sClient: k8sClient,
ksClient: ksClient,
globalRoleBindingInformer: globalRoleBindingInformer,
globalRoleBindingLister: globalRoleBindingInformer.Lister(),
globalRoleBindingSynced: globalRoleBindingInformer.Informer().HasSynced,
fedGlobalRoleBindingCache: fedGlobalRoleBindingCache,
fedGlobalRoleBindingCacheController: fedGlobalRoleBindingCacheController,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "GlobalRoleBinding"),
recorder: recorder,
multiClusterEnabled: multiClusterEnabled,
}
klog.Info("Setting up event handlers")
globalRoleBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -101,14 +110,19 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
//init client
// Start the informer factories to begin populating the informer caches
klog.Info("Starting User controller")
klog.Info("Starting GlobalRoleBinding controller")
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.synced); !ok {
synced := make([]cache.InformerSynced, 0)
synced = append(synced, c.globalRoleBindingSynced)
if c.multiClusterEnabled {
synced = append(synced, c.fedGlobalRoleBindingCacheController.HasSynced)
}
if ok := cache.WaitForCacheSync(stopCh, synced...); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
@@ -197,12 +211,12 @@ func (c *Controller) processNextWorkItem() bool {
// with the current status of the resource.
func (c *Controller) reconcile(key string) error {
globalRoleBinding, err := c.lister.Get(key)
globalRoleBinding, err := c.globalRoleBindingLister.Get(key)
if err != nil {
// The user may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("clusterrolebinding '%s' in work queue no longer exists", key))
utilruntime.HandleError(fmt.Errorf("globalrolebinding '%s' in work queue no longer exists", key))
return nil
}
klog.Error(err)
@@ -216,6 +230,13 @@ func (c *Controller) reconcile(key string) error {
}
}
if c.multiClusterEnabled {
if err = c.multiClusterSync(globalRoleBinding); err != nil {
klog.Error(err)
return err
}
}
c.recorder.Event(globalRoleBinding, corev1.EventTypeNormal, successSynced, messageResourceSynced)
return nil
}
@@ -224,89 +245,181 @@ func (c *Controller) Start(stopCh <-chan struct{}) error {
return c.Run(4, stopCh)
}
func (c *Controller) relateToClusterAdmin(globalRoleBinding *iamv1alpha2.GlobalRoleBinding) error {
func (c *Controller) multiClusterSync(globalRoleBinding *iamv1alpha2.GlobalRoleBinding) error {
if c.multiClusterEnabled {
federatedClusterRoleBinding := &iamv1alpha2.FederatedClusterRoleBinding{
TypeMeta: metav1.TypeMeta{
Kind: federatedClusterRoleBindingKind,
APIVersion: federatedResourceVersion,
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("fed-%s", globalRoleBinding.Name),
},
Spec: iamv1alpha2.FederatedClusterRoleBindingSpec{
Template: iamv1alpha2.Template{
Subjects: ensureSubjectAPIVersionIsValid(globalRoleBinding.Subjects),
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: iamv1alpha2.ResourceKindClusterRole,
Name: iamv1alpha2.ClusterAdmin,
},
},
Placement: iamv1alpha2.Placement{
ClusterSelector: iamv1alpha2.ClusterSelector{},
},
},
}
err := controllerutil.SetControllerReference(globalRoleBinding, federatedClusterRoleBinding, scheme.Scheme)
if err != nil {
return err
}
data, err := json.Marshal(federatedClusterRoleBinding)
if err != nil {
return err
}
cli := c.k8sClient.(*kubernetes.Clientset)
err = cli.RESTClient().Post().
AbsPath(federatedResourceAPIPath).
Body(data).
Do().Error()
if err != nil {
if errors.IsAlreadyExists(err) {
return nil
}
return err
}
} else {
clusterRoleBinding := &rbacv1.ClusterRoleBinding{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("fed-%s", globalRoleBinding.Name),
},
Subjects: ensureSubjectAPIVersionIsValid(globalRoleBinding.Subjects),
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: iamv1alpha2.ResourceKindClusterRole,
Name: iamv1alpha2.ClusterAdmin,
},
}
err := controllerutil.SetControllerReference(globalRoleBinding, clusterRoleBinding, scheme.Scheme)
if err != nil {
return err
}
_, err = c.k8sClient.RbacV1().ClusterRoleBindings().Create(clusterRoleBinding)
if err != nil {
if errors.IsAlreadyExists(err) {
return nil
}
return err
}
if err := c.ensureNotControlledByKubefed(globalRoleBinding); err != nil {
klog.Error(err)
return err
}
obj, exist, err := c.fedGlobalRoleBindingCache.GetByKey(globalRoleBinding.Name)
if !exist {
return c.createFederatedGlobalRoleBinding(globalRoleBinding)
}
if err != nil {
klog.Error(err)
return err
}
var federatedGlobalRoleBinding iamv1alpha2.FederatedRoleBinding
err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.(*unstructured.Unstructured).Object, &federatedGlobalRoleBinding)
if err != nil {
klog.Error(err)
return err
}
if !reflect.DeepEqual(federatedGlobalRoleBinding.Spec.Template.Subjects, globalRoleBinding.Subjects) ||
!reflect.DeepEqual(federatedGlobalRoleBinding.Spec.Template.RoleRef, globalRoleBinding.RoleRef) ||
!reflect.DeepEqual(federatedGlobalRoleBinding.Spec.Template.Labels, globalRoleBinding.Labels) ||
!reflect.DeepEqual(federatedGlobalRoleBinding.Spec.Template.Annotations, globalRoleBinding.Annotations) {
federatedGlobalRoleBinding.Spec.Template.Subjects = globalRoleBinding.Subjects
federatedGlobalRoleBinding.Spec.Template.RoleRef = globalRoleBinding.RoleRef
federatedGlobalRoleBinding.Spec.Template.Annotations = globalRoleBinding.Annotations
federatedGlobalRoleBinding.Spec.Template.Labels = globalRoleBinding.Labels
return c.updateFederatedGlobalRoleBinding(&federatedGlobalRoleBinding)
}
return nil
}
func (c *Controller) relateToClusterAdmin(globalRoleBinding *iamv1alpha2.GlobalRoleBinding) error {
username := findExpectUsername(globalRoleBinding)
// unexpected
if username == "" {
return nil
}
clusterRoleBinding := &rbacv1.ClusterRoleBinding{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", username, iamv1alpha2.ClusterAdmin),
},
Subjects: ensureSubjectAPIVersionIsValid(globalRoleBinding.Subjects),
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: iamv1alpha2.ResourceKindClusterRole,
Name: iamv1alpha2.ClusterAdmin,
},
}
err := controllerutil.SetControllerReference(globalRoleBinding, clusterRoleBinding, scheme.Scheme)
if err != nil {
return err
}
_, err = c.k8sClient.RbacV1().ClusterRoleBindings().Create(clusterRoleBinding)
if err != nil {
if errors.IsAlreadyExists(err) {
return nil
}
return err
}
return nil
}
func findExpectUsername(globalRoleBinding *iamv1alpha2.GlobalRoleBinding) string {
for _, subject := range globalRoleBinding.Subjects {
if subject.Kind == iamv1alpha2.ResourceKindUser {
return subject.Name
}
}
return ""
}
func (c *Controller) createFederatedGlobalRoleBinding(globalRoleBinding *iamv1alpha2.GlobalRoleBinding) error {
federatedGlobalRoleBinding := &iamv1alpha2.FederatedRoleBinding{
TypeMeta: metav1.TypeMeta{
Kind: iamv1alpha2.FedGlobalRoleBindingKind,
APIVersion: iamv1alpha2.FedGlobalRoleBindingResource.Group + "/" + iamv1alpha2.FedGlobalRoleBindingResource.Version,
},
ObjectMeta: metav1.ObjectMeta{
Name: globalRoleBinding.Name,
},
Spec: iamv1alpha2.FederatedRoleBindingSpec{
Template: iamv1alpha2.RoleBindingTemplate{
ObjectMeta: metav1.ObjectMeta{
Labels: globalRoleBinding.Labels,
Annotations: globalRoleBinding.Annotations,
},
Subjects: globalRoleBinding.Subjects,
RoleRef: globalRoleBinding.RoleRef,
},
Placement: iamv1alpha2.Placement{
ClusterSelector: iamv1alpha2.ClusterSelector{},
},
},
}
err := controllerutil.SetControllerReference(globalRoleBinding, federatedGlobalRoleBinding, scheme.Scheme)
if err != nil {
return err
}
data, err := json.Marshal(federatedGlobalRoleBinding)
if err != nil {
return err
}
cli := c.k8sClient.(*kubernetes.Clientset)
err = cli.RESTClient().Post().
AbsPath(fmt.Sprintf("/apis/%s/%s/%s", iamv1alpha2.FedGlobalRoleBindingResource.Group,
iamv1alpha2.FedGlobalRoleBindingResource.Version, iamv1alpha2.FedGlobalRoleBindingResource.Name)).
Body(data).
Do().Error()
if err != nil {
if errors.IsAlreadyExists(err) {
return nil
}
return err
}
return nil
}
func (c *Controller) updateFederatedGlobalRoleBinding(federatedGlobalRoleBinding *iamv1alpha2.FederatedRoleBinding) error {
data, err := json.Marshal(federatedGlobalRoleBinding)
if err != nil {
return err
}
cli := c.k8sClient.(*kubernetes.Clientset)
err = cli.RESTClient().Put().
AbsPath(fmt.Sprintf("/apis/%s/%s/%s/%s", iamv1alpha2.FedGlobalRoleBindingResource.Group,
iamv1alpha2.FedGlobalRoleBindingResource.Version, iamv1alpha2.FedGlobalRoleBindingResource.Name,
federatedGlobalRoleBinding.Name)).
Body(data).
Do().Error()
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return nil
}
func (c *Controller) ensureNotControlledByKubefed(globalRoleBinding *iamv1alpha2.GlobalRoleBinding) error {
if globalRoleBinding.Labels[constants.KubefedManagedLabel] != "false" {
if globalRoleBinding.Labels == nil {
globalRoleBinding.Labels = make(map[string]string, 0)
}
globalRoleBinding = globalRoleBinding.DeepCopy()
globalRoleBinding.Labels[constants.KubefedManagedLabel] = "false"
_, err := c.ksClient.IamV1alpha2().GlobalRoleBindings().Update(globalRoleBinding)
if err != nil {
klog.Error(err)
}
}
return nil
}