Double check in clusterclient if the cluster exists but is not cached

This commit is contained in:
Xinzhao Xu
2022-04-27 15:32:31 +08:00
parent 7d9563dca1
commit 757fca8ade

View File

@@ -17,7 +17,6 @@ limitations under the License.
package clusterclient package clusterclient
import ( import (
"fmt"
"net/http" "net/http"
"net/url" "net/url"
"sync" "sync"
@@ -31,10 +30,7 @@ import (
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1" clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
clusterinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/cluster/v1alpha1" clusterinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/cluster/v1alpha1"
) clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
var (
ClusterNotExistsFormat = "cluster %s not exists"
) )
type innerCluster struct { type innerCluster struct {
@@ -45,8 +41,7 @@ type innerCluster struct {
type clusterClients struct { type clusterClients struct {
sync.RWMutex sync.RWMutex
clusterMap map[string]*clusterv1alpha1.Cluster clusterLister clusterlister.ClusterLister
clusterKubeconfig map[string]string
// build a in memory cluster cache to speed things up // build a in memory cluster cache to speed things up
innerClusters map[string]*innerCluster innerClusters map[string]*innerCluster
@@ -60,48 +55,16 @@ type ClusterClients interface {
GetInnerCluster(string) *innerCluster GetInnerCluster(string) *innerCluster
} }
func (c *clusterClients) IsClusterReady(cluster *clusterv1alpha1.Cluster) bool { var (
for _, condition := range cluster.Status.Conditions { once sync.Once
if condition.Type == clusterv1alpha1.ClusterReady && condition.Status == corev1.ConditionTrue { c *clusterClients
return true )
}
}
return false
}
func (c *clusterClients) IsHostCluster(cluster *clusterv1alpha1.Cluster) bool {
if _, ok := cluster.Labels[clusterv1alpha1.HostCluster]; ok {
return true
}
return false
}
func (c *clusterClients) GetInnerCluster(name string) *innerCluster {
c.RLock()
defer c.RUnlock()
if cluster, ok := c.innerClusters[name]; ok {
return cluster
}
return nil
}
var c *clusterClients
var lock sync.Mutex
func NewClusterClient(clusterInformer clusterinformer.ClusterInformer) ClusterClients { func NewClusterClient(clusterInformer clusterinformer.ClusterInformer) ClusterClients {
once.Do(func() {
if c == nil {
lock.Lock()
defer lock.Unlock()
if c != nil {
return c
}
c = &clusterClients{ c = &clusterClients{
clusterMap: map[string]*clusterv1alpha1.Cluster{}, innerClusters: make(map[string]*innerCluster),
clusterKubeconfig: map[string]string{}, clusterLister: clusterInformer.Lister(),
innerClusters: make(map[string]*innerCluster),
} }
clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -109,15 +72,18 @@ func NewClusterClient(clusterInformer clusterinformer.ClusterInformer) ClusterCl
c.addCluster(obj) c.addCluster(obj)
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
c.removeCluster(oldObj) newCluster := newObj.(*clusterv1alpha1.Cluster)
oldCluster := oldObj.(*clusterv1alpha1.Cluster)
if newCluster.ResourceVersion == oldCluster.ResourceVersion {
return
}
c.addCluster(newObj) c.addCluster(newObj)
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
c.removeCluster(obj) c.removeCluster(obj)
}, },
}) })
} })
return c return c
} }
@@ -125,11 +91,7 @@ func (c *clusterClients) removeCluster(obj interface{}) {
cluster := obj.(*clusterv1alpha1.Cluster) cluster := obj.(*clusterv1alpha1.Cluster)
klog.V(4).Infof("remove cluster %s", cluster.Name) klog.V(4).Infof("remove cluster %s", cluster.Name)
c.Lock() c.Lock()
if _, ok := c.clusterMap[cluster.Name]; ok { delete(c.innerClusters, cluster.Name)
delete(c.clusterMap, cluster.Name)
delete(c.innerClusters, cluster.Name)
delete(c.clusterKubeconfig, cluster.Name)
}
c.Unlock() c.Unlock()
} }
@@ -172,39 +134,58 @@ func newInnerCluster(cluster *clusterv1alpha1.Cluster) *innerCluster {
} }
} }
func (c *clusterClients) addCluster(obj interface{}) { func (c *clusterClients) addCluster(obj interface{}) *innerCluster {
cluster := obj.(*clusterv1alpha1.Cluster) cluster := obj.(*clusterv1alpha1.Cluster)
klog.V(4).Infof("add new cluster %s", cluster.Name) klog.V(4).Infof("add new cluster %s", cluster.Name)
_, err := url.Parse(cluster.Spec.Connection.KubernetesAPIEndpoint) _, err := url.Parse(cluster.Spec.Connection.KubernetesAPIEndpoint)
if err != nil { if err != nil {
klog.Errorf("Parse kubernetes apiserver endpoint %s failed, %v", cluster.Spec.Connection.KubernetesAPIEndpoint, err) klog.Errorf("Parse kubernetes apiserver endpoint %s failed, %v", cluster.Spec.Connection.KubernetesAPIEndpoint, err)
return return nil
} }
innerCluster := newInnerCluster(cluster) inner := newInnerCluster(cluster)
c.Lock() c.Lock()
c.clusterMap[cluster.Name] = cluster c.innerClusters[cluster.Name] = inner
c.clusterKubeconfig[cluster.Name] = string(cluster.Spec.Connection.KubeConfig)
c.innerClusters[cluster.Name] = innerCluster
c.Unlock() c.Unlock()
} return inner
func (c *clusterClients) GetClusterKubeconfig(clusterName string) (string, error) {
c.RLock()
defer c.RUnlock()
if c, exists := c.clusterKubeconfig[clusterName]; exists {
return c, nil
} else {
return "", fmt.Errorf(ClusterNotExistsFormat, clusterName)
}
} }
func (c *clusterClients) Get(clusterName string) (*clusterv1alpha1.Cluster, error) { func (c *clusterClients) Get(clusterName string) (*clusterv1alpha1.Cluster, error) {
return c.clusterLister.Get(clusterName)
}
func (c *clusterClients) GetClusterKubeconfig(clusterName string) (string, error) {
cluster, err := c.clusterLister.Get(clusterName)
if err != nil {
return "", err
}
return string(cluster.Spec.Connection.KubeConfig), nil
}
func (c *clusterClients) GetInnerCluster(name string) *innerCluster {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
if cluster, exists := c.clusterMap[clusterName]; exists { if inner, ok := c.innerClusters[name]; ok {
return cluster, nil return inner
} else { } else if cluster, err := c.clusterLister.Get(name); err == nil {
return nil, fmt.Errorf(ClusterNotExistsFormat, clusterName) // double check if the cluster exists but is not cached
return c.addCluster(cluster)
} }
return nil
}
func (c *clusterClients) IsClusterReady(cluster *clusterv1alpha1.Cluster) bool {
for _, condition := range cluster.Status.Conditions {
if condition.Type == clusterv1alpha1.ClusterReady && condition.Status == corev1.ConditionTrue {
return true
}
}
return false
}
func (c *clusterClients) IsHostCluster(cluster *clusterv1alpha1.Cluster) bool {
if _, ok := cluster.Labels[clusterv1alpha1.HostCluster]; ok {
return true
}
return false
} }