diff --git a/cmd/controller-manager/app/controllers.go b/cmd/controller-manager/app/controllers.go index 1f87906c3..4b602a59b 100644 --- a/cmd/controller-manager/app/controllers.go +++ b/cmd/controller-manager/app/controllers.go @@ -493,7 +493,6 @@ func addAllControllers(mgr manager.Manager, client k8s.Client, informerFactory i client.KubeSphere().ClusterV1alpha1().Clusters(), cmOptions.MultiClusterOptions.ClusterControllerResyncPeriod, cmOptions.MultiClusterOptions.HostClusterName, - kubernetesInformer.Core().V1().ConfigMaps(), ) addController(mgr, "cluster", clusterController) } diff --git a/pkg/controller/cluster/cluster_controller.go b/pkg/controller/cluster/cluster_controller.go index f1d96f2a2..a95a7cd8f 100644 --- a/pkg/controller/cluster/cluster_controller.go +++ b/pkg/controller/cluster/cluster_controller.go @@ -25,20 +25,17 @@ import ( "fmt" "net/http" "reflect" - "sync" "time" "gopkg.in/yaml.v2" v1 "k8s.io/api/core/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -125,22 +122,6 @@ var hostCluster = &clusterv1alpha1.Cluster{ }, } -// ClusterData stores cluster client -type clusterData struct { - - // cached rest.Config - config *rest.Config - - // cached kubernetes client, rebuild once cluster changed - client kubernetes.Interface - - // cached kubeconfig - cachedKubeconfig []byte - - // cached transport, used to proxy kubesphere version request - transport http.RoundTripper -} - type clusterController struct { eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder @@ -158,10 +139,6 @@ type clusterController struct { workerLoopPeriod time.Duration - mu sync.RWMutex - - clusterMap map[string]*clusterData - resyncPeriod time.Duration hostClusterName string @@ -174,7 +151,6 @@ func NewClusterController( clusterClient clusterclient.ClusterInterface, resyncPeriod time.Duration, hostClusterName string, - configmapInformer coreinformers.ConfigMapInformer, ) *clusterController { broadcaster := record.NewBroadcaster() @@ -192,7 +168,6 @@ func NewClusterController( clusterClient: clusterClient, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"), workerLoopPeriod: time.Second, - clusterMap: make(map[string]*clusterData), resyncPeriod: resyncPeriod, hostClusterName: hostClusterName, } @@ -200,23 +175,15 @@ func NewClusterController( c.clusterHasSynced = clusterInformer.Informer().HasSynced clusterInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ - AddFunc: c.addCluster, + AddFunc: c.enqueueCluster, UpdateFunc: func(oldObj, newObj interface{}) { - c.addCluster(newObj) - }, - DeleteFunc: c.addCluster, - }, resyncPeriod) - - configmapInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(oldObj, newObj interface{}) { - oldCM := oldObj.(*v1.ConfigMap) - newCM := newObj.(*v1.ConfigMap) - if oldCM.ResourceVersion == newCM.ResourceVersion { - return + oldCluster := oldObj.(*clusterv1alpha1.Cluster) + newCluster := newObj.(*clusterv1alpha1.Cluster) + if !reflect.DeepEqual(oldCluster.Spec, newCluster.Spec) { + c.enqueueCluster(newObj) } - // Update the clusterName field when the kubesphere-config configmap is updated. - c.syncClusterNameInConfigMap() }, + DeleteFunc: c.enqueueCluster, }, resyncPeriod) return c @@ -275,40 +242,6 @@ func (c *clusterController) processNextItem() bool { return true } -func buildClusterData(kubeconfig []byte) (*clusterData, error) { - // prepare for - clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig) - if err != nil { - klog.Errorf("Unable to create client config from kubeconfig bytes, %#v", err) - return nil, err - } - - clusterConfig, err := clientConfig.ClientConfig() - if err != nil { - klog.Errorf("Failed to get client config, %#v", err) - return nil, err - } - - transport, err := rest.TransportFor(clusterConfig) - if err != nil { - klog.Errorf("Failed to create transport, %#v", err) - return nil, err - } - - clientSet, err := kubernetes.NewForConfig(clusterConfig) - if err != nil { - klog.Errorf("Failed to create ClientSet from config, %#v", err) - return nil, err - } - - return &clusterData{ - cachedKubeconfig: kubeconfig, - config: clusterConfig, - client: clientSet, - transport: transport, - }, nil -} - // reconcileHostCluster will create a host cluster if there are no clusters labeled 'cluster-role.kubesphere.io/host' func (c *clusterController) reconcileHostCluster() error { clusters, err := c.clusterLister.List(labels.SelectorFromSet(labels.Set{clusterv1alpha1.HostCluster: ""})) @@ -495,28 +428,30 @@ func (c *clusterController) syncCluster(key string) error { return nil } - // build up cached cluster data if there isn't any - c.mu.Lock() - clusterDt, ok := c.clusterMap[cluster.Name] - if !ok || clusterDt == nil || !equality.Semantic.DeepEqual(clusterDt.cachedKubeconfig, cluster.Spec.Connection.KubeConfig) { - clusterDt, err = buildClusterData(cluster.Spec.Connection.KubeConfig) - if err != nil { - c.mu.Unlock() - return err - } - c.clusterMap[cluster.Name] = clusterDt + clusterConfig, err := clientcmd.RESTConfigFromKubeConfig(cluster.Spec.Connection.KubeConfig) + if err != nil { + return fmt.Errorf("failed to create cluster config for %s: %s", cluster.Name, err) + } + + clusterClient, err := kubernetes.NewForConfig(clusterConfig) + if err != nil { + return fmt.Errorf("failed to create cluster client for %s: %s", cluster.Name, err) + } + + proxyTransport, err := rest.TransportFor(clusterConfig) + if err != nil { + return fmt.Errorf("failed to create proxy transport for %s: %s", cluster.Name, err) } - c.mu.Unlock() if !cluster.Spec.JoinFederation { // trying to unJoin federation - err = c.unJoinFederation(clusterDt.config, cluster.Name) + err = c.unJoinFederation(clusterConfig, cluster.Name) if err != nil { klog.Errorf("Failed to unJoin federation for cluster %s, error %v", cluster.Name, err) c.eventRecorder.Event(cluster, v1.EventTypeWarning, "UnJoinFederation", err.Error()) return err } } else { // join federation - _, err = c.joinFederation(clusterDt.config, cluster.Name, cluster.Labels) + _, err = c.joinFederation(clusterConfig, cluster.Name, cluster.Labels) if err != nil { klog.Errorf("Failed to join federation for cluster %s, error %v", cluster.Name, err) @@ -556,29 +491,33 @@ func (c *clusterController) syncCluster(key string) error { // since there is no agent necessary for host cluster, so updates for host cluster // is safe. if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 { - cluster.Spec.Connection.KubernetesAPIEndpoint = clusterDt.config.Host + cluster.Spec.Connection.KubernetesAPIEndpoint = clusterConfig.Host } - version, err := clusterDt.client.Discovery().ServerVersion() + serverVersion, err := clusterClient.Discovery().ServerVersion() if err != nil { klog.Errorf("Failed to get kubernetes version, %#v", err) return err } - cluster.Status.KubernetesVersion = version.GitVersion + cluster.Status.KubernetesVersion = serverVersion.GitVersion - nodes, err := clusterDt.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + nodes, err := clusterClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) if err != nil { klog.Errorf("Failed to get cluster nodes, %#v", err) return err } cluster.Status.NodeCount = len(nodes.Items) - configz, err := c.tryToFetchKubeSphereComponents(clusterDt.config.Host, clusterDt.transport) - if err == nil { + // TODO use rest.Interface instead + configz, err := c.tryToFetchKubeSphereComponents(clusterConfig.Host, proxyTransport) + if err != nil { + klog.Warningf("failed to fetch kubesphere components status in cluster %s: %s", cluster.Name, err) + } else { cluster.Status.Configz = configz } - v, err := c.tryFetchKubeSphereVersion(clusterDt.config.Host, clusterDt.transport) + // TODO use rest.Interface instead + v, err := c.tryFetchKubeSphereVersion(clusterConfig.Host, proxyTransport) if err != nil { klog.Errorf("failed to get KubeSphere version, err: %#v", err) } else { @@ -586,7 +525,7 @@ func (c *clusterController) syncCluster(key string) error { } // Use kube-system namespace UID as cluster ID - kubeSystem, err := clusterDt.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceSystem, metav1.GetOptions{}) + kubeSystem, err := clusterClient.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceSystem, metav1.GetOptions{}) if err != nil { return err } @@ -600,7 +539,7 @@ func (c *clusterController) syncCluster(key string) error { cluster.Labels[clusterv1alpha1.HostCluster] = "" } - readyConditon := clusterv1alpha1.ClusterCondition{ + readyCondition := clusterv1alpha1.ClusterCondition{ Type: clusterv1alpha1.ClusterReady, Status: v1.ConditionTrue, LastUpdateTime: metav1.Now(), @@ -608,7 +547,7 @@ func (c *clusterController) syncCluster(key string) error { Reason: string(clusterv1alpha1.ClusterReady), Message: "Cluster is available now", } - c.updateClusterCondition(cluster, readyConditon) + c.updateClusterCondition(cluster, readyCondition) if err = c.updateKubeConfigExpirationDateCondition(cluster); err != nil { klog.Errorf("sync KubeConfig expiration date for cluster %s failed: %v", cluster.Name, err) @@ -623,7 +562,7 @@ func (c *clusterController) syncCluster(key string) error { } } - if err = c.setClusterNameInConfigMap(clusterDt.client, cluster.Name); err != nil { + if err = c.setClusterNameInConfigMap(clusterClient, cluster.Name); err != nil { return err } @@ -659,27 +598,6 @@ func (c *clusterController) setClusterNameInConfigMap(client kubernetes.Interfac return nil } -func (c *clusterController) syncClusterNameInConfigMap() { - clusters, err := c.clusterLister.List(labels.Everything()) - if err != nil { - klog.Errorf("list clusters failed: %v", err) - return - } - - for _, cluster := range clusters { - clusterDt, ok := c.clusterMap[cluster.Name] - if !ok { - continue - } - if err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - return c.setClusterNameInConfigMap(clusterDt.client, cluster.Name) - }); err != nil { - klog.Errorf("update configmap %s failed: %v", constants.KubeSphereConfigName, err) - continue - } - } -} - func (c *clusterController) checkIfClusterIsHostCluster(memberClusterNodes *v1.NodeList) bool { hostNodes, err := c.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) if err != nil { @@ -766,7 +684,7 @@ func (c *clusterController) tryFetchKubeSphereVersion(host string, transport htt return info.GitVersion, nil } -func (c *clusterController) addCluster(obj interface{}) { +func (c *clusterController) enqueueCluster(obj interface{}) { cluster := obj.(*clusterv1alpha1.Cluster) key, err := cache.MetaNamespaceKeyFunc(obj) diff --git a/pkg/utils/clusterclient/clusterclient.go b/pkg/utils/clusterclient/clusterclient.go index 890022e8c..e3f88c260 100644 --- a/pkg/utils/clusterclient/clusterclient.go +++ b/pkg/utils/clusterclient/clusterclient.go @@ -19,6 +19,7 @@ package clusterclient import ( "net/http" "net/url" + "reflect" "sync" corev1 "k8s.io/api/core/v1" @@ -72,12 +73,11 @@ func NewClusterClient(clusterInformer clusterinformer.ClusterInformer) ClusterCl c.addCluster(obj) }, UpdateFunc: func(oldObj, newObj interface{}) { - newCluster := newObj.(*clusterv1alpha1.Cluster) oldCluster := oldObj.(*clusterv1alpha1.Cluster) - if newCluster.ResourceVersion == oldCluster.ResourceVersion { - return + newCluster := newObj.(*clusterv1alpha1.Cluster) + if !reflect.DeepEqual(oldCluster.Spec, newCluster.Spec) { + c.addCluster(newObj) } - c.addCluster(newObj) }, DeleteFunc: func(obj interface{}) { c.removeCluster(obj)