From 194d0549738bbeafc72bf2ab714e53d107a5f6fa Mon Sep 17 00:00:00 2001 From: yuswift Date: Tue, 23 Feb 2021 16:24:22 +0800 Subject: [PATCH] refactor cluster controller Signed-off-by: yuswift --- pkg/controller/cluster/cluster_controller.go | 313 ++++++------------- pkg/simple/client/multicluster/options.go | 4 +- 2 files changed, 97 insertions(+), 220 deletions(-) diff --git a/pkg/controller/cluster/cluster_controller.go b/pkg/controller/cluster/cluster_controller.go index c138e6cae..500d22653 100644 --- a/pkg/controller/cluster/cluster_controller.go +++ b/pkg/controller/cluster/cluster_controller.go @@ -21,7 +21,6 @@ import ( "context" "encoding/json" "fmt" - "math/rand" "net/http" "reflect" "sync" @@ -33,7 +32,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/intstr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -44,6 +42,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" "k8s.io/klog" fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1" @@ -56,12 +55,12 @@ import ( ) // Cluster controller only runs under multicluster mode. Cluster controller is following below steps, -// 1. Populates proxy spec if cluster connection type is proxy -// 1.1 Wait for cluster agent is ready if connection type is proxy +// 1. Wait for cluster agent is ready if connection type is proxy // 2. Join cluster into federation control plane if kubeconfig is ready. // 3. Pull cluster version and configz, set result to cluster status // Also put all clusters back into queue every 5 * time.Minute to sync cluster status, this is needed // in case there aren't any cluster changes made. +// Also check if all of the clusters are ready by the spec.connection.kubeconfig every resync period const ( // maxRetries is the number of times a service will be retried before it is dropped out of the queue. @@ -83,17 +82,14 @@ const ( portRangeMin = 6000 portRangeMax = 7000 - // Proxy service port - kubernetesPort = 6443 - kubespherePort = 80 - - defaultAgentNamespace = "kubesphere-system" - // proxy format proxyFormat = "%s/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:80/proxy/%s" // mulitcluster configuration name configzMultiCluster = "multicluster" + + // probe cluster timeout + probeClusterTimeout = 3 * time.Second ) // Cluster template for reconcile host cluster if there is none. @@ -223,12 +219,16 @@ func (c *clusterController) Run(workers int, stopCh <-chan struct{}) error { go wait.Until(c.worker, c.workerLoopPeriod, stopCh) } - // refresh cluster configz every 2 minutes + // refresh cluster configz every resync period go wait.Until(func() { if err := c.reconcileHostCluster(); err != nil { klog.Errorf("Error create host cluster, error %v", err) } + if err := c.probeClusters(); err != nil { + klog.Errorf("failed to reconcile cluster ready status, err: %v", err) + } + }, c.resyncPeriod, stopCh) <-stopCh @@ -348,6 +348,80 @@ func (c *clusterController) reconcileHostCluster() error { return err } +func (c *clusterController) probeClusters() error { + clusters, err := c.clusterLister.List(labels.Everything()) + if err != nil { + return err + } + + for _, cluster := range clusters { + if len(cluster.Spec.Connection.KubeConfig) == 0 { + continue + } + + clientConfig, err := clientcmd.NewClientConfigFromBytes(cluster.Spec.Connection.KubeConfig) + if err != nil { + klog.Error(err) + continue + } + + config, err := clientConfig.ClientConfig() + if err != nil { + klog.Error(err) + continue + } + config.Timeout = probeClusterTimeout + + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + klog.Error(err) + continue + } + + var con clusterv1alpha1.ClusterCondition + _, err = clientSet.Discovery().ServerVersion() + if err == nil { + con = clusterv1alpha1.ClusterCondition{ + Type: clusterv1alpha1.ClusterReady, + Status: v1.ConditionTrue, + LastUpdateTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: string(clusterv1alpha1.ClusterReady), + Message: "Cluster is available now", + } + } else { + con = clusterv1alpha1.ClusterCondition{ + Type: clusterv1alpha1.ClusterReady, + Status: v1.ConditionFalse, + LastUpdateTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: "failed to connect get kubernetes version", + Message: "Cluster is not available now", + } + } + + c.updateClusterCondition(cluster, con) + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + ct, err := c.clusterClient.Get(context.TODO(), cluster.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + ct.Status.Conditions = cluster.Status.Conditions + ct, err = c.clusterClient.Update(context.TODO(), ct, metav1.UpdateOptions{}) + return err + }) + if err != nil { + klog.Errorf("failed to update cluster %s status, err: %v", cluster.Name, err) + } else { + klog.V(4).Infof("successfully updated cluster %s to status %v", cluster.Name, con) + } + + } + + return nil +} + func (c *clusterController) syncCluster(key string) error { klog.V(5).Infof("starting to sync cluster %s", key) startTime := time.Now() @@ -363,6 +437,7 @@ func (c *clusterController) syncCluster(key string) error { }() cluster, err := c.clusterLister.Get(name) + if err != nil { // cluster not found, possibly been deleted // need to do the cleanup @@ -374,9 +449,6 @@ func (c *clusterController) syncCluster(key string) error { return err } - // proxy service name if needed - serviceName := fmt.Sprintf("mc-%s", cluster.Name) - if cluster.ObjectMeta.DeletionTimestamp.IsZero() { // The object is not being deleted, so if it does not have our finalizer, // then lets add the finalizer and update the object. This is equivalent @@ -399,22 +471,6 @@ func (c *clusterController) syncCluster(key string) error { return err } - _, err = c.client.CoreV1().Services(defaultAgentNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - // nothing to do - } else { - klog.Errorf("Failed to get proxy service %s, error %v", serviceName, err) - return err - } - } else { - err = c.client.CoreV1().Services(defaultAgentNamespace).Delete(context.TODO(), serviceName, *metav1.NewDeleteOptions(0)) - if err != nil { - klog.Errorf("Unable to delete service %s, error %v", serviceName, err) - return err - } - } - // clean up openpitrix runtime of the cluster if _, ok := cluster.Annotations[openpitrixRuntime]; ok { if c.openpitrixClient != nil { @@ -438,136 +494,18 @@ func (c *clusterController) syncCluster(key string) error { return nil } + // save a old copy of cluster + oldCluster := cluster.DeepCopy() + // currently we didn't set cluster.Spec.Enable when creating cluster at client side, so only check // if we enable cluster.Spec.JoinFederation now if cluster.Spec.JoinFederation == false { return nil } - // save a old copy of cluster - oldCluster := cluster.DeepCopy() - - // prepare for proxy to member cluster - if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy { - - // allocate ports for kubernetes and kubesphere endpoint - if cluster.Spec.Connection.KubeSphereAPIServerPort == 0 || - cluster.Spec.Connection.KubernetesAPIServerPort == 0 { - port, err := c.allocatePort() - if err != nil { - klog.Error(err) - return err - } - - cluster.Spec.Connection.KubernetesAPIServerPort = port - cluster.Spec.Connection.KubeSphereAPIServerPort = port + 10000 - } - - // token uninitialized, generate a new token - if len(cluster.Spec.Connection.Token) == 0 { - cluster.Spec.Connection.Token = c.generateToken() - } - - // create a proxy service spec - mcService := v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: serviceName, - Namespace: cluster.Namespace, - Labels: map[string]string{ - "app.kubernetes.io/name": serviceName, - "app": serviceName, - }, - }, - Spec: v1.ServiceSpec{ - Selector: map[string]string{ - "app.kubernetes.io/name": "tower", - "app": "tower", - }, - Ports: []v1.ServicePort{ - { - Name: "kubernetes", - Protocol: v1.ProtocolTCP, - Port: kubernetesPort, - TargetPort: intstr.FromInt(int(cluster.Spec.Connection.KubernetesAPIServerPort)), - }, - { - Name: "kubesphere", - Protocol: v1.ProtocolTCP, - Port: kubespherePort, - TargetPort: intstr.FromInt(int(cluster.Spec.Connection.KubeSphereAPIServerPort)), - }, - }, - }, - } - - service, err := c.client.CoreV1().Services(defaultAgentNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) - if err != nil { // proxy service not found - if errors.IsNotFound(err) { - service, err = c.client.CoreV1().Services(defaultAgentNamespace).Create(context.TODO(), &mcService, metav1.CreateOptions{}) - if err != nil { - return err - } - } - - return err - } else { // update existed proxy service - if !reflect.DeepEqual(service.Spec, mcService.Spec) { - mcService.ObjectMeta = service.ObjectMeta - mcService.Spec.ClusterIP = service.Spec.ClusterIP - - service, err = c.client.CoreV1().Services(defaultAgentNamespace).Update(context.TODO(), &mcService, metav1.UpdateOptions{}) - if err != nil { - return err - } - } - } - - // populates the kubernetes apiEndpoint and kubesphere apiEndpoint - cluster.Spec.Connection.KubernetesAPIEndpoint = fmt.Sprintf("https://%s:%d", service.Spec.ClusterIP, kubernetesPort) - cluster.Spec.Connection.KubeSphereAPIEndpoint = fmt.Sprintf("http://%s:%d", service.Spec.ClusterIP, kubespherePort) - - initializedCondition := clusterv1alpha1.ClusterCondition{ - Type: clusterv1alpha1.ClusterInitialized, - Status: v1.ConditionTrue, - Reason: string(clusterv1alpha1.ClusterInitialized), - Message: "Cluster has been initialized", - LastUpdateTime: metav1.Now(), - LastTransitionTime: metav1.Now(), - } - - if !isConditionTrue(cluster, clusterv1alpha1.ClusterInitialized) { - c.updateClusterCondition(cluster, initializedCondition) - } - - if !reflect.DeepEqual(oldCluster, cluster) { - cluster, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{}) - if err != nil { - klog.Errorf("Error updating cluster %s, error %s", cluster.Name, err) - return err - } - } - } - - // agent status unavailable, which means the agent disconnected from the server or has not connected to the server - // we need to update the cluster ready status unavailable and return. - if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy && - !isConditionTrue(cluster, clusterv1alpha1.ClusterAgentAvailable) { - clusterNotReadyCondition := clusterv1alpha1.ClusterCondition{ - Type: clusterv1alpha1.ClusterReady, - Status: v1.ConditionFalse, - LastUpdateTime: metav1.Now(), - LastTransitionTime: metav1.Now(), - Reason: "Unable to establish connection with cluster", - Message: "Cluster is not available now", - } - - c.updateClusterCondition(cluster, clusterNotReadyCondition) - - cluster, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{}) - if err != nil { - klog.Errorf("Error updating cluster %s, error %s", cluster.Name, err) - } - return err + // cluster not ready, nothing to do + if !isConditionTrue(cluster, clusterv1alpha1.ClusterReady) { + return nil } // build up cached cluster data if there isn't any @@ -594,10 +532,10 @@ func (c *clusterController) syncCluster(key string) error { _, err = c.joinFederation(clusterDt.config, cluster.Name, cluster.Labels) if err != nil { klog.Errorf("Failed to join federation for cluster %s, error %v", cluster.Name, err) - c.eventRecorder.Event(cluster, v1.EventTypeWarning, "JoinFederation", err.Error()) return err } - c.eventRecorder.Event(cluster, v1.EventTypeNormal, "JoinFederation", "Cluster has joined federation.") + + klog.Infof("successfully joined federation for cluster %s", cluster.Name) federationReadyCondition := clusterv1alpha1.ClusterCondition{ Type: clusterv1alpha1.ClusterFederated, @@ -611,7 +549,7 @@ func (c *clusterController) syncCluster(key string) error { c.updateClusterCondition(cluster, federationReadyCondition) } - // cluster agent is ready, we can pull kubernetes cluster info through agent + // cluster is ready, we can pull kubernetes cluster info through agent // since there is no agent necessary for host cluster, so updates for host cluster // is safe. if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 { @@ -647,17 +585,6 @@ func (c *clusterController) syncCluster(key string) error { cluster.Labels[clusterv1alpha1.HostCluster] = "" } - clusterReadyCondition := clusterv1alpha1.ClusterCondition{ - Type: clusterv1alpha1.ClusterReady, - Status: v1.ConditionTrue, - LastUpdateTime: metav1.Now(), - LastTransitionTime: metav1.Now(), - Reason: string(clusterv1alpha1.ClusterReady), - Message: "Cluster is available now", - } - - c.updateClusterCondition(cluster, clusterReadyCondition) - if c.openpitrixClient != nil { // OpenPitrix is enabled, create runtime if cluster.GetAnnotations() == nil { cluster.Annotations = make(map[string]string) @@ -746,16 +673,6 @@ func (c *clusterController) addCluster(obj interface{}) { c.queue.Add(key) } -func hasHostClusterLabel(cluster *clusterv1alpha1.Cluster) bool { - if cluster.Labels == nil || len(cluster.Labels) == 0 { - return false - } - - _, ok := cluster.Labels[clusterv1alpha1.HostCluster] - - return ok -} - func (c *clusterController) handleErr(err error, key interface{}) { if err == nil { c.queue.Forget(key) @@ -855,43 +772,3 @@ func (c *clusterController) unJoinFederation(clusterConfig *rest.Config, unjoini } } } - -// allocatePort find a available port between [portRangeMin, portRangeMax] in maximumRetries -// TODO: only works with handful clusters -func (c *clusterController) allocatePort() (uint16, error) { - rand.Seed(time.Now().UnixNano()) - - clusters, err := c.clusterLister.List(labels.Everything()) - if err != nil { - return 0, err - } - - const maximumRetries = 10 - for i := 0; i < maximumRetries; i++ { - collision := false - port := uint16(portRangeMin + rand.Intn(portRangeMax-portRangeMin+1)) - - for _, item := range clusters { - if item.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy && - item.Spec.Connection.KubernetesAPIServerPort != 0 && - item.Spec.Connection.KubeSphereAPIServerPort == port { - collision = true - break - } - } - - if !collision { - return port, nil - } - } - - return 0, fmt.Errorf("unable to allocate port after %d retries", maximumRetries) -} - -// generateToken returns a random 32-byte string as token -func (c *clusterController) generateToken() string { - rand.Seed(time.Now().UnixNano()) - b := make([]byte, 32) - rand.Read(b) - return fmt.Sprintf("%x", b) -} diff --git a/pkg/simple/client/multicluster/options.go b/pkg/simple/client/multicluster/options.go index 7c31cc3fd..c6ece89fd 100644 --- a/pkg/simple/client/multicluster/options.go +++ b/pkg/simple/client/multicluster/options.go @@ -22,7 +22,7 @@ import ( "github.com/spf13/pflag" ) -const DefaultResyncPeriod = time.Duration(120) * time.Second +const DefaultResyncPeriod = 120 * time.Second type Options struct { // Enable @@ -79,5 +79,5 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, s *Options) { "This field is used when generating deployment yaml for agent.") fs.DurationVar(&o.ClusterControllerResyncSecond, "cluster-controller-resync-second", s.ClusterControllerResyncSecond, - "Cluster controller resync second to sync cluster resource.") + "Cluster controller resync second to sync cluster resource. e.g. 2m 5m 10m ... default set to 2m") }