From 81db894741f7ef679cc0adada30ad369827c1a21 Mon Sep 17 00:00:00 2001 From: live77 <535157110@qq.com> Date: Wed, 1 Dec 2021 23:38:30 +0800 Subject: [PATCH] add --controllers option in ks-controller-manager imple controller enable/disable check logic add unittest for selective controller enable/disable move all controllers init code to a single place --- cmd/controller-manager/app/controllers.go | 617 +++++++++++++----- cmd/controller-manager/app/options/options.go | 72 +- .../app/options/options_test.go | 66 ++ cmd/controller-manager/app/server.go | 173 +---- .../helmcategory/helm_category_controller.go | 18 +- .../helmrepo/helm_repo_controller.go | 22 +- .../quota/resourcequota_controller.go | 24 +- 7 files changed, 618 insertions(+), 374 deletions(-) create mode 100644 cmd/controller-manager/app/options/options_test.go diff --git a/cmd/controller-manager/app/controllers.go b/cmd/controller-manager/app/controllers.go index 27b7d48db..35f049cb8 100644 --- a/cmd/controller-manager/app/controllers.go +++ b/cmd/controller-manager/app/controllers.go @@ -17,16 +17,38 @@ limitations under the License. package app import ( + "fmt" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/klog" + "kubesphere.io/kubesphere/cmd/controller-manager/app/options" + "kubesphere.io/kubesphere/pkg/controller/application" + "kubesphere.io/kubesphere/pkg/controller/helm" + "kubesphere.io/kubesphere/pkg/controller/namespace" + "kubesphere.io/kubesphere/pkg/controller/openpitrix/helmapplication" + "kubesphere.io/kubesphere/pkg/controller/openpitrix/helmcategory" + "kubesphere.io/kubesphere/pkg/controller/openpitrix/helmrelease" + "kubesphere.io/kubesphere/pkg/controller/openpitrix/helmrepo" + "kubesphere.io/kubesphere/pkg/controller/quota" + "kubesphere.io/kubesphere/pkg/controller/serviceaccount" + "kubesphere.io/kubesphere/pkg/controller/user" + "kubesphere.io/kubesphere/pkg/controller/workspace" + "kubesphere.io/kubesphere/pkg/controller/workspacerole" + "kubesphere.io/kubesphere/pkg/controller/workspacerolebinding" + "kubesphere.io/kubesphere/pkg/controller/workspacetemplate" + "kubesphere.io/kubesphere/pkg/models/kubeconfig" + "kubesphere.io/kubesphere/pkg/simple/client/devops" + "kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins" + ldapclient "kubesphere.io/kubesphere/pkg/simple/client/ldap" + "kubesphere.io/kubesphere/pkg/simple/client/s3" + ctrl "sigs.k8s.io/controller-runtime" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/kubefed/pkg/controller/util" "kubesphere.io/kubesphere/pkg/controller/storage/snapshotclass" - "kubesphere.io/kubesphere/pkg/apiserver/authentication" - iamv1alpha2 "kubesphere.io/api/iam/v1alpha2" "kubesphere.io/kubesphere/pkg/controller/certificatesigningrequest" @@ -46,198 +68,471 @@ import ( "kubesphere.io/kubesphere/pkg/controller/storage/capability" "kubesphere.io/kubesphere/pkg/controller/virtualservice" "kubesphere.io/kubesphere/pkg/informers" - "kubesphere.io/kubesphere/pkg/simple/client/devops" "kubesphere.io/kubesphere/pkg/simple/client/k8s" - ldapclient "kubesphere.io/kubesphere/pkg/simple/client/ldap" - "kubesphere.io/kubesphere/pkg/simple/client/multicluster" - "kubesphere.io/kubesphere/pkg/simple/client/network" ippoolclient "kubesphere.io/kubesphere/pkg/simple/client/network/ippool" - "kubesphere.io/kubesphere/pkg/simple/client/s3" ) -func addControllers( - mgr manager.Manager, - client k8s.Client, - informerFactory informers.InformerFactory, - devopsClient devops.Interface, - s3Client s3.Interface, - ldapClient ldapclient.Interface, - options *k8s.KubernetesOptions, - authenticationOptions *authentication.Options, - multiClusterOptions *multicluster.Options, - networkOptions *network.Options, - serviceMeshEnabled bool, - kubectlImage string, - stopCh <-chan struct{}) error { +var allControllers = []string { + "user", + "workspacetemplate", + "workspace", + "workspacerole", + "workspacerolebinding", + "namespace", + "helmrepo", + "helmcategory", + "helmapplication", + "helmapplicationversion", + "helmrelease", + "helm", + + "application", + "serviceaccount", + "resourcequota", + + "virtualservice", + "destinationrule", + "job", + "storagecapability", + "volumesnapshot", + "loginrecord", + "cluster", + "nsnp", + "ippool", + "csr", + + "clusterrolebinding", + + "fedglobalrolecache", + "globalrole", + "fedglobalrolebindingcache", + "globalrolebinding", + + "groupbinding", + "group", + + "notification", +} + +// setup all available controllers one by one +func addAllControllers(mgr manager.Manager, client k8s.Client, informerFactory informers.InformerFactory, + cmOptions *options.KubeSphereControllerManagerOptions, + stopCh <-chan struct{}) error { + var err error + + //////////////////////////////////// + // begin init necessary informers + //////////////////////////////////// kubernetesInformer := informerFactory.KubernetesSharedInformerFactory() istioInformer := informerFactory.IstioSharedInformerFactory() kubesphereInformer := informerFactory.KubeSphereSharedInformerFactory() + //////////////////////////////////// + // end informers + //////////////////////////////////// - multiClusterEnabled := multiClusterOptions.Enable + //////////////////////////////////// + // begin init necessary clients + //////////////////////////////////// + kubeconfigClient := kubeconfig.NewOperator(client.Kubernetes(), + informerFactory.KubernetesSharedInformerFactory().Core().V1().ConfigMaps().Lister(), + client.Config()) - var vsController, drController manager.Runnable + var devopsClient devops.Interface + if cmOptions.DevopsOptions != nil && len(cmOptions.DevopsOptions.Host) != 0 { + devopsClient, err = jenkins.NewDevopsClient(cmOptions.DevopsOptions) + if err != nil { + return fmt.Errorf("failed to connect jenkins, please check jenkins status, error: %v", err) + } + } + + var ldapClient ldapclient.Interface + // when there is no ldapOption, we set ldapClient as nil, which means we don't need to sync user info into ldap. + if cmOptions.LdapOptions != nil && len(cmOptions.LdapOptions.Host) != 0 { + if cmOptions.LdapOptions.Host == ldapclient.FAKE_HOST { // for debug only + ldapClient = ldapclient.NewSimpleLdap() + } else { + ldapClient, err = ldapclient.NewLdapClient(cmOptions.LdapOptions, stopCh) + if err != nil { + return fmt.Errorf("failed to connect to ldap service, please check ldap status, error: %v", err) + } + } + } else { + klog.Warning("ks-controller-manager starts without ldap provided, it will not sync user into ldap") + } + //////////////////////////////////// + // end init clients + //////////////////////////////////// + + //////////////////////////////////////////////////////// + // begin init controller and add to manager one by one + //////////////////////////////////////////////////////// + + // "user" controller + if cmOptions.IsControllerEnabled("user") { + userController := &user.Reconciler{ + MultiClusterEnabled: cmOptions.MultiClusterOptions.Enable, + MaxConcurrentReconciles: 4, + LdapClient: ldapClient, + DevopsClient: devopsClient, + KubeconfigClient: kubeconfigClient, + AuthenticationOptions: cmOptions.AuthenticationOptions, + } + addControllerWithSetup(mgr,"user", userController) + } + + // "workspacetemplate" controller + if cmOptions.IsControllerEnabled("workspacetemplate") { + workspaceTemplateReconciler := &workspacetemplate.Reconciler{MultiClusterEnabled: cmOptions.MultiClusterOptions.Enable} + addControllerWithSetup(mgr,"workspacetemplate", workspaceTemplateReconciler) + } + + // "workspace" controller + if cmOptions.IsControllerEnabled("workspace") { + workspaceReconciler := &workspace.Reconciler{} + addControllerWithSetup(mgr,"workspace", workspaceReconciler) + } + + // "workspacerole" controller + if cmOptions.IsControllerEnabled("workspacerole") { + workspaceRoleReconciler := &workspacerole.Reconciler{MultiClusterEnabled: cmOptions.MultiClusterOptions.Enable} + addControllerWithSetup(mgr,"workspacerole", workspaceRoleReconciler) + } + + // "workspacerolebinding" controller + if cmOptions.IsControllerEnabled("workspacerolebinding") { + workspaceRoleBindingReconciler := &workspacerolebinding.Reconciler{MultiClusterEnabled: cmOptions.MultiClusterOptions.Enable} + addControllerWithSetup(mgr,"workspacerolebinding", workspaceRoleBindingReconciler) + } + + // "namespace" controller + if cmOptions.IsControllerEnabled("namespace") { + namespaceReconciler := &namespace.Reconciler{} + addControllerWithSetup(mgr,"namespace", namespaceReconciler) + } + + // "helmrepo" controller + if cmOptions.IsControllerEnabled("helmrepo") { + helmRepoReconciler := &helmrepo.ReconcileHelmRepo{} + addControllerWithSetup(mgr, "helmrepo", helmRepoReconciler) + } + + // "helmcategory" controller + if cmOptions.IsControllerEnabled("helmcategory") { + helmCategoryReconciler := &helmcategory.ReconcileHelmCategory{} + addControllerWithSetup(mgr, "helmcategory", helmCategoryReconciler) + } + + var opS3Client s3.Interface + if !cmOptions.OpenPitrixOptions.AppStoreConfIsEmpty() { + opS3Client, err = s3.NewS3Client(cmOptions.OpenPitrixOptions.S3Options) + if err != nil { + klog.Fatalf("failed to connect to s3, please check openpitrix s3 service status, error: %v", err) + } + + // "helmapplication" controller + if cmOptions.IsControllerEnabled("helmapplication") { + reconcileHelmApp := (&helmapplication.ReconcileHelmApplication{}) + addControllerWithSetup(mgr, "helmapplication", reconcileHelmApp) + } + + // "helmapplicationversion" controller + if cmOptions.IsControllerEnabled("helmapplicationversion") { + reconcileHelmAppVersion := (&helmapplication.ReconcileHelmApplicationVersion{}) + addControllerWithSetup(mgr, "helmapplicationversion",reconcileHelmAppVersion) + } + } + + // "helmrelease" controller + if cmOptions.IsControllerEnabled("helmrelease") { + reconcileHelmRelease := &helmrelease.ReconcileHelmRelease{ + // nil interface is valid value. + StorageClient: opS3Client, + KsFactory: informerFactory.KubeSphereSharedInformerFactory(), + MultiClusterEnable: cmOptions.MultiClusterOptions.Enable, + WaitTime: cmOptions.OpenPitrixOptions.ReleaseControllerOptions.WaitTime, + MaxConcurrent: cmOptions.OpenPitrixOptions.ReleaseControllerOptions.MaxConcurrent, + StopChan: stopCh, + } + addControllerWithSetup(mgr, "helmrelease",reconcileHelmRelease) + } + + // "helm" controller + if cmOptions.IsControllerEnabled("helm") { + if !cmOptions.GatewayOptions.IsEmpty() { + helmReconciler := &helm.Reconciler{GatewayOptions: cmOptions.GatewayOptions} + addControllerWithSetup(mgr, "helm",helmReconciler) + } + } + + // "application" controller + if cmOptions.IsControllerEnabled("application") { + selector, _ := labels.Parse(cmOptions.ApplicationSelector) + applicationReconciler := &application.ApplicationReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Mapper: mgr.GetRESTMapper(), + ApplicationSelector: selector, + } + addControllerWithSetup(mgr, "application",applicationReconciler) + } + + // "serviceaccount" controller + if cmOptions.IsControllerEnabled("serviceaccount") { + saReconciler := &serviceaccount.Reconciler{} + addControllerWithSetup(mgr, "serviceaccount",saReconciler) + } + + // "resourcequota" controller + if cmOptions.IsControllerEnabled("resourcequota") { + resourceQuotaReconciler := "a.Reconciler{ + MaxConcurrentReconciles: quota.DefaultMaxConcurrentReconciles, + ResyncPeriod: quota.DefaultResyncPeriod, + InformerFactory: informerFactory.KubernetesSharedInformerFactory(), + } + addControllerWithSetup(mgr, "resourcequota", resourceQuotaReconciler) + } + + serviceMeshEnabled := cmOptions.ServiceMeshOptions != nil && len(cmOptions.ServiceMeshOptions.IstioPilotHost) != 0 if serviceMeshEnabled { - vsController = virtualservice.NewVirtualServiceController(kubernetesInformer.Core().V1().Services(), - istioInformer.Networking().V1alpha3().VirtualServices(), - istioInformer.Networking().V1alpha3().DestinationRules(), - kubesphereInformer.Servicemesh().V1alpha2().Strategies(), + // "virtualservice" controller + if cmOptions.IsControllerEnabled("virtualservice") { + vsController := virtualservice.NewVirtualServiceController(kubernetesInformer.Core().V1().Services(), + istioInformer.Networking().V1alpha3().VirtualServices(), + istioInformer.Networking().V1alpha3().DestinationRules(), + kubesphereInformer.Servicemesh().V1alpha2().Strategies(), + client.Kubernetes(), + client.Istio(), + client.KubeSphere()) + addController(mgr, "virtualservice", vsController) + } + + // "destinationrule" controller + if cmOptions.IsControllerEnabled("destinationrule") { + drController := destinationrule.NewDestinationRuleController(kubernetesInformer.Apps().V1().Deployments(), + istioInformer.Networking().V1alpha3().DestinationRules(), + kubernetesInformer.Core().V1().Services(), + kubesphereInformer.Servicemesh().V1alpha2().ServicePolicies(), + client.Kubernetes(), + client.Istio(), + client.KubeSphere()) + addController(mgr, "destinationrule", drController) + } + } + + // "job" controller + if cmOptions.IsControllerEnabled("job") { + jobController := job.NewJobController(kubernetesInformer.Batch().V1().Jobs(), client.Kubernetes()) + addController(mgr, "job", jobController) + } + + // "storagecapability" controller + if cmOptions.IsControllerEnabled("storagecapability") { + storageCapabilityController := capability.NewController( + client.Kubernetes().StorageV1().StorageClasses(), + kubernetesInformer.Storage().V1().StorageClasses(), + kubernetesInformer.Storage().V1().CSIDrivers(), + ) + addController(mgr, "storagecapability", storageCapabilityController) + } + + // "volumesnapshot" controller + if cmOptions.IsControllerEnabled("volumesnapshot") { + volumeSnapshotController := snapshotclass.NewController( + kubernetesInformer.Storage().V1().StorageClasses(), + client.Snapshot().SnapshotV1().VolumeSnapshotClasses(), + informerFactory.SnapshotSharedInformerFactory().Snapshot().V1().VolumeSnapshotClasses(), + ) + addController(mgr, "volumesnapshot", volumeSnapshotController) + } + + // "loginrecord" controller + if cmOptions.IsControllerEnabled("loginrecord") { + loginRecordController := loginrecord.NewLoginRecordController( client.Kubernetes(), - client.Istio(), - client.KubeSphere()) - - drController = destinationrule.NewDestinationRuleController(kubernetesInformer.Apps().V1().Deployments(), - istioInformer.Networking().V1alpha3().DestinationRules(), - kubernetesInformer.Core().V1().Services(), - kubesphereInformer.Servicemesh().V1alpha2().ServicePolicies(), - client.Kubernetes(), - client.Istio(), - client.KubeSphere()) + client.KubeSphere(), + kubesphereInformer.Iam().V1alpha2().LoginRecords(), + kubesphereInformer.Iam().V1alpha2().Users(), + cmOptions.AuthenticationOptions.LoginHistoryRetentionPeriod, + cmOptions.AuthenticationOptions.LoginHistoryMaximumEntries) + addController(mgr, "loginrecord", loginRecordController) } - jobController := job.NewJobController(kubernetesInformer.Batch().V1().Jobs(), client.Kubernetes()) + // "csr" controller + if cmOptions.IsControllerEnabled("csr") { + csrController := certificatesigningrequest.NewController(client.Kubernetes(), + kubernetesInformer.Certificates().V1().CertificateSigningRequests(), + kubernetesInformer.Core().V1().ConfigMaps(), client.Config()) + addController(mgr, "csr", csrController) + } - storageCapabilityController := capability.NewController( - client.Kubernetes().StorageV1().StorageClasses(), - kubernetesInformer.Storage().V1().StorageClasses(), - kubernetesInformer.Storage().V1().CSIDrivers(), - ) + // "clusterrolebinding" controller + if cmOptions.IsControllerEnabled("clusterrolebinding") { + clusterRoleBindingController := clusterrolebinding.NewController(client.Kubernetes(), + kubernetesInformer.Rbac().V1().ClusterRoleBindings(), + kubernetesInformer.Apps().V1().Deployments(), + kubernetesInformer.Core().V1().Pods(), + kubesphereInformer.Iam().V1alpha2().Users(), + cmOptions.AuthenticationOptions.KubectlImage) + addController(mgr, "clusterrolebinding", clusterRoleBindingController) + } - volumeSnapshotController := snapshotclass.NewController( - kubernetesInformer.Storage().V1().StorageClasses(), - client.Snapshot().SnapshotV1().VolumeSnapshotClasses(), - informerFactory.SnapshotSharedInformerFactory().Snapshot().V1().VolumeSnapshotClasses(), - ) - - var fedGlobalRoleBindingCache, fedGlobalRoleCache cache.Store - var fedGlobalRoleBindingCacheController, fedGlobalRoleCacheController cache.Controller - - if multiClusterEnabled { - fedGlobalRoleClient, err := util.NewResourceClient(client.Config(), &iamv1alpha2.FedGlobalRoleResource) - if err != nil { - klog.Error(err) - return err + // "fedglobalrolecache" controller + var fedGlobalRoleCache cache.Store + var fedGlobalRoleCacheController cache.Controller + if cmOptions.IsControllerEnabled("fedglobalrolecache") { + if cmOptions.MultiClusterOptions.Enable { + fedGlobalRoleClient, err := util.NewResourceClient(client.Config(), &iamv1alpha2.FedGlobalRoleResource) + if err != nil { + klog.Fatalf("Unable to create FedGlobalRole controller: %v", err) + } + fedGlobalRoleCache, fedGlobalRoleCacheController = util.NewResourceInformer(fedGlobalRoleClient, "", + &iamv1alpha2.FedGlobalRoleResource, func(object runtimeclient.Object) {}) + go fedGlobalRoleCacheController.Run(stopCh) + addSuccessfullyControllers.Insert("fedglobalrolecache") } - fedGlobalRoleBindingClient, err := util.NewResourceClient(client.Config(), &iamv1alpha2.FedGlobalRoleBindingResource) - if err != nil { - klog.Error(err) - return err + } + + // "globalrole" controller + if cmOptions.IsControllerEnabled("globalrole") { + if cmOptions.MultiClusterOptions.Enable { + globalRoleController := globalrole.NewController(client.Kubernetes(), client.KubeSphere(), + kubesphereInformer.Iam().V1alpha2().GlobalRoles(), fedGlobalRoleCache, fedGlobalRoleCacheController) + addController(mgr, "globalrole", globalRoleController) } - - fedGlobalRoleCache, fedGlobalRoleCacheController = util.NewResourceInformer(fedGlobalRoleClient, "", &iamv1alpha2.FedGlobalRoleResource, func(object runtimeclient.Object) {}) - fedGlobalRoleBindingCache, fedGlobalRoleBindingCacheController = util.NewResourceInformer(fedGlobalRoleBindingClient, "", &iamv1alpha2.FedGlobalRoleBindingResource, func(object runtimeclient.Object) {}) - - go fedGlobalRoleCacheController.Run(stopCh) - go fedGlobalRoleBindingCacheController.Run(stopCh) } - loginRecordController := loginrecord.NewLoginRecordController( - client.Kubernetes(), - client.KubeSphere(), - kubesphereInformer.Iam().V1alpha2().LoginRecords(), - kubesphereInformer.Iam().V1alpha2().Users(), - authenticationOptions.LoginHistoryRetentionPeriod, - authenticationOptions.LoginHistoryMaximumEntries) - - csrController := certificatesigningrequest.NewController(client.Kubernetes(), - kubernetesInformer.Certificates().V1().CertificateSigningRequests(), - kubernetesInformer.Core().V1().ConfigMaps(), client.Config()) - - clusterRoleBindingController := clusterrolebinding.NewController(client.Kubernetes(), - kubernetesInformer.Rbac().V1().ClusterRoleBindings(), - kubernetesInformer.Apps().V1().Deployments(), - kubernetesInformer.Core().V1().Pods(), - kubesphereInformer.Iam().V1alpha2().Users(), - kubectlImage) - - globalRoleController := globalrole.NewController(client.Kubernetes(), client.KubeSphere(), - kubesphereInformer.Iam().V1alpha2().GlobalRoles(), fedGlobalRoleCache, fedGlobalRoleCacheController) - - globalRoleBindingController := globalrolebinding.NewController(client.Kubernetes(), client.KubeSphere(), - kubesphereInformer.Iam().V1alpha2().GlobalRoleBindings(), - fedGlobalRoleBindingCache, fedGlobalRoleBindingCacheController, - multiClusterEnabled) - - groupBindingController := groupbinding.NewController(client.Kubernetes(), client.KubeSphere(), - kubesphereInformer.Iam().V1alpha2().GroupBindings(), - kubesphereInformer.Types().V1beta1().FederatedGroupBindings(), - multiClusterEnabled) - - groupController := group.NewController(client.Kubernetes(), client.KubeSphere(), - kubesphereInformer.Iam().V1alpha2().Groups(), - kubesphereInformer.Types().V1beta1().FederatedGroups(), - multiClusterEnabled) - - var clusterController manager.Runnable - if multiClusterEnabled { - clusterController = cluster.NewClusterController( - client.Kubernetes(), - client.Config(), - kubesphereInformer.Cluster().V1alpha1().Clusters(), - client.KubeSphere().ClusterV1alpha1().Clusters(), - multiClusterOptions.ClusterControllerResyncPeriod, - multiClusterOptions.HostClusterName) - } - - var nsnpController manager.Runnable - if networkOptions.EnableNetworkPolicy { - nsnpProvider, err := provider.NewNsNetworkPolicyProvider(client.Kubernetes(), kubernetesInformer.Networking().V1().NetworkPolicies()) - if err != nil { - return err + // "fedglobalrolebindingcache" controller + var fedGlobalRoleBindingCache cache.Store + var fedGlobalRoleBindingCacheController cache.Controller + if cmOptions.IsControllerEnabled("fedglobalrolebindingcache") { + if cmOptions.MultiClusterOptions.Enable { + fedGlobalRoleBindingClient, err := util.NewResourceClient(client.Config(), &iamv1alpha2.FedGlobalRoleBindingResource) + if err != nil { + klog.Fatalf("Unable to create FedGlobalRoleBinding controller: %v", err) + } + fedGlobalRoleBindingCache, fedGlobalRoleBindingCacheController = util.NewResourceInformer(fedGlobalRoleBindingClient, "", + &iamv1alpha2.FedGlobalRoleBindingResource, func(object runtimeclient.Object) {}) + go fedGlobalRoleBindingCacheController.Run(stopCh) + addSuccessfullyControllers.Insert("fedglobalrolebindingcache") } - - nsnpController = nsnetworkpolicy.NewNSNetworkPolicyController(client.Kubernetes(), - client.KubeSphere().NetworkV1alpha1(), - kubesphereInformer.Network().V1alpha1().NamespaceNetworkPolicies(), - kubernetesInformer.Core().V1().Services(), - kubernetesInformer.Core().V1().Nodes(), - kubesphereInformer.Tenant().V1alpha1().Workspaces(), - kubernetesInformer.Core().V1().Namespaces(), nsnpProvider, networkOptions.NSNPOptions) } - var ippoolController manager.Runnable - ippoolProvider := ippoolclient.NewProvider(kubernetesInformer, client.KubeSphere(), client.Kubernetes(), networkOptions.IPPoolType, options) - if ippoolProvider != nil { - ippoolController = ippool.NewIPPoolController(kubesphereInformer, kubernetesInformer, client.Kubernetes(), client.KubeSphere(), ippoolProvider) + // "globalrolebinding" controller + if cmOptions.IsControllerEnabled("globalrolebinding") { + globalRoleBindingController := globalrolebinding.NewController(client.Kubernetes(), client.KubeSphere(), + kubesphereInformer.Iam().V1alpha2().GlobalRoleBindings(), + fedGlobalRoleBindingCache, fedGlobalRoleBindingCacheController, + cmOptions.MultiClusterOptions.Enable) + addController(mgr, "globalrolebinding", globalRoleBindingController) } - controllers := map[string]manager.Runnable{ - "virtualservice-controller": vsController, - "destinationrule-controller": drController, - "job-controller": jobController, - "storagecapability-controller": storageCapabilityController, - "volumesnapshot-controller": volumeSnapshotController, - "loginrecord-controller": loginRecordController, - "cluster-controller": clusterController, - "nsnp-controller": nsnpController, - "csr-controller": csrController, - "clusterrolebinding-controller": clusterRoleBindingController, - "globalrolebinding-controller": globalRoleBindingController, - "ippool-controller": ippoolController, - "groupbinding-controller": groupBindingController, - "group-controller": groupController, + // "groupbinding" controller + if cmOptions.IsControllerEnabled("groupbinding") { + groupBindingController := groupbinding.NewController(client.Kubernetes(), client.KubeSphere(), + kubesphereInformer.Iam().V1alpha2().GroupBindings(), + kubesphereInformer.Types().V1beta1().FederatedGroupBindings(), + cmOptions.MultiClusterOptions.Enable) + addController(mgr, "groupbinding", groupBindingController) } - if multiClusterEnabled { - controllers["globalrole-controller"] = globalRoleController - notificationController, err := notification.NewController(client.Kubernetes(), mgr.GetClient(), mgr.GetCache()) - if err != nil { - return err + // "group" controller + if cmOptions.IsControllerEnabled("group") { + groupController := group.NewController(client.Kubernetes(), client.KubeSphere(), + kubesphereInformer.Iam().V1alpha2().Groups(), + kubesphereInformer.Types().V1beta1().FederatedGroups(), + cmOptions.MultiClusterOptions.Enable) + addController(mgr, "group", groupController) + } + + // "cluster" controller + if cmOptions.IsControllerEnabled("cluster") { + if cmOptions.MultiClusterOptions.Enable { + clusterController := cluster.NewClusterController( + client.Kubernetes(), + client.Config(), + kubesphereInformer.Cluster().V1alpha1().Clusters(), + client.KubeSphere().ClusterV1alpha1().Clusters(), + cmOptions.MultiClusterOptions.ClusterControllerResyncPeriod, + cmOptions.MultiClusterOptions.HostClusterName) + addController(mgr, "cluster", clusterController) } - controllers["notification-controller"] = notificationController } - for name, ctrl := range controllers { - if ctrl == nil { - klog.V(4).Infof("%s is not going to run due to dependent component disabled.", name) - continue - } + // "nsnp" controller + if cmOptions.IsControllerEnabled("nsnp") { + if cmOptions.NetworkOptions.EnableNetworkPolicy { + nsnpProvider, err := provider.NewNsNetworkPolicyProvider(client.Kubernetes(), kubernetesInformer.Networking().V1().NetworkPolicies()) + if err != nil { + klog.Fatalf("Unable to create NSNetworkPolicy controller: %v", err) + } - if err := mgr.Add(ctrl); err != nil { - klog.Error(err, "add controller to manager failed", "name", name) - return err + nsnpController := nsnetworkpolicy.NewNSNetworkPolicyController(client.Kubernetes(), + client.KubeSphere().NetworkV1alpha1(), + kubesphereInformer.Network().V1alpha1().NamespaceNetworkPolicies(), + kubernetesInformer.Core().V1().Services(), + kubernetesInformer.Core().V1().Nodes(), + kubesphereInformer.Tenant().V1alpha1().Workspaces(), + kubernetesInformer.Core().V1().Namespaces(), nsnpProvider, cmOptions.NetworkOptions.NSNPOptions) + addController(mgr, "nsnp", nsnpController) + } + } + + // "ippool" controller + if cmOptions.IsControllerEnabled("ippool") { + ippoolProvider := ippoolclient.NewProvider(kubernetesInformer, client.KubeSphere(), client.Kubernetes(), + cmOptions.NetworkOptions.IPPoolType, cmOptions.KubernetesOptions) + if ippoolProvider != nil { + ippoolController := ippool.NewIPPoolController(kubesphereInformer, kubernetesInformer, client.Kubernetes(), + client.KubeSphere(), ippoolProvider) + addController(mgr, "ippool", ippoolController) + } + } + + // "notification" controller + if cmOptions.IsControllerEnabled("notification") { + if cmOptions.MultiClusterOptions.Enable { + notificationController, err := notification.NewController(client.Kubernetes(), mgr.GetClient(), mgr.GetCache()) + if err != nil { + klog.Fatalf("Unable to create Notification controller: %v", err) + } + addController(mgr, "notification", notificationController) + } + } + + // log all controllers process result + for _, name := range allControllers { + if cmOptions.IsControllerEnabled(name) { + if addSuccessfullyControllers.Has(name) { + klog.Infof("%s controller is enabled and added successfully.", name) + } else { + klog.Infof("%s controller is enabled but is not going to run due to its dependent component being disabled.", name) + } + } else { + klog.Infof("%s controller is disabled by controller selectors.", name) } } return nil } + + +var addSuccessfullyControllers = sets.NewString() + +type setupableController interface { + SetupWithManager(mgr ctrl.Manager) error +} + +func addControllerWithSetup(mgr manager.Manager, name string, controller setupableController) { + if err := controller.SetupWithManager(mgr); err != nil { + klog.Fatalf("Unable to create %v controller: %v", name, err) + } + addSuccessfullyControllers.Insert(name) +} + +func addController(mgr manager.Manager, name string, controller manager.Runnable) { + if err := mgr.Add(controller); err != nil { + klog.Fatalf("Unable to create %v controller: %v", name, err) + } + addSuccessfullyControllers.Insert(name) +} diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 239fbb806..afe88566e 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -18,6 +18,8 @@ package options import ( "flag" + "fmt" + "k8s.io/apimachinery/pkg/util/sets" "strings" "time" @@ -64,6 +66,16 @@ type KubeSphereControllerManagerOptions struct { // "kubesphere.io/creator=" means reconcile applications with this label key // "!kubesphere.io/creator" means exclude applications with this key ApplicationSelector string + + // ControllerSelectors is the list of controller selectors to enable or disable controller. + // '*' means "all enabled by default controllers" + // 'foo' means "enable 'foo'" + // '-foo' means "disable 'foo'" + // first item for a particular name wins. + // e.g. '-foo,foo' means "disable foo", 'foo,-foo' means "enable foo" + // * has the lowest priority. + // e.g. *,-foo, means "disable 'foo'" + ControllerSelectors []string } func NewKubeSphereControllerManagerOptions() *KubeSphereControllerManagerOptions { @@ -86,12 +98,13 @@ func NewKubeSphereControllerManagerOptions() *KubeSphereControllerManagerOptions LeaderElect: false, WebhookCertDir: "", ApplicationSelector: "", + ControllerSelectors: []string{"*"}, } return s } -func (s *KubeSphereControllerManagerOptions) Flags() cliflag.NamedFlagSets { +func (s *KubeSphereControllerManagerOptions) Flags(allControllerNameSelectors []string) cliflag.NamedFlagSets { fss := cliflag.NamedFlagSets{} s.KubernetesOptions.AddFlags(fss.FlagSet("kubernetes"), s.KubernetesOptions) @@ -120,6 +133,10 @@ func (s *KubeSphereControllerManagerOptions) Flags() cliflag.NamedFlagSets { gfs.StringVar(&s.ApplicationSelector, "application-selector", s.ApplicationSelector, ""+ "Only reconcile application(sigs.k8s.io/application) objects match given selector, this could avoid conflicts with "+ "other projects built on top of sig-application. Default behavior is to reconcile all of application objects.") + gfs.StringSliceVar(&s.ControllerSelectors, "controllers", []string{"*"}, fmt.Sprintf(""+ + "A list of controllers to enable. '*' enables all on-by-default controllers, 'foo' enables the controller "+ + "named 'foo', '-foo' disables the controller named 'foo'.\nAll controllers: %s", + strings.Join(allControllerNameSelectors, ", "))) kfs := fss.FlagSet("klog") local := flag.NewFlagSet("klog", flag.ExitOnError) @@ -132,26 +149,58 @@ func (s *KubeSphereControllerManagerOptions) Flags() cliflag.NamedFlagSets { return fss } -func (s *KubeSphereControllerManagerOptions) Validate() []error { +// Validate Options and Genetic Options +func (o *KubeSphereControllerManagerOptions) Validate(allControllerNameSelectors []string) []error { var errs []error - errs = append(errs, s.DevopsOptions.Validate()...) - errs = append(errs, s.KubernetesOptions.Validate()...) - errs = append(errs, s.S3Options.Validate()...) - errs = append(errs, s.OpenPitrixOptions.Validate()...) - errs = append(errs, s.NetworkOptions.Validate()...) - errs = append(errs, s.LdapOptions.Validate()...) - errs = append(errs, s.MultiClusterOptions.Validate()...) + errs = append(errs, o.DevopsOptions.Validate()...) + errs = append(errs, o.KubernetesOptions.Validate()...) + errs = append(errs, o.S3Options.Validate()...) + errs = append(errs, o.OpenPitrixOptions.Validate()...) + errs = append(errs, o.NetworkOptions.Validate()...) + errs = append(errs, o.LdapOptions.Validate()...) + errs = append(errs, o.MultiClusterOptions.Validate()...) - if len(s.ApplicationSelector) != 0 { - _, err := labels.Parse(s.ApplicationSelector) + // genetic option: application-selector + if len(o.ApplicationSelector) != 0 { + _, err := labels.Parse(o.ApplicationSelector) if err != nil { errs = append(errs, err) } } + // genetic option: controllers, check all selectors are valid + allControllersNameSet := sets.NewString(allControllerNameSelectors...) + for _, selector := range o.ControllerSelectors { + if selector == "*" { + continue + } + selector = strings.TrimPrefix(selector, "-") + if !allControllersNameSet.Has(selector) { + errs = append(errs, fmt.Errorf("%q is not in the list of known controllers", selector)) + } + } + return errs } +// IsControllerEnabled check if a specified controller enabled or not. +func (o *KubeSphereControllerManagerOptions) IsControllerEnabled(name string) bool { + hasStar := false + for _, ctrl := range o.ControllerSelectors { + if ctrl == name { + return true + } + if ctrl == "-" + name { + return false + } + if ctrl == "*" { + hasStar = true + } + } + + return hasStar +} + func (s *KubeSphereControllerManagerOptions) bindLeaderElectionFlags(l *leaderelection.LeaderElectionConfig, fs *pflag.FlagSet) { fs.DurationVar(&l.LeaseDuration, "leader-elect-lease-duration", l.LeaseDuration, ""+ "The duration that non-leader candidates will wait after observing a leadership "+ @@ -167,3 +216,4 @@ func (s *KubeSphereControllerManagerOptions) bindLeaderElectionFlags(l *leaderel "The duration the clients should wait between attempting acquisition and renewal "+ "of a leadership. This is only applicable if leader election is enabled.") } + diff --git a/cmd/controller-manager/app/options/options_test.go b/cmd/controller-manager/app/options/options_test.go new file mode 100644 index 000000000..e03796758 --- /dev/null +++ b/cmd/controller-manager/app/options/options_test.go @@ -0,0 +1,66 @@ +package options + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +// ref: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/controller-manager/app/helper_test.go +func TestIsControllerEnabled(t *testing.T) { + testcases := []struct { + name string + controllerName string + controllerFlags []string + expected bool + }{ + { + name: "on by name", + controllerName: "bravo", + controllerFlags: []string{"alpha", "bravo", "-charlie"}, + expected: true, + }, + { + name: "off by name", + controllerName: "charlie", + controllerFlags: []string{"alpha", "bravo", "-charlie"}, + expected: false, + }, + { + name: "on by default", + controllerName: "alpha", + controllerFlags: []string{"*"}, + expected: true, + }, + { + name: "on by star, not off by name", + controllerName: "alpha", + controllerFlags: []string{"*", "-charlie"}, + expected: true, + }, + { + name: "off by name with star", + controllerName: "charlie", + controllerFlags: []string{"*", "-charlie"}, + expected: false, + }, + { + name: "off then on", + controllerName: "alpha", + controllerFlags: []string{"-alpha", "alpha"}, + expected: false, + }, + { + name: "on then off", + controllerName: "alpha", + controllerFlags: []string{"alpha", "-alpha"}, + expected: true, + }, + } + + for _, tc := range testcases { + option := NewKubeSphereControllerManagerOptions() + option.ControllerSelectors = tc.controllerFlags + actual := option.IsControllerEnabled(tc.controllerName) + assert.Equal(t, tc.expected, actual, "%v: expected %v, got %v", tc.name, tc.expected, actual) + } +} diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index a670c9060..28cb04722 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -21,11 +21,8 @@ import ( "fmt" "os" - "kubesphere.io/kubesphere/pkg/models/kubeconfig" - "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" cliflag "k8s.io/component-base/cli/flag" "k8s.io/klog" @@ -38,26 +35,11 @@ import ( "kubesphere.io/kubesphere/cmd/controller-manager/app/options" "kubesphere.io/kubesphere/pkg/apis" controllerconfig "kubesphere.io/kubesphere/pkg/apiserver/config" - "kubesphere.io/kubesphere/pkg/controller/application" - "kubesphere.io/kubesphere/pkg/controller/helm" - "kubesphere.io/kubesphere/pkg/controller/namespace" "kubesphere.io/kubesphere/pkg/controller/network/webhooks" - "kubesphere.io/kubesphere/pkg/controller/openpitrix/helmapplication" - "kubesphere.io/kubesphere/pkg/controller/openpitrix/helmcategory" - "kubesphere.io/kubesphere/pkg/controller/openpitrix/helmrelease" - "kubesphere.io/kubesphere/pkg/controller/openpitrix/helmrepo" "kubesphere.io/kubesphere/pkg/controller/quota" - "kubesphere.io/kubesphere/pkg/controller/serviceaccount" "kubesphere.io/kubesphere/pkg/controller/user" - "kubesphere.io/kubesphere/pkg/controller/workspace" - "kubesphere.io/kubesphere/pkg/controller/workspacerole" - "kubesphere.io/kubesphere/pkg/controller/workspacerolebinding" - "kubesphere.io/kubesphere/pkg/controller/workspacetemplate" "kubesphere.io/kubesphere/pkg/informers" - "kubesphere.io/kubesphere/pkg/simple/client/devops" - "kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins" "kubesphere.io/kubesphere/pkg/simple/client/k8s" - ldapclient "kubesphere.io/kubesphere/pkg/simple/client/ldap" "kubesphere.io/kubesphere/pkg/simple/client/s3" "kubesphere.io/kubesphere/pkg/utils/metrics" "kubesphere.io/kubesphere/pkg/utils/term" @@ -92,7 +74,7 @@ func NewControllerManagerCommand() *cobra.Command { Use: "controller-manager", Long: `KubeSphere controller manager is a daemon that`, Run: func(cmd *cobra.Command, args []string) { - if errs := s.Validate(); len(errs) != 0 { + if errs := s.Validate(allControllers); len(errs) != 0 { klog.Error(utilerrors.NewAggregate(errs)) os.Exit(1) } @@ -106,7 +88,7 @@ func NewControllerManagerCommand() *cobra.Command { } fs := cmd.Flags() - namedFlagSets := s.Flags() + namedFlagSets := s.Flags(allControllers) for _, f := range namedFlagSets.FlagSets { fs.AddFlagSet(f) @@ -140,32 +122,8 @@ func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) err return err } - var devopsClient devops.Interface - if s.DevopsOptions != nil && len(s.DevopsOptions.Host) != 0 { - devopsClient, err = jenkins.NewDevopsClient(s.DevopsOptions) - if err != nil { - return fmt.Errorf("failed to connect jenkins, please check jenkins status, error: %v", err) - } - } - - var ldapClient ldapclient.Interface - // when there is no ldapOption, we set ldapClient as nil, which means we don't need to sync user info into ldap. - if s.LdapOptions != nil && len(s.LdapOptions.Host) != 0 { - if s.LdapOptions.Host == ldapclient.FAKE_HOST { // for debug only - ldapClient = ldapclient.NewSimpleLdap() - } else { - ldapClient, err = ldapclient.NewLdapClient(s.LdapOptions, ctx.Done()) - if err != nil { - return fmt.Errorf("failed to connect to ldap service, please check ldap status, error: %v", err) - } - } - } else { - klog.Warning("ks-controller-manager starts without ldap provided, it will not sync user into ldap") - } - - var s3Client s3.Interface if s.S3Options != nil && len(s.S3Options.Endpoint) != 0 { - s3Client, err = s3.NewS3Client(s.S3Options) + _, err = s3.NewS3Client(s.S3Options) if err != nil { return fmt.Errorf("failed to connect to s3, please check s3 service status, error: %v", err) } @@ -212,130 +170,13 @@ func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) err // register common meta types into schemas. metav1.AddToGroupVersion(mgr.GetScheme(), metav1.SchemeGroupVersion) - kubeconfigClient := kubeconfig.NewOperator(kubernetesClient.Kubernetes(), - informerFactory.KubernetesSharedInformerFactory().Core().V1().ConfigMaps().Lister(), - kubernetesClient.Config()) - userController := user.Reconciler{ - MultiClusterEnabled: s.MultiClusterOptions.Enable, - MaxConcurrentReconciles: 4, - LdapClient: ldapClient, - DevopsClient: devopsClient, - KubeconfigClient: kubeconfigClient, - AuthenticationOptions: s.AuthenticationOptions, - } - - if err = userController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Unable to create user controller: %v", err) - } - - workspaceTemplateReconciler := &workspacetemplate.Reconciler{MultiClusterEnabled: s.MultiClusterOptions.Enable} - if err = workspaceTemplateReconciler.SetupWithManager(mgr); err != nil { - klog.Fatalf("Unable to create workspace template controller: %v", err) - } - - workspaceReconciler := &workspace.Reconciler{} - if err = workspaceReconciler.SetupWithManager(mgr); err != nil { - klog.Fatalf("Unable to create workspace controller: %v", err) - } - - workspaceRoleReconciler := &workspacerole.Reconciler{MultiClusterEnabled: s.MultiClusterOptions.Enable} - if err = workspaceRoleReconciler.SetupWithManager(mgr); err != nil { - klog.Fatalf("Unable to create workspace role controller: %v", err) - } - - workspaceRoleBindingReconciler := &workspacerolebinding.Reconciler{MultiClusterEnabled: s.MultiClusterOptions.Enable} - if err = workspaceRoleBindingReconciler.SetupWithManager(mgr); err != nil { - klog.Fatalf("Unable to create workspace role binding controller: %v", err) - } - - namespaceReconciler := &namespace.Reconciler{} - if err = namespaceReconciler.SetupWithManager(mgr); err != nil { - klog.Fatalf("Unable to create namespace controller: %v", err) - } - - err = helmrepo.Add(mgr) - if err != nil { - klog.Fatal("Unable to create helm repo controller") - } - - err = helmcategory.Add(mgr) - if err != nil { - klog.Fatal("Unable to create helm category controller") - } - - var opS3Client s3.Interface - if !s.OpenPitrixOptions.AppStoreConfIsEmpty() { - opS3Client, err = s3.NewS3Client(s.OpenPitrixOptions.S3Options) - if err != nil { - klog.Fatalf("failed to connect to s3, please check openpitrix s3 service status, error: %v", err) - } - err = (&helmapplication.ReconcileHelmApplication{}).SetupWithManager(mgr) - if err != nil { - klog.Fatalf("Unable to create helm application controller, error: %s", err) - } - - err = (&helmapplication.ReconcileHelmApplicationVersion{}).SetupWithManager(mgr) - if err != nil { - klog.Fatalf("Unable to create helm application version controller, error: %s ", err) - } - } - - err = (&helmrelease.ReconcileHelmRelease{ - // nil interface is valid value. - StorageClient: opS3Client, - KsFactory: informerFactory.KubeSphereSharedInformerFactory(), - MultiClusterEnable: s.MultiClusterOptions.Enable, - WaitTime: s.OpenPitrixOptions.ReleaseControllerOptions.WaitTime, - MaxConcurrent: s.OpenPitrixOptions.ReleaseControllerOptions.MaxConcurrent, - StopChan: ctx.Done(), - }).SetupWithManager(mgr) - - if err != nil { - klog.Fatalf("Unable to create helm release controller, error: %s", err) - } - - selector, _ := labels.Parse(s.ApplicationSelector) - applicationReconciler := &application.ApplicationReconciler{ - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Mapper: mgr.GetRESTMapper(), - ApplicationSelector: selector, - } - if err = applicationReconciler.SetupWithManager(mgr); err != nil { - klog.Fatalf("Unable to create application controller: %v", err) - } - - saReconciler := &serviceaccount.Reconciler{} - if err = saReconciler.SetupWithManager(mgr); err != nil { - klog.Fatalf("Unable to create ServiceAccount controller: %v", err) - } - - resourceQuotaReconciler := quota.Reconciler{} - if err := resourceQuotaReconciler.SetupWithManager(mgr, quota.DefaultMaxConcurrentReconciles, quota.DefaultResyncPeriod, informerFactory.KubernetesSharedInformerFactory()); err != nil { - klog.Fatalf("Unable to create ResourceQuota controller: %v", err) - } - - if !s.GatewayOptions.IsEmpty() { - helmReconciler := helm.Reconciler{GatewayOptions: s.GatewayOptions} - if err := helmReconciler.SetupWithManager(mgr); err != nil { - klog.Fatalf("Unable to create helm controller: %v", err) - } - } - // TODO(jeff): refactor config with CRD - servicemeshEnabled := s.ServiceMeshOptions != nil && len(s.ServiceMeshOptions.IstioPilotHost) != 0 - if err = addControllers(mgr, + // install all controllers + if err = addAllControllers(mgr, kubernetesClient, informerFactory, - devopsClient, - s3Client, - ldapClient, - s.KubernetesOptions, - s.AuthenticationOptions, - s.MultiClusterOptions, - s.NetworkOptions, - servicemeshEnabled, - s.AuthenticationOptions.KubectlImage, ctx.Done()); err != nil { + s, + ctx.Done()); err != nil { klog.Fatalf("unable to register controllers to the manager: %v", err) } diff --git a/pkg/controller/openpitrix/helmcategory/helm_category_controller.go b/pkg/controller/openpitrix/helmcategory/helm_category_controller.go index 2de04ee8f..b3b81ff77 100644 --- a/pkg/controller/openpitrix/helmcategory/helm_category_controller.go +++ b/pkg/controller/openpitrix/helmcategory/helm_category_controller.go @@ -18,11 +18,11 @@ package helmcategory import ( "context" + ctrl "sigs.k8s.io/controller-runtime" "time" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -46,15 +46,6 @@ const ( HelmCategoryFinalizer = "helmcategories.application.kubesphere.io" ) -func Add(mgr manager.Manager) error { - return add(mgr, newReconciler(mgr)) -} - -// newReconciler returns a new reconcile.Reconciler -func newReconciler(mgr manager.Manager) reconcile.Reconciler { - return &ReconcileHelmCategory{Client: mgr.GetClient(), Scheme: mgr.GetScheme()} -} - // add adds a new Controller to mgr with r as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler) error { // Create a new controller @@ -185,11 +176,16 @@ var _ reconcile.Reconciler = &ReconcileHelmCategory{} // ReconcileWorkspace reconciles a Workspace object type ReconcileHelmCategory struct { client.Client - Scheme *runtime.Scheme + //Scheme *runtime.Scheme recorder record.EventRecorder config *rest.Config } +func (r *ReconcileHelmCategory) SetupWithManager(mgr ctrl.Manager) error { + r.Client = mgr.GetClient() + return add(mgr, r) +} + // Reconcile reads that state of the cluster for a helmcategories object and makes changes based on the state read // and what is in the helmreleases.Spec // +kubebuilder:rbac:groups=application.kubesphere.io,resources=helmcategories,verbs=get;list;watch;create;update;patch;delete diff --git a/pkg/controller/openpitrix/helmrepo/helm_repo_controller.go b/pkg/controller/openpitrix/helmrepo/helm_repo_controller.go index bca75aecd..2d5383de1 100644 --- a/pkg/controller/openpitrix/helmrepo/helm_repo_controller.go +++ b/pkg/controller/openpitrix/helmrepo/helm_repo_controller.go @@ -19,6 +19,7 @@ package helmrepo import ( "context" "math" + ctrl "sigs.k8s.io/controller-runtime" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -57,20 +58,6 @@ const ( HelmRepoFinalizer = "helmrepo.application.kubesphere.io" ) -// Add creates a new Workspace Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller -// and Start it when the Manager is Started. -func Add(mgr manager.Manager) error { - return add(mgr, newReconciler(mgr)) -} - -// newReconciler returns a new reconcile.Reconciler -func newReconciler(mgr manager.Manager) reconcile.Reconciler { - return &ReconcileHelmRepo{Client: mgr.GetClient(), scheme: mgr.GetScheme(), - recorder: mgr.GetEventRecorderFor("workspace-controller"), - config: mgr.GetConfig(), - } -} - // add adds a new Controller to mgr with r as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler) error { // Create a new controller @@ -98,6 +85,13 @@ type ReconcileHelmRepo struct { config *rest.Config } +func (r *ReconcileHelmRepo) SetupWithManager(mgr ctrl.Manager) error { + r.scheme = mgr.GetScheme() + r.recorder = mgr.GetEventRecorderFor("workspace-controller") + r.config = mgr.GetConfig() + return add(mgr, r) +} + // Reconcile reads that state of the cluster for a helmrepoes object and makes changes based on the state read // and what is in the helmreleases.Spec // +kubebuilder:rbac:groups=application.kubesphere.io,resources=helmrepos,verbs=get;list;watch;create;update;patch;delete diff --git a/pkg/controller/quota/resourcequota_controller.go b/pkg/controller/quota/resourcequota_controller.go index bdf82aaf9..36bb7c1de 100644 --- a/pkg/controller/quota/resourcequota_controller.go +++ b/pkg/controller/quota/resourcequota_controller.go @@ -70,32 +70,34 @@ type Reconciler struct { client.Client logger logr.Logger recorder record.EventRecorder - maxConcurrentReconciles int // Knows how to calculate usage registry quotav1.Registry + + MaxConcurrentReconciles int // Controls full recalculation of quota usage - resyncPeriod time.Duration + ResyncPeriod time.Duration + InformerFactory k8sinformers.SharedInformerFactory + scheme *runtime.Scheme } -func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, maxConcurrentReconciles int, resyncPeriod time.Duration, informerFactory k8sinformers.SharedInformerFactory) error { +func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { r.logger = ctrl.Log.WithName("controllers").WithName(ControllerName) r.recorder = mgr.GetEventRecorderFor(ControllerName) r.scheme = mgr.GetScheme() - r.registry = generic.NewRegistry(install.NewQuotaConfigurationForControllers(generic.ListerFuncForResourceFunc(informerFactory.ForResource)).Evaluators()) + r.registry = generic.NewRegistry(install.NewQuotaConfigurationForControllers( + generic.ListerFuncForResourceFunc(r.InformerFactory.ForResource)).Evaluators()) if r.Client == nil { r.Client = mgr.GetClient() } - if maxConcurrentReconciles > 0 { - r.maxConcurrentReconciles = maxConcurrentReconciles - } else { - r.maxConcurrentReconciles = DefaultMaxConcurrentReconciles + if r.MaxConcurrentReconciles <= 0 { + r.MaxConcurrentReconciles = DefaultMaxConcurrentReconciles } - r.resyncPeriod = time.Duration(math.Max(float64(resyncPeriod), float64(DefaultResyncPeriod))) + r.ResyncPeriod = time.Duration(math.Max(float64(r.ResyncPeriod), float64(DefaultResyncPeriod))) c, err := ctrl.NewControllerManagedBy(mgr). Named(ControllerName). WithOptions(controller.Options{ - MaxConcurrentReconciles: r.maxConcurrentReconciles, + MaxConcurrentReconciles: r.MaxConcurrentReconciles, }). For("av1alpha2.ResourceQuota{}). WithEventFilter(predicate.GenerationChangedPredicate{ @@ -206,7 +208,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } r.recorder.Event(resourceQuota, corev1.EventTypeNormal, "Synced", "Synced successfully") - return ctrl.Result{RequeueAfter: r.resyncPeriod}, nil + return ctrl.Result{RequeueAfter: r.ResyncPeriod}, nil } func (r *Reconciler) bindWorkspace(resourceQuota *quotav1alpha2.ResourceQuota) error {