enable controller

Signed-off-by: runzexia <runzexia@yunify.com>
This commit is contained in:
runzexia
2020-03-27 15:20:53 +08:00
parent f827116a1d
commit 443eb3f74c
2 changed files with 35 additions and 21 deletions

View File

@@ -21,8 +21,10 @@ import (
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/controller/application"
"kubesphere.io/kubesphere/pkg/controller/destinationrule"
"kubesphere.io/kubesphere/pkg/controller/devopscredential"
"kubesphere.io/kubesphere/pkg/controller/devopsproject"
"kubesphere.io/kubesphere/pkg/controller/job"
"kubesphere.io/kubesphere/pkg/controller/pipeline"
"kubesphere.io/kubesphere/pkg/controller/s2ibinary"
"kubesphere.io/kubesphere/pkg/controller/s2irun"
"kubesphere.io/kubesphere/pkg/controller/storage/expansion"
@@ -89,6 +91,16 @@ func AddControllers(
informerFactory.KubernetesSharedInformerFactory().Core().V1().Namespaces(),
informerFactory.KubeSphereSharedInformerFactory().Devops().V1alpha3().DevOpsProjects(),
)
devopsPipelineController := pipeline.NewController(client.Kubernetes(),
client.KubeSphere(),
devopsClient,
informerFactory.KubernetesSharedInformerFactory().Core().V1().Namespaces(),
informerFactory.KubeSphereSharedInformerFactory().Devops().V1alpha3().Pipelines())
devopsCredentialController := devopscredential.NewController(client.Kubernetes(),
devopsClient,
informerFactory.KubernetesSharedInformerFactory().Core().V1().Namespaces(),
informerFactory.KubernetesSharedInformerFactory().Core().V1().Secrets())
volumeExpansionController := expansion.NewVolumeExpansionController(
client.Kubernetes(),
@@ -100,14 +112,16 @@ func AddControllers(
kubernetesInformer.Apps().V1().StatefulSets())
controllers := map[string]manager.Runnable{
"virtualservice-controller": vsController,
"destinationrule-controller": drController,
"application-controller": apController,
"job-controller": jobController,
"s2ibinary-controller": s2iBinaryController,
"s2irun-controller": s2iRunController,
"volumeexpansion-controller": volumeExpansionController,
"devopsprojects-controller": devopsProjectController,
"virtualservice-controller": vsController,
"destinationrule-controller": drController,
"application-controller": apController,
"job-controller": jobController,
"s2ibinary-controller": s2iBinaryController,
"s2irun-controller": s2iRunController,
"volumeexpansion-controller": volumeExpansionController,
"devopsprojects-controller": devopsProjectController,
"pipeline-controller": devopsPipelineController,
"devopscredential-controller": devopsCredentialController,
}
for name, ctrl := range controllers {

View File

@@ -61,12 +61,12 @@ func NewController(client clientset.Interface,
klog.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "pipeline-controller"})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "devopscredential-controller"})
v := &Controller{
client: client,
devopsClient: devopsClinet,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pipeline"),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "devopscredential"),
secretLister: secretInformer.Lister(),
secretSynced: secretInformer.Informer().HasSynced,
namespaceLister: namespaceInformer.Lister(),
@@ -79,24 +79,24 @@ func NewController(client clientset.Interface,
secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
secret := obj.(*v1.Secret)
if strings.HasPrefix(string(secret.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
secret, ok := obj.(*v1.Secret)
if ok && strings.HasPrefix(string(secret.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
v.enqueueSecret(obj)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*v1.Secret)
new := newObj.(*v1.Secret)
if old.ResourceVersion == new.ResourceVersion {
old, ook := oldObj.(*v1.Secret)
new, nok := newObj.(*v1.Secret)
if ook && nok && old.ResourceVersion == new.ResourceVersion {
return
}
if strings.HasPrefix(string(new.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
if ook && nok && strings.HasPrefix(string(new.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
v.enqueueSecret(newObj)
}
},
DeleteFunc: func(obj interface{}) {
secret := obj.(*v1.Secret)
if strings.HasPrefix(string(secret.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
secret, ok := obj.(*v1.Secret)
if ok && strings.HasPrefix(string(secret.Type), devopsv1alpha3.DevOpsCredentialPrefix) {
v.enqueueSecret(obj)
}
},
@@ -166,8 +166,8 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
klog.Info("starting pipeline controller")
defer klog.Info("shutting down pipeline controller")
klog.Info("starting devopscredential controller")
defer klog.Info("shutting down devopscredential controller")
if !cache.WaitForCacheSync(stopCh, c.secretSynced) {
return fmt.Errorf("failed to wait for caches to sync")
@@ -182,7 +182,7 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the pipeline resource
// converge the two. It then updates the Status block of the secret resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
nsName, name, err := cache.SplitMetaNamespaceKey(key)