fix host cluster reconcile bug (#2479)

Signed-off-by: Jeff <zw0948@gmail.com>
This commit is contained in:
zryfish
2020-07-19 01:22:32 +08:00
committed by GitHub
parent b7eb64dd95
commit 89f850466d
5 changed files with 112 additions and 36 deletions

View File

@@ -1,6 +1,7 @@
package cluster package cluster
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@@ -53,6 +54,7 @@ const (
kubefedNamespace = "kube-federation-system" kubefedNamespace = "kube-federation-system"
openpitrixRuntime = "openpitrix.io/runtime" openpitrixRuntime = "openpitrix.io/runtime"
kubesphereManaged = "kubesphere.io/managed"
// Actually host cluster name can be anything, there is only necessary when calling JoinFederation function // Actually host cluster name can be anything, there is only necessary when calling JoinFederation function
hostClusterName = "kubesphere" hostClusterName = "kubesphere"
@@ -81,12 +83,12 @@ var hostCluster = &clusterv1alpha1.Cluster{
Name: "host", Name: "host",
Annotations: map[string]string{ Annotations: map[string]string{
"kubesphere.io/description": "Automatically created by kubesphere, " + "kubesphere.io/description": "Automatically created by kubesphere, " +
"we encourage you use host cluster for cluster management only, " + "we encourage you to use host cluster for clusters management only, " +
"deploy workloads to member clusters.", "deploy workloads to member clusters.",
}, },
Labels: map[string]string{ Labels: map[string]string{
clusterv1alpha1.HostCluster: "", clusterv1alpha1.HostCluster: "",
clusterv1alpha1.ClusterGroup: "production", kubesphereManaged: "true",
}, },
}, },
Spec: clusterv1alpha1.ClusterSpec{ Spec: clusterv1alpha1.ClusterSpec{
@@ -119,6 +121,7 @@ type clusterController struct {
eventBroadcaster record.EventBroadcaster eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
// build this only for host cluster
client kubernetes.Interface client kubernetes.Interface
hostConfig *rest.Config hostConfig *rest.Config
@@ -296,17 +299,40 @@ func (c *clusterController) reconcileHostCluster() error {
return err return err
} }
if len(clusters) == 0 { hostKubeConfig, err := buildKubeconfigFromRestConfig(c.hostConfig)
hostKubeConfig, err := buildKubeconfigFromRestConfig(c.hostConfig) if err != nil {
if err != nil {
return err
}
hostCluster.Spec.Connection.KubeConfig = hostKubeConfig
_, err = c.clusterClient.Create(hostCluster)
return err return err
} }
return nil // no host cluster, create one
if len(clusters) == 0 {
hostCluster.Spec.Connection.KubeConfig = hostKubeConfig
_, err = c.clusterClient.Create(hostCluster)
return err
} else if len(clusters) > 1 {
return fmt.Errorf("there MUST not be more than one host clusters, while there are %d", len(clusters))
}
// only deal with cluster managed by kubesphere
cluster := clusters[0]
managedByKubesphere, ok := cluster.Labels[kubesphereManaged]
if !ok || managedByKubesphere != "true" {
return nil
}
// no kubeconfig, not likely to happen
if len(cluster.Spec.Connection.KubeConfig) == 0 {
cluster.Spec.Connection.KubeConfig = hostKubeConfig
} else {
// if kubeconfig are the same, then there is nothing to do
if bytes.Equal(cluster.Spec.Connection.KubeConfig, hostKubeConfig) {
return nil
}
}
// update host cluster config
_, err = c.clusterClient.Update(cluster)
return err
} }
func (c *clusterController) syncCluster(key string) error { func (c *clusterController) syncCluster(key string) error {
@@ -699,6 +725,16 @@ func (c *clusterController) addCluster(obj interface{}) {
c.queue.Add(key) c.queue.Add(key)
} }
func hasHostClusterLabel(cluster *clusterv1alpha1.Cluster) bool {
if cluster.Labels == nil || len(cluster.Labels) == 0 {
return false
}
_, ok := cluster.Labels[clusterv1alpha1.HostCluster]
return ok
}
func (c *clusterController) handleErr(err error, key interface{}) { func (c *clusterController) handleErr(err error, key interface{}) {
if err == nil { if err == nil {
c.queue.Forget(key) c.queue.Forget(key)
@@ -771,14 +807,40 @@ func (c *clusterController) joinFederation(clusterConfig *rest.Config, joiningCl
} }
// unJoinFederation unjoins a cluster from federation control plane. // unJoinFederation unjoins a cluster from federation control plane.
// It will first do normal unjoin process, if maximum retries reached, it will skip
// member cluster resource deletion, only delete resources in host cluster.
func (c *clusterController) unJoinFederation(clusterConfig *rest.Config, unjoiningClusterName string) error { func (c *clusterController) unJoinFederation(clusterConfig *rest.Config, unjoiningClusterName string) error {
return unjoinCluster(c.hostConfig, localMaxRetries := 5
clusterConfig, retries := 0
kubefedNamespace,
hostClusterName, for {
unjoiningClusterName, err := unjoinCluster(c.hostConfig,
true, clusterConfig,
false) kubefedNamespace,
hostClusterName,
unjoiningClusterName,
true,
false,
false)
if err != nil {
klog.Errorf("Failed to unJoin federation for cluster %s, error %v", unjoiningClusterName, err)
} else {
return nil
}
retries += 1
if retries >= localMaxRetries {
err = unjoinCluster(c.hostConfig,
clusterConfig,
kubefedNamespace,
hostClusterName,
unjoiningClusterName,
true,
false,
true)
return err
}
}
} }
// allocatePort find a available port between [portRangeMin, portRangeMax] in maximumRetries // allocatePort find a available port between [portRangeMin, portRangeMax] in maximumRetries

View File

@@ -1,6 +1,7 @@
package cluster package cluster
import ( import (
"io/ioutil"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api" "k8s.io/client-go/tools/clientcmd/api"
@@ -9,19 +10,29 @@ import (
func buildKubeconfigFromRestConfig(config *rest.Config) ([]byte, error) { func buildKubeconfigFromRestConfig(config *rest.Config) ([]byte, error) {
apiConfig := api.NewConfig() apiConfig := api.NewConfig()
apiConfig.Clusters["kubernetes"] = &api.Cluster{ apiCluster := &api.Cluster{
Server: config.Host, Server: config.Host,
CertificateAuthorityData: config.CAData, CertificateAuthorityData: config.CAData,
CertificateAuthority: config.CAFile,
} }
// generated kubeconfig will be used by cluster federation, CAFile is not
// accepted by kubefed, so we need read CAFile
if len(apiCluster.CertificateAuthorityData) == 0 && len(config.CAFile) != 0 {
caData, err := ioutil.ReadFile(config.CAFile)
if err != nil {
return nil, err
}
apiCluster.CertificateAuthorityData = caData
}
apiConfig.Clusters["kubernetes"] = apiCluster
apiConfig.AuthInfos["kubernetes-admin"] = &api.AuthInfo{ apiConfig.AuthInfos["kubernetes-admin"] = &api.AuthInfo{
ClientCertificate: config.CertFile,
ClientCertificateData: config.CertData, ClientCertificateData: config.CertData,
ClientKey: config.KeyFile,
ClientKeyData: config.KeyData, ClientKeyData: config.KeyData,
TokenFile: config.BearerTokenFile,
Token: config.BearerToken, Token: config.BearerToken,
TokenFile: config.BearerTokenFile,
Username: config.Username, Username: config.Username,
Password: config.Password, Password: config.Password,
} }

View File

@@ -18,7 +18,7 @@ import (
// UnjoinCluster performs all the necessary steps to remove the // UnjoinCluster performs all the necessary steps to remove the
// registration of a cluster from a KubeFed control plane provided the // registration of a cluster from a KubeFed control plane provided the
// required set of parameters are passed in. // required set of parameters are passed in.
func unjoinCluster(hostConfig, clusterConfig *rest.Config, kubefedNamespace, hostClusterName, unjoiningClusterName string, forceDeletion, dryRun bool) error { func unjoinCluster(hostConfig, clusterConfig *rest.Config, kubefedNamespace, hostClusterName, unjoiningClusterName string, forceDeletion, dryRun bool, skipMemberClusterResources bool) error {
hostClientset, err := util.HostClientset(hostConfig) hostClientset, err := util.HostClientset(hostConfig)
if err != nil { if err != nil {
@@ -43,7 +43,7 @@ func unjoinCluster(hostConfig, clusterConfig *rest.Config, kubefedNamespace, hos
return err return err
} }
if clusterClientset != nil { if clusterClientset != nil && !skipMemberClusterResources {
err := deleteRBACResources(clusterClientset, kubefedNamespace, unjoiningClusterName, hostClusterName, forceDeletion, dryRun) err := deleteRBACResources(clusterClientset, kubefedNamespace, unjoiningClusterName, hostClusterName, forceDeletion, dryRun)
if err != nil { if err != nil {
if !forceDeletion { if !forceDeletion {

View File

@@ -17,7 +17,7 @@ limitations under the License.
package customresourcedefinition package customresourcedefinition
import ( import (
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"kubesphere.io/kubesphere/pkg/api" "kubesphere.io/kubesphere/pkg/api"
@@ -35,12 +35,15 @@ func New(informers apiextensionsinformers.SharedInformerFactory) v1alpha3.Interf
} }
} }
// The reason we are still using v1beta1 instead of stable v1 is v1 is not released yet
// in Kubernetes v1.15.x, while v1.15.x is in our supporting list. Maybe we can change
// it to v1 when v1.15.x is no longer officially supported.
func (c crdGetter) Get(_, name string) (runtime.Object, error) { func (c crdGetter) Get(_, name string) (runtime.Object, error) {
return c.informers.Apiextensions().V1().CustomResourceDefinitions().Lister().Get(name) return c.informers.Apiextensions().V1beta1().CustomResourceDefinitions().Lister().Get(name)
} }
func (c crdGetter) List(_ string, query *query.Query) (*api.ListResult, error) { func (c crdGetter) List(_ string, query *query.Query) (*api.ListResult, error) {
crds, err := c.informers.Apiextensions().V1().CustomResourceDefinitions().Lister().List(query.Selector()) crds, err := c.informers.Apiextensions().V1beta1().CustomResourceDefinitions().Lister().List(query.Selector())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -54,12 +57,12 @@ func (c crdGetter) List(_ string, query *query.Query) (*api.ListResult, error) {
} }
func (c crdGetter) compare(left runtime.Object, right runtime.Object, field query.Field) bool { func (c crdGetter) compare(left runtime.Object, right runtime.Object, field query.Field) bool {
leftCRD, ok := left.(*v1.CustomResourceDefinition) leftCRD, ok := left.(*v1beta1.CustomResourceDefinition)
if !ok { if !ok {
return false return false
} }
rightCRD, ok := right.(*v1.CustomResourceDefinition) rightCRD, ok := right.(*v1beta1.CustomResourceDefinition)
if !ok { if !ok {
return false return false
} }
@@ -68,7 +71,7 @@ func (c crdGetter) compare(left runtime.Object, right runtime.Object, field quer
} }
func (c crdGetter) filter(object runtime.Object, filter query.Filter) bool { func (c crdGetter) filter(object runtime.Object, filter query.Filter) bool {
crd, ok := object.(*v1.CustomResourceDefinition) crd, ok := object.(*v1beta1.CustomResourceDefinition)
if !ok { if !ok {
return false return false
} }

View File

@@ -18,7 +18,7 @@ package customresourcedefinition
import ( import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -28,7 +28,7 @@ import (
"testing" "testing"
) )
var crds = []*v1.CustomResourceDefinition{ var crds = []*v1beta1.CustomResourceDefinition{
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "clusters.cluster.kubesphere.io", Name: "clusters.cluster.kubesphere.io",
@@ -47,7 +47,7 @@ var crds = []*v1.CustomResourceDefinition{
}, },
} }
func crdsToRuntimeObjects(crds ...*v1.CustomResourceDefinition) []runtime.Object { func crdsToRuntimeObjects(crds ...*v1beta1.CustomResourceDefinition) []runtime.Object {
items := make([]runtime.Object, 0) items := make([]runtime.Object, 0)
for _, crd := range crds { for _, crd := range crds {
@@ -57,7 +57,7 @@ func crdsToRuntimeObjects(crds ...*v1.CustomResourceDefinition) []runtime.Object
return items return items
} }
func crdsToInterface(crds ...*v1.CustomResourceDefinition) []interface{} { func crdsToInterface(crds ...*v1beta1.CustomResourceDefinition) []interface{} {
items := make([]interface{}, 0) items := make([]interface{}, 0)
for _, crd := range crds { for _, crd := range crds {
@@ -91,7 +91,7 @@ func TestCrdGetterList(t *testing.T) {
informers := apiextensionsinformers.NewSharedInformerFactory(client, 0) informers := apiextensionsinformers.NewSharedInformerFactory(client, 0)
for _, crd := range crds { for _, crd := range crds {
informers.Apiextensions().V1().CustomResourceDefinitions().Informer().GetIndexer().Add(crd) informers.Apiextensions().V1beta1().CustomResourceDefinitions().Informer().GetIndexer().Add(crd)
} }
for _, testCase := range testCases { for _, testCase := range testCases {