Files
kubesphere/pkg/controller/cluster/cluster_controller.go

716 lines
21 KiB
Go

package cluster
import (
"encoding/json"
"fmt"
v1 "k8s.io/api/core/v1"
apiextv1b1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
clusterclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/cluster/v1alpha1"
clusterinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/cluster/v1alpha1"
clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
"math/rand"
"net/http"
"reflect"
fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
"sync"
"time"
)
// Cluster controller only runs under multicluster mode. Cluster controller is following below steps,
// 1. Populates proxy spec if cluster connection type is proxy
// 1.1 Wait for cluster agent is ready if connection type is proxy
// 2. Join cluster into federation control plane if kubeconfig is ready.
// 3. Pull cluster version and configz, set result to cluster status
// Also put all clusters back into queue every 5 * time.Minute to sync cluster status, this is needed
// in case there aren't any cluster changes made.
const (
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
// sequence of delays between successive queuings of a service.
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
kubefedNamespace = "kube-federation-system"
// Actually host cluster name can be anything, there is only necessary when calling JoinFederation function
hostClusterName = "kubesphere"
// allocate kubernetesAPIServer port in range [portRangeMin, portRangeMax] for agents if port is not specified
// kubesphereAPIServer port is defaulted to kubernetesAPIServerPort + 10000
portRangeMin = 6000
portRangeMax = 7000
// Proxy service port
kubernetesPort = 6443
kubespherePort = 80
defaultAgentNamespace = "kubesphere-system"
// proxy format
proxyFormat = "%s/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:80/proxy/%s"
)
// ClusterData stores cluster client
type clusterData struct {
// cached rest.Config
config *rest.Config
// cached kubernetes client, rebuild once cluster changed
client kubernetes.Interface
// cached kubeconfig
cachedKubeconfig []byte
// cached transport, used to proxy kubesphere version request
transport http.RoundTripper
}
type clusterController struct {
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
client kubernetes.Interface
hostConfig *rest.Config
clusterClient clusterclient.ClusterInterface
clusterLister clusterlister.ClusterLister
clusterHasSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
mu sync.RWMutex
clusterMap map[string]*clusterData
}
func NewClusterController(
client kubernetes.Interface,
config *rest.Config,
clusterInformer clusterinformer.ClusterInformer,
clusterClient clusterclient.ClusterInterface,
) *clusterController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) {
klog.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cluster-controller"})
c := &clusterController{
eventBroadcaster: broadcaster,
eventRecorder: recorder,
client: client,
hostConfig: config,
clusterClient: clusterClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"),
workerLoopPeriod: time.Second,
clusterMap: make(map[string]*clusterData),
}
c.clusterLister = clusterInformer.Lister()
c.clusterHasSynced = clusterInformer.Informer().HasSynced
clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addCluster,
UpdateFunc: func(oldObj, newObj interface{}) {
newCluster := newObj.(*clusterv1alpha1.Cluster)
oldCluster := oldObj.(*clusterv1alpha1.Cluster)
if newCluster.ResourceVersion == oldCluster.ResourceVersion {
return
}
c.addCluster(newObj)
},
DeleteFunc: c.addCluster,
})
return c
}
func (c *clusterController) Start(stopCh <-chan struct{}) error {
return c.Run(3, stopCh)
}
func (c *clusterController) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.V(0).Info("starting cluster controller")
defer klog.Info("shutting down cluster controller")
if !cache.WaitForCacheSync(stopCh, c.clusterHasSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
}
go wait.Until(func() {
if err := c.syncStatus(); err != nil {
klog.Errorf("Error periodically sync cluster status, %v", err)
}
}, 5*time.Minute, stopCh)
<-stopCh
return nil
}
func (c *clusterController) worker() {
for c.processNextItem() {
}
}
func (c *clusterController) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncCluster(key.(string))
c.handleErr(err, key)
return true
}
func buildClusterData(kubeconfig []byte) (*clusterData, error) {
// prepare for
clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
if err != nil {
klog.Errorf("Unable to create client config from kubeconfig bytes, %#v", err)
return nil, err
}
clusterConfig, err := clientConfig.ClientConfig()
if err != nil {
klog.Errorf("Failed to get client config, %#v", err)
return nil, err
}
transport, err := rest.TransportFor(clusterConfig)
if err != nil {
klog.Errorf("Failed to create transport, %#v", err)
return nil, err
}
clientSet, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
klog.Errorf("Failed to create ClientSet from config, %#v", err)
return nil, err
}
return &clusterData{
cachedKubeconfig: kubeconfig,
config: clusterConfig,
client: clientSet,
transport: transport,
}, nil
}
func (c *clusterController) syncStatus() error {
clusters, err := c.clusterLister.List(labels.Everything())
if err != nil {
return err
}
for _, cluster := range clusters {
key, err := cache.MetaNamespaceKeyFunc(cluster)
if err != nil {
return err
}
c.queue.AddRateLimited(key)
}
return nil
}
func (c *clusterController) syncCluster(key string) error {
startTime := time.Now()
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Errorf("not a valid controller key %s, %#v", key, err)
return err
}
defer func() {
klog.V(4).Infof("Finished syncing cluster %s in %s", name, time.Since(startTime))
}()
cluster, err := c.clusterLister.Get(name)
if err != nil {
// cluster not found, possibly been deleted
// need to do the cleanup
if errors.IsNotFound(err) {
return nil
}
klog.Errorf("Failed to get cluster with name %s, %#v", name, err)
return err
}
// proxy service name if needed
serviceName := fmt.Sprintf("mc-%s", cluster.Name)
if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !sets.NewString(cluster.ObjectMeta.Finalizers...).Has(clusterv1alpha1.Finalizer) {
cluster.ObjectMeta.Finalizers = append(cluster.ObjectMeta.Finalizers, clusterv1alpha1.Finalizer)
if cluster, err = c.clusterClient.Update(cluster); err != nil {
return err
}
}
} else {
// The object is being deleted
if sets.NewString(cluster.ObjectMeta.Finalizers...).Has(clusterv1alpha1.Finalizer) {
// need to unJoin federation first, before there are
// some cleanup work to do in member cluster which depends
// agent to proxy traffic
err = c.unJoinFederation(nil, name)
if err != nil {
klog.Errorf("Failed to unjoin federation for cluster %s, error %v", name, err)
return err
}
_, err = c.client.CoreV1().Services(defaultAgentNamespace).Get(serviceName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// nothing to do
} else {
klog.Errorf("Failed to get proxy service %s, error %v", serviceName, err)
return err
}
} else {
err = c.client.CoreV1().Services(defaultAgentNamespace).Delete(serviceName, metav1.NewDeleteOptions(0))
if err != nil {
klog.Errorf("Unable to delete service %s, error %v", serviceName, err)
return err
}
}
// remove our cluster finalizer
finalizers := sets.NewString(cluster.ObjectMeta.Finalizers...)
finalizers.Delete(clusterv1alpha1.Finalizer)
cluster.ObjectMeta.Finalizers = finalizers.List()
if _, err = c.clusterClient.Update(cluster); err != nil {
return err
}
}
return nil
}
// save a old copy of cluster
oldCluster := cluster.DeepCopy()
// prepare for proxy to member cluster
if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy {
// allocate ports for kubernetes and kubesphere endpoint
if cluster.Spec.Connection.KubeSphereAPIServerPort == 0 ||
cluster.Spec.Connection.KubernetesAPIServerPort == 0 {
port, err := c.allocatePort()
if err != nil {
klog.Error(err)
return err
}
cluster.Spec.Connection.KubernetesAPIServerPort = port
cluster.Spec.Connection.KubeSphereAPIServerPort = port + 10000
}
// token uninitialized, generate a new token
if len(cluster.Spec.Connection.Token) == 0 {
cluster.Spec.Connection.Token = c.generateToken()
}
// create a proxy service spec
mcService := v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: cluster.Namespace,
Labels: map[string]string{
"app.kubernetes.io/name": serviceName,
"app": serviceName,
},
},
Spec: v1.ServiceSpec{
Selector: map[string]string{
"app.kubernetes.io/name": "tower",
"app": "tower",
},
Ports: []v1.ServicePort{
{
Name: "kubernetes",
Protocol: v1.ProtocolTCP,
Port: kubernetesPort,
TargetPort: intstr.FromInt(int(cluster.Spec.Connection.KubernetesAPIServerPort)),
},
{
Name: "kubesphere",
Protocol: v1.ProtocolTCP,
Port: kubespherePort,
TargetPort: intstr.FromInt(int(cluster.Spec.Connection.KubeSphereAPIServerPort)),
},
},
},
}
service, err := c.client.CoreV1().Services(defaultAgentNamespace).Get(serviceName, metav1.GetOptions{})
if err != nil { // proxy service not found
if errors.IsNotFound(err) {
service, err = c.client.CoreV1().Services(defaultAgentNamespace).Create(&mcService)
if err != nil {
return err
}
}
return err
} else { // update existed proxy service
if !reflect.DeepEqual(service.Spec, mcService.Spec) {
mcService.ObjectMeta = service.ObjectMeta
mcService.Spec.ClusterIP = service.Spec.ClusterIP
service, err = c.client.CoreV1().Services(defaultAgentNamespace).Update(&mcService)
if err != nil {
return err
}
}
}
// populates the kubernetes apiEndpoint and kubesphere apiEndpoint
cluster.Spec.Connection.KubernetesAPIEndpoint = fmt.Sprintf("https://%s:%d", service.Spec.ClusterIP, kubernetesPort)
cluster.Spec.Connection.KubeSphereAPIEndpoint = fmt.Sprintf("http://%s:%d", service.Spec.ClusterIP, kubespherePort)
initializedCondition := clusterv1alpha1.ClusterCondition{
Type: clusterv1alpha1.ClusterInitialized,
Status: v1.ConditionTrue,
Reason: string(clusterv1alpha1.ClusterInitialized),
Message: "Cluster has been initialized",
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
}
c.updateClusterCondition(cluster, initializedCondition)
if !reflect.DeepEqual(oldCluster, cluster) {
cluster, err = c.clusterClient.Update(cluster)
if err != nil {
klog.Errorf("Error updating cluster %s, error %s", cluster.Name, err)
return err
}
return nil
}
}
// kubeconfig not ready, nothing to do
if len(cluster.Spec.Connection.KubeConfig) == 0 {
return nil
}
// build up cached cluster data if there isn't any
c.mu.Lock()
clusterDt, ok := c.clusterMap[cluster.Name]
if !ok || clusterDt == nil || !equality.Semantic.DeepEqual(clusterDt.cachedKubeconfig, cluster.Spec.Connection.KubeConfig) {
clusterDt, err = buildClusterData(cluster.Spec.Connection.KubeConfig)
if err != nil {
c.mu.Unlock()
return err
}
c.clusterMap[cluster.Name] = clusterDt
}
c.mu.Unlock()
if !cluster.Spec.JoinFederation { // trying to unJoin federation
err = c.unJoinFederation(clusterDt.config, cluster.Name)
if err != nil {
klog.Errorf("Failed to unJoin federation for cluster %s, error %v", cluster.Name, err)
c.eventRecorder.Event(cluster, v1.EventTypeWarning, "UnJoinFederation", err.Error())
return err
}
} else { // join federation
_, err = c.joinFederation(clusterDt.config, cluster.Name, cluster.Labels)
if err != nil {
klog.Errorf("Failed to join federation for cluster %s, error %v", cluster.Name, err)
c.eventRecorder.Event(cluster, v1.EventTypeWarning, "JoinFederation", err.Error())
return err
}
c.eventRecorder.Event(cluster, v1.EventTypeNormal, "JoinFederation", "Cluster has joined federation.")
federationReadyCondition := clusterv1alpha1.ClusterCondition{
Type: clusterv1alpha1.ClusterFederated,
Status: v1.ConditionTrue,
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "",
Message: "Cluster has joined federation control plane successfully",
}
c.updateClusterCondition(cluster, federationReadyCondition)
}
// cluster agent is ready, we can pull kubernetes cluster info through agent
// since there is no agent necessary for host cluster, so updates for host cluster
// is safe.
if isConditionTrue(cluster, clusterv1alpha1.ClusterAgentAvailable) ||
cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeDirect {
if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 {
cluster.Spec.Connection.KubernetesAPIEndpoint = clusterDt.config.Host
}
version, err := clusterDt.client.Discovery().ServerVersion()
if err != nil {
klog.Errorf("Failed to get kubernetes version, %#v", err)
return err
}
cluster.Status.KubernetesVersion = version.GitVersion
nodes, err := clusterDt.client.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
klog.Errorf("Failed to get cluster nodes, %#v", err)
return err
}
cluster.Status.NodeCount = len(nodes.Items)
configz, err := c.tryToFetchKubeSphereComponents(clusterDt.config.Host, clusterDt.transport)
if err == nil {
cluster.Status.Configz = configz
}
clusterReadyCondition := clusterv1alpha1.ClusterCondition{
Type: clusterv1alpha1.ClusterReady,
Status: v1.ConditionTrue,
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: string(clusterv1alpha1.ClusterReady),
Message: "Cluster is available now",
}
c.updateClusterCondition(cluster, clusterReadyCondition)
}
if !isConditionTrue(cluster, clusterv1alpha1.ClusterAgentAvailable) {
clusterNotReadyCondition := clusterv1alpha1.ClusterCondition{
Type: clusterv1alpha1.ClusterReady,
Status: v1.ConditionFalse,
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: "Unable to establish connection with cluster",
Message: "Cluster is not available now",
}
c.updateClusterCondition(cluster, clusterNotReadyCondition)
}
if !reflect.DeepEqual(oldCluster, cluster) {
_, err = c.clusterClient.Update(cluster)
if err != nil {
klog.Errorf("Failed to update cluster status, %#v", err)
return err
}
}
return nil
}
// tryToFetchKubeSphereComponents will send requests to member cluster configz api using kube-apiserver proxy way
func (c *clusterController) tryToFetchKubeSphereComponents(host string, transport http.RoundTripper) (map[string]bool, error) {
client := http.Client{
Transport: transport,
Timeout: 5 * time.Second,
}
response, err := client.Get(fmt.Sprintf(proxyFormat, host, "kapis/config.kubesphere.io/v1alpha2/configs/configz"))
if err != nil {
klog.V(4).Infof("Failed to get kubesphere components, error %v", err)
return nil, err
}
if response.StatusCode != http.StatusOK {
klog.V(4).Infof("Response status code isn't 200.")
return nil, fmt.Errorf("response code %d", response.StatusCode)
}
configz := make(map[string]bool)
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&configz)
if err != nil {
klog.V(4).Infof("Decode error %v", err)
return nil, err
}
return configz, nil
}
func (c *clusterController) addCluster(obj interface{}) {
cluster := obj.(*clusterv1alpha1.Cluster)
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("get cluster key %s failed", cluster.Name))
return
}
c.queue.Add(key)
}
func (c *clusterController) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
}
if c.queue.NumRequeues(key) < maxRetries {
klog.V(2).Infof("Error syncing cluster %s, retrying, %v", key, err)
c.queue.AddRateLimited(key)
return
}
klog.V(4).Infof("Dropping cluster %s out of the queue.", key)
c.queue.Forget(key)
utilruntime.HandleError(err)
}
// isConditionTrue checks cluster specific condition value is True, return false if condition not exists
func isConditionTrue(cluster *clusterv1alpha1.Cluster, conditionType clusterv1alpha1.ClusterConditionType) bool {
for _, condition := range cluster.Status.Conditions {
if condition.Type == conditionType && condition.Status == v1.ConditionTrue {
return true
}
}
return false
}
// updateClusterCondition updates condition in cluster conditions using giving condition
// adds condition if not existed
func (c *clusterController) updateClusterCondition(cluster *clusterv1alpha1.Cluster, condition clusterv1alpha1.ClusterCondition) {
if cluster.Status.Conditions == nil {
cluster.Status.Conditions = make([]clusterv1alpha1.ClusterCondition, 0)
}
newConditions := make([]clusterv1alpha1.ClusterCondition, 0)
needToUpdate := true
for _, cond := range cluster.Status.Conditions {
if cond.Type == condition.Type {
if cond.Status == condition.Status {
needToUpdate = false
continue
} else {
newConditions = append(newConditions, cond)
}
}
newConditions = append(newConditions, cond)
}
if needToUpdate {
newConditions = append(newConditions, condition)
cluster.Status.Conditions = newConditions
}
}
func isHostCluster(cluster *clusterv1alpha1.Cluster) bool {
for k, v := range cluster.Annotations {
if k == clusterv1alpha1.IsHostCluster && v == "true" {
return true
}
}
return false
}
// joinFederation joins a cluster into federation clusters.
// return nil error if kubefed cluster already exists.
func (c *clusterController) joinFederation(clusterConfig *rest.Config, joiningClusterName string, labels map[string]string) (*fedv1b1.KubeFedCluster, error) {
return joinClusterForNamespace(c.hostConfig,
clusterConfig,
kubefedNamespace,
kubefedNamespace,
hostClusterName,
joiningClusterName,
fmt.Sprintf("%s-secret", joiningClusterName),
labels,
apiextv1b1.ClusterScoped,
false,
false)
}
// unJoinFederation unjoins a cluster from federation control plane.
func (c *clusterController) unJoinFederation(clusterConfig *rest.Config, unjoiningClusterName string) error {
return unjoinCluster(c.hostConfig,
clusterConfig,
kubefedNamespace,
hostClusterName,
unjoiningClusterName,
true,
false)
}
// allocatePort find a available port between [portRangeMin, portRangeMax] in maximumRetries
// TODO: only works with handful clusters
func (c *clusterController) allocatePort() (uint16, error) {
rand.Seed(time.Now().UnixNano())
clusters, err := c.clusterLister.List(labels.Everything())
if err != nil {
return 0, err
}
const maximumRetries = 10
for i := 0; i < maximumRetries; i++ {
collision := false
port := uint16(portRangeMin + rand.Intn(portRangeMax-portRangeMin+1))
for _, item := range clusters {
if item.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeProxy &&
item.Spec.Connection.KubernetesAPIServerPort != 0 &&
item.Spec.Connection.KubeSphereAPIServerPort == port {
collision = true
break
}
}
if !collision {
return port, nil
}
}
return 0, fmt.Errorf("unable to allocate port after %d retries", maximumRetries)
}
// generateToken returns a random 32-byte string as token
func (c *clusterController) generateToken() string {
rand.Seed(time.Now().UnixNano())
b := make([]byte, 32)
rand.Read(b)
return fmt.Sprintf("%x", b)
}