Files
kubesphere/pkg/utils/clusterclient/clusterclient.go

193 lines
5.4 KiB
Go

/*
* Please refer to the LICENSE file in the root directory of the project.
* https://github.com/kubesphere/kubesphere/blob/master/LICENSE
*/
package clusterclient
import (
"context"
"fmt"
"net/http"
"net/url"
"reflect"
"sync"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"kubesphere.io/kubesphere/pkg/scheme"
)
type Interface interface {
Get(string) (*clusterv1alpha1.Cluster, error)
ListClusters(ctx context.Context) ([]clusterv1alpha1.Cluster, error)
GetClusterClient(string) (*ClusterClient, error)
GetRuntimeClient(string) (runtimeclient.Client, error)
}
type ClusterClient struct {
KubernetesURL *url.URL
KubeSphereURL *url.URL
KubernetesVersion string
RestConfig *rest.Config
Transport http.RoundTripper
Client runtimeclient.Client
KubernetesClient kubernetes.Interface
}
type clusterClients struct {
// build an in memory cluster cache to speed things up
clients *sync.Map
cache runtimecache.Cache
}
func NewClusterClientSet(runtimeCache runtimecache.Cache) (Interface, error) {
c := &clusterClients{
clients: &sync.Map{},
cache: runtimeCache,
}
clusterInformer, err := runtimeCache.GetInformerForKind(context.Background(), clusterv1alpha1.SchemeGroupVersion.WithKind(clusterv1alpha1.ResourceKindCluster))
if err != nil {
return nil, err
}
if _, err = clusterInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if _, err = c.addCluster(obj); err != nil {
klog.Error(err)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldCluster := oldObj.(*clusterv1alpha1.Cluster)
newCluster := newObj.(*clusterv1alpha1.Cluster)
if !reflect.DeepEqual(oldCluster.Spec, newCluster.Spec) {
c.removeCluster(oldCluster)
if _, err = c.addCluster(newObj); err != nil {
klog.Error(err)
}
}
},
DeleteFunc: c.removeCluster,
}); err != nil {
return nil, err
}
return c, nil
}
func (c *clusterClients) addCluster(obj interface{}) (*ClusterClient, error) {
cluster := obj.(*clusterv1alpha1.Cluster)
klog.V(4).Infof("add new cluster %s", cluster.Name)
kubernetesEndpoint, err := url.Parse(cluster.Spec.Connection.KubernetesAPIEndpoint)
if err != nil {
return nil, fmt.Errorf("parse kubernetes apiserver endpoint %s failed: %v", cluster.Spec.Connection.KubernetesAPIEndpoint, err)
}
kubesphereEndpoint, err := url.Parse(cluster.Spec.Connection.KubeSphereAPIEndpoint)
if err != nil {
return nil, fmt.Errorf("parse kubesphere apiserver endpoint %s failed: %v", cluster.Spec.Connection.KubeSphereAPIEndpoint, err)
}
restConfig, err := clientcmd.RESTConfigFromKubeConfig(cluster.Spec.Connection.KubeConfig)
if err != nil {
return nil, err
}
// It also applies saner defaults for QPS and burst based on the Kubernetes
// controller manager defaults (20 QPS, 30 burst)
if restConfig.QPS == 0.0 {
restConfig.QPS = 20.0
}
if restConfig.Burst == 0 {
restConfig.Burst = 30
}
kubernetesClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}
serverVersion, err := kubernetesClient.Discovery().ServerVersion()
if err != nil {
return nil, err
}
httpClient, err := rest.HTTPClientFor(restConfig)
if err != nil {
return nil, err
}
mapper, err := apiutil.NewDynamicRESTMapper(restConfig, httpClient)
if err != nil {
return nil, err
}
client, err := runtimeclient.New(restConfig, runtimeclient.Options{
HTTPClient: httpClient,
Scheme: scheme.Scheme,
Mapper: mapper,
Cache: nil,
})
if err != nil {
return nil, err
}
clusterClient := &ClusterClient{
KubernetesURL: kubernetesEndpoint,
KubeSphereURL: kubesphereEndpoint,
KubernetesVersion: serverVersion.GitVersion,
RestConfig: restConfig,
Transport: httpClient.Transport,
Client: client,
KubernetesClient: kubernetesClient,
}
c.clients.Store(cluster.Name, clusterClient)
return clusterClient, nil
}
func (c *clusterClients) removeCluster(obj interface{}) {
cluster := obj.(*clusterv1alpha1.Cluster)
if _, ok := c.clients.Load(cluster.Name); ok {
klog.V(4).Infof("remove cluster %s", cluster.Name)
c.clients.Delete(cluster.Name)
}
}
func (c *clusterClients) Get(clusterName string) (*clusterv1alpha1.Cluster, error) {
cluster := &clusterv1alpha1.Cluster{}
err := c.cache.Get(context.Background(), types.NamespacedName{Name: clusterName}, cluster)
return cluster, err
}
func (c *clusterClients) ListClusters(ctx context.Context) ([]clusterv1alpha1.Cluster, error) {
clusterList := &clusterv1alpha1.ClusterList{}
if err := c.cache.List(ctx, clusterList); err != nil {
return nil, err
}
return clusterList.Items, nil
}
func (c *clusterClients) GetClusterClient(name string) (*ClusterClient, error) {
if client, ok := c.clients.Load(name); ok {
return client.(*ClusterClient), nil
}
// double check if the cluster exists but is not cached
cluster, err := c.Get(name)
if err != nil {
return nil, err
}
return c.addCluster(cluster)
}
func (c *clusterClients) GetRuntimeClient(name string) (runtimeclient.Client, error) {
clusterClient, err := c.GetClusterClient(name)
if err != nil {
return nil, err
}
return clusterClient.Client, nil
}