refactor cluster controller
Signed-off-by: yuswift <yuswiftli@yunify.com>
This commit is contained in:
@@ -21,7 +21,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -33,7 +32,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
@@ -44,6 +42,7 @@ import (
|
|||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/client-go/util/retry"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
|
fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
|
||||||
@@ -56,12 +55,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Cluster controller only runs under multicluster mode. Cluster controller is following below steps,
|
// Cluster controller only runs under multicluster mode. Cluster controller is following below steps,
|
||||||
// 1. Populates proxy spec if cluster connection type is proxy
|
// 1. Wait for cluster agent is ready if connection type is proxy
|
||||||
// 1.1 Wait for cluster agent is ready if connection type is proxy
|
|
||||||
// 2. Join cluster into federation control plane if kubeconfig is ready.
|
// 2. Join cluster into federation control plane if kubeconfig is ready.
|
||||||
// 3. Pull cluster version and configz, set result to cluster status
|
// 3. Pull cluster version and configz, set result to cluster status
|
||||||
// Also put all clusters back into queue every 5 * time.Minute to sync cluster status, this is needed
|
// Also put all clusters back into queue every 5 * time.Minute to sync cluster status, this is needed
|
||||||
// in case there aren't any cluster changes made.
|
// in case there aren't any cluster changes made.
|
||||||
|
// Also check if all of the clusters are ready by the spec.connection.kubeconfig every resync period
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
|
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
|
||||||
@@ -83,17 +82,14 @@ const (
|
|||||||
portRangeMin = 6000
|
portRangeMin = 6000
|
||||||
portRangeMax = 7000
|
portRangeMax = 7000
|
||||||
|
|
||||||
// Proxy service port
|
|
||||||
kubernetesPort = 6443
|
|
||||||
kubespherePort = 80
|
|
||||||
|
|
||||||
defaultAgentNamespace = "kubesphere-system"
|
|
||||||
|
|
||||||
// proxy format
|
// proxy format
|
||||||
proxyFormat = "%s/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:80/proxy/%s"
|
proxyFormat = "%s/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:80/proxy/%s"
|
||||||
|
|
||||||
// mulitcluster configuration name
|
// mulitcluster configuration name
|
||||||
configzMultiCluster = "multicluster"
|
configzMultiCluster = "multicluster"
|
||||||
|
|
||||||
|
// probe cluster timeout
|
||||||
|
probeClusterTimeout = 3 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Cluster template for reconcile host cluster if there is none.
|
// Cluster template for reconcile host cluster if there is none.
|
||||||
@@ -223,12 +219,16 @@ func (c *clusterController) Run(workers int, stopCh <-chan struct{}) error {
|
|||||||
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
|
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// refresh cluster configz every 2 minutes
|
// refresh cluster configz every resync period
|
||||||
go wait.Until(func() {
|
go wait.Until(func() {
|
||||||
if err := c.reconcileHostCluster(); err != nil {
|
if err := c.reconcileHostCluster(); err != nil {
|
||||||
klog.Errorf("Error create host cluster, error %v", err)
|
klog.Errorf("Error create host cluster, error %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := c.probeClusters(); err != nil {
|
||||||
|
klog.Errorf("failed to reconcile cluster ready status, err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
}, c.resyncPeriod, stopCh)
|
}, c.resyncPeriod, stopCh)
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
@@ -348,6 +348,80 @@ func (c *clusterController) reconcileHostCluster() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *clusterController) probeClusters() error {
|
||||||
|
clusters, err := c.clusterLister.List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, cluster := range clusters {
|
||||||
|
if len(cluster.Spec.Connection.KubeConfig) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
clientConfig, err := clientcmd.NewClientConfigFromBytes(cluster.Spec.Connection.KubeConfig)
|
||||||
|
if err != nil {
|
||||||
|
klog.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
config, err := clientConfig.ClientConfig()
|
||||||
|
if err != nil {
|
||||||
|
klog.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
config.Timeout = probeClusterTimeout
|
||||||
|
|
||||||
|
clientSet, err := kubernetes.NewForConfig(config)
|
||||||
|
if err != nil {
|
||||||
|
klog.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var con clusterv1alpha1.ClusterCondition
|
||||||
|
_, err = clientSet.Discovery().ServerVersion()
|
||||||
|
if err == nil {
|
||||||
|
con = clusterv1alpha1.ClusterCondition{
|
||||||
|
Type: clusterv1alpha1.ClusterReady,
|
||||||
|
Status: v1.ConditionTrue,
|
||||||
|
LastUpdateTime: metav1.Now(),
|
||||||
|
LastTransitionTime: metav1.Now(),
|
||||||
|
Reason: string(clusterv1alpha1.ClusterReady),
|
||||||
|
Message: "Cluster is available now",
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
con = clusterv1alpha1.ClusterCondition{
|
||||||
|
Type: clusterv1alpha1.ClusterReady,
|
||||||
|
Status: v1.ConditionFalse,
|
||||||
|
LastUpdateTime: metav1.Now(),
|
||||||
|
LastTransitionTime: metav1.Now(),
|
||||||
|
Reason: "failed to connect get kubernetes version",
|
||||||
|
Message: "Cluster is not available now",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.updateClusterCondition(cluster, con)
|
||||||
|
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||||
|
ct, err := c.clusterClient.Get(context.TODO(), cluster.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ct.Status.Conditions = cluster.Status.Conditions
|
||||||
|
ct, err = c.clusterClient.Update(context.TODO(), ct, metav1.UpdateOptions{})
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("failed to update cluster %s status, err: %v", cluster.Name, err)
|
||||||
|
} else {
|
||||||
|
klog.V(4).Infof("successfully updated cluster %s to status %v", cluster.Name, con)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *clusterController) syncCluster(key string) error {
|
func (c *clusterController) syncCluster(key string) error {
|
||||||
klog.V(5).Infof("starting to sync cluster %s", key)
|
klog.V(5).Infof("starting to sync cluster %s", key)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
@@ -363,6 +437,7 @@ func (c *clusterController) syncCluster(key string) error {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
cluster, err := c.clusterLister.Get(name)
|
cluster, err := c.clusterLister.Get(name)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// cluster not found, possibly been deleted
|
// cluster not found, possibly been deleted
|
||||||
// need to do the cleanup
|
// need to do the cleanup
|
||||||
@@ -374,9 +449,6 @@ func (c *clusterController) syncCluster(key string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// proxy service name if needed
|
|
||||||
serviceName := fmt.Sprintf("mc-%s", cluster.Name)
|
|
||||||
|
|
||||||
if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
|
if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||||
// The object is not being deleted, so if it does not have our finalizer,
|
// The object is not being deleted, so if it does not have our finalizer,
|
||||||
// then lets add the finalizer and update the object. This is equivalent
|
// then lets add the finalizer and update the object. This is equivalent
|
||||||
@@ -399,22 +471,6 @@ func (c *clusterController) syncCluster(key string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = c.client.CoreV1().Services(defaultAgentNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
if errors.IsNotFound(err) {
|
|
||||||
// nothing to do
|
|
||||||
} else {
|
|
||||||
klog.Errorf("Failed to get proxy service %s, error %v", serviceName, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
err = c.client.CoreV1().Services(defaultAgentNamespace).Delete(context.TODO(), serviceName, *metav1.NewDeleteOptions(0))
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Unable to delete service %s, error %v", serviceName, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// clean up openpitrix runtime of the cluster
|
// clean up openpitrix runtime of the cluster
|
||||||
if _, ok := cluster.Annotations[openpitrixRuntime]; ok {
|
if _, ok := cluster.Annotations[openpitrixRuntime]; ok {
|
||||||
if c.openpitrixClient != nil {
|
if c.openpitrixClient != nil {
|
||||||
@@ -438,136 +494,18 @@ func (c *clusterController) syncCluster(key string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// save a old copy of cluster
|
||||||
|
oldCluster := cluster.DeepCopy()
|
||||||
|
|
||||||
// currently we didn't set cluster.Spec.Enable when creating cluster at client side, so only check
|
// currently we didn't set cluster.Spec.Enable when creating cluster at client side, so only check
|
||||||
// if we enable cluster.Spec.JoinFederation now
|
// if we enable cluster.Spec.JoinFederation now
|
||||||
if cluster.Spec.JoinFederation == false {
|
if cluster.Spec.JoinFederation == false {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// save a old copy of cluster
|
// cluster not ready, nothing to do
|
||||||
oldCluster := cluster.DeepCopy()
|
if !isConditionTrue(cluster, clusterv1alpha1.ClusterReady) {
|
||||||
|
return nil
|
||||||
// prepare for proxy to member cluster
|
|
||||||
if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy {
|
|
||||||
|
|
||||||
// allocate ports for kubernetes and kubesphere endpoint
|
|
||||||
if cluster.Spec.Connection.KubeSphereAPIServerPort == 0 ||
|
|
||||||
cluster.Spec.Connection.KubernetesAPIServerPort == 0 {
|
|
||||||
port, err := c.allocatePort()
|
|
||||||
if err != nil {
|
|
||||||
klog.Error(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
cluster.Spec.Connection.KubernetesAPIServerPort = port
|
|
||||||
cluster.Spec.Connection.KubeSphereAPIServerPort = port + 10000
|
|
||||||
}
|
|
||||||
|
|
||||||
// token uninitialized, generate a new token
|
|
||||||
if len(cluster.Spec.Connection.Token) == 0 {
|
|
||||||
cluster.Spec.Connection.Token = c.generateToken()
|
|
||||||
}
|
|
||||||
|
|
||||||
// create a proxy service spec
|
|
||||||
mcService := v1.Service{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: serviceName,
|
|
||||||
Namespace: cluster.Namespace,
|
|
||||||
Labels: map[string]string{
|
|
||||||
"app.kubernetes.io/name": serviceName,
|
|
||||||
"app": serviceName,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Spec: v1.ServiceSpec{
|
|
||||||
Selector: map[string]string{
|
|
||||||
"app.kubernetes.io/name": "tower",
|
|
||||||
"app": "tower",
|
|
||||||
},
|
|
||||||
Ports: []v1.ServicePort{
|
|
||||||
{
|
|
||||||
Name: "kubernetes",
|
|
||||||
Protocol: v1.ProtocolTCP,
|
|
||||||
Port: kubernetesPort,
|
|
||||||
TargetPort: intstr.FromInt(int(cluster.Spec.Connection.KubernetesAPIServerPort)),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "kubesphere",
|
|
||||||
Protocol: v1.ProtocolTCP,
|
|
||||||
Port: kubespherePort,
|
|
||||||
TargetPort: intstr.FromInt(int(cluster.Spec.Connection.KubeSphereAPIServerPort)),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
service, err := c.client.CoreV1().Services(defaultAgentNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
|
|
||||||
if err != nil { // proxy service not found
|
|
||||||
if errors.IsNotFound(err) {
|
|
||||||
service, err = c.client.CoreV1().Services(defaultAgentNamespace).Create(context.TODO(), &mcService, metav1.CreateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
} else { // update existed proxy service
|
|
||||||
if !reflect.DeepEqual(service.Spec, mcService.Spec) {
|
|
||||||
mcService.ObjectMeta = service.ObjectMeta
|
|
||||||
mcService.Spec.ClusterIP = service.Spec.ClusterIP
|
|
||||||
|
|
||||||
service, err = c.client.CoreV1().Services(defaultAgentNamespace).Update(context.TODO(), &mcService, metav1.UpdateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// populates the kubernetes apiEndpoint and kubesphere apiEndpoint
|
|
||||||
cluster.Spec.Connection.KubernetesAPIEndpoint = fmt.Sprintf("https://%s:%d", service.Spec.ClusterIP, kubernetesPort)
|
|
||||||
cluster.Spec.Connection.KubeSphereAPIEndpoint = fmt.Sprintf("http://%s:%d", service.Spec.ClusterIP, kubespherePort)
|
|
||||||
|
|
||||||
initializedCondition := clusterv1alpha1.ClusterCondition{
|
|
||||||
Type: clusterv1alpha1.ClusterInitialized,
|
|
||||||
Status: v1.ConditionTrue,
|
|
||||||
Reason: string(clusterv1alpha1.ClusterInitialized),
|
|
||||||
Message: "Cluster has been initialized",
|
|
||||||
LastUpdateTime: metav1.Now(),
|
|
||||||
LastTransitionTime: metav1.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
if !isConditionTrue(cluster, clusterv1alpha1.ClusterInitialized) {
|
|
||||||
c.updateClusterCondition(cluster, initializedCondition)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(oldCluster, cluster) {
|
|
||||||
cluster, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Error updating cluster %s, error %s", cluster.Name, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// agent status unavailable, which means the agent disconnected from the server or has not connected to the server
|
|
||||||
// we need to update the cluster ready status unavailable and return.
|
|
||||||
if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy &&
|
|
||||||
!isConditionTrue(cluster, clusterv1alpha1.ClusterAgentAvailable) {
|
|
||||||
clusterNotReadyCondition := clusterv1alpha1.ClusterCondition{
|
|
||||||
Type: clusterv1alpha1.ClusterReady,
|
|
||||||
Status: v1.ConditionFalse,
|
|
||||||
LastUpdateTime: metav1.Now(),
|
|
||||||
LastTransitionTime: metav1.Now(),
|
|
||||||
Reason: "Unable to establish connection with cluster",
|
|
||||||
Message: "Cluster is not available now",
|
|
||||||
}
|
|
||||||
|
|
||||||
c.updateClusterCondition(cluster, clusterNotReadyCondition)
|
|
||||||
|
|
||||||
cluster, err = c.clusterClient.Update(context.TODO(), cluster, metav1.UpdateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Error updating cluster %s, error %s", cluster.Name, err)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// build up cached cluster data if there isn't any
|
// build up cached cluster data if there isn't any
|
||||||
@@ -594,10 +532,10 @@ func (c *clusterController) syncCluster(key string) error {
|
|||||||
_, err = c.joinFederation(clusterDt.config, cluster.Name, cluster.Labels)
|
_, err = c.joinFederation(clusterDt.config, cluster.Name, cluster.Labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to join federation for cluster %s, error %v", cluster.Name, err)
|
klog.Errorf("Failed to join federation for cluster %s, error %v", cluster.Name, err)
|
||||||
c.eventRecorder.Event(cluster, v1.EventTypeWarning, "JoinFederation", err.Error())
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.eventRecorder.Event(cluster, v1.EventTypeNormal, "JoinFederation", "Cluster has joined federation.")
|
|
||||||
|
klog.Infof("successfully joined federation for cluster %s", cluster.Name)
|
||||||
|
|
||||||
federationReadyCondition := clusterv1alpha1.ClusterCondition{
|
federationReadyCondition := clusterv1alpha1.ClusterCondition{
|
||||||
Type: clusterv1alpha1.ClusterFederated,
|
Type: clusterv1alpha1.ClusterFederated,
|
||||||
@@ -611,7 +549,7 @@ func (c *clusterController) syncCluster(key string) error {
|
|||||||
c.updateClusterCondition(cluster, federationReadyCondition)
|
c.updateClusterCondition(cluster, federationReadyCondition)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cluster agent is ready, we can pull kubernetes cluster info through agent
|
// cluster is ready, we can pull kubernetes cluster info through agent
|
||||||
// since there is no agent necessary for host cluster, so updates for host cluster
|
// since there is no agent necessary for host cluster, so updates for host cluster
|
||||||
// is safe.
|
// is safe.
|
||||||
if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 {
|
if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 {
|
||||||
@@ -647,17 +585,6 @@ func (c *clusterController) syncCluster(key string) error {
|
|||||||
cluster.Labels[clusterv1alpha1.HostCluster] = ""
|
cluster.Labels[clusterv1alpha1.HostCluster] = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
clusterReadyCondition := clusterv1alpha1.ClusterCondition{
|
|
||||||
Type: clusterv1alpha1.ClusterReady,
|
|
||||||
Status: v1.ConditionTrue,
|
|
||||||
LastUpdateTime: metav1.Now(),
|
|
||||||
LastTransitionTime: metav1.Now(),
|
|
||||||
Reason: string(clusterv1alpha1.ClusterReady),
|
|
||||||
Message: "Cluster is available now",
|
|
||||||
}
|
|
||||||
|
|
||||||
c.updateClusterCondition(cluster, clusterReadyCondition)
|
|
||||||
|
|
||||||
if c.openpitrixClient != nil { // OpenPitrix is enabled, create runtime
|
if c.openpitrixClient != nil { // OpenPitrix is enabled, create runtime
|
||||||
if cluster.GetAnnotations() == nil {
|
if cluster.GetAnnotations() == nil {
|
||||||
cluster.Annotations = make(map[string]string)
|
cluster.Annotations = make(map[string]string)
|
||||||
@@ -746,16 +673,6 @@ func (c *clusterController) addCluster(obj interface{}) {
|
|||||||
c.queue.Add(key)
|
c.queue.Add(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func hasHostClusterLabel(cluster *clusterv1alpha1.Cluster) bool {
|
|
||||||
if cluster.Labels == nil || len(cluster.Labels) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
_, ok := cluster.Labels[clusterv1alpha1.HostCluster]
|
|
||||||
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clusterController) handleErr(err error, key interface{}) {
|
func (c *clusterController) handleErr(err error, key interface{}) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.queue.Forget(key)
|
c.queue.Forget(key)
|
||||||
@@ -855,43 +772,3 @@ func (c *clusterController) unJoinFederation(clusterConfig *rest.Config, unjoini
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// allocatePort find a available port between [portRangeMin, portRangeMax] in maximumRetries
|
|
||||||
// TODO: only works with handful clusters
|
|
||||||
func (c *clusterController) allocatePort() (uint16, error) {
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
|
||||||
|
|
||||||
clusters, err := c.clusterLister.List(labels.Everything())
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
const maximumRetries = 10
|
|
||||||
for i := 0; i < maximumRetries; i++ {
|
|
||||||
collision := false
|
|
||||||
port := uint16(portRangeMin + rand.Intn(portRangeMax-portRangeMin+1))
|
|
||||||
|
|
||||||
for _, item := range clusters {
|
|
||||||
if item.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy &&
|
|
||||||
item.Spec.Connection.KubernetesAPIServerPort != 0 &&
|
|
||||||
item.Spec.Connection.KubeSphereAPIServerPort == port {
|
|
||||||
collision = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !collision {
|
|
||||||
return port, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0, fmt.Errorf("unable to allocate port after %d retries", maximumRetries)
|
|
||||||
}
|
|
||||||
|
|
||||||
// generateToken returns a random 32-byte string as token
|
|
||||||
func (c *clusterController) generateToken() string {
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
|
||||||
b := make([]byte, 32)
|
|
||||||
rand.Read(b)
|
|
||||||
return fmt.Sprintf("%x", b)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ import (
|
|||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
)
|
)
|
||||||
|
|
||||||
const DefaultResyncPeriod = time.Duration(120) * time.Second
|
const DefaultResyncPeriod = 120 * time.Second
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// Enable
|
// Enable
|
||||||
@@ -79,5 +79,5 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, s *Options) {
|
|||||||
"This field is used when generating deployment yaml for agent.")
|
"This field is used when generating deployment yaml for agent.")
|
||||||
|
|
||||||
fs.DurationVar(&o.ClusterControllerResyncSecond, "cluster-controller-resync-second", s.ClusterControllerResyncSecond,
|
fs.DurationVar(&o.ClusterControllerResyncSecond, "cluster-controller-resync-second", s.ClusterControllerResyncSecond,
|
||||||
"Cluster controller resync second to sync cluster resource.")
|
"Cluster controller resync second to sync cluster resource. e.g. 2m 5m 10m ... default set to 2m")
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user