Reduce unnecessary status updates

This commit is contained in:
hongming
2022-05-13 17:50:49 +08:00
parent 1c96f99072
commit 825a38f948
3 changed files with 41 additions and 124 deletions

View File

@@ -25,20 +25,17 @@ import (
"fmt"
"net/http"
"reflect"
"sync"
"time"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -125,22 +122,6 @@ var hostCluster = &clusterv1alpha1.Cluster{
},
}
// ClusterData stores cluster client
type clusterData struct {
// cached rest.Config
config *rest.Config
// cached kubernetes client, rebuild once cluster changed
client kubernetes.Interface
// cached kubeconfig
cachedKubeconfig []byte
// cached transport, used to proxy kubesphere version request
transport http.RoundTripper
}
type clusterController struct {
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
@@ -158,10 +139,6 @@ type clusterController struct {
workerLoopPeriod time.Duration
mu sync.RWMutex
clusterMap map[string]*clusterData
resyncPeriod time.Duration
hostClusterName string
@@ -174,7 +151,6 @@ func NewClusterController(
clusterClient clusterclient.ClusterInterface,
resyncPeriod time.Duration,
hostClusterName string,
configmapInformer coreinformers.ConfigMapInformer,
) *clusterController {
broadcaster := record.NewBroadcaster()
@@ -192,7 +168,6 @@ func NewClusterController(
clusterClient: clusterClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"),
workerLoopPeriod: time.Second,
clusterMap: make(map[string]*clusterData),
resyncPeriod: resyncPeriod,
hostClusterName: hostClusterName,
}
@@ -200,23 +175,15 @@ func NewClusterController(
c.clusterHasSynced = clusterInformer.Informer().HasSynced
clusterInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: c.addCluster,
AddFunc: c.enqueueCluster,
UpdateFunc: func(oldObj, newObj interface{}) {
c.addCluster(newObj)
},
DeleteFunc: c.addCluster,
}, resyncPeriod)
configmapInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
oldCM := oldObj.(*v1.ConfigMap)
newCM := newObj.(*v1.ConfigMap)
if oldCM.ResourceVersion == newCM.ResourceVersion {
return
oldCluster := oldObj.(*clusterv1alpha1.Cluster)
newCluster := newObj.(*clusterv1alpha1.Cluster)
if !reflect.DeepEqual(oldCluster.Spec, newCluster.Spec) {
c.enqueueCluster(newObj)
}
// Update the clusterName field when the kubesphere-config configmap is updated.
c.syncClusterNameInConfigMap()
},
DeleteFunc: c.enqueueCluster,
}, resyncPeriod)
return c
@@ -275,40 +242,6 @@ func (c *clusterController) processNextItem() bool {
return true
}
func buildClusterData(kubeconfig []byte) (*clusterData, error) {
// prepare for
clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
if err != nil {
klog.Errorf("Unable to create client config from kubeconfig bytes, %#v", err)
return nil, err
}
clusterConfig, err := clientConfig.ClientConfig()
if err != nil {
klog.Errorf("Failed to get client config, %#v", err)
return nil, err
}
transport, err := rest.TransportFor(clusterConfig)
if err != nil {
klog.Errorf("Failed to create transport, %#v", err)
return nil, err
}
clientSet, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
klog.Errorf("Failed to create ClientSet from config, %#v", err)
return nil, err
}
return &clusterData{
cachedKubeconfig: kubeconfig,
config: clusterConfig,
client: clientSet,
transport: transport,
}, nil
}
// reconcileHostCluster will create a host cluster if there are no clusters labeled 'cluster-role.kubesphere.io/host'
func (c *clusterController) reconcileHostCluster() error {
clusters, err := c.clusterLister.List(labels.SelectorFromSet(labels.Set{clusterv1alpha1.HostCluster: ""}))
@@ -495,28 +428,30 @@ func (c *clusterController) syncCluster(key string) error {
return nil
}
// build up cached cluster data if there isn't any
c.mu.Lock()
clusterDt, ok := c.clusterMap[cluster.Name]
if !ok || clusterDt == nil || !equality.Semantic.DeepEqual(clusterDt.cachedKubeconfig, cluster.Spec.Connection.KubeConfig) {
clusterDt, err = buildClusterData(cluster.Spec.Connection.KubeConfig)
if err != nil {
c.mu.Unlock()
return err
}
c.clusterMap[cluster.Name] = clusterDt
clusterConfig, err := clientcmd.RESTConfigFromKubeConfig(cluster.Spec.Connection.KubeConfig)
if err != nil {
return fmt.Errorf("failed to create cluster config for %s: %s", cluster.Name, err)
}
clusterClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
return fmt.Errorf("failed to create cluster client for %s: %s", cluster.Name, err)
}
proxyTransport, err := rest.TransportFor(clusterConfig)
if err != nil {
return fmt.Errorf("failed to create proxy transport for %s: %s", cluster.Name, err)
}
c.mu.Unlock()
if !cluster.Spec.JoinFederation { // trying to unJoin federation
err = c.unJoinFederation(clusterDt.config, cluster.Name)
err = c.unJoinFederation(clusterConfig, cluster.Name)
if err != nil {
klog.Errorf("Failed to unJoin federation for cluster %s, error %v", cluster.Name, err)
c.eventRecorder.Event(cluster, v1.EventTypeWarning, "UnJoinFederation", err.Error())
return err
}
} else { // join federation
_, err = c.joinFederation(clusterDt.config, cluster.Name, cluster.Labels)
_, err = c.joinFederation(clusterConfig, cluster.Name, cluster.Labels)
if err != nil {
klog.Errorf("Failed to join federation for cluster %s, error %v", cluster.Name, err)
@@ -556,29 +491,33 @@ func (c *clusterController) syncCluster(key string) error {
// since there is no agent necessary for host cluster, so updates for host cluster
// is safe.
if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 {
cluster.Spec.Connection.KubernetesAPIEndpoint = clusterDt.config.Host
cluster.Spec.Connection.KubernetesAPIEndpoint = clusterConfig.Host
}
version, err := clusterDt.client.Discovery().ServerVersion()
serverVersion, err := clusterClient.Discovery().ServerVersion()
if err != nil {
klog.Errorf("Failed to get kubernetes version, %#v", err)
return err
}
cluster.Status.KubernetesVersion = version.GitVersion
cluster.Status.KubernetesVersion = serverVersion.GitVersion
nodes, err := clusterDt.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
nodes, err := clusterClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Failed to get cluster nodes, %#v", err)
return err
}
cluster.Status.NodeCount = len(nodes.Items)
configz, err := c.tryToFetchKubeSphereComponents(clusterDt.config.Host, clusterDt.transport)
if err == nil {
// TODO use rest.Interface instead
configz, err := c.tryToFetchKubeSphereComponents(clusterConfig.Host, proxyTransport)
if err != nil {
klog.Warningf("failed to fetch kubesphere components status in cluster %s: %s", cluster.Name, err)
} else {
cluster.Status.Configz = configz
}
v, err := c.tryFetchKubeSphereVersion(clusterDt.config.Host, clusterDt.transport)
// TODO use rest.Interface instead
v, err := c.tryFetchKubeSphereVersion(clusterConfig.Host, proxyTransport)
if err != nil {
klog.Errorf("failed to get KubeSphere version, err: %#v", err)
} else {
@@ -586,7 +525,7 @@ func (c *clusterController) syncCluster(key string) error {
}
// Use kube-system namespace UID as cluster ID
kubeSystem, err := clusterDt.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceSystem, metav1.GetOptions{})
kubeSystem, err := clusterClient.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceSystem, metav1.GetOptions{})
if err != nil {
return err
}
@@ -600,7 +539,7 @@ func (c *clusterController) syncCluster(key string) error {
cluster.Labels[clusterv1alpha1.HostCluster] = ""
}
readyConditon := clusterv1alpha1.ClusterCondition{
readyCondition := clusterv1alpha1.ClusterCondition{
Type: clusterv1alpha1.ClusterReady,
Status: v1.ConditionTrue,
LastUpdateTime: metav1.Now(),
@@ -608,7 +547,7 @@ func (c *clusterController) syncCluster(key string) error {
Reason: string(clusterv1alpha1.ClusterReady),
Message: "Cluster is available now",
}
c.updateClusterCondition(cluster, readyConditon)
c.updateClusterCondition(cluster, readyCondition)
if err = c.updateKubeConfigExpirationDateCondition(cluster); err != nil {
klog.Errorf("sync KubeConfig expiration date for cluster %s failed: %v", cluster.Name, err)
@@ -623,7 +562,7 @@ func (c *clusterController) syncCluster(key string) error {
}
}
if err = c.setClusterNameInConfigMap(clusterDt.client, cluster.Name); err != nil {
if err = c.setClusterNameInConfigMap(clusterClient, cluster.Name); err != nil {
return err
}
@@ -659,27 +598,6 @@ func (c *clusterController) setClusterNameInConfigMap(client kubernetes.Interfac
return nil
}
func (c *clusterController) syncClusterNameInConfigMap() {
clusters, err := c.clusterLister.List(labels.Everything())
if err != nil {
klog.Errorf("list clusters failed: %v", err)
return
}
for _, cluster := range clusters {
clusterDt, ok := c.clusterMap[cluster.Name]
if !ok {
continue
}
if err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
return c.setClusterNameInConfigMap(clusterDt.client, cluster.Name)
}); err != nil {
klog.Errorf("update configmap %s failed: %v", constants.KubeSphereConfigName, err)
continue
}
}
}
func (c *clusterController) checkIfClusterIsHostCluster(memberClusterNodes *v1.NodeList) bool {
hostNodes, err := c.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
@@ -766,7 +684,7 @@ func (c *clusterController) tryFetchKubeSphereVersion(host string, transport htt
return info.GitVersion, nil
}
func (c *clusterController) addCluster(obj interface{}) {
func (c *clusterController) enqueueCluster(obj interface{}) {
cluster := obj.(*clusterv1alpha1.Cluster)
key, err := cache.MetaNamespaceKeyFunc(obj)

View File

@@ -19,6 +19,7 @@ package clusterclient
import (
"net/http"
"net/url"
"reflect"
"sync"
corev1 "k8s.io/api/core/v1"
@@ -72,12 +73,11 @@ func NewClusterClient(clusterInformer clusterinformer.ClusterInformer) ClusterCl
c.addCluster(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newCluster := newObj.(*clusterv1alpha1.Cluster)
oldCluster := oldObj.(*clusterv1alpha1.Cluster)
if newCluster.ResourceVersion == oldCluster.ResourceVersion {
return
newCluster := newObj.(*clusterv1alpha1.Cluster)
if !reflect.DeepEqual(oldCluster.Spec, newCluster.Spec) {
c.addCluster(newObj)
}
c.addCluster(newObj)
},
DeleteFunc: func(obj interface{}) {
c.removeCluster(obj)