From 89f850466db8c16c346ed27392e5cf6928fe45a5 Mon Sep 17 00:00:00 2001 From: zryfish Date: Sun, 19 Jul 2020 01:22:32 +0800 Subject: [PATCH] fix host cluster reconcile bug (#2479) Signed-off-by: Jeff --- pkg/controller/cluster/cluster_controller.go | 98 +++++++++++++++---- pkg/controller/cluster/helper.go | 21 +++- pkg/controller/cluster/unjoin.go | 4 +- .../customresourcedefinitions.go | 15 +-- .../customresourcedefinitions_test.go | 10 +- 5 files changed, 112 insertions(+), 36 deletions(-) diff --git a/pkg/controller/cluster/cluster_controller.go b/pkg/controller/cluster/cluster_controller.go index 30fd67966..94d8aaae7 100644 --- a/pkg/controller/cluster/cluster_controller.go +++ b/pkg/controller/cluster/cluster_controller.go @@ -1,6 +1,7 @@ package cluster import ( + "bytes" "encoding/json" "fmt" v1 "k8s.io/api/core/v1" @@ -53,6 +54,7 @@ const ( kubefedNamespace = "kube-federation-system" openpitrixRuntime = "openpitrix.io/runtime" + kubesphereManaged = "kubesphere.io/managed" // Actually host cluster name can be anything, there is only necessary when calling JoinFederation function hostClusterName = "kubesphere" @@ -81,12 +83,12 @@ var hostCluster = &clusterv1alpha1.Cluster{ Name: "host", Annotations: map[string]string{ "kubesphere.io/description": "Automatically created by kubesphere, " + - "we encourage you use host cluster for cluster management only, " + + "we encourage you to use host cluster for clusters management only, " + "deploy workloads to member clusters.", }, Labels: map[string]string{ - clusterv1alpha1.HostCluster: "", - clusterv1alpha1.ClusterGroup: "production", + clusterv1alpha1.HostCluster: "", + kubesphereManaged: "true", }, }, Spec: clusterv1alpha1.ClusterSpec{ @@ -119,6 +121,7 @@ type clusterController struct { eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder + // build this only for host cluster client kubernetes.Interface hostConfig *rest.Config @@ -296,17 +299,40 @@ func (c *clusterController) reconcileHostCluster() error { return err } - if len(clusters) == 0 { - hostKubeConfig, err := buildKubeconfigFromRestConfig(c.hostConfig) - if err != nil { - return err - } - hostCluster.Spec.Connection.KubeConfig = hostKubeConfig - _, err = c.clusterClient.Create(hostCluster) + hostKubeConfig, err := buildKubeconfigFromRestConfig(c.hostConfig) + if err != nil { return err } - return nil + // no host cluster, create one + if len(clusters) == 0 { + hostCluster.Spec.Connection.KubeConfig = hostKubeConfig + _, err = c.clusterClient.Create(hostCluster) + return err + } else if len(clusters) > 1 { + return fmt.Errorf("there MUST not be more than one host clusters, while there are %d", len(clusters)) + } + + // only deal with cluster managed by kubesphere + cluster := clusters[0] + managedByKubesphere, ok := cluster.Labels[kubesphereManaged] + if !ok || managedByKubesphere != "true" { + return nil + } + + // no kubeconfig, not likely to happen + if len(cluster.Spec.Connection.KubeConfig) == 0 { + cluster.Spec.Connection.KubeConfig = hostKubeConfig + } else { + // if kubeconfig are the same, then there is nothing to do + if bytes.Equal(cluster.Spec.Connection.KubeConfig, hostKubeConfig) { + return nil + } + } + + // update host cluster config + _, err = c.clusterClient.Update(cluster) + return err } func (c *clusterController) syncCluster(key string) error { @@ -699,6 +725,16 @@ func (c *clusterController) addCluster(obj interface{}) { c.queue.Add(key) } +func hasHostClusterLabel(cluster *clusterv1alpha1.Cluster) bool { + if cluster.Labels == nil || len(cluster.Labels) == 0 { + return false + } + + _, ok := cluster.Labels[clusterv1alpha1.HostCluster] + + return ok +} + func (c *clusterController) handleErr(err error, key interface{}) { if err == nil { c.queue.Forget(key) @@ -771,14 +807,40 @@ func (c *clusterController) joinFederation(clusterConfig *rest.Config, joiningCl } // unJoinFederation unjoins a cluster from federation control plane. +// It will first do normal unjoin process, if maximum retries reached, it will skip +// member cluster resource deletion, only delete resources in host cluster. func (c *clusterController) unJoinFederation(clusterConfig *rest.Config, unjoiningClusterName string) error { - return unjoinCluster(c.hostConfig, - clusterConfig, - kubefedNamespace, - hostClusterName, - unjoiningClusterName, - true, - false) + localMaxRetries := 5 + retries := 0 + + for { + err := unjoinCluster(c.hostConfig, + clusterConfig, + kubefedNamespace, + hostClusterName, + unjoiningClusterName, + true, + false, + false) + if err != nil { + klog.Errorf("Failed to unJoin federation for cluster %s, error %v", unjoiningClusterName, err) + } else { + return nil + } + + retries += 1 + if retries >= localMaxRetries { + err = unjoinCluster(c.hostConfig, + clusterConfig, + kubefedNamespace, + hostClusterName, + unjoiningClusterName, + true, + false, + true) + return err + } + } } // allocatePort find a available port between [portRangeMin, portRangeMax] in maximumRetries diff --git a/pkg/controller/cluster/helper.go b/pkg/controller/cluster/helper.go index 396629b67..e3e5d4cfd 100644 --- a/pkg/controller/cluster/helper.go +++ b/pkg/controller/cluster/helper.go @@ -1,6 +1,7 @@ package cluster import ( + "io/ioutil" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd/api" @@ -9,19 +10,29 @@ import ( func buildKubeconfigFromRestConfig(config *rest.Config) ([]byte, error) { apiConfig := api.NewConfig() - apiConfig.Clusters["kubernetes"] = &api.Cluster{ + apiCluster := &api.Cluster{ Server: config.Host, CertificateAuthorityData: config.CAData, - CertificateAuthority: config.CAFile, } + // generated kubeconfig will be used by cluster federation, CAFile is not + // accepted by kubefed, so we need read CAFile + if len(apiCluster.CertificateAuthorityData) == 0 && len(config.CAFile) != 0 { + caData, err := ioutil.ReadFile(config.CAFile) + if err != nil { + return nil, err + } + + apiCluster.CertificateAuthorityData = caData + } + + apiConfig.Clusters["kubernetes"] = apiCluster + apiConfig.AuthInfos["kubernetes-admin"] = &api.AuthInfo{ - ClientCertificate: config.CertFile, ClientCertificateData: config.CertData, - ClientKey: config.KeyFile, ClientKeyData: config.KeyData, - TokenFile: config.BearerTokenFile, Token: config.BearerToken, + TokenFile: config.BearerTokenFile, Username: config.Username, Password: config.Password, } diff --git a/pkg/controller/cluster/unjoin.go b/pkg/controller/cluster/unjoin.go index d89727322..0b73d55f6 100644 --- a/pkg/controller/cluster/unjoin.go +++ b/pkg/controller/cluster/unjoin.go @@ -18,7 +18,7 @@ import ( // UnjoinCluster performs all the necessary steps to remove the // registration of a cluster from a KubeFed control plane provided the // required set of parameters are passed in. -func unjoinCluster(hostConfig, clusterConfig *rest.Config, kubefedNamespace, hostClusterName, unjoiningClusterName string, forceDeletion, dryRun bool) error { +func unjoinCluster(hostConfig, clusterConfig *rest.Config, kubefedNamespace, hostClusterName, unjoiningClusterName string, forceDeletion, dryRun bool, skipMemberClusterResources bool) error { hostClientset, err := util.HostClientset(hostConfig) if err != nil { @@ -43,7 +43,7 @@ func unjoinCluster(hostConfig, clusterConfig *rest.Config, kubefedNamespace, hos return err } - if clusterClientset != nil { + if clusterClientset != nil && !skipMemberClusterResources { err := deleteRBACResources(clusterClientset, kubefedNamespace, unjoiningClusterName, hostClusterName, forceDeletion, dryRun) if err != nil { if !forceDeletion { diff --git a/pkg/models/resources/v1alpha3/customresourcedefinition/customresourcedefinitions.go b/pkg/models/resources/v1alpha3/customresourcedefinition/customresourcedefinitions.go index d40daa600..bc4b69544 100644 --- a/pkg/models/resources/v1alpha3/customresourcedefinition/customresourcedefinitions.go +++ b/pkg/models/resources/v1alpha3/customresourcedefinition/customresourcedefinitions.go @@ -17,7 +17,7 @@ limitations under the License. package customresourcedefinition import ( - "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" "k8s.io/apimachinery/pkg/runtime" "kubesphere.io/kubesphere/pkg/api" @@ -35,12 +35,15 @@ func New(informers apiextensionsinformers.SharedInformerFactory) v1alpha3.Interf } } +// The reason we are still using v1beta1 instead of stable v1 is v1 is not released yet +// in Kubernetes v1.15.x, while v1.15.x is in our supporting list. Maybe we can change +// it to v1 when v1.15.x is no longer officially supported. func (c crdGetter) Get(_, name string) (runtime.Object, error) { - return c.informers.Apiextensions().V1().CustomResourceDefinitions().Lister().Get(name) + return c.informers.Apiextensions().V1beta1().CustomResourceDefinitions().Lister().Get(name) } func (c crdGetter) List(_ string, query *query.Query) (*api.ListResult, error) { - crds, err := c.informers.Apiextensions().V1().CustomResourceDefinitions().Lister().List(query.Selector()) + crds, err := c.informers.Apiextensions().V1beta1().CustomResourceDefinitions().Lister().List(query.Selector()) if err != nil { return nil, err } @@ -54,12 +57,12 @@ func (c crdGetter) List(_ string, query *query.Query) (*api.ListResult, error) { } func (c crdGetter) compare(left runtime.Object, right runtime.Object, field query.Field) bool { - leftCRD, ok := left.(*v1.CustomResourceDefinition) + leftCRD, ok := left.(*v1beta1.CustomResourceDefinition) if !ok { return false } - rightCRD, ok := right.(*v1.CustomResourceDefinition) + rightCRD, ok := right.(*v1beta1.CustomResourceDefinition) if !ok { return false } @@ -68,7 +71,7 @@ func (c crdGetter) compare(left runtime.Object, right runtime.Object, field quer } func (c crdGetter) filter(object runtime.Object, filter query.Filter) bool { - crd, ok := object.(*v1.CustomResourceDefinition) + crd, ok := object.(*v1beta1.CustomResourceDefinition) if !ok { return false } diff --git a/pkg/models/resources/v1alpha3/customresourcedefinition/customresourcedefinitions_test.go b/pkg/models/resources/v1alpha3/customresourcedefinition/customresourcedefinitions_test.go index 4986c996e..0901d294a 100644 --- a/pkg/models/resources/v1alpha3/customresourcedefinition/customresourcedefinitions_test.go +++ b/pkg/models/resources/v1alpha3/customresourcedefinition/customresourcedefinitions_test.go @@ -18,7 +18,7 @@ package customresourcedefinition import ( "github.com/google/go-cmp/cmp" - v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,7 +28,7 @@ import ( "testing" ) -var crds = []*v1.CustomResourceDefinition{ +var crds = []*v1beta1.CustomResourceDefinition{ { ObjectMeta: metav1.ObjectMeta{ Name: "clusters.cluster.kubesphere.io", @@ -47,7 +47,7 @@ var crds = []*v1.CustomResourceDefinition{ }, } -func crdsToRuntimeObjects(crds ...*v1.CustomResourceDefinition) []runtime.Object { +func crdsToRuntimeObjects(crds ...*v1beta1.CustomResourceDefinition) []runtime.Object { items := make([]runtime.Object, 0) for _, crd := range crds { @@ -57,7 +57,7 @@ func crdsToRuntimeObjects(crds ...*v1.CustomResourceDefinition) []runtime.Object return items } -func crdsToInterface(crds ...*v1.CustomResourceDefinition) []interface{} { +func crdsToInterface(crds ...*v1beta1.CustomResourceDefinition) []interface{} { items := make([]interface{}, 0) for _, crd := range crds { @@ -91,7 +91,7 @@ func TestCrdGetterList(t *testing.T) { informers := apiextensionsinformers.NewSharedInformerFactory(client, 0) for _, crd := range crds { - informers.Apiextensions().V1().CustomResourceDefinitions().Informer().GetIndexer().Add(crd) + informers.Apiextensions().V1beta1().CustomResourceDefinitions().Informer().GetIndexer().Add(crd) } for _, testCase := range testCases {