refactor application controller

Signed-off-by: zackzhang <zackzhang@yunify.com>
This commit is contained in:
Zack Zhang
2020-12-29 16:18:31 +08:00
committed by hongming
parent fe6c5de00f
commit f20c1f33f1
69 changed files with 1015 additions and 3155 deletions

View File

@@ -18,262 +18,124 @@ package application
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1beta12 "k8s.io/api/networking/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
informersv1 "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
listersv1 "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
log "k8s.io/klog"
servicemeshinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/servicemesh/v1alpha2"
servicemeshlisters "kubesphere.io/kubesphere/pkg/client/listers/servicemesh/v1alpha2"
"k8s.io/klog"
servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
"kubesphere.io/kubesphere/pkg/controller/virtualservice/util"
applicationclient "kubesphere.io/kubesphere/pkg/simple/client/app/clientset/versioned"
applicationinformers "kubesphere.io/kubesphere/pkg/simple/client/app/informers/externalversions/app/v1beta1"
applicationlister "kubesphere.io/kubesphere/pkg/simple/client/app/listers/app/v1beta1"
"sigs.k8s.io/application/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"time"
)
const (
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
// sequence of delays between successive queuings of a service.
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)
type ApplicationController struct {
client clientset.Interface
applicationClient applicationclient.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
applicationLister applicationlister.ApplicationLister
applicationSynced cache.InformerSynced
serviceLister corelisters.ServiceLister
serviceSynced cache.InformerSynced
deploymentLister listersv1.DeploymentLister
deploymentSynced cache.InformerSynced
statefulSetLister listersv1.StatefulSetLister
statefulSetSynced cache.InformerSynced
strategyLister servicemeshlisters.StrategyLister
strategySynced cache.InformerSynced
servicePolicyLister servicemeshlisters.ServicePolicyLister
servicePolicySynced cache.InformerSynced
queue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration
// Add creates a new Workspace Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
return add(mgr, newReconciler(mgr))
}
func NewApplicationController(serviceInformer coreinformers.ServiceInformer,
deploymentInformer informersv1.DeploymentInformer,
statefulSetInformer informersv1.StatefulSetInformer,
strategyInformer servicemeshinformers.StrategyInformer,
servicePolicyInformer servicemeshinformers.ServicePolicyInformer,
applicationInformer applicationinformers.ApplicationInformer,
client clientset.Interface,
applicationClient applicationclient.Interface) *ApplicationController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(func(format string, args ...interface{}) {
log.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "application-controller"})
v := &ApplicationController{
client: client,
applicationClient: applicationClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "application"),
workerLoopPeriod: time.Second,
}
v.deploymentLister = deploymentInformer.Lister()
v.deploymentSynced = deploymentInformer.Informer().HasSynced
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueueObject,
UpdateFunc: func(old, new interface{}) { v.enqueueObject(new) },
DeleteFunc: v.enqueueObject,
})
v.statefulSetLister = statefulSetInformer.Lister()
v.statefulSetSynced = statefulSetInformer.Informer().HasSynced
statefulSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueueObject,
UpdateFunc: func(old, new interface{}) { v.enqueueObject(new) },
DeleteFunc: v.enqueueObject,
})
v.serviceLister = serviceInformer.Lister()
v.serviceSynced = serviceInformer.Informer().HasSynced
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueueObject,
UpdateFunc: func(old, new interface{}) { v.enqueueObject(new) },
DeleteFunc: v.enqueueObject,
})
v.strategyLister = strategyInformer.Lister()
v.strategySynced = strategyInformer.Informer().HasSynced
strategyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueueObject,
UpdateFunc: func(old, new interface{}) { v.enqueueObject(new) },
DeleteFunc: v.enqueueObject,
})
v.servicePolicyLister = servicePolicyInformer.Lister()
v.servicePolicySynced = servicePolicyInformer.Informer().HasSynced
servicePolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: v.enqueueObject,
UpdateFunc: func(old, new interface{}) { v.enqueueObject(new) },
DeleteFunc: v.enqueueObject,
})
v.applicationLister = applicationInformer.Lister()
v.applicationSynced = applicationInformer.Informer().HasSynced
v.eventBroadcaster = broadcaster
v.eventRecorder = recorder
return v
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileApplication{Client: mgr.GetClient(), scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor("application-controller")}
}
func (v *ApplicationController) Start(stopCh <-chan struct{}) error {
return v.Run(2, stopCh)
}
func (v *ApplicationController) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer v.queue.ShutDown()
log.Info("starting application controller")
defer log.Info("shutting down application controller")
if !cache.WaitForCacheSync(stopCh, v.deploymentSynced, v.statefulSetSynced, v.serviceSynced, v.strategySynced, v.servicePolicySynced, v.applicationSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.Until(v.worker, v.workerLoopPeriod, stopCh)
}
<-stopCh
return nil
}
func (v *ApplicationController) worker() {
for v.processNextWorkItem() {
}
}
func (v *ApplicationController) processNextWorkItem() bool {
eKey, quit := v.queue.Get()
if quit {
return false
}
defer v.queue.Done(eKey)
err := v.syncApplication(eKey.(string))
v.handleErr(err, eKey)
return true
}
func (v *ApplicationController) syncApplication(key string) error {
startTime := time.Now()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New("application-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
log.Error(err, "not a valid controller key", "key", key)
return err
}
defer func() {
log.V(4).Info("Finished updating application.", "namespace", namespace, "name", name, "duration", time.Since(startTime))
}()
application, err := v.applicationLister.Applications(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
// application has been deleted
return nil
}
log.Error(err, "get application failed")
sources := []runtime.Object{
&v1.Deployment{},
&corev1.Service{},
&v1.StatefulSet{},
&v1beta12.Ingress{},
&servicemeshv1alpha2.ServicePolicy{},
&servicemeshv1alpha2.Strategy{},
}
annotations := application.GetAnnotations()
for _, s := range sources {
// Watch for changes to Application
err = c.Watch(&source.Kind{Type: s},
&handler.EnqueueRequestForOwner{OwnerType: &v1beta1.Application{}, IsController: false},
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return isApp(e.MetaOld)
},
CreateFunc: func(e event.CreateEvent) bool {
return isApp(e.Meta)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return isApp(e.Meta)
},
})
if err != nil {
return err
}
}
return nil
}
var _ reconcile.Reconciler = &ReconcileApplication{}
// ReconcileApplication reconciles a Workspace object
type ReconcileApplication struct {
client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
}
// +kubebuilder:rbac:groups=app.k8s.io,resources=applications,verbs=get;list;watch;create;update;patch;delete
func (r *ReconcileApplication) Reconcile(request reconcile.Request) (reconcile.Result, error) {
// Fetch the Application instance
ctx := context.Background()
app := &v1beta1.Application{}
err := r.Get(ctx, request.NamespacedName, app)
if err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
// add specified annotation for app when triggered by sub-resources,
// so the application in sigs.k8s.io can reconcile to update status
annotations := app.GetObjectMeta().GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations["kubesphere.io/last-updated"] = time.Now().String()
application.SetAnnotations(annotations)
_, err = v.applicationClient.AppV1beta1().Applications(namespace).Update(context.Background(), application, metav1.UpdateOptions{})
app.SetAnnotations(annotations)
err = r.Update(ctx, app)
if err != nil {
if errors.IsNotFound(err) {
log.V(4).Info("application has been deleted during update")
return nil
klog.V(4).Info("application has been deleted during update")
return reconcile.Result{}, nil
}
log.Error(err, "failed to update application", "namespace", namespace, "name", name)
return err
}
return nil
return reconcile.Result{}, nil
}
func (v *ApplicationController) enqueueObject(obj interface{}) {
var resource = obj.(metav1.Object)
if resource.GetLabels() == nil || !util.IsApplicationComponent(resource.GetLabels()) {
return
}
applicationName := util.GetApplictionName(resource.GetLabels())
if len(applicationName) > 0 {
key := resource.GetNamespace() + "/" + applicationName
v.queue.Add(key)
func isApp(o metav1.Object) bool {
if o.GetLabels() == nil || !util.IsApplicationComponent(o.GetLabels()) {
return false
}
}
func (v *ApplicationController) handleErr(err error, key interface{}) {
if err == nil {
v.queue.Forget(key)
return
}
if v.queue.NumRequeues(key) < maxRetries {
log.V(2).Info("Error syncing virtualservice for service retrying.", "key", key, "error", err)
v.queue.AddRateLimited(key)
return
}
log.V(4).Info("Dropping service out of the queue.", "key", key, "error", err)
v.queue.Forget(key)
utilruntime.HandleError(err)
return true
}