1. refactor kubesphere dependency service client creation, we can disable dependency by config 2. dependencies can be configured by configuration file 3. refactor cmd package using cobra.Command, so we can use hypersphere to invoke command sepearately. Later we only need to build one image to contains all kubesphere core components. One command to rule them all! 4. live reloading configuration currently not implemented
260 lines
8.0 KiB
Go
260 lines
8.0 KiB
Go
package application
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
applicationclient "github.com/kubernetes-sigs/application/pkg/client/clientset/versioned"
|
|
applicationinformers "github.com/kubernetes-sigs/application/pkg/client/informers/externalversions/app/v1beta1"
|
|
applicationlister "github.com/kubernetes-sigs/application/pkg/client/listers/app/v1beta1"
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
informersv1 "k8s.io/client-go/informers/apps/v1"
|
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
listersv1 "k8s.io/client-go/listers/apps/v1"
|
|
corelisters "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/client-go/util/workqueue"
|
|
log "k8s.io/klog"
|
|
"k8s.io/kubernetes/pkg/util/metrics"
|
|
servicemeshinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/servicemesh/v1alpha2"
|
|
servicemeshlisters "kubesphere.io/kubesphere/pkg/client/listers/servicemesh/v1alpha2"
|
|
"kubesphere.io/kubesphere/pkg/controller/virtualservice/util"
|
|
)
|
|
|
|
const (
|
|
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
|
|
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
|
|
// sequence of delays between successive queuings of a service.
|
|
//
|
|
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
|
|
maxRetries = 15
|
|
)
|
|
|
|
type ApplicationController struct {
|
|
client clientset.Interface
|
|
|
|
applicationClient applicationclient.Interface
|
|
|
|
eventBroadcaster record.EventBroadcaster
|
|
eventRecorder record.EventRecorder
|
|
|
|
applicationLister applicationlister.ApplicationLister
|
|
applicationSynced cache.InformerSynced
|
|
|
|
serviceLister corelisters.ServiceLister
|
|
serviceSynced cache.InformerSynced
|
|
|
|
deploymentLister listersv1.DeploymentLister
|
|
deploymentSynced cache.InformerSynced
|
|
|
|
statefulSetLister listersv1.StatefulSetLister
|
|
statefulSetSynced cache.InformerSynced
|
|
|
|
strategyLister servicemeshlisters.StrategyLister
|
|
strategySynced cache.InformerSynced
|
|
|
|
servicePolicyLister servicemeshlisters.ServicePolicyLister
|
|
servicePolicySynced cache.InformerSynced
|
|
|
|
queue workqueue.RateLimitingInterface
|
|
|
|
workerLoopPeriod time.Duration
|
|
}
|
|
|
|
func NewApplicationController(serviceInformer coreinformers.ServiceInformer,
|
|
deploymentInformer informersv1.DeploymentInformer,
|
|
statefulSetInformer informersv1.StatefulSetInformer,
|
|
strategyInformer servicemeshinformers.StrategyInformer,
|
|
servicePolicyInformer servicemeshinformers.ServicePolicyInformer,
|
|
applicationInformer applicationinformers.ApplicationInformer,
|
|
client clientset.Interface,
|
|
applicationClient applicationclient.Interface) *ApplicationController {
|
|
|
|
broadcaster := record.NewBroadcaster()
|
|
broadcaster.StartLogging(func(format string, args ...interface{}) {
|
|
log.Info(fmt.Sprintf(format, args))
|
|
})
|
|
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events("")})
|
|
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "application-controller"})
|
|
|
|
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
|
|
metrics.RegisterMetricAndTrackRateLimiterUsage("virtualservice_controller", client.CoreV1().RESTClient().GetRateLimiter())
|
|
}
|
|
|
|
v := &ApplicationController{
|
|
client: client,
|
|
applicationClient: applicationClient,
|
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "application"),
|
|
workerLoopPeriod: time.Second,
|
|
}
|
|
|
|
v.deploymentLister = deploymentInformer.Lister()
|
|
v.deploymentSynced = deploymentInformer.Informer().HasSynced
|
|
|
|
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: v.enqueueObject,
|
|
DeleteFunc: v.enqueueObject,
|
|
})
|
|
|
|
v.statefulSetLister = statefulSetInformer.Lister()
|
|
v.statefulSetSynced = statefulSetInformer.Informer().HasSynced
|
|
|
|
statefulSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: v.enqueueObject,
|
|
DeleteFunc: v.enqueueObject,
|
|
})
|
|
|
|
v.serviceLister = serviceInformer.Lister()
|
|
v.serviceSynced = serviceInformer.Informer().HasSynced
|
|
|
|
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: v.enqueueObject,
|
|
DeleteFunc: v.enqueueObject,
|
|
})
|
|
|
|
v.strategyLister = strategyInformer.Lister()
|
|
v.strategySynced = strategyInformer.Informer().HasSynced
|
|
|
|
strategyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: v.enqueueObject,
|
|
DeleteFunc: v.enqueueObject,
|
|
})
|
|
|
|
v.servicePolicyLister = servicePolicyInformer.Lister()
|
|
v.servicePolicySynced = servicePolicyInformer.Informer().HasSynced
|
|
|
|
servicePolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: v.enqueueObject,
|
|
DeleteFunc: v.enqueueObject,
|
|
})
|
|
|
|
v.applicationLister = applicationInformer.Lister()
|
|
v.applicationSynced = applicationInformer.Informer().HasSynced
|
|
|
|
v.eventBroadcaster = broadcaster
|
|
v.eventRecorder = recorder
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
func (v *ApplicationController) Start(stopCh <-chan struct{}) error {
|
|
return v.Run(2, stopCh)
|
|
}
|
|
|
|
func (v *ApplicationController) Run(workers int, stopCh <-chan struct{}) error {
|
|
defer utilruntime.HandleCrash()
|
|
defer v.queue.ShutDown()
|
|
|
|
log.Info("starting application controller")
|
|
defer log.Info("shutting down application controller")
|
|
|
|
if !cache.WaitForCacheSync(stopCh, v.deploymentSynced, v.statefulSetSynced, v.serviceSynced, v.strategySynced, v.servicePolicySynced, v.applicationSynced) {
|
|
return fmt.Errorf("failed to wait for caches to sync")
|
|
}
|
|
|
|
for i := 0; i < workers; i++ {
|
|
go wait.Until(v.worker, v.workerLoopPeriod, stopCh)
|
|
}
|
|
<-stopCh
|
|
return nil
|
|
}
|
|
|
|
func (v *ApplicationController) worker() {
|
|
|
|
for v.processNextWorkItem() {
|
|
}
|
|
}
|
|
|
|
func (v *ApplicationController) processNextWorkItem() bool {
|
|
eKey, quit := v.queue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
|
|
defer v.queue.Done(eKey)
|
|
|
|
err := v.syncApplication(eKey.(string))
|
|
v.handleErr(err, eKey)
|
|
|
|
return true
|
|
}
|
|
|
|
func (v *ApplicationController) syncApplication(key string) error {
|
|
startTime := time.Now()
|
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
if err != nil {
|
|
log.Error(err, "not a valid controller key", "key", key)
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
log.V(4).Info("Finished updating application.", "namespace", namespace, "name", name, "duration", time.Since(startTime))
|
|
}()
|
|
|
|
application, err := v.applicationLister.Applications(namespace).Get(name)
|
|
if err != nil {
|
|
if errors.IsNotFound(err) {
|
|
// application has been deleted
|
|
return nil
|
|
}
|
|
log.Error(err, "get application failed")
|
|
}
|
|
|
|
annotations := application.GetAnnotations()
|
|
annotations["kubesphere.io/last-updated"] = time.Now().String()
|
|
application.SetAnnotations(annotations)
|
|
|
|
_, err = v.applicationClient.AppV1beta1().Applications(namespace).Update(application)
|
|
if err != nil {
|
|
if errors.IsNotFound(err) {
|
|
log.V(4).Info("application has been deleted during update")
|
|
return nil
|
|
}
|
|
log.Error(err, "failed to update application", "namespace", namespace, "name", name)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (v *ApplicationController) enqueueObject(obj interface{}) {
|
|
var resource = obj.(metav1.Object)
|
|
|
|
if resource.GetLabels() == nil || !util.IsApplicationComponent(resource.GetLabels()) {
|
|
return
|
|
}
|
|
|
|
applicationName := util.GetApplictionName(resource.GetLabels())
|
|
|
|
if len(applicationName) > 0 {
|
|
key := resource.GetNamespace() + "/" + applicationName
|
|
v.queue.Add(key)
|
|
}
|
|
}
|
|
|
|
func (v *ApplicationController) handleErr(err error, key interface{}) {
|
|
if err != nil {
|
|
v.queue.Forget(key)
|
|
return
|
|
}
|
|
|
|
if v.queue.NumRequeues(key) < maxRetries {
|
|
log.V(2).Info("Error syncing virtualservice for service retrying.", "key", key, "error", err)
|
|
v.queue.AddRateLimited(key)
|
|
return
|
|
}
|
|
|
|
log.V(4).Info("Dropping service out of the queue.", "key", key, "error", err)
|
|
v.queue.Forget(key)
|
|
utilruntime.HandleError(err)
|
|
}
|