Merge pull request #3380 from yuswift/refactor_cluster_controller
refactor cluster controller
This commit is contained in:
@@ -21,7 +21,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"sync"
|
||||
@@ -33,7 +32,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@@ -44,6 +42,7 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
|
||||
@@ -56,12 +55,12 @@ import (
|
||||
)
|
||||
|
||||
// Cluster controller only runs under multicluster mode. Cluster controller is following below steps,
|
||||
// 1. Populates proxy spec if cluster connection type is proxy
|
||||
// 1.1 Wait for cluster agent is ready if connection type is proxy
|
||||
// 1. Wait for cluster agent is ready if connection type is proxy
|
||||
// 2. Join cluster into federation control plane if kubeconfig is ready.
|
||||
// 3. Pull cluster version and configz, set result to cluster status
|
||||
// Also put all clusters back into queue every 5 * time.Minute to sync cluster status, this is needed
|
||||
// in case there aren't any cluster changes made.
|
||||
// Also check if all of the clusters are ready by the spec.connection.kubeconfig every resync period
|
||||
|
||||
const (
|
||||
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
|
||||
@@ -83,17 +82,14 @@ const (
|
||||
portRangeMin = 6000
|
||||
portRangeMax = 7000
|
||||
|
||||
// Proxy service port
|
||||
kubernetesPort = 6443
|
||||
kubespherePort = 80
|
||||
|
||||
defaultAgentNamespace = "kubesphere-system"
|
||||
|
||||
// proxy format
|
||||
proxyFormat = "%s/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:80/proxy/%s"
|
||||
|
||||
// mulitcluster configuration name
|
||||
configzMultiCluster = "multicluster"
|
||||
|
||||
// probe cluster timeout
|
||||
probeClusterTimeout = 3 * time.Second
|
||||
)
|
||||
|
||||
// Cluster template for reconcile host cluster if there is none.
|
||||
@@ -223,12 +219,16 @@ func (c *clusterController) Run(workers int, stopCh <-chan struct{}) error {
|
||||
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
|
||||
}
|
||||
|
||||
// refresh cluster configz every 2 minutes
|
||||
// refresh cluster configz every resync period
|
||||
go wait.Until(func() {
|
||||
if err := c.reconcileHostCluster(); err != nil {
|
||||
klog.Errorf("Error create host cluster, error %v", err)
|
||||
}
|
||||
|
||||
if err := c.probeClusters(); err != nil {
|
||||
klog.Errorf("failed to reconcile cluster ready status, err: %v", err)
|
||||
}
|
||||
|
||||
}, c.resyncPeriod, stopCh)
|
||||
|
||||
<-stopCh
|
||||
@@ -348,6 +348,80 @@ func (c *clusterController) reconcileHostCluster() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *clusterController) probeClusters() error {
|
||||
clusters, err := c.clusterLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, cluster := range clusters {
|
||||
if len(cluster.Spec.Connection.KubeConfig) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
clientConfig, err := clientcmd.NewClientConfigFromBytes(cluster.Spec.Connection.KubeConfig)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
config, err := clientConfig.ClientConfig()
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
continue
|
||||
}
|
||||
config.Timeout = probeClusterTimeout
|
||||
|
||||
clientSet, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
var con clusterv1alpha1.ClusterCondition
|
||||
_, err = clientSet.Discovery().ServerVersion()
|
||||
if err == nil {
|
||||
con = clusterv1alpha1.ClusterCondition{
|
||||
Type: clusterv1alpha1.ClusterReady,
|
||||
Status: v1.ConditionTrue,
|
||||
LastUpdateTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(clusterv1alpha1.ClusterReady),
|
||||
Message: "Cluster is available now",
|
||||
}
|
||||
} else {
|
||||
con = clusterv1alpha1.ClusterCondition{
|
||||
Type: clusterv1alpha1.ClusterReady,
|
||||
Status: v1.ConditionFalse,
|
||||
LastUpdateTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "failed to connect get kubernetes version",
|
||||
Message: "Cluster is not available now",
|
||||
}
|
||||
}
|
||||
|
||||
c.updateClusterCondition(cluster, con)
|
||||
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
ct, err := c.clusterClient.Get(context.TODO(), cluster.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ct.Status.Conditions = cluster.Status.Conditions
|
||||
ct, err = c.clusterClient.Update(context.TODO(), ct, metav1.UpdateOptions{})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to update cluster %s status, err: %v", cluster.Name, err)
|
||||
} else {
|
||||
klog.V(4).Infof("successfully updated cluster %s to status %v", cluster.Name, con)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clusterController) syncCluster(key string) error {
|
||||
klog.V(5).Infof("starting to sync cluster %s", key)
|
||||
startTime := time.Now()
|
||||
@@ -363,6 +437,7 @@ func (c *clusterController) syncCluster(key string) error {
|
||||
}()
|
||||
|
||||
cluster, err := c.clusterLister.Get(name)
|
||||
|
||||
if err != nil {
|
||||
// cluster not found, possibly been deleted
|
||||
// need to do the cleanup
|
||||
@@ -374,9 +449,6 @@ func (c *clusterController) syncCluster(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// proxy service name if needed
|
||||
serviceName := fmt.Sprintf("mc-%s", cluster.Name)
|
||||
|
||||
if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
// The object is not being deleted, so if it does not have our finalizer,
|
||||
// then lets add the finalizer and update the object. This is equivalent
|
||||
@@ -399,22 +471,6 @@ func (c *clusterController) syncCluster(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.client.CoreV1().Services(defaultAgentNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
// nothing to do
|
||||
} else {
|
||||
klog.Errorf("Failed to get proxy service %s, error %v", serviceName, err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err = c.client.CoreV1().Services(defaultAgentNamespace).Delete(context.TODO(), serviceName, *metav1.NewDeleteOptions(0))
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to delete service %s, error %v", serviceName, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// clean up openpitrix runtime of the cluster
|
||||
if _, ok := cluster.Annotations[openpitrixRuntime]; ok {
|
||||
if c.openpitrixClient != nil {
|
||||
@@ -438,136 +494,18 @@ func (c *clusterController) syncCluster(key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// save a old copy of cluster
|
||||
oldCluster := cluster.DeepCopy()
|
||||
|
||||
// currently we didn't set cluster.Spec.Enable when creating cluster at client side, so only check
|
||||
// if we enable cluster.Spec.JoinFederation now
|
||||
if cluster.Spec.JoinFederation == false {
|
||||
return nil
|
||||
}
|
||||
|
||||
// save a old copy of cluster
|
||||
oldCluster := cluster.DeepCopy()
|
||||
|
||||
// prepare for proxy to member cluster
|
||||
if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy {
|
||||
|
||||
// allocate ports for kubernetes and kubesphere endpoint
|
||||
if cluster.Spec.Connection.KubeSphereAPIServerPort == 0 ||
|
||||
cluster.Spec.Connection.KubernetesAPIServerPort == 0 {
|
||||
port, err := c.allocatePort()
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
cluster.Spec.Connection.KubernetesAPIServerPort = port
|
||||
cluster.Spec.Connection.KubeSphereAPIServerPort = port + 10000
|
||||
}
|
||||
|
||||
// token uninitialized, generate a new token
|
||||
if len(cluster.Spec.Connection.Token) == 0 {
|
||||
cluster.Spec.Connection.Token = c.generateToken()
|
||||
}
|
||||
|
||||
// create a proxy service spec
|
||||
mcService := v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: serviceName,
|
||||
Namespace: cluster.Namespace,
|
||||
Labels: map[string]string{
|
||||
"app.kubernetes.io/name": serviceName,
|
||||
"app": serviceName,
|
||||
},
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: map[string]string{
|
||||
"app.kubernetes.io/name": "tower",
|
||||
"app": "tower",
|
||||
},
|
||||
Ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "kubernetes",
|
||||
Protocol: v1.ProtocolTCP,
|
||||
Port: kubernetesPort,
|
||||
TargetPort: intstr.FromInt(int(cluster.Spec.Connection.KubernetesAPIServerPort)),
|
||||
},
|
||||
{
|
||||
Name: "kubesphere",
|
||||
Protocol: v1.ProtocolTCP,
|
||||
Port: kubespherePort,
|
||||
TargetPort: intstr.FromInt(int(cluster.Spec.Connection.KubeSphereAPIServerPort)),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
service, err := c.client.CoreV1().Services(defaultAgentNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
|
||||
if err != nil { // proxy service not found
|
||||
if errors.IsNotFound(err) {
|
||||
service, err = c.client.CoreV1().Services(defaultAgentNamespace).Create(context.TODO(), &mcService, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
} else { // update existed proxy service
|
||||
if !reflect.DeepEqual(service.Spec, mcService.Spec) {
|
||||
mcService.ObjectMeta = service.ObjectMeta
|
||||
mcService.Spec.ClusterIP = service.Spec.ClusterIP
|
||||
|
||||
service, err = c.client.CoreV1().Services(defaultAgentNamespace).Update(context.TODO(), &mcService, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// populates the kubernetes apiEndpoint and kubesphere apiEndpoint
|
||||
cluster.Spec.Connection.KubernetesAPIEndpoint = fmt.Sprintf("https://%s:%d", service.Spec.ClusterIP, kubernetesPort)
|
||||
cluster.Spec.Connection.KubeSphereAPIEndpoint = fmt.Sprintf("http://%s:%d", service.Spec.ClusterIP, kubespherePort)
|
||||
|
||||
initializedCondition := clusterv1alpha1.ClusterCondition{
|
||||
Type: clusterv1alpha1.ClusterInitialized,
|
||||
Status: v1.ConditionTrue,
|
||||
Reason: string(clusterv1alpha1.ClusterInitialized),
|
||||
Message: "Cluster has been initialized",
|
||||
LastUpdateTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
}
|
||||
|
||||
if !isConditionTrue(cluster, clusterv1alpha1.ClusterInitialized) {
|
||||
c.updateClusterCondition(cluster, initializedCondition)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(oldCluster, cluster) {
|
||||
cluster, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Error updating cluster %s, error %s", cluster.Name, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// agent status unavailable, which means the agent disconnected from the server or has not connected to the server
|
||||
// we need to update the cluster ready status unavailable and return.
|
||||
if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy &&
|
||||
!isConditionTrue(cluster, clusterv1alpha1.ClusterAgentAvailable) {
|
||||
clusterNotReadyCondition := clusterv1alpha1.ClusterCondition{
|
||||
Type: clusterv1alpha1.ClusterReady,
|
||||
Status: v1.ConditionFalse,
|
||||
LastUpdateTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: "Unable to establish connection with cluster",
|
||||
Message: "Cluster is not available now",
|
||||
}
|
||||
|
||||
c.updateClusterCondition(cluster, clusterNotReadyCondition)
|
||||
|
||||
cluster, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Error updating cluster %s, error %s", cluster.Name, err)
|
||||
}
|
||||
return err
|
||||
// cluster not ready, nothing to do
|
||||
if !isConditionTrue(cluster, clusterv1alpha1.ClusterReady) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// build up cached cluster data if there isn't any
|
||||
@@ -594,10 +532,10 @@ func (c *clusterController) syncCluster(key string) error {
|
||||
_, err = c.joinFederation(clusterDt.config, cluster.Name, cluster.Labels)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to join federation for cluster %s, error %v", cluster.Name, err)
|
||||
c.eventRecorder.Event(cluster, v1.EventTypeWarning, "JoinFederation", err.Error())
|
||||
return err
|
||||
}
|
||||
c.eventRecorder.Event(cluster, v1.EventTypeNormal, "JoinFederation", "Cluster has joined federation.")
|
||||
|
||||
klog.Infof("successfully joined federation for cluster %s", cluster.Name)
|
||||
|
||||
federationReadyCondition := clusterv1alpha1.ClusterCondition{
|
||||
Type: clusterv1alpha1.ClusterFederated,
|
||||
@@ -611,7 +549,7 @@ func (c *clusterController) syncCluster(key string) error {
|
||||
c.updateClusterCondition(cluster, federationReadyCondition)
|
||||
}
|
||||
|
||||
// cluster agent is ready, we can pull kubernetes cluster info through agent
|
||||
// cluster is ready, we can pull kubernetes cluster info through agent
|
||||
// since there is no agent necessary for host cluster, so updates for host cluster
|
||||
// is safe.
|
||||
if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 {
|
||||
@@ -647,17 +585,6 @@ func (c *clusterController) syncCluster(key string) error {
|
||||
cluster.Labels[clusterv1alpha1.HostCluster] = ""
|
||||
}
|
||||
|
||||
clusterReadyCondition := clusterv1alpha1.ClusterCondition{
|
||||
Type: clusterv1alpha1.ClusterReady,
|
||||
Status: v1.ConditionTrue,
|
||||
LastUpdateTime: metav1.Now(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(clusterv1alpha1.ClusterReady),
|
||||
Message: "Cluster is available now",
|
||||
}
|
||||
|
||||
c.updateClusterCondition(cluster, clusterReadyCondition)
|
||||
|
||||
if c.openpitrixClient != nil { // OpenPitrix is enabled, create runtime
|
||||
if cluster.GetAnnotations() == nil {
|
||||
cluster.Annotations = make(map[string]string)
|
||||
@@ -746,16 +673,6 @@ func (c *clusterController) addCluster(obj interface{}) {
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
func hasHostClusterLabel(cluster *clusterv1alpha1.Cluster) bool {
|
||||
if cluster.Labels == nil || len(cluster.Labels) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
_, ok := cluster.Labels[clusterv1alpha1.HostCluster]
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
func (c *clusterController) handleErr(err error, key interface{}) {
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
@@ -855,43 +772,3 @@ func (c *clusterController) unJoinFederation(clusterConfig *rest.Config, unjoini
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// allocatePort find a available port between [portRangeMin, portRangeMax] in maximumRetries
|
||||
// TODO: only works with handful clusters
|
||||
func (c *clusterController) allocatePort() (uint16, error) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
clusters, err := c.clusterLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
const maximumRetries = 10
|
||||
for i := 0; i < maximumRetries; i++ {
|
||||
collision := false
|
||||
port := uint16(portRangeMin + rand.Intn(portRangeMax-portRangeMin+1))
|
||||
|
||||
for _, item := range clusters {
|
||||
if item.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy &&
|
||||
item.Spec.Connection.KubernetesAPIServerPort != 0 &&
|
||||
item.Spec.Connection.KubeSphereAPIServerPort == port {
|
||||
collision = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !collision {
|
||||
return port, nil
|
||||
}
|
||||
}
|
||||
|
||||
return 0, fmt.Errorf("unable to allocate port after %d retries", maximumRetries)
|
||||
}
|
||||
|
||||
// generateToken returns a random 32-byte string as token
|
||||
func (c *clusterController) generateToken() string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
b := make([]byte, 32)
|
||||
rand.Read(b)
|
||||
return fmt.Sprintf("%x", b)
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ import (
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
const DefaultResyncPeriod = time.Duration(120) * time.Second
|
||||
const DefaultResyncPeriod = 120 * time.Second
|
||||
|
||||
type Options struct {
|
||||
// Enable
|
||||
@@ -79,5 +79,5 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, s *Options) {
|
||||
"This field is used when generating deployment yaml for agent.")
|
||||
|
||||
fs.DurationVar(&o.ClusterControllerResyncSecond, "cluster-controller-resync-second", s.ClusterControllerResyncSecond,
|
||||
"Cluster controller resync second to sync cluster resource.")
|
||||
"Cluster controller resync second to sync cluster resource. e.g. 2m 5m 10m ... default set to 2m")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user