fix unable to setup webhook server under leader election (#2830)
Signed-off-by: Jeff <zw0948@gmail.com>
This commit is contained in:
@@ -17,22 +17,15 @@ limitations under the License.
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/api/core/v1"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||
"k8s.io/client-go/tools/record"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/klog/klogr"
|
||||
"kubesphere.io/kubesphere/cmd/controller-manager/app/options"
|
||||
"kubesphere.io/kubesphere/pkg/apis"
|
||||
controllerconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
|
||||
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme"
|
||||
"kubesphere.io/kubesphere/pkg/controller/namespace"
|
||||
"kubesphere.io/kubesphere/pkg/controller/network/nsnetworkpolicy"
|
||||
"kubesphere.io/kubesphere/pkg/controller/user"
|
||||
@@ -46,11 +39,10 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/s3"
|
||||
"kubesphere.io/kubesphere/pkg/utils/term"
|
||||
"os"
|
||||
application "sigs.k8s.io/application/controllers"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
|
||||
"sigs.k8s.io/controller-runtime/pkg/webhook"
|
||||
|
||||
application "sigs.k8s.io/application/controllers"
|
||||
)
|
||||
|
||||
func NewControllerManagerCommand() *cobra.Command {
|
||||
@@ -72,6 +64,8 @@ func NewControllerManagerCommand() *cobra.Command {
|
||||
LeaderElect: s.LeaderElect,
|
||||
WebhookCertDir: s.WebhookCertDir,
|
||||
}
|
||||
} else {
|
||||
klog.Fatal("Failed to load configuration from disk", err)
|
||||
}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
@@ -83,7 +77,7 @@ func NewControllerManagerCommand() *cobra.Command {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err = Run(s, signals.SetupSignalHandler()); err != nil {
|
||||
if err = run(s, signals.SetupSignalHandler()); err != nil {
|
||||
klog.Error(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
@@ -101,13 +95,13 @@ func NewControllerManagerCommand() *cobra.Command {
|
||||
usageFmt := "Usage:\n %s\n"
|
||||
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
|
||||
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
|
||||
_, _ = fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
|
||||
cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
|
||||
})
|
||||
return cmd
|
||||
}
|
||||
|
||||
func Run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{}) error {
|
||||
func run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{}) error {
|
||||
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create kubernetes clientset %v", err)
|
||||
@@ -160,126 +154,89 @@ func Run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{})
|
||||
kubernetesClient.Snapshot(),
|
||||
kubernetesClient.ApiExtensions())
|
||||
|
||||
run := func(ctx context.Context) {
|
||||
klog.V(0).Info("setting up manager")
|
||||
// Use 8443 instead of 443 cause we need root permission to bind port 443
|
||||
mgr, err := manager.New(kubernetesClient.Config(), manager.Options{CertDir: s.WebhookCertDir, Port: 8443})
|
||||
if err != nil {
|
||||
klog.Fatalf("unable to set up overall controller manager: %v", err)
|
||||
}
|
||||
|
||||
klog.V(0).Info("setting up scheme")
|
||||
if err := apis.AddToScheme(mgr.GetScheme()); err != nil {
|
||||
klog.Fatalf("unable add APIs to scheme: %v", err)
|
||||
}
|
||||
|
||||
klog.V(0).Info("Setting up controllers")
|
||||
err = workspace.Add(mgr)
|
||||
if err != nil {
|
||||
klog.Fatal("Unable to create workspace controller")
|
||||
}
|
||||
|
||||
err = namespace.Add(mgr)
|
||||
if err != nil {
|
||||
klog.Fatal("Unable to create namespace controller")
|
||||
}
|
||||
|
||||
err = (&application.ApplicationReconciler{
|
||||
Scheme: mgr.GetScheme(),
|
||||
Client: mgr.GetClient(),
|
||||
Mapper: mgr.GetRESTMapper(),
|
||||
Log: klogr.New(),
|
||||
}).SetupWithManager(mgr)
|
||||
if err != nil {
|
||||
klog.Fatal("Unable to create application controller")
|
||||
}
|
||||
|
||||
// TODO(jeff): refactor config with CRD
|
||||
servicemeshEnabled := s.ServiceMeshOptions != nil && len(s.ServiceMeshOptions.IstioPilotHost) != 0
|
||||
if err = addControllers(mgr,
|
||||
kubernetesClient,
|
||||
informerFactory,
|
||||
devopsClient,
|
||||
s3Client,
|
||||
ldapClient,
|
||||
s.AuthenticationOptions,
|
||||
openpitrixClient,
|
||||
s.MultiClusterOptions.Enable,
|
||||
s.NetworkOptions,
|
||||
servicemeshEnabled,
|
||||
s.AuthenticationOptions.KubectlImage, stopCh); err != nil {
|
||||
klog.Fatalf("unable to register controllers to the manager: %v", err)
|
||||
}
|
||||
|
||||
// Start cache data after all informer is registered
|
||||
informerFactory.Start(stopCh)
|
||||
|
||||
// Setup webhooks
|
||||
klog.Info("setting up webhook server")
|
||||
hookServer := mgr.GetWebhookServer()
|
||||
|
||||
klog.Info("registering webhooks to the webhook server")
|
||||
hookServer.Register("/validate-email-iam-kubesphere-io-v1alpha2-user", &webhook.Admission{Handler: &user.EmailValidator{Client: mgr.GetClient()}})
|
||||
hookServer.Register("/validate-nsnp-kubesphere-io-v1alpha1-network", &webhook.Admission{Handler: &nsnetworkpolicy.NSNPValidator{Client: mgr.GetClient()}})
|
||||
|
||||
klog.V(0).Info("Starting the controllers.")
|
||||
if err = mgr.Start(stopCh); err != nil {
|
||||
klog.Fatalf("unable to run the manager: %v", err)
|
||||
}
|
||||
|
||||
select {}
|
||||
mgrOptions := manager.Options{
|
||||
CertDir: s.WebhookCertDir,
|
||||
Port: 8443,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
<-stopCh
|
||||
cancel()
|
||||
}()
|
||||
|
||||
if !s.LeaderElect {
|
||||
run(ctx)
|
||||
return nil
|
||||
if s.LeaderElect {
|
||||
mgrOptions = manager.Options{
|
||||
CertDir: s.WebhookCertDir,
|
||||
Port: 8443,
|
||||
LeaderElection: s.LeaderElect,
|
||||
LeaderElectionNamespace: "kubesphere-system",
|
||||
LeaderElectionID: "ks-controller-manager-leader-election",
|
||||
LeaseDuration: &s.LeaderElection.LeaseDuration,
|
||||
RetryPeriod: &s.LeaderElection.RetryPeriod,
|
||||
RenewDeadline: &s.LeaderElection.RenewDeadline,
|
||||
}
|
||||
}
|
||||
|
||||
id, err := os.Hostname()
|
||||
klog.V(0).Info("setting up manager")
|
||||
|
||||
// Use 8443 instead of 443 cause we need root permission to bind port 443
|
||||
mgr, err := manager.New(kubernetesClient.Config(), mgrOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
klog.Fatalf("unable to set up overall controller manager: %v", err)
|
||||
}
|
||||
|
||||
// add a uniquifier so that two processes on the same host don't accidentally both become active
|
||||
id = id + "_" + string(uuid.NewUUID())
|
||||
|
||||
lock, err := resourcelock.New(resourcelock.LeasesResourceLock,
|
||||
"kubesphere-system",
|
||||
"ks-controller-manager",
|
||||
kubernetesClient.Kubernetes().CoreV1(),
|
||||
kubernetesClient.Kubernetes().CoordinationV1(),
|
||||
resourcelock.ResourceLockConfig{
|
||||
Identity: id,
|
||||
EventRecorder: record.NewBroadcaster().NewRecorder(scheme.Scheme, v1.EventSource{
|
||||
Component: "ks-controller-manager",
|
||||
}),
|
||||
})
|
||||
if err = apis.AddToScheme(mgr.GetScheme()); err != nil {
|
||||
klog.Fatalf("unable add APIs to scheme: %v", err)
|
||||
}
|
||||
|
||||
err = workspace.Add(mgr)
|
||||
if err != nil {
|
||||
klog.Fatalf("error creating lock: %v", err)
|
||||
klog.Fatal("Unable to create workspace controller")
|
||||
}
|
||||
|
||||
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
|
||||
Lock: lock,
|
||||
LeaseDuration: s.LeaderElection.LeaseDuration,
|
||||
RenewDeadline: s.LeaderElection.RenewDeadline,
|
||||
RetryPeriod: s.LeaderElection.RetryPeriod,
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: run,
|
||||
OnStoppedLeading: func() {
|
||||
klog.Errorf("leadership lost")
|
||||
os.Exit(0)
|
||||
},
|
||||
},
|
||||
})
|
||||
err = namespace.Add(mgr)
|
||||
if err != nil {
|
||||
klog.Fatal("Unable to create namespace controller")
|
||||
}
|
||||
|
||||
err = (&application.ApplicationReconciler{
|
||||
Scheme: mgr.GetScheme(),
|
||||
Client: mgr.GetClient(),
|
||||
Mapper: mgr.GetRESTMapper(),
|
||||
Log: klogr.New(),
|
||||
}).SetupWithManager(mgr)
|
||||
if err != nil {
|
||||
klog.Fatal("Unable to create application controller")
|
||||
}
|
||||
|
||||
// TODO(jeff): refactor config with CRD
|
||||
servicemeshEnabled := s.ServiceMeshOptions != nil && len(s.ServiceMeshOptions.IstioPilotHost) != 0
|
||||
if err = addControllers(mgr,
|
||||
kubernetesClient,
|
||||
informerFactory,
|
||||
devopsClient,
|
||||
s3Client,
|
||||
ldapClient,
|
||||
s.AuthenticationOptions,
|
||||
openpitrixClient,
|
||||
s.MultiClusterOptions.Enable,
|
||||
s.NetworkOptions,
|
||||
servicemeshEnabled,
|
||||
s.AuthenticationOptions.KubectlImage, stopCh); err != nil {
|
||||
klog.Fatalf("unable to register controllers to the manager: %v", err)
|
||||
}
|
||||
|
||||
// Start cache data after all informer is registered
|
||||
klog.V(0).Info("Starting cache resource from apiserver...")
|
||||
informerFactory.Start(stopCh)
|
||||
|
||||
// Setup webhooks
|
||||
klog.V(2).Info("setting up webhook server")
|
||||
hookServer := mgr.GetWebhookServer()
|
||||
|
||||
klog.V(2).Info("registering webhooks to the webhook server")
|
||||
hookServer.Register("/validate-email-iam-kubesphere-io-v1alpha2-user", &webhook.Admission{Handler: &user.EmailValidator{Client: mgr.GetClient()}})
|
||||
hookServer.Register("/validate-nsnp-kubesphere-io-v1alpha1-network", &webhook.Admission{Handler: &nsnetworkpolicy.NSNPValidator{Client: mgr.GetClient()}})
|
||||
|
||||
klog.V(0).Info("Starting the controllers.")
|
||||
if err = mgr.Start(stopCh); err != nil {
|
||||
klog.Fatalf("unable to run the manager: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -191,7 +191,7 @@ func (c *StorageCapabilityController) handlerCSIDriver(obj interface{}) {
|
||||
}
|
||||
for _, storageClass := range storageClasses {
|
||||
if storageClass.Provisioner == csiDriver.Name {
|
||||
klog.Info("enqueue StorageClass when handler csiDriver", storageClass)
|
||||
klog.V(4).Infof("enqueue StorageClass %s when handling csiDriver", storageClass.Name)
|
||||
c.enqueueStorageClass(storageClass)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user