add component status to cluster status (#2160)

This commit is contained in:
zryfish
2020-06-05 14:51:05 +08:00
committed by GitHub
parent 98b34a7760
commit bb4d3fee7a
163 changed files with 3431 additions and 1889 deletions

View File

@@ -1,9 +1,11 @@
package cluster
import (
"encoding/json"
"fmt"
v1 "k8s.io/api/core/v1"
apiextv1b1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@@ -25,8 +27,10 @@ import (
clusterinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/cluster/v1alpha1"
clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
"math/rand"
"net/http"
"reflect"
fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
"sync"
"time"
)
@@ -52,9 +56,28 @@ const (
kubespherePort = 80
defaultAgentNamespace = "kubesphere-system"
// proxy format
proxyFormat = "%s/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:80/proxy/%s"
)
type ClusterController struct {
// ClusterData stores cluster client
type clusterData struct {
// cached rest.Config
config *rest.Config
// cached kubernetes client, rebuild once cluster changed
client kubernetes.Interface
// cached kubeconfig
cachedKubeconfig []byte
// cached transport, used to proxy kubesphere version request
transport http.RoundTripper
}
type clusterController struct {
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
@@ -69,6 +92,10 @@ type ClusterController struct {
queue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
mu sync.RWMutex
clusterMap map[string]*clusterData
}
func NewClusterController(
@@ -76,7 +103,7 @@ func NewClusterController(
config *rest.Config,
clusterInformer clusterinformer.ClusterInformer,
clusterClient clusterclient.ClusterInterface,
) *ClusterController {
) *clusterController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) {
@@ -85,7 +112,7 @@ func NewClusterController(
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cluster-controller"})
c := &ClusterController{
c := &clusterController{
eventBroadcaster: broadcaster,
eventRecorder: recorder,
client: client,
@@ -93,6 +120,7 @@ func NewClusterController(
clusterClient: clusterClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster"),
workerLoopPeriod: time.Second,
clusterMap: make(map[string]*clusterData),
}
c.clusterLister = clusterInformer.Lister()
@@ -114,11 +142,11 @@ func NewClusterController(
return c
}
func (c *ClusterController) Start(stopCh <-chan struct{}) error {
return c.Run(5, stopCh)
func (c *clusterController) Start(stopCh <-chan struct{}) error {
return c.Run(3, stopCh)
}
func (c *ClusterController) Run(workers int, stopCh <-chan struct{}) error {
func (c *clusterController) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
@@ -133,16 +161,22 @@ func (c *ClusterController) Run(workers int, stopCh <-chan struct{}) error {
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
}
go wait.Until(func() {
if err := c.syncStatus(); err != nil {
klog.Errorf("Error periodically sync cluster status, %v", err)
}
}, 5*time.Minute, stopCh)
<-stopCh
return nil
}
func (c *ClusterController) worker() {
func (c *clusterController) worker() {
for c.processNextItem() {
}
}
func (c *ClusterController) processNextItem() bool {
func (c *clusterController) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
@@ -155,7 +189,59 @@ func (c *ClusterController) processNextItem() bool {
return true
}
func (c *ClusterController) syncCluster(key string) error {
func buildClusterData(kubeconfig []byte) (*clusterData, error) {
// prepare for
clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
if err != nil {
klog.Errorf("Unable to create client config from kubeconfig bytes, %#v", err)
return nil, err
}
clusterConfig, err := clientConfig.ClientConfig()
if err != nil {
klog.Errorf("Failed to get client config, %#v", err)
return nil, err
}
transport, err := rest.TransportFor(clusterConfig)
if err != nil {
klog.Errorf("Failed to create transport, %#v", err)
return nil, err
}
clientSet, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
klog.Errorf("Failed to create ClientSet from config, %#v", err)
return nil, err
}
return &clusterData{
cachedKubeconfig: kubeconfig,
config: clusterConfig,
client: clientSet,
transport: transport,
}, nil
}
func (c *clusterController) syncStatus() error {
clusters, err := c.clusterLister.List(labels.Everything())
if err != nil {
return err
}
for _, cluster := range clusters {
key, err := cache.MetaNamespaceKeyFunc(cluster)
if err != nil {
return err
}
c.queue.AddRateLimited(key)
}
return nil
}
func (c *clusterController) syncCluster(key string) error {
startTime := time.Now()
_, name, err := cache.SplitMetaNamespaceKey(key)
@@ -333,37 +419,27 @@ func (c *ClusterController) syncCluster(key string) error {
return nil
}
var clientSet kubernetes.Interface
var clusterConfig *rest.Config
// prepare for
clientConfig, err := clientcmd.NewClientConfigFromBytes(cluster.Spec.Connection.KubeConfig)
if err != nil {
klog.Errorf("Unable to create client config from kubeconfig bytes, %#v", err)
return err
}
clusterConfig, err = clientConfig.ClientConfig()
if err != nil {
klog.Errorf("Failed to get client config, %#v", err)
return err
}
clientSet, err = kubernetes.NewForConfig(clusterConfig)
if err != nil {
klog.Errorf("Failed to create ClientSet from config, %#v", err)
return err
c.mu.Lock()
clusterDt, ok := c.clusterMap[cluster.Name]
if !ok || clusterDt == nil || !equality.Semantic.DeepEqual(clusterDt.cachedKubeconfig, cluster.Spec.Connection.KubeConfig) {
clusterDt, err = buildClusterData(cluster.Spec.Connection.KubeConfig)
if err != nil {
c.mu.Unlock()
return err
}
c.clusterMap[cluster.Name] = clusterDt
}
c.mu.Unlock()
if !cluster.Spec.JoinFederation { // trying to unJoin federation
err = c.unJoinFederation(clusterConfig, cluster.Name)
err = c.unJoinFederation(clusterDt.config, cluster.Name)
if err != nil {
klog.Errorf("Failed to unJoin federation for cluster %s, error %v", cluster.Name, err)
c.eventRecorder.Event(cluster, v1.EventTypeWarning, "UnJoinFederation", err.Error())
return err
}
} else { // join federation
_, err = c.joinFederation(clusterConfig, cluster.Name, cluster.Labels)
_, err = c.joinFederation(clusterDt.config, cluster.Name, cluster.Labels)
if err != nil {
klog.Errorf("Failed to join federation for cluster %s, error %v", cluster.Name, err)
c.eventRecorder.Event(cluster, v1.EventTypeWarning, "JoinFederation", err.Error())
@@ -390,10 +466,10 @@ func (c *ClusterController) syncCluster(key string) error {
cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeDirect {
if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 {
cluster.Spec.Connection.KubernetesAPIEndpoint = clusterConfig.Host
cluster.Spec.Connection.KubernetesAPIEndpoint = clusterDt.config.Host
}
version, err := clientSet.Discovery().ServerVersion()
version, err := clusterDt.client.Discovery().ServerVersion()
if err != nil {
klog.Errorf("Failed to get kubernetes version, %#v", err)
return err
@@ -401,7 +477,7 @@ func (c *ClusterController) syncCluster(key string) error {
cluster.Status.KubernetesVersion = version.GitVersion
nodes, err := clientSet.CoreV1().Nodes().List(metav1.ListOptions{})
nodes, err := clusterDt.client.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
klog.Errorf("Failed to get cluster nodes, %#v", err)
return err
@@ -409,6 +485,11 @@ func (c *ClusterController) syncCluster(key string) error {
cluster.Status.NodeCount = len(nodes.Items)
configz, err := c.tryToFetchKubeSphereComponents(clusterDt.config.Host, clusterDt.transport)
if err == nil {
cluster.Status.Configz = configz
}
clusterReadyCondition := clusterv1alpha1.ClusterCondition{
Type: clusterv1alpha1.ClusterReady,
Status: v1.ConditionTrue,
@@ -432,7 +513,34 @@ func (c *ClusterController) syncCluster(key string) error {
return nil
}
func (c *ClusterController) addCluster(obj interface{}) {
func (c *clusterController) tryToFetchKubeSphereComponents(host string, transport http.RoundTripper) (map[string]bool, error) {
client := http.Client{
Transport: transport,
Timeout: 5 * time.Second,
}
response, err := client.Get(fmt.Sprintf(proxyFormat, host, "kapis/config.kubesphere.io/v1alpha2/configs/configz"))
if err != nil {
klog.V(4).Infof("Failed to get kubesphere components, error %v", err)
return nil, err
}
if response.StatusCode != http.StatusOK {
klog.V(4).Infof("Response status code isn't 200.")
return nil, fmt.Errorf("response code %d", response.StatusCode)
}
configz := make(map[string]bool)
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&configz)
if err != nil {
klog.V(4).Infof("Decode error %v", err)
return nil, err
}
return configz, nil
}
func (c *clusterController) addCluster(obj interface{}) {
cluster := obj.(*clusterv1alpha1.Cluster)
key, err := cache.MetaNamespaceKeyFunc(obj)
@@ -444,7 +552,7 @@ func (c *ClusterController) addCluster(obj interface{}) {
c.queue.Add(key)
}
func (c *ClusterController) handleErr(err error, key interface{}) {
func (c *clusterController) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
@@ -472,7 +580,7 @@ func isConditionTrue(cluster *clusterv1alpha1.Cluster, conditionType clusterv1al
// updateClusterCondition updates condition in cluster conditions using giving condition
// adds condition if not existed
func (c *ClusterController) updateClusterCondition(cluster *clusterv1alpha1.Cluster, condition clusterv1alpha1.ClusterCondition) {
func (c *clusterController) updateClusterCondition(cluster *clusterv1alpha1.Cluster, condition clusterv1alpha1.ClusterCondition) {
if cluster.Status.Conditions == nil {
cluster.Status.Conditions = make([]clusterv1alpha1.ClusterCondition, 0)
}
@@ -509,7 +617,7 @@ func isHostCluster(cluster *clusterv1alpha1.Cluster) bool {
// joinFederation joins a cluster into federation clusters.
// return nil error if kubefed cluster already exists.
func (c *ClusterController) joinFederation(clusterConfig *rest.Config, joiningClusterName string, labels map[string]string) (*fedv1b1.KubeFedCluster, error) {
func (c *clusterController) joinFederation(clusterConfig *rest.Config, joiningClusterName string, labels map[string]string) (*fedv1b1.KubeFedCluster, error) {
return joinClusterForNamespace(c.hostConfig,
clusterConfig,
@@ -525,7 +633,7 @@ func (c *ClusterController) joinFederation(clusterConfig *rest.Config, joiningCl
}
// unJoinFederation unjoins a cluster from federation control plane.
func (c *ClusterController) unJoinFederation(clusterConfig *rest.Config, unjoiningClusterName string) error {
func (c *clusterController) unJoinFederation(clusterConfig *rest.Config, unjoiningClusterName string) error {
return unjoinCluster(c.hostConfig,
clusterConfig,
kubefedNamespace,
@@ -537,7 +645,7 @@ func (c *ClusterController) unJoinFederation(clusterConfig *rest.Config, unjoini
// allocatePort find a available port between [portRangeMin, portRangeMax] in maximumRetries
// TODO: only works with handful clusters
func (c *ClusterController) allocatePort() (uint16, error) {
func (c *clusterController) allocatePort() (uint16, error) {
rand.Seed(time.Now().UnixNano())
clusters, err := c.clusterLister.List(labels.Everything())
@@ -568,7 +676,7 @@ func (c *ClusterController) allocatePort() (uint16, error) {
}
// generateToken returns a random 32-byte string as token
func (c *ClusterController) generateToken() string {
func (c *clusterController) generateToken() string {
rand.Seed(time.Now().UnixNano())
b := make([]byte, 32)
rand.Read(b)