Merge pull request #4877 from wansir/fix-4876

Reduce unnecessary status updates
This commit is contained in:
KubeSphere CI Bot
2022-05-16 17:18:06 +08:00
committed by GitHub
3 changed files with 42 additions and 183 deletions

View File

@@ -493,7 +493,6 @@ func addAllControllers(mgr manager.Manager, client k8s.Client, informerFactory i
client.KubeSphere().ClusterV1alpha1().Clusters(),
cmOptions.MultiClusterOptions.ClusterControllerResyncPeriod,
cmOptions.MultiClusterOptions.HostClusterName,
kubernetesInformer.Core().V1().ConfigMaps(),
)
addController(mgr, "cluster", clusterController)
}

View File

@@ -25,20 +25,17 @@ import (
"fmt"
"net/http"
"reflect"
"sync"
"time"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -46,7 +43,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
@@ -125,22 +121,6 @@ var hostCluster = &clusterv1alpha1.Cluster{
},
}
// ClusterData stores cluster client
type clusterData struct {
// cached rest.Config
config *rest.Config
// cached kubernetes client, rebuild once cluster changed
client kubernetes.Interface
// cached kubeconfig
cachedKubeconfig []byte
// cached transport, used to proxy kubesphere version request
transport http.RoundTripper
}
type clusterController struct {
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
@@ -158,10 +138,6 @@ type clusterController struct {
workerLoopPeriod time.Duration
mu sync.RWMutex
clusterMap map[string]*clusterData
resyncPeriod time.Duration
hostClusterName string
@@ -174,7 +150,6 @@ func NewClusterController(
clusterClient clusterclient.ClusterInterface,
resyncPeriod time.Duration,
hostClusterName string,
configmapInformer coreinformers.ConfigMapInformer,
) *clusterController {
broadcaster := record.NewBroadcaster()
@@ -192,7 +167,6 @@ func NewClusterController(
clusterClient: clusterClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"),
workerLoopPeriod: time.Second,
clusterMap: make(map[string]*clusterData),
resyncPeriod: resyncPeriod,
hostClusterName: hostClusterName,
}
@@ -200,23 +174,15 @@ func NewClusterController(
c.clusterHasSynced = clusterInformer.Informer().HasSynced
clusterInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: c.addCluster,
AddFunc: c.enqueueCluster,
UpdateFunc: func(oldObj, newObj interface{}) {
c.addCluster(newObj)
},
DeleteFunc: c.addCluster,
}, resyncPeriod)
configmapInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
oldCM := oldObj.(*v1.ConfigMap)
newCM := newObj.(*v1.ConfigMap)
if oldCM.ResourceVersion == newCM.ResourceVersion {
return
oldCluster := oldObj.(*clusterv1alpha1.Cluster)
newCluster := newObj.(*clusterv1alpha1.Cluster)
if !reflect.DeepEqual(oldCluster.Spec, newCluster.Spec) {
c.enqueueCluster(newObj)
}
// Update the clusterName field when the kubesphere-config configmap is updated.
c.syncClusterNameInConfigMap()
},
DeleteFunc: c.enqueueCluster,
}, resyncPeriod)
return c
@@ -250,7 +216,6 @@ func (c *clusterController) Run(workers int, stopCh <-chan struct{}) error {
if err := c.probeClusters(); err != nil {
klog.Errorf("failed to reconcile cluster ready status, err: %v", err)
}
}, c.resyncPeriod, stopCh)
<-stopCh
@@ -275,40 +240,6 @@ func (c *clusterController) processNextItem() bool {
return true
}
func buildClusterData(kubeconfig []byte) (*clusterData, error) {
// prepare for
clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
if err != nil {
klog.Errorf("Unable to create client config from kubeconfig bytes, %#v", err)
return nil, err
}
clusterConfig, err := clientConfig.ClientConfig()
if err != nil {
klog.Errorf("Failed to get client config, %#v", err)
return nil, err
}
transport, err := rest.TransportFor(clusterConfig)
if err != nil {
klog.Errorf("Failed to create transport, %#v", err)
return nil, err
}
clientSet, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
klog.Errorf("Failed to create ClientSet from config, %#v", err)
return nil, err
}
return &clusterData{
cachedKubeconfig: kubeconfig,
config: clusterConfig,
client: clientSet,
transport: transport,
}, nil
}
// reconcileHostCluster will create a host cluster if there are no clusters labeled 'cluster-role.kubesphere.io/host'
func (c *clusterController) reconcileHostCluster() error {
clusters, err := c.clusterLister.List(labels.SelectorFromSet(labels.Set{clusterv1alpha1.HostCluster: ""}))
@@ -360,64 +291,8 @@ func (c *clusterController) probeClusters() error {
}
for _, cluster := range clusters {
// if the cluster is not federated, we skip it and consider it not ready.
if !isConditionTrue(cluster, clusterv1alpha1.ClusterFederated) {
continue
}
if len(cluster.Spec.Connection.KubeConfig) == 0 {
continue
}
clientConfig, err := clientcmd.NewClientConfigFromBytes(cluster.Spec.Connection.KubeConfig)
if err != nil {
klog.Error(err)
continue
}
config, err := clientConfig.ClientConfig()
if err != nil {
klog.Error(err)
continue
}
config.Timeout = probeClusterTimeout
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Error(err)
continue
}
condition := clusterv1alpha1.ClusterCondition{
Type: clusterv1alpha1.ClusterReady,
Status: v1.ConditionTrue,
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: string(clusterv1alpha1.ClusterReady),
Message: "Cluster is available now",
}
if _, err = clientSet.Discovery().ServerVersion(); err != nil {
condition.Status = v1.ConditionFalse
condition.Reason = "failed to connect get kubernetes version"
condition.Message = "Cluster is not available now"
}
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
newCluster, err := c.clusterClient.Get(context.TODO(), cluster.Name, metav1.GetOptions{})
if err != nil {
return err
}
c.updateClusterCondition(newCluster, condition)
_, err = c.clusterClient.Update(context.TODO(), newCluster, metav1.UpdateOptions{})
return err
})
if err != nil {
klog.Errorf("failed to update cluster %s status, err: %v", cluster.Name, err)
} else {
klog.V(4).Infof("successfully updated cluster %s to status %v", cluster.Name, condition)
}
c.syncCluster(cluster.Name)
}
return nil
}
@@ -495,28 +370,30 @@ func (c *clusterController) syncCluster(key string) error {
return nil
}
// build up cached cluster data if there isn't any
c.mu.Lock()
clusterDt, ok := c.clusterMap[cluster.Name]
if !ok || clusterDt == nil || !equality.Semantic.DeepEqual(clusterDt.cachedKubeconfig, cluster.Spec.Connection.KubeConfig) {
clusterDt, err = buildClusterData(cluster.Spec.Connection.KubeConfig)
if err != nil {
c.mu.Unlock()
return err
}
c.clusterMap[cluster.Name] = clusterDt
clusterConfig, err := clientcmd.RESTConfigFromKubeConfig(cluster.Spec.Connection.KubeConfig)
if err != nil {
return fmt.Errorf("failed to create cluster config for %s: %s", cluster.Name, err)
}
clusterClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
return fmt.Errorf("failed to create cluster client for %s: %s", cluster.Name, err)
}
proxyTransport, err := rest.TransportFor(clusterConfig)
if err != nil {
return fmt.Errorf("failed to create proxy transport for %s: %s", cluster.Name, err)
}
c.mu.Unlock()
if !cluster.Spec.JoinFederation { // trying to unJoin federation
err = c.unJoinFederation(clusterDt.config, cluster.Name)
err = c.unJoinFederation(clusterConfig, cluster.Name)
if err != nil {
klog.Errorf("Failed to unJoin federation for cluster %s, error %v", cluster.Name, err)
c.eventRecorder.Event(cluster, v1.EventTypeWarning, "UnJoinFederation", err.Error())
return err
}
} else { // join federation
_, err = c.joinFederation(clusterDt.config, cluster.Name, cluster.Labels)
_, err = c.joinFederation(clusterConfig, cluster.Name, cluster.Labels)
if err != nil {
klog.Errorf("Failed to join federation for cluster %s, error %v", cluster.Name, err)
@@ -556,29 +433,33 @@ func (c *clusterController) syncCluster(key string) error {
// since there is no agent necessary for host cluster, so updates for host cluster
// is safe.
if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 {
cluster.Spec.Connection.KubernetesAPIEndpoint = clusterDt.config.Host
cluster.Spec.Connection.KubernetesAPIEndpoint = clusterConfig.Host
}
version, err := clusterDt.client.Discovery().ServerVersion()
serverVersion, err := clusterClient.Discovery().ServerVersion()
if err != nil {
klog.Errorf("Failed to get kubernetes version, %#v", err)
return err
}
cluster.Status.KubernetesVersion = version.GitVersion
cluster.Status.KubernetesVersion = serverVersion.GitVersion
nodes, err := clusterDt.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
nodes, err := clusterClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Failed to get cluster nodes, %#v", err)
return err
}
cluster.Status.NodeCount = len(nodes.Items)
configz, err := c.tryToFetchKubeSphereComponents(clusterDt.config.Host, clusterDt.transport)
if err == nil {
// TODO use rest.Interface instead
configz, err := c.tryToFetchKubeSphereComponents(clusterConfig.Host, proxyTransport)
if err != nil {
klog.Warningf("failed to fetch kubesphere components status in cluster %s: %s", cluster.Name, err)
} else {
cluster.Status.Configz = configz
}
v, err := c.tryFetchKubeSphereVersion(clusterDt.config.Host, clusterDt.transport)
// TODO use rest.Interface instead
v, err := c.tryFetchKubeSphereVersion(clusterConfig.Host, proxyTransport)
if err != nil {
klog.Errorf("failed to get KubeSphere version, err: %#v", err)
} else {
@@ -586,7 +467,7 @@ func (c *clusterController) syncCluster(key string) error {
}
// Use kube-system namespace UID as cluster ID
kubeSystem, err := clusterDt.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceSystem, metav1.GetOptions{})
kubeSystem, err := clusterClient.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceSystem, metav1.GetOptions{})
if err != nil {
return err
}
@@ -600,7 +481,7 @@ func (c *clusterController) syncCluster(key string) error {
cluster.Labels[clusterv1alpha1.HostCluster] = ""
}
readyConditon := clusterv1alpha1.ClusterCondition{
readyCondition := clusterv1alpha1.ClusterCondition{
Type: clusterv1alpha1.ClusterReady,
Status: v1.ConditionTrue,
LastUpdateTime: metav1.Now(),
@@ -608,7 +489,7 @@ func (c *clusterController) syncCluster(key string) error {
Reason: string(clusterv1alpha1.ClusterReady),
Message: "Cluster is available now",
}
c.updateClusterCondition(cluster, readyConditon)
c.updateClusterCondition(cluster, readyCondition)
if err = c.updateKubeConfigExpirationDateCondition(cluster); err != nil {
klog.Errorf("sync KubeConfig expiration date for cluster %s failed: %v", cluster.Name, err)
@@ -623,7 +504,7 @@ func (c *clusterController) syncCluster(key string) error {
}
}
if err = c.setClusterNameInConfigMap(clusterDt.client, cluster.Name); err != nil {
if err = c.setClusterNameInConfigMap(clusterClient, cluster.Name); err != nil {
return err
}
@@ -659,27 +540,6 @@ func (c *clusterController) setClusterNameInConfigMap(client kubernetes.Interfac
return nil
}
func (c *clusterController) syncClusterNameInConfigMap() {
clusters, err := c.clusterLister.List(labels.Everything())
if err != nil {
klog.Errorf("list clusters failed: %v", err)
return
}
for _, cluster := range clusters {
clusterDt, ok := c.clusterMap[cluster.Name]
if !ok {
continue
}
if err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
return c.setClusterNameInConfigMap(clusterDt.client, cluster.Name)
}); err != nil {
klog.Errorf("update configmap %s failed: %v", constants.KubeSphereConfigName, err)
continue
}
}
}
func (c *clusterController) checkIfClusterIsHostCluster(memberClusterNodes *v1.NodeList) bool {
hostNodes, err := c.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
@@ -766,7 +626,7 @@ func (c *clusterController) tryFetchKubeSphereVersion(host string, transport htt
return info.GitVersion, nil
}
func (c *clusterController) addCluster(obj interface{}) {
func (c *clusterController) enqueueCluster(obj interface{}) {
cluster := obj.(*clusterv1alpha1.Cluster)
key, err := cache.MetaNamespaceKeyFunc(obj)

View File

@@ -19,6 +19,7 @@ package clusterclient
import (
"net/http"
"net/url"
"reflect"
"sync"
corev1 "k8s.io/api/core/v1"
@@ -72,12 +73,11 @@ func NewClusterClient(clusterInformer clusterinformer.ClusterInformer) ClusterCl
c.addCluster(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newCluster := newObj.(*clusterv1alpha1.Cluster)
oldCluster := oldObj.(*clusterv1alpha1.Cluster)
if newCluster.ResourceVersion == oldCluster.ResourceVersion {
return
newCluster := newObj.(*clusterv1alpha1.Cluster)
if !reflect.DeepEqual(oldCluster.Spec, newCluster.Spec) {
c.addCluster(newObj)
}
c.addCluster(newObj)
},
DeleteFunc: func(obj interface{}) {
c.removeCluster(obj)