Merge remote-tracking branch 'upstream/dev' into dev
# Conflicts: # pkg/apis/iam/v1alpha2/user_types.go
This commit is contained in:
347
pkg/controller/cluster/cluster_controller.go
Normal file
347
pkg/controller/cluster/cluster_controller.go
Normal file
@@ -0,0 +1,347 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
|
||||
clusterclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/cluster/v1alpha1"
|
||||
clusterinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/cluster/v1alpha1"
|
||||
clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
|
||||
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
|
||||
// sequence of delays between successive queuings of a service.
|
||||
//
|
||||
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
|
||||
maxRetries = 15
|
||||
)
|
||||
|
||||
type ClusterController struct {
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
eventRecorder record.EventRecorder
|
||||
|
||||
agentClient clusterclient.AgentInterface
|
||||
clusterClient clusterclient.ClusterInterface
|
||||
|
||||
agentLister clusterlister.AgentLister
|
||||
agentHasSynced cache.InformerSynced
|
||||
|
||||
clusterLister clusterlister.ClusterLister
|
||||
clusterHasSynced cache.InformerSynced
|
||||
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
workerLoopPeriod time.Duration
|
||||
}
|
||||
|
||||
func NewClusterController(
|
||||
client kubernetes.Interface,
|
||||
clusterInformer clusterinformer.ClusterInformer,
|
||||
agentInformer clusterinformer.AgentInformer,
|
||||
agentClient clusterclient.AgentInterface,
|
||||
clusterClient clusterclient.ClusterInterface,
|
||||
) *ClusterController {
|
||||
|
||||
broadcaster := record.NewBroadcaster()
|
||||
broadcaster.StartLogging(func(format string, args ...interface{}) {
|
||||
klog.Info(fmt.Sprintf(format, args))
|
||||
})
|
||||
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
|
||||
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cluster-controller"})
|
||||
|
||||
c := &ClusterController{
|
||||
eventBroadcaster: broadcaster,
|
||||
eventRecorder: recorder,
|
||||
agentClient: agentClient,
|
||||
clusterClient: clusterClient,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"),
|
||||
workerLoopPeriod: time.Second,
|
||||
}
|
||||
|
||||
c.agentLister = agentInformer.Lister()
|
||||
c.agentHasSynced = agentInformer.Informer().HasSynced
|
||||
|
||||
c.clusterLister = clusterInformer.Lister()
|
||||
c.clusterHasSynced = clusterInformer.Informer().HasSynced
|
||||
|
||||
clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addCluster,
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
c.addCluster(newObj)
|
||||
},
|
||||
DeleteFunc: c.addCluster,
|
||||
})
|
||||
|
||||
agentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: nil,
|
||||
UpdateFunc: nil,
|
||||
DeleteFunc: nil,
|
||||
})
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *ClusterController) Start(stopCh <-chan struct{}) error {
|
||||
return c.Run(5, stopCh)
|
||||
}
|
||||
|
||||
func (c *ClusterController) Run(workers int, stopCh <-chan struct{}) error {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
klog.V(0).Info("starting cluster controller")
|
||||
defer klog.Info("shutting down cluster controller")
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, c.clusterHasSynced, c.agentHasSynced) {
|
||||
return fmt.Errorf("failed to wait for caches to sync")
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ClusterController) worker() {
|
||||
for c.processNextItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ClusterController) processNextItem() bool {
|
||||
key, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
|
||||
defer c.queue.Done(key)
|
||||
|
||||
err := c.syncCluster(key.(string))
|
||||
c.handleErr(err, key)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *ClusterController) syncCluster(key string) error {
|
||||
startTime := time.Now()
|
||||
|
||||
_, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
klog.Errorf("not a valid controller key %s, %#v", key, err)
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
klog.V(4).Infof("Finished syncing cluster %s in %s", name, time.Since(startTime))
|
||||
}()
|
||||
|
||||
cluster, err := c.clusterLister.Get(name)
|
||||
if err != nil {
|
||||
// cluster not found, possibly been deleted
|
||||
// need to do the cleanup
|
||||
if errors.IsNotFound(err) {
|
||||
_, err = c.agentLister.Get(name)
|
||||
if err != nil && errors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster agent %s, %#v", name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// do the real cleanup work
|
||||
err = c.agentClient.Delete(name, &metav1.DeleteOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
klog.Errorf("Failed to get cluster with name %s, %#v", name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
newAgent := &clusterv1alpha1.Agent{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Labels: map[string]string{
|
||||
"app.kubernetes.io/name": "tower",
|
||||
"cluster.kubesphere.io/name": name,
|
||||
},
|
||||
},
|
||||
Spec: clusterv1alpha1.AgentSpec{
|
||||
Token: "",
|
||||
KubeSphereAPIServerPort: 0,
|
||||
KubernetesAPIServerPort: 0,
|
||||
Proxy: "",
|
||||
Paused: !cluster.Spec.Active,
|
||||
},
|
||||
}
|
||||
|
||||
agent, err := c.agentLister.Get(name)
|
||||
if err != nil && errors.IsNotFound(err) {
|
||||
agent, err = c.agentClient.Create(newAgent)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create agent %s, %#v", name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get agent %s, %#v", name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if agent.Spec.Paused != newAgent.Spec.Paused {
|
||||
agent.Spec.Paused = newAgent.Spec.Paused
|
||||
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
_, err = c.agentClient.Update(agent)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// agent connection is ready, update cluster status
|
||||
// set
|
||||
if len(agent.Status.KubeConfig) != 0 && c.isAgentReady(agent) {
|
||||
clientConfig, err := clientcmd.NewClientConfigFromBytes(agent.Status.KubeConfig)
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to create client config from kubeconfig bytes, %#v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
config, err := clientConfig.ClientConfig()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get client config, %#v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
clientSet, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create ClientSet from config, %#v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
version, err := clientSet.Discovery().ServerVersion()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get kubernetes version, %#v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
cluster.Status.KubernetesVersion = version.GitVersion
|
||||
|
||||
nodes, err := clientSet.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get cluster nodes, %#v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
cluster.Status.NodeCount = len(nodes.Items)
|
||||
}
|
||||
|
||||
agentReadyCondition := clusterv1alpha1.ClusterCondition{
|
||||
Type: clusterv1alpha1.ClusterAgentAvailable,
|
||||
LastUpdateTime: metav1.NewTime(time.Now()),
|
||||
LastTransitionTime: metav1.NewTime(time.Now()),
|
||||
Reason: "",
|
||||
Message: "Cluster agent is available now.",
|
||||
}
|
||||
|
||||
if c.isAgentReady(agent) {
|
||||
agentReadyCondition.Status = v1.ConditionTrue
|
||||
} else {
|
||||
agentReadyCondition.Status = v1.ConditionFalse
|
||||
}
|
||||
|
||||
c.addClusterCondition(cluster, agentReadyCondition)
|
||||
|
||||
_, err = c.clusterClient.Update(cluster)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update cluster status, %#v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ClusterController) addCluster(obj interface{}) {
|
||||
cluster := obj.(*clusterv1alpha1.Cluster)
|
||||
|
||||
key, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("get cluster key %s failed", cluster.Name))
|
||||
return
|
||||
}
|
||||
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
func (c *ClusterController) handleErr(err error, key interface{}) {
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
return
|
||||
}
|
||||
|
||||
if c.queue.NumRequeues(key) < maxRetries {
|
||||
klog.V(2).Infof("Error syncing virtualservice %s for service retrying, %#v", key, err)
|
||||
c.queue.AddRateLimited(key)
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Dropping service %s out of the queue.", key)
|
||||
c.queue.Forget(key)
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
|
||||
func (c *ClusterController) addAgent(obj interface{}) {
|
||||
agent := obj.(*clusterv1alpha1.Agent)
|
||||
key, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("get agent key %s failed", agent.Name))
|
||||
return
|
||||
}
|
||||
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
func (c *ClusterController) isAgentReady(agent *clusterv1alpha1.Agent) bool {
|
||||
for _, condition := range agent.Status.Conditions {
|
||||
if condition.Type == clusterv1alpha1.AgentConnected && condition.Status == v1.ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// addClusterCondition add condition
|
||||
func (c *ClusterController) addClusterCondition(cluster *clusterv1alpha1.Cluster, condition clusterv1alpha1.ClusterCondition) {
|
||||
if cluster.Status.Conditions == nil {
|
||||
cluster.Status.Conditions = make([]clusterv1alpha1.ClusterCondition, 0)
|
||||
}
|
||||
|
||||
newConditions := make([]clusterv1alpha1.ClusterCondition, 0)
|
||||
for _, cond := range cluster.Status.Conditions {
|
||||
if cond.Type == condition.Type {
|
||||
continue
|
||||
}
|
||||
newConditions = append(newConditions, cond)
|
||||
}
|
||||
|
||||
newConditions = append(newConditions, condition)
|
||||
cluster.Status.Conditions = newConditions
|
||||
}
|
||||
1
pkg/controller/cluster/cluster_controller_test.go
Normal file
1
pkg/controller/cluster/cluster_controller_test.go
Normal file
@@ -0,0 +1 @@
|
||||
package cluster
|
||||
@@ -199,7 +199,7 @@ func (v *DestinationRuleController) processNextWorkItem() bool {
|
||||
func (v *DestinationRuleController) syncService(key string) error {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
log.V(4).Info("Finished syncing service destinationrule.", "key", key, "duration", time.Since(startTime))
|
||||
log.V(4).Infof("Finished syncing service destinationrule %s in %s.", key, time.Since(startTime))
|
||||
}()
|
||||
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
@@ -212,14 +212,14 @@ func (v *DestinationRuleController) syncService(key string) error {
|
||||
// delete the corresponding destinationrule if there is any, as the service has been deleted.
|
||||
err = v.destinationRuleClient.NetworkingV1alpha3().DestinationRules(namespace).Delete(name, nil)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
log.Error(err, "delete destination rule failed", "namespace", namespace, "name", name)
|
||||
log.Errorf("delete destination rule failed %s/%s, error %v.", namespace, name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// delete orphan service policy if there is any
|
||||
err = v.servicemeshClient.ServicemeshV1alpha2().ServicePolicies(namespace).Delete(name, nil)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
log.Error(err, "delete orphan service policy failed", "namespace", namespace, "name", name)
|
||||
log.Errorf("delete orphan service policy %s/%s failed, %#v", namespace, name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -259,7 +259,7 @@ func (v *DestinationRuleController) syncService(key string) error {
|
||||
version := util.GetComponentVersion(&deployment.ObjectMeta)
|
||||
|
||||
if len(version) == 0 {
|
||||
log.V(4).Info("Deployment doesn't have a version label", "key", types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name}.String())
|
||||
log.V(4).Infof("Deployment %s doesn't have a version label", types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name}.String())
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -213,7 +213,7 @@ func (v *VirtualServiceController) syncService(key string) error {
|
||||
appName := name
|
||||
|
||||
defer func() {
|
||||
log.V(4).Info("Finished syncing service virtualservice.", "namespace", namespace, "name", name, "duration", time.Since(startTime))
|
||||
log.V(4).Infof("Finished syncing service virtualservice %s/%s in %s.", namespace, name, time.Since(startTime))
|
||||
}()
|
||||
|
||||
service, err := v.serviceLister.Services(namespace).Get(name)
|
||||
|
||||
Reference in New Issue
Block a user