add --controllers option in ks-controller-manager

imple controller enable/disable check logic
add unittest for selective controller enable/disable
move all controllers init code to a single place
This commit is contained in:
live77
2021-12-01 23:38:30 +08:00
parent 6ed02d3059
commit 81db894741
7 changed files with 618 additions and 374 deletions

View File

@@ -17,16 +17,38 @@ limitations under the License.
package app
import (
"fmt"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"kubesphere.io/kubesphere/cmd/controller-manager/app/options"
"kubesphere.io/kubesphere/pkg/controller/application"
"kubesphere.io/kubesphere/pkg/controller/helm"
"kubesphere.io/kubesphere/pkg/controller/namespace"
"kubesphere.io/kubesphere/pkg/controller/openpitrix/helmapplication"
"kubesphere.io/kubesphere/pkg/controller/openpitrix/helmcategory"
"kubesphere.io/kubesphere/pkg/controller/openpitrix/helmrelease"
"kubesphere.io/kubesphere/pkg/controller/openpitrix/helmrepo"
"kubesphere.io/kubesphere/pkg/controller/quota"
"kubesphere.io/kubesphere/pkg/controller/serviceaccount"
"kubesphere.io/kubesphere/pkg/controller/user"
"kubesphere.io/kubesphere/pkg/controller/workspace"
"kubesphere.io/kubesphere/pkg/controller/workspacerole"
"kubesphere.io/kubesphere/pkg/controller/workspacerolebinding"
"kubesphere.io/kubesphere/pkg/controller/workspacetemplate"
"kubesphere.io/kubesphere/pkg/models/kubeconfig"
"kubesphere.io/kubesphere/pkg/simple/client/devops"
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
ldapclient "kubesphere.io/kubesphere/pkg/simple/client/ldap"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
ctrl "sigs.k8s.io/controller-runtime"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/kubefed/pkg/controller/util"
"kubesphere.io/kubesphere/pkg/controller/storage/snapshotclass"
"kubesphere.io/kubesphere/pkg/apiserver/authentication"
iamv1alpha2 "kubesphere.io/api/iam/v1alpha2"
"kubesphere.io/kubesphere/pkg/controller/certificatesigningrequest"
@@ -46,198 +68,471 @@ import (
"kubesphere.io/kubesphere/pkg/controller/storage/capability"
"kubesphere.io/kubesphere/pkg/controller/virtualservice"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/simple/client/devops"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
ldapclient "kubesphere.io/kubesphere/pkg/simple/client/ldap"
"kubesphere.io/kubesphere/pkg/simple/client/multicluster"
"kubesphere.io/kubesphere/pkg/simple/client/network"
ippoolclient "kubesphere.io/kubesphere/pkg/simple/client/network/ippool"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
)
func addControllers(
mgr manager.Manager,
client k8s.Client,
informerFactory informers.InformerFactory,
devopsClient devops.Interface,
s3Client s3.Interface,
ldapClient ldapclient.Interface,
options *k8s.KubernetesOptions,
authenticationOptions *authentication.Options,
multiClusterOptions *multicluster.Options,
networkOptions *network.Options,
serviceMeshEnabled bool,
kubectlImage string,
stopCh <-chan struct{}) error {
var allControllers = []string {
"user",
"workspacetemplate",
"workspace",
"workspacerole",
"workspacerolebinding",
"namespace",
"helmrepo",
"helmcategory",
"helmapplication",
"helmapplicationversion",
"helmrelease",
"helm",
"application",
"serviceaccount",
"resourcequota",
"virtualservice",
"destinationrule",
"job",
"storagecapability",
"volumesnapshot",
"loginrecord",
"cluster",
"nsnp",
"ippool",
"csr",
"clusterrolebinding",
"fedglobalrolecache",
"globalrole",
"fedglobalrolebindingcache",
"globalrolebinding",
"groupbinding",
"group",
"notification",
}
// setup all available controllers one by one
func addAllControllers(mgr manager.Manager, client k8s.Client, informerFactory informers.InformerFactory,
cmOptions *options.KubeSphereControllerManagerOptions,
stopCh <-chan struct{}) error {
var err error
////////////////////////////////////
// begin init necessary informers
////////////////////////////////////
kubernetesInformer := informerFactory.KubernetesSharedInformerFactory()
istioInformer := informerFactory.IstioSharedInformerFactory()
kubesphereInformer := informerFactory.KubeSphereSharedInformerFactory()
////////////////////////////////////
// end informers
////////////////////////////////////
multiClusterEnabled := multiClusterOptions.Enable
////////////////////////////////////
// begin init necessary clients
////////////////////////////////////
kubeconfigClient := kubeconfig.NewOperator(client.Kubernetes(),
informerFactory.KubernetesSharedInformerFactory().Core().V1().ConfigMaps().Lister(),
client.Config())
var vsController, drController manager.Runnable
var devopsClient devops.Interface
if cmOptions.DevopsOptions != nil && len(cmOptions.DevopsOptions.Host) != 0 {
devopsClient, err = jenkins.NewDevopsClient(cmOptions.DevopsOptions)
if err != nil {
return fmt.Errorf("failed to connect jenkins, please check jenkins status, error: %v", err)
}
}
var ldapClient ldapclient.Interface
// when there is no ldapOption, we set ldapClient as nil, which means we don't need to sync user info into ldap.
if cmOptions.LdapOptions != nil && len(cmOptions.LdapOptions.Host) != 0 {
if cmOptions.LdapOptions.Host == ldapclient.FAKE_HOST { // for debug only
ldapClient = ldapclient.NewSimpleLdap()
} else {
ldapClient, err = ldapclient.NewLdapClient(cmOptions.LdapOptions, stopCh)
if err != nil {
return fmt.Errorf("failed to connect to ldap service, please check ldap status, error: %v", err)
}
}
} else {
klog.Warning("ks-controller-manager starts without ldap provided, it will not sync user into ldap")
}
////////////////////////////////////
// end init clients
////////////////////////////////////
////////////////////////////////////////////////////////
// begin init controller and add to manager one by one
////////////////////////////////////////////////////////
// "user" controller
if cmOptions.IsControllerEnabled("user") {
userController := &user.Reconciler{
MultiClusterEnabled: cmOptions.MultiClusterOptions.Enable,
MaxConcurrentReconciles: 4,
LdapClient: ldapClient,
DevopsClient: devopsClient,
KubeconfigClient: kubeconfigClient,
AuthenticationOptions: cmOptions.AuthenticationOptions,
}
addControllerWithSetup(mgr,"user", userController)
}
// "workspacetemplate" controller
if cmOptions.IsControllerEnabled("workspacetemplate") {
workspaceTemplateReconciler := &workspacetemplate.Reconciler{MultiClusterEnabled: cmOptions.MultiClusterOptions.Enable}
addControllerWithSetup(mgr,"workspacetemplate", workspaceTemplateReconciler)
}
// "workspace" controller
if cmOptions.IsControllerEnabled("workspace") {
workspaceReconciler := &workspace.Reconciler{}
addControllerWithSetup(mgr,"workspace", workspaceReconciler)
}
// "workspacerole" controller
if cmOptions.IsControllerEnabled("workspacerole") {
workspaceRoleReconciler := &workspacerole.Reconciler{MultiClusterEnabled: cmOptions.MultiClusterOptions.Enable}
addControllerWithSetup(mgr,"workspacerole", workspaceRoleReconciler)
}
// "workspacerolebinding" controller
if cmOptions.IsControllerEnabled("workspacerolebinding") {
workspaceRoleBindingReconciler := &workspacerolebinding.Reconciler{MultiClusterEnabled: cmOptions.MultiClusterOptions.Enable}
addControllerWithSetup(mgr,"workspacerolebinding", workspaceRoleBindingReconciler)
}
// "namespace" controller
if cmOptions.IsControllerEnabled("namespace") {
namespaceReconciler := &namespace.Reconciler{}
addControllerWithSetup(mgr,"namespace", namespaceReconciler)
}
// "helmrepo" controller
if cmOptions.IsControllerEnabled("helmrepo") {
helmRepoReconciler := &helmrepo.ReconcileHelmRepo{}
addControllerWithSetup(mgr, "helmrepo", helmRepoReconciler)
}
// "helmcategory" controller
if cmOptions.IsControllerEnabled("helmcategory") {
helmCategoryReconciler := &helmcategory.ReconcileHelmCategory{}
addControllerWithSetup(mgr, "helmcategory", helmCategoryReconciler)
}
var opS3Client s3.Interface
if !cmOptions.OpenPitrixOptions.AppStoreConfIsEmpty() {
opS3Client, err = s3.NewS3Client(cmOptions.OpenPitrixOptions.S3Options)
if err != nil {
klog.Fatalf("failed to connect to s3, please check openpitrix s3 service status, error: %v", err)
}
// "helmapplication" controller
if cmOptions.IsControllerEnabled("helmapplication") {
reconcileHelmApp := (&helmapplication.ReconcileHelmApplication{})
addControllerWithSetup(mgr, "helmapplication", reconcileHelmApp)
}
// "helmapplicationversion" controller
if cmOptions.IsControllerEnabled("helmapplicationversion") {
reconcileHelmAppVersion := (&helmapplication.ReconcileHelmApplicationVersion{})
addControllerWithSetup(mgr, "helmapplicationversion",reconcileHelmAppVersion)
}
}
// "helmrelease" controller
if cmOptions.IsControllerEnabled("helmrelease") {
reconcileHelmRelease := &helmrelease.ReconcileHelmRelease{
// nil interface is valid value.
StorageClient: opS3Client,
KsFactory: informerFactory.KubeSphereSharedInformerFactory(),
MultiClusterEnable: cmOptions.MultiClusterOptions.Enable,
WaitTime: cmOptions.OpenPitrixOptions.ReleaseControllerOptions.WaitTime,
MaxConcurrent: cmOptions.OpenPitrixOptions.ReleaseControllerOptions.MaxConcurrent,
StopChan: stopCh,
}
addControllerWithSetup(mgr, "helmrelease",reconcileHelmRelease)
}
// "helm" controller
if cmOptions.IsControllerEnabled("helm") {
if !cmOptions.GatewayOptions.IsEmpty() {
helmReconciler := &helm.Reconciler{GatewayOptions: cmOptions.GatewayOptions}
addControllerWithSetup(mgr, "helm",helmReconciler)
}
}
// "application" controller
if cmOptions.IsControllerEnabled("application") {
selector, _ := labels.Parse(cmOptions.ApplicationSelector)
applicationReconciler := &application.ApplicationReconciler{
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Mapper: mgr.GetRESTMapper(),
ApplicationSelector: selector,
}
addControllerWithSetup(mgr, "application",applicationReconciler)
}
// "serviceaccount" controller
if cmOptions.IsControllerEnabled("serviceaccount") {
saReconciler := &serviceaccount.Reconciler{}
addControllerWithSetup(mgr, "serviceaccount",saReconciler)
}
// "resourcequota" controller
if cmOptions.IsControllerEnabled("resourcequota") {
resourceQuotaReconciler := &quota.Reconciler{
MaxConcurrentReconciles: quota.DefaultMaxConcurrentReconciles,
ResyncPeriod: quota.DefaultResyncPeriod,
InformerFactory: informerFactory.KubernetesSharedInformerFactory(),
}
addControllerWithSetup(mgr, "resourcequota", resourceQuotaReconciler)
}
serviceMeshEnabled := cmOptions.ServiceMeshOptions != nil && len(cmOptions.ServiceMeshOptions.IstioPilotHost) != 0
if serviceMeshEnabled {
vsController = virtualservice.NewVirtualServiceController(kubernetesInformer.Core().V1().Services(),
istioInformer.Networking().V1alpha3().VirtualServices(),
istioInformer.Networking().V1alpha3().DestinationRules(),
kubesphereInformer.Servicemesh().V1alpha2().Strategies(),
// "virtualservice" controller
if cmOptions.IsControllerEnabled("virtualservice") {
vsController := virtualservice.NewVirtualServiceController(kubernetesInformer.Core().V1().Services(),
istioInformer.Networking().V1alpha3().VirtualServices(),
istioInformer.Networking().V1alpha3().DestinationRules(),
kubesphereInformer.Servicemesh().V1alpha2().Strategies(),
client.Kubernetes(),
client.Istio(),
client.KubeSphere())
addController(mgr, "virtualservice", vsController)
}
// "destinationrule" controller
if cmOptions.IsControllerEnabled("destinationrule") {
drController := destinationrule.NewDestinationRuleController(kubernetesInformer.Apps().V1().Deployments(),
istioInformer.Networking().V1alpha3().DestinationRules(),
kubernetesInformer.Core().V1().Services(),
kubesphereInformer.Servicemesh().V1alpha2().ServicePolicies(),
client.Kubernetes(),
client.Istio(),
client.KubeSphere())
addController(mgr, "destinationrule", drController)
}
}
// "job" controller
if cmOptions.IsControllerEnabled("job") {
jobController := job.NewJobController(kubernetesInformer.Batch().V1().Jobs(), client.Kubernetes())
addController(mgr, "job", jobController)
}
// "storagecapability" controller
if cmOptions.IsControllerEnabled("storagecapability") {
storageCapabilityController := capability.NewController(
client.Kubernetes().StorageV1().StorageClasses(),
kubernetesInformer.Storage().V1().StorageClasses(),
kubernetesInformer.Storage().V1().CSIDrivers(),
)
addController(mgr, "storagecapability", storageCapabilityController)
}
// "volumesnapshot" controller
if cmOptions.IsControllerEnabled("volumesnapshot") {
volumeSnapshotController := snapshotclass.NewController(
kubernetesInformer.Storage().V1().StorageClasses(),
client.Snapshot().SnapshotV1().VolumeSnapshotClasses(),
informerFactory.SnapshotSharedInformerFactory().Snapshot().V1().VolumeSnapshotClasses(),
)
addController(mgr, "volumesnapshot", volumeSnapshotController)
}
// "loginrecord" controller
if cmOptions.IsControllerEnabled("loginrecord") {
loginRecordController := loginrecord.NewLoginRecordController(
client.Kubernetes(),
client.Istio(),
client.KubeSphere())
drController = destinationrule.NewDestinationRuleController(kubernetesInformer.Apps().V1().Deployments(),
istioInformer.Networking().V1alpha3().DestinationRules(),
kubernetesInformer.Core().V1().Services(),
kubesphereInformer.Servicemesh().V1alpha2().ServicePolicies(),
client.Kubernetes(),
client.Istio(),
client.KubeSphere())
client.KubeSphere(),
kubesphereInformer.Iam().V1alpha2().LoginRecords(),
kubesphereInformer.Iam().V1alpha2().Users(),
cmOptions.AuthenticationOptions.LoginHistoryRetentionPeriod,
cmOptions.AuthenticationOptions.LoginHistoryMaximumEntries)
addController(mgr, "loginrecord", loginRecordController)
}
jobController := job.NewJobController(kubernetesInformer.Batch().V1().Jobs(), client.Kubernetes())
// "csr" controller
if cmOptions.IsControllerEnabled("csr") {
csrController := certificatesigningrequest.NewController(client.Kubernetes(),
kubernetesInformer.Certificates().V1().CertificateSigningRequests(),
kubernetesInformer.Core().V1().ConfigMaps(), client.Config())
addController(mgr, "csr", csrController)
}
storageCapabilityController := capability.NewController(
client.Kubernetes().StorageV1().StorageClasses(),
kubernetesInformer.Storage().V1().StorageClasses(),
kubernetesInformer.Storage().V1().CSIDrivers(),
)
// "clusterrolebinding" controller
if cmOptions.IsControllerEnabled("clusterrolebinding") {
clusterRoleBindingController := clusterrolebinding.NewController(client.Kubernetes(),
kubernetesInformer.Rbac().V1().ClusterRoleBindings(),
kubernetesInformer.Apps().V1().Deployments(),
kubernetesInformer.Core().V1().Pods(),
kubesphereInformer.Iam().V1alpha2().Users(),
cmOptions.AuthenticationOptions.KubectlImage)
addController(mgr, "clusterrolebinding", clusterRoleBindingController)
}
volumeSnapshotController := snapshotclass.NewController(
kubernetesInformer.Storage().V1().StorageClasses(),
client.Snapshot().SnapshotV1().VolumeSnapshotClasses(),
informerFactory.SnapshotSharedInformerFactory().Snapshot().V1().VolumeSnapshotClasses(),
)
var fedGlobalRoleBindingCache, fedGlobalRoleCache cache.Store
var fedGlobalRoleBindingCacheController, fedGlobalRoleCacheController cache.Controller
if multiClusterEnabled {
fedGlobalRoleClient, err := util.NewResourceClient(client.Config(), &iamv1alpha2.FedGlobalRoleResource)
if err != nil {
klog.Error(err)
return err
// "fedglobalrolecache" controller
var fedGlobalRoleCache cache.Store
var fedGlobalRoleCacheController cache.Controller
if cmOptions.IsControllerEnabled("fedglobalrolecache") {
if cmOptions.MultiClusterOptions.Enable {
fedGlobalRoleClient, err := util.NewResourceClient(client.Config(), &iamv1alpha2.FedGlobalRoleResource)
if err != nil {
klog.Fatalf("Unable to create FedGlobalRole controller: %v", err)
}
fedGlobalRoleCache, fedGlobalRoleCacheController = util.NewResourceInformer(fedGlobalRoleClient, "",
&iamv1alpha2.FedGlobalRoleResource, func(object runtimeclient.Object) {})
go fedGlobalRoleCacheController.Run(stopCh)
addSuccessfullyControllers.Insert("fedglobalrolecache")
}
fedGlobalRoleBindingClient, err := util.NewResourceClient(client.Config(), &iamv1alpha2.FedGlobalRoleBindingResource)
if err != nil {
klog.Error(err)
return err
}
// "globalrole" controller
if cmOptions.IsControllerEnabled("globalrole") {
if cmOptions.MultiClusterOptions.Enable {
globalRoleController := globalrole.NewController(client.Kubernetes(), client.KubeSphere(),
kubesphereInformer.Iam().V1alpha2().GlobalRoles(), fedGlobalRoleCache, fedGlobalRoleCacheController)
addController(mgr, "globalrole", globalRoleController)
}
fedGlobalRoleCache, fedGlobalRoleCacheController = util.NewResourceInformer(fedGlobalRoleClient, "", &iamv1alpha2.FedGlobalRoleResource, func(object runtimeclient.Object) {})
fedGlobalRoleBindingCache, fedGlobalRoleBindingCacheController = util.NewResourceInformer(fedGlobalRoleBindingClient, "", &iamv1alpha2.FedGlobalRoleBindingResource, func(object runtimeclient.Object) {})
go fedGlobalRoleCacheController.Run(stopCh)
go fedGlobalRoleBindingCacheController.Run(stopCh)
}
loginRecordController := loginrecord.NewLoginRecordController(
client.Kubernetes(),
client.KubeSphere(),
kubesphereInformer.Iam().V1alpha2().LoginRecords(),
kubesphereInformer.Iam().V1alpha2().Users(),
authenticationOptions.LoginHistoryRetentionPeriod,
authenticationOptions.LoginHistoryMaximumEntries)
csrController := certificatesigningrequest.NewController(client.Kubernetes(),
kubernetesInformer.Certificates().V1().CertificateSigningRequests(),
kubernetesInformer.Core().V1().ConfigMaps(), client.Config())
clusterRoleBindingController := clusterrolebinding.NewController(client.Kubernetes(),
kubernetesInformer.Rbac().V1().ClusterRoleBindings(),
kubernetesInformer.Apps().V1().Deployments(),
kubernetesInformer.Core().V1().Pods(),
kubesphereInformer.Iam().V1alpha2().Users(),
kubectlImage)
globalRoleController := globalrole.NewController(client.Kubernetes(), client.KubeSphere(),
kubesphereInformer.Iam().V1alpha2().GlobalRoles(), fedGlobalRoleCache, fedGlobalRoleCacheController)
globalRoleBindingController := globalrolebinding.NewController(client.Kubernetes(), client.KubeSphere(),
kubesphereInformer.Iam().V1alpha2().GlobalRoleBindings(),
fedGlobalRoleBindingCache, fedGlobalRoleBindingCacheController,
multiClusterEnabled)
groupBindingController := groupbinding.NewController(client.Kubernetes(), client.KubeSphere(),
kubesphereInformer.Iam().V1alpha2().GroupBindings(),
kubesphereInformer.Types().V1beta1().FederatedGroupBindings(),
multiClusterEnabled)
groupController := group.NewController(client.Kubernetes(), client.KubeSphere(),
kubesphereInformer.Iam().V1alpha2().Groups(),
kubesphereInformer.Types().V1beta1().FederatedGroups(),
multiClusterEnabled)
var clusterController manager.Runnable
if multiClusterEnabled {
clusterController = cluster.NewClusterController(
client.Kubernetes(),
client.Config(),
kubesphereInformer.Cluster().V1alpha1().Clusters(),
client.KubeSphere().ClusterV1alpha1().Clusters(),
multiClusterOptions.ClusterControllerResyncPeriod,
multiClusterOptions.HostClusterName)
}
var nsnpController manager.Runnable
if networkOptions.EnableNetworkPolicy {
nsnpProvider, err := provider.NewNsNetworkPolicyProvider(client.Kubernetes(), kubernetesInformer.Networking().V1().NetworkPolicies())
if err != nil {
return err
// "fedglobalrolebindingcache" controller
var fedGlobalRoleBindingCache cache.Store
var fedGlobalRoleBindingCacheController cache.Controller
if cmOptions.IsControllerEnabled("fedglobalrolebindingcache") {
if cmOptions.MultiClusterOptions.Enable {
fedGlobalRoleBindingClient, err := util.NewResourceClient(client.Config(), &iamv1alpha2.FedGlobalRoleBindingResource)
if err != nil {
klog.Fatalf("Unable to create FedGlobalRoleBinding controller: %v", err)
}
fedGlobalRoleBindingCache, fedGlobalRoleBindingCacheController = util.NewResourceInformer(fedGlobalRoleBindingClient, "",
&iamv1alpha2.FedGlobalRoleBindingResource, func(object runtimeclient.Object) {})
go fedGlobalRoleBindingCacheController.Run(stopCh)
addSuccessfullyControllers.Insert("fedglobalrolebindingcache")
}
nsnpController = nsnetworkpolicy.NewNSNetworkPolicyController(client.Kubernetes(),
client.KubeSphere().NetworkV1alpha1(),
kubesphereInformer.Network().V1alpha1().NamespaceNetworkPolicies(),
kubernetesInformer.Core().V1().Services(),
kubernetesInformer.Core().V1().Nodes(),
kubesphereInformer.Tenant().V1alpha1().Workspaces(),
kubernetesInformer.Core().V1().Namespaces(), nsnpProvider, networkOptions.NSNPOptions)
}
var ippoolController manager.Runnable
ippoolProvider := ippoolclient.NewProvider(kubernetesInformer, client.KubeSphere(), client.Kubernetes(), networkOptions.IPPoolType, options)
if ippoolProvider != nil {
ippoolController = ippool.NewIPPoolController(kubesphereInformer, kubernetesInformer, client.Kubernetes(), client.KubeSphere(), ippoolProvider)
// "globalrolebinding" controller
if cmOptions.IsControllerEnabled("globalrolebinding") {
globalRoleBindingController := globalrolebinding.NewController(client.Kubernetes(), client.KubeSphere(),
kubesphereInformer.Iam().V1alpha2().GlobalRoleBindings(),
fedGlobalRoleBindingCache, fedGlobalRoleBindingCacheController,
cmOptions.MultiClusterOptions.Enable)
addController(mgr, "globalrolebinding", globalRoleBindingController)
}
controllers := map[string]manager.Runnable{
"virtualservice-controller": vsController,
"destinationrule-controller": drController,
"job-controller": jobController,
"storagecapability-controller": storageCapabilityController,
"volumesnapshot-controller": volumeSnapshotController,
"loginrecord-controller": loginRecordController,
"cluster-controller": clusterController,
"nsnp-controller": nsnpController,
"csr-controller": csrController,
"clusterrolebinding-controller": clusterRoleBindingController,
"globalrolebinding-controller": globalRoleBindingController,
"ippool-controller": ippoolController,
"groupbinding-controller": groupBindingController,
"group-controller": groupController,
// "groupbinding" controller
if cmOptions.IsControllerEnabled("groupbinding") {
groupBindingController := groupbinding.NewController(client.Kubernetes(), client.KubeSphere(),
kubesphereInformer.Iam().V1alpha2().GroupBindings(),
kubesphereInformer.Types().V1beta1().FederatedGroupBindings(),
cmOptions.MultiClusterOptions.Enable)
addController(mgr, "groupbinding", groupBindingController)
}
if multiClusterEnabled {
controllers["globalrole-controller"] = globalRoleController
notificationController, err := notification.NewController(client.Kubernetes(), mgr.GetClient(), mgr.GetCache())
if err != nil {
return err
// "group" controller
if cmOptions.IsControllerEnabled("group") {
groupController := group.NewController(client.Kubernetes(), client.KubeSphere(),
kubesphereInformer.Iam().V1alpha2().Groups(),
kubesphereInformer.Types().V1beta1().FederatedGroups(),
cmOptions.MultiClusterOptions.Enable)
addController(mgr, "group", groupController)
}
// "cluster" controller
if cmOptions.IsControllerEnabled("cluster") {
if cmOptions.MultiClusterOptions.Enable {
clusterController := cluster.NewClusterController(
client.Kubernetes(),
client.Config(),
kubesphereInformer.Cluster().V1alpha1().Clusters(),
client.KubeSphere().ClusterV1alpha1().Clusters(),
cmOptions.MultiClusterOptions.ClusterControllerResyncPeriod,
cmOptions.MultiClusterOptions.HostClusterName)
addController(mgr, "cluster", clusterController)
}
controllers["notification-controller"] = notificationController
}
for name, ctrl := range controllers {
if ctrl == nil {
klog.V(4).Infof("%s is not going to run due to dependent component disabled.", name)
continue
}
// "nsnp" controller
if cmOptions.IsControllerEnabled("nsnp") {
if cmOptions.NetworkOptions.EnableNetworkPolicy {
nsnpProvider, err := provider.NewNsNetworkPolicyProvider(client.Kubernetes(), kubernetesInformer.Networking().V1().NetworkPolicies())
if err != nil {
klog.Fatalf("Unable to create NSNetworkPolicy controller: %v", err)
}
if err := mgr.Add(ctrl); err != nil {
klog.Error(err, "add controller to manager failed", "name", name)
return err
nsnpController := nsnetworkpolicy.NewNSNetworkPolicyController(client.Kubernetes(),
client.KubeSphere().NetworkV1alpha1(),
kubesphereInformer.Network().V1alpha1().NamespaceNetworkPolicies(),
kubernetesInformer.Core().V1().Services(),
kubernetesInformer.Core().V1().Nodes(),
kubesphereInformer.Tenant().V1alpha1().Workspaces(),
kubernetesInformer.Core().V1().Namespaces(), nsnpProvider, cmOptions.NetworkOptions.NSNPOptions)
addController(mgr, "nsnp", nsnpController)
}
}
// "ippool" controller
if cmOptions.IsControllerEnabled("ippool") {
ippoolProvider := ippoolclient.NewProvider(kubernetesInformer, client.KubeSphere(), client.Kubernetes(),
cmOptions.NetworkOptions.IPPoolType, cmOptions.KubernetesOptions)
if ippoolProvider != nil {
ippoolController := ippool.NewIPPoolController(kubesphereInformer, kubernetesInformer, client.Kubernetes(),
client.KubeSphere(), ippoolProvider)
addController(mgr, "ippool", ippoolController)
}
}
// "notification" controller
if cmOptions.IsControllerEnabled("notification") {
if cmOptions.MultiClusterOptions.Enable {
notificationController, err := notification.NewController(client.Kubernetes(), mgr.GetClient(), mgr.GetCache())
if err != nil {
klog.Fatalf("Unable to create Notification controller: %v", err)
}
addController(mgr, "notification", notificationController)
}
}
// log all controllers process result
for _, name := range allControllers {
if cmOptions.IsControllerEnabled(name) {
if addSuccessfullyControllers.Has(name) {
klog.Infof("%s controller is enabled and added successfully.", name)
} else {
klog.Infof("%s controller is enabled but is not going to run due to its dependent component being disabled.", name)
}
} else {
klog.Infof("%s controller is disabled by controller selectors.", name)
}
}
return nil
}
var addSuccessfullyControllers = sets.NewString()
type setupableController interface {
SetupWithManager(mgr ctrl.Manager) error
}
func addControllerWithSetup(mgr manager.Manager, name string, controller setupableController) {
if err := controller.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create %v controller: %v", name, err)
}
addSuccessfullyControllers.Insert(name)
}
func addController(mgr manager.Manager, name string, controller manager.Runnable) {
if err := mgr.Add(controller); err != nil {
klog.Fatalf("Unable to create %v controller: %v", name, err)
}
addSuccessfullyControllers.Insert(name)
}

View File

@@ -18,6 +18,8 @@ package options
import (
"flag"
"fmt"
"k8s.io/apimachinery/pkg/util/sets"
"strings"
"time"
@@ -64,6 +66,16 @@ type KubeSphereControllerManagerOptions struct {
// "kubesphere.io/creator=" means reconcile applications with this label key
// "!kubesphere.io/creator" means exclude applications with this key
ApplicationSelector string
// ControllerSelectors is the list of controller selectors to enable or disable controller.
// '*' means "all enabled by default controllers"
// 'foo' means "enable 'foo'"
// '-foo' means "disable 'foo'"
// first item for a particular name wins.
// e.g. '-foo,foo' means "disable foo", 'foo,-foo' means "enable foo"
// * has the lowest priority.
// e.g. *,-foo, means "disable 'foo'"
ControllerSelectors []string
}
func NewKubeSphereControllerManagerOptions() *KubeSphereControllerManagerOptions {
@@ -86,12 +98,13 @@ func NewKubeSphereControllerManagerOptions() *KubeSphereControllerManagerOptions
LeaderElect: false,
WebhookCertDir: "",
ApplicationSelector: "",
ControllerSelectors: []string{"*"},
}
return s
}
func (s *KubeSphereControllerManagerOptions) Flags() cliflag.NamedFlagSets {
func (s *KubeSphereControllerManagerOptions) Flags(allControllerNameSelectors []string) cliflag.NamedFlagSets {
fss := cliflag.NamedFlagSets{}
s.KubernetesOptions.AddFlags(fss.FlagSet("kubernetes"), s.KubernetesOptions)
@@ -120,6 +133,10 @@ func (s *KubeSphereControllerManagerOptions) Flags() cliflag.NamedFlagSets {
gfs.StringVar(&s.ApplicationSelector, "application-selector", s.ApplicationSelector, ""+
"Only reconcile application(sigs.k8s.io/application) objects match given selector, this could avoid conflicts with "+
"other projects built on top of sig-application. Default behavior is to reconcile all of application objects.")
gfs.StringSliceVar(&s.ControllerSelectors, "controllers", []string{"*"}, fmt.Sprintf(""+
"A list of controllers to enable. '*' enables all on-by-default controllers, 'foo' enables the controller "+
"named 'foo', '-foo' disables the controller named 'foo'.\nAll controllers: %s",
strings.Join(allControllerNameSelectors, ", ")))
kfs := fss.FlagSet("klog")
local := flag.NewFlagSet("klog", flag.ExitOnError)
@@ -132,26 +149,58 @@ func (s *KubeSphereControllerManagerOptions) Flags() cliflag.NamedFlagSets {
return fss
}
func (s *KubeSphereControllerManagerOptions) Validate() []error {
// Validate Options and Genetic Options
func (o *KubeSphereControllerManagerOptions) Validate(allControllerNameSelectors []string) []error {
var errs []error
errs = append(errs, s.DevopsOptions.Validate()...)
errs = append(errs, s.KubernetesOptions.Validate()...)
errs = append(errs, s.S3Options.Validate()...)
errs = append(errs, s.OpenPitrixOptions.Validate()...)
errs = append(errs, s.NetworkOptions.Validate()...)
errs = append(errs, s.LdapOptions.Validate()...)
errs = append(errs, s.MultiClusterOptions.Validate()...)
errs = append(errs, o.DevopsOptions.Validate()...)
errs = append(errs, o.KubernetesOptions.Validate()...)
errs = append(errs, o.S3Options.Validate()...)
errs = append(errs, o.OpenPitrixOptions.Validate()...)
errs = append(errs, o.NetworkOptions.Validate()...)
errs = append(errs, o.LdapOptions.Validate()...)
errs = append(errs, o.MultiClusterOptions.Validate()...)
if len(s.ApplicationSelector) != 0 {
_, err := labels.Parse(s.ApplicationSelector)
// genetic option: application-selector
if len(o.ApplicationSelector) != 0 {
_, err := labels.Parse(o.ApplicationSelector)
if err != nil {
errs = append(errs, err)
}
}
// genetic option: controllers, check all selectors are valid
allControllersNameSet := sets.NewString(allControllerNameSelectors...)
for _, selector := range o.ControllerSelectors {
if selector == "*" {
continue
}
selector = strings.TrimPrefix(selector, "-")
if !allControllersNameSet.Has(selector) {
errs = append(errs, fmt.Errorf("%q is not in the list of known controllers", selector))
}
}
return errs
}
// IsControllerEnabled check if a specified controller enabled or not.
func (o *KubeSphereControllerManagerOptions) IsControllerEnabled(name string) bool {
hasStar := false
for _, ctrl := range o.ControllerSelectors {
if ctrl == name {
return true
}
if ctrl == "-" + name {
return false
}
if ctrl == "*" {
hasStar = true
}
}
return hasStar
}
func (s *KubeSphereControllerManagerOptions) bindLeaderElectionFlags(l *leaderelection.LeaderElectionConfig, fs *pflag.FlagSet) {
fs.DurationVar(&l.LeaseDuration, "leader-elect-lease-duration", l.LeaseDuration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
@@ -167,3 +216,4 @@ func (s *KubeSphereControllerManagerOptions) bindLeaderElectionFlags(l *leaderel
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
}

View File

@@ -0,0 +1,66 @@
package options
import (
"github.com/stretchr/testify/assert"
"testing"
)
// ref: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/controller-manager/app/helper_test.go
func TestIsControllerEnabled(t *testing.T) {
testcases := []struct {
name string
controllerName string
controllerFlags []string
expected bool
}{
{
name: "on by name",
controllerName: "bravo",
controllerFlags: []string{"alpha", "bravo", "-charlie"},
expected: true,
},
{
name: "off by name",
controllerName: "charlie",
controllerFlags: []string{"alpha", "bravo", "-charlie"},
expected: false,
},
{
name: "on by default",
controllerName: "alpha",
controllerFlags: []string{"*"},
expected: true,
},
{
name: "on by star, not off by name",
controllerName: "alpha",
controllerFlags: []string{"*", "-charlie"},
expected: true,
},
{
name: "off by name with star",
controllerName: "charlie",
controllerFlags: []string{"*", "-charlie"},
expected: false,
},
{
name: "off then on",
controllerName: "alpha",
controllerFlags: []string{"-alpha", "alpha"},
expected: false,
},
{
name: "on then off",
controllerName: "alpha",
controllerFlags: []string{"alpha", "-alpha"},
expected: true,
},
}
for _, tc := range testcases {
option := NewKubeSphereControllerManagerOptions()
option.ControllerSelectors = tc.controllerFlags
actual := option.IsControllerEnabled(tc.controllerName)
assert.Equal(t, tc.expected, actual, "%v: expected %v, got %v", tc.name, tc.expected, actual)
}
}

View File

@@ -21,11 +21,8 @@ import (
"fmt"
"os"
"kubesphere.io/kubesphere/pkg/models/kubeconfig"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog"
@@ -38,26 +35,11 @@ import (
"kubesphere.io/kubesphere/cmd/controller-manager/app/options"
"kubesphere.io/kubesphere/pkg/apis"
controllerconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
"kubesphere.io/kubesphere/pkg/controller/application"
"kubesphere.io/kubesphere/pkg/controller/helm"
"kubesphere.io/kubesphere/pkg/controller/namespace"
"kubesphere.io/kubesphere/pkg/controller/network/webhooks"
"kubesphere.io/kubesphere/pkg/controller/openpitrix/helmapplication"
"kubesphere.io/kubesphere/pkg/controller/openpitrix/helmcategory"
"kubesphere.io/kubesphere/pkg/controller/openpitrix/helmrelease"
"kubesphere.io/kubesphere/pkg/controller/openpitrix/helmrepo"
"kubesphere.io/kubesphere/pkg/controller/quota"
"kubesphere.io/kubesphere/pkg/controller/serviceaccount"
"kubesphere.io/kubesphere/pkg/controller/user"
"kubesphere.io/kubesphere/pkg/controller/workspace"
"kubesphere.io/kubesphere/pkg/controller/workspacerole"
"kubesphere.io/kubesphere/pkg/controller/workspacerolebinding"
"kubesphere.io/kubesphere/pkg/controller/workspacetemplate"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/simple/client/devops"
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
ldapclient "kubesphere.io/kubesphere/pkg/simple/client/ldap"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
"kubesphere.io/kubesphere/pkg/utils/metrics"
"kubesphere.io/kubesphere/pkg/utils/term"
@@ -92,7 +74,7 @@ func NewControllerManagerCommand() *cobra.Command {
Use: "controller-manager",
Long: `KubeSphere controller manager is a daemon that`,
Run: func(cmd *cobra.Command, args []string) {
if errs := s.Validate(); len(errs) != 0 {
if errs := s.Validate(allControllers); len(errs) != 0 {
klog.Error(utilerrors.NewAggregate(errs))
os.Exit(1)
}
@@ -106,7 +88,7 @@ func NewControllerManagerCommand() *cobra.Command {
}
fs := cmd.Flags()
namedFlagSets := s.Flags()
namedFlagSets := s.Flags(allControllers)
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
@@ -140,32 +122,8 @@ func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) err
return err
}
var devopsClient devops.Interface
if s.DevopsOptions != nil && len(s.DevopsOptions.Host) != 0 {
devopsClient, err = jenkins.NewDevopsClient(s.DevopsOptions)
if err != nil {
return fmt.Errorf("failed to connect jenkins, please check jenkins status, error: %v", err)
}
}
var ldapClient ldapclient.Interface
// when there is no ldapOption, we set ldapClient as nil, which means we don't need to sync user info into ldap.
if s.LdapOptions != nil && len(s.LdapOptions.Host) != 0 {
if s.LdapOptions.Host == ldapclient.FAKE_HOST { // for debug only
ldapClient = ldapclient.NewSimpleLdap()
} else {
ldapClient, err = ldapclient.NewLdapClient(s.LdapOptions, ctx.Done())
if err != nil {
return fmt.Errorf("failed to connect to ldap service, please check ldap status, error: %v", err)
}
}
} else {
klog.Warning("ks-controller-manager starts without ldap provided, it will not sync user into ldap")
}
var s3Client s3.Interface
if s.S3Options != nil && len(s.S3Options.Endpoint) != 0 {
s3Client, err = s3.NewS3Client(s.S3Options)
_, err = s3.NewS3Client(s.S3Options)
if err != nil {
return fmt.Errorf("failed to connect to s3, please check s3 service status, error: %v", err)
}
@@ -212,130 +170,13 @@ func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) err
// register common meta types into schemas.
metav1.AddToGroupVersion(mgr.GetScheme(), metav1.SchemeGroupVersion)
kubeconfigClient := kubeconfig.NewOperator(kubernetesClient.Kubernetes(),
informerFactory.KubernetesSharedInformerFactory().Core().V1().ConfigMaps().Lister(),
kubernetesClient.Config())
userController := user.Reconciler{
MultiClusterEnabled: s.MultiClusterOptions.Enable,
MaxConcurrentReconciles: 4,
LdapClient: ldapClient,
DevopsClient: devopsClient,
KubeconfigClient: kubeconfigClient,
AuthenticationOptions: s.AuthenticationOptions,
}
if err = userController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create user controller: %v", err)
}
workspaceTemplateReconciler := &workspacetemplate.Reconciler{MultiClusterEnabled: s.MultiClusterOptions.Enable}
if err = workspaceTemplateReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create workspace template controller: %v", err)
}
workspaceReconciler := &workspace.Reconciler{}
if err = workspaceReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create workspace controller: %v", err)
}
workspaceRoleReconciler := &workspacerole.Reconciler{MultiClusterEnabled: s.MultiClusterOptions.Enable}
if err = workspaceRoleReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create workspace role controller: %v", err)
}
workspaceRoleBindingReconciler := &workspacerolebinding.Reconciler{MultiClusterEnabled: s.MultiClusterOptions.Enable}
if err = workspaceRoleBindingReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create workspace role binding controller: %v", err)
}
namespaceReconciler := &namespace.Reconciler{}
if err = namespaceReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create namespace controller: %v", err)
}
err = helmrepo.Add(mgr)
if err != nil {
klog.Fatal("Unable to create helm repo controller")
}
err = helmcategory.Add(mgr)
if err != nil {
klog.Fatal("Unable to create helm category controller")
}
var opS3Client s3.Interface
if !s.OpenPitrixOptions.AppStoreConfIsEmpty() {
opS3Client, err = s3.NewS3Client(s.OpenPitrixOptions.S3Options)
if err != nil {
klog.Fatalf("failed to connect to s3, please check openpitrix s3 service status, error: %v", err)
}
err = (&helmapplication.ReconcileHelmApplication{}).SetupWithManager(mgr)
if err != nil {
klog.Fatalf("Unable to create helm application controller, error: %s", err)
}
err = (&helmapplication.ReconcileHelmApplicationVersion{}).SetupWithManager(mgr)
if err != nil {
klog.Fatalf("Unable to create helm application version controller, error: %s ", err)
}
}
err = (&helmrelease.ReconcileHelmRelease{
// nil interface is valid value.
StorageClient: opS3Client,
KsFactory: informerFactory.KubeSphereSharedInformerFactory(),
MultiClusterEnable: s.MultiClusterOptions.Enable,
WaitTime: s.OpenPitrixOptions.ReleaseControllerOptions.WaitTime,
MaxConcurrent: s.OpenPitrixOptions.ReleaseControllerOptions.MaxConcurrent,
StopChan: ctx.Done(),
}).SetupWithManager(mgr)
if err != nil {
klog.Fatalf("Unable to create helm release controller, error: %s", err)
}
selector, _ := labels.Parse(s.ApplicationSelector)
applicationReconciler := &application.ApplicationReconciler{
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Mapper: mgr.GetRESTMapper(),
ApplicationSelector: selector,
}
if err = applicationReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create application controller: %v", err)
}
saReconciler := &serviceaccount.Reconciler{}
if err = saReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create ServiceAccount controller: %v", err)
}
resourceQuotaReconciler := quota.Reconciler{}
if err := resourceQuotaReconciler.SetupWithManager(mgr, quota.DefaultMaxConcurrentReconciles, quota.DefaultResyncPeriod, informerFactory.KubernetesSharedInformerFactory()); err != nil {
klog.Fatalf("Unable to create ResourceQuota controller: %v", err)
}
if !s.GatewayOptions.IsEmpty() {
helmReconciler := helm.Reconciler{GatewayOptions: s.GatewayOptions}
if err := helmReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create helm controller: %v", err)
}
}
// TODO(jeff): refactor config with CRD
servicemeshEnabled := s.ServiceMeshOptions != nil && len(s.ServiceMeshOptions.IstioPilotHost) != 0
if err = addControllers(mgr,
// install all controllers
if err = addAllControllers(mgr,
kubernetesClient,
informerFactory,
devopsClient,
s3Client,
ldapClient,
s.KubernetesOptions,
s.AuthenticationOptions,
s.MultiClusterOptions,
s.NetworkOptions,
servicemeshEnabled,
s.AuthenticationOptions.KubectlImage, ctx.Done()); err != nil {
s,
ctx.Done()); err != nil {
klog.Fatalf("unable to register controllers to the manager: %v", err)
}