Files
kubesphere/cmd/controller-manager/app/server.go
live77 81db894741 add --controllers option in ks-controller-manager
imple controller enable/disable check logic
add unittest for selective controller enable/disable
move all controllers init code to a single place
2021-12-05 16:06:08 +08:00

214 lines
7.0 KiB
Go

/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"context"
"fmt"
"os"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog"
"k8s.io/klog/klogr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"kubesphere.io/kubesphere/cmd/controller-manager/app/options"
"kubesphere.io/kubesphere/pkg/apis"
controllerconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
"kubesphere.io/kubesphere/pkg/controller/network/webhooks"
"kubesphere.io/kubesphere/pkg/controller/quota"
"kubesphere.io/kubesphere/pkg/controller/user"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
"kubesphere.io/kubesphere/pkg/utils/metrics"
"kubesphere.io/kubesphere/pkg/utils/term"
"kubesphere.io/kubesphere/pkg/version"
)
func NewControllerManagerCommand() *cobra.Command {
s := options.NewKubeSphereControllerManagerOptions()
conf, err := controllerconfig.TryLoadFromDisk()
if err == nil {
// make sure LeaderElection is not nil
s = &options.KubeSphereControllerManagerOptions{
KubernetesOptions: conf.KubernetesOptions,
DevopsOptions: conf.DevopsOptions,
S3Options: conf.S3Options,
AuthenticationOptions: conf.AuthenticationOptions,
LdapOptions: conf.LdapOptions,
OpenPitrixOptions: conf.OpenPitrixOptions,
NetworkOptions: conf.NetworkOptions,
MultiClusterOptions: conf.MultiClusterOptions,
ServiceMeshOptions: conf.ServiceMeshOptions,
GatewayOptions: conf.GatewayOptions,
LeaderElection: s.LeaderElection,
LeaderElect: s.LeaderElect,
WebhookCertDir: s.WebhookCertDir,
}
} else {
klog.Fatal("Failed to load configuration from disk", err)
}
cmd := &cobra.Command{
Use: "controller-manager",
Long: `KubeSphere controller manager is a daemon that`,
Run: func(cmd *cobra.Command, args []string) {
if errs := s.Validate(allControllers); len(errs) != 0 {
klog.Error(utilerrors.NewAggregate(errs))
os.Exit(1)
}
if err = run(s, signals.SetupSignalHandler()); err != nil {
klog.Error(err)
os.Exit(1)
}
},
SilenceUsage: true,
}
fs := cmd.Flags()
namedFlagSets := s.Flags(allControllers)
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
usageFmt := "Usage:\n %s\n"
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
_, _ = fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
})
versionCmd := &cobra.Command{
Use: "version",
Short: "Print the version of KubeSphere controller-manager",
Run: func(cmd *cobra.Command, args []string) {
cmd.Println(version.Get())
},
}
cmd.AddCommand(versionCmd)
return cmd
}
func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) error {
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
if err != nil {
klog.Errorf("Failed to create kubernetes clientset %v", err)
return err
}
if s.S3Options != nil && len(s.S3Options.Endpoint) != 0 {
_, err = s3.NewS3Client(s.S3Options)
if err != nil {
return fmt.Errorf("failed to connect to s3, please check s3 service status, error: %v", err)
}
}
informerFactory := informers.NewInformerFactories(
kubernetesClient.Kubernetes(),
kubernetesClient.KubeSphere(),
kubernetesClient.Istio(),
kubernetesClient.Snapshot(),
kubernetesClient.ApiExtensions(),
kubernetesClient.Prometheus())
mgrOptions := manager.Options{
CertDir: s.WebhookCertDir,
Port: 8443,
}
if s.LeaderElect {
mgrOptions = manager.Options{
CertDir: s.WebhookCertDir,
Port: 8443,
LeaderElection: s.LeaderElect,
LeaderElectionNamespace: "kubesphere-system",
LeaderElectionID: "ks-controller-manager-leader-election",
LeaseDuration: &s.LeaderElection.LeaseDuration,
RetryPeriod: &s.LeaderElection.RetryPeriod,
RenewDeadline: &s.LeaderElection.RenewDeadline,
}
}
klog.V(0).Info("setting up manager")
ctrl.SetLogger(klogr.New())
// Use 8443 instead of 443 cause we need root permission to bind port 443
mgr, err := manager.New(kubernetesClient.Config(), mgrOptions)
if err != nil {
klog.Fatalf("unable to set up overall controller manager: %v", err)
}
if err = apis.AddToScheme(mgr.GetScheme()); err != nil {
klog.Fatalf("unable add APIs to scheme: %v", err)
}
// register common meta types into schemas.
metav1.AddToGroupVersion(mgr.GetScheme(), metav1.SchemeGroupVersion)
// TODO(jeff): refactor config with CRD
// install all controllers
if err = addAllControllers(mgr,
kubernetesClient,
informerFactory,
s,
ctx.Done()); err != nil {
klog.Fatalf("unable to register controllers to the manager: %v", err)
}
// Start cache data after all informer is registered
klog.V(0).Info("Starting cache resource from apiserver...")
informerFactory.Start(ctx.Done())
// Setup webhooks
klog.V(2).Info("setting up webhook server")
hookServer := mgr.GetWebhookServer()
klog.V(2).Info("registering webhooks to the webhook server")
hookServer.Register("/validate-email-iam-kubesphere-io-v1alpha2", &webhook.Admission{Handler: &user.EmailValidator{Client: mgr.GetClient()}})
hookServer.Register("/validate-network-kubesphere-io-v1alpha1", &webhook.Admission{Handler: &webhooks.ValidatingHandler{C: mgr.GetClient()}})
hookServer.Register("/mutate-network-kubesphere-io-v1alpha1", &webhook.Admission{Handler: &webhooks.MutatingHandler{C: mgr.GetClient()}})
resourceQuotaAdmission, err := quota.NewResourceQuotaAdmission(mgr.GetClient(), mgr.GetScheme())
if err != nil {
klog.Fatalf("unable to create resource quota admission: %v", err)
}
hookServer.Register("/validate-quota-kubesphere-io-v1alpha2", &webhook.Admission{Handler: resourceQuotaAdmission})
klog.V(2).Info("registering metrics to the webhook server")
// Add an extra metric endpoint, so we can use the the same metric definition with ks-apiserver
// /kapis/metrics is independent of controller-manager's built-in /metrics
mgr.AddMetricsExtraHandler("/kapis/metrics", metrics.Handler())
klog.V(0).Info("Starting the controllers.")
if err = mgr.Start(ctx); err != nil {
klog.Fatalf("unable to run the manager: %v", err)
}
return nil
}