diff --git a/cmd/controller-manager/app/controllers.go b/cmd/controller-manager/app/controllers.go index f62d496b3..b7e3d564b 100644 --- a/cmd/controller-manager/app/controllers.go +++ b/cmd/controller-manager/app/controllers.go @@ -22,6 +22,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "kubesphere.io/kubesphere/pkg/controller/destinationrule" + "kubesphere.io/kubesphere/pkg/controller/job" "kubesphere.io/kubesphere/pkg/controller/virtualservice" "sigs.k8s.io/controller-runtime/pkg/manager" "time" @@ -77,6 +78,8 @@ func AddControllers(mgr manager.Manager, cfg *rest.Config, stopCh <-chan struct{ kubeClient, istioclient) + jobController := job.NewJobController(informerFactory.Batch().V1().Jobs(), kubeClient) + servicemeshinformer.Start(stopCh) istioInformer.Start(stopCh) informerFactory.Start(stopCh) @@ -84,6 +87,7 @@ func AddControllers(mgr manager.Manager, cfg *rest.Config, stopCh <-chan struct{ controllers := map[string]manager.Runnable{ "virtualservice-controller": vsController, "destinationrule-controller": drController, + "job-controller": jobController, } for name, ctrl := range controllers { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go new file mode 100644 index 000000000..4e730f908 --- /dev/null +++ b/pkg/controller/job/job_controller.go @@ -0,0 +1,299 @@ +/* + + Copyright 2019 The KubeSphere Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ +package job + +import ( + "encoding/json" + "fmt" + "github.com/golang/glog" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + batchv1informers "k8s.io/client-go/informers/batch/v1" + batchv1listers "k8s.io/client-go/listers/batch/v1" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/util/metrics" + "reflect" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "time" +) + +const ( + // maxRetries is the number of times a service will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the + // sequence of delays between successive queuings of a service. + // + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 + revisionsAnnotationKey = "revisions" +) + +var log = logf.Log.WithName("job-controller") + +type JobController struct { + client clientset.Interface + eventBroadcaster record.EventBroadcaster + eventRecorder record.EventRecorder + + jobLister batchv1listers.JobLister + jobSynced cache.InformerSynced + + queue workqueue.RateLimitingInterface + + workerLoopPeriod time.Duration +} + +func NewJobController(jobInformer batchv1informers.JobInformer, client clientset.Interface) *JobController { + + if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", client.CoreV1().RESTClient().GetRateLimiter()) + } + + v := &JobController{ + client: client, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "job"), + workerLoopPeriod: time.Second, + } + + v.jobLister = jobInformer.Lister() + v.jobSynced = jobInformer.Informer().HasSynced + + jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + v.enqueueJob(obj) + }, + UpdateFunc: func(old, cur interface{}) { + v.enqueueJob(cur) + }, + }) + + return v + +} + +func (v *JobController) Start(stopCh <-chan struct{}) error { + v.Run(5, stopCh) + + return nil +} + +func (v *JobController) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer v.queue.ShutDown() + + log.Info("starting job controller") + defer log.Info("shutting down job controller") + + if !controller.WaitForCacheSync("job-controller", stopCh, v.jobSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.Until(v.worker, v.workerLoopPeriod, stopCh) + } + + <-stopCh +} + +func (v *JobController) enqueueJob(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + v.queue.Add(key) +} + +func (v *JobController) worker() { + for v.processNextWorkItem() { + + } +} + +func (v *JobController) processNextWorkItem() bool { + eKey, quit := v.queue.Get() + if quit { + return false + } + + defer v.queue.Done(eKey) + + err := v.syncJob(eKey.(string)) + v.handleErr(err, eKey) + + return true +} + +// main function of the reconcile for job +// job's name is same with the service that created it +func (v *JobController) syncJob(key string) error { + startTime := time.Now() + defer func() { + log.V(4).Info("Finished syncing job.", "key", key, "duration", time.Since(startTime)) + }() + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + job, err := v.jobLister.Jobs(namespace).Get(name) + if err != nil { + // has been deleted + if errors.IsNotFound(err) { + return nil + } + log.Error(err, "get job failed", "namespace", namespace, "name", name) + return err + } + + err = v.makeRevision(job) + + if err != nil { + log.Error(err, "make job revision failed", "namespace", namespace, "name", name) + } + + return nil +} + +// When a job is added, figure out which service it will be used +// and enqueue it. obj must have *batchv1.Job type +func (v *JobController) addJob(obj interface{}) { + deploy := obj.(*batchv1.Job) + + v.queue.Add(deploy.Name) + + return +} + +func (v *JobController) handleErr(err error, key interface{}) { + if err != nil { + v.queue.Forget(key) + return + } + + if v.queue.NumRequeues(key) < maxRetries { + log.V(2).Info("Error syncing job, retrying.", "key", key, "error", err) + v.queue.AddRateLimited(key) + return + } + + log.V(4).Info("Dropping job out of the queue", "key", key, "error", err) + v.queue.Forget(key) + utilruntime.HandleError(err) +} + +func (v *JobController) makeRevision(job *batchv1.Job) error { + revisionIndex := -1 + revisions, err := v.getRevisions(job) + + // failed get revisions + if err != nil { + return nil + } + + uid := job.UID + for index, revision := range revisions { + if revision.Uid == string(uid) { + currentRevision := v.getCurrentRevision(job) + if reflect.DeepEqual(currentRevision, revision) { + return nil + } else { + revisionIndex = index + break + } + } + } + + if revisionIndex == -1 { + revisionIndex = len(revisions) + 1 + } + + revisions[revisionIndex] = v.getCurrentRevision(job) + + revisionsByte, err := json.Marshal(revisions) + if err != nil { + glog.Error("generate reversion string failed", err) + return nil + } + + if job.Annotations == nil { + job.Annotations = make(map[string]string) + } + + job.Annotations[revisionsAnnotationKey] = string(revisionsByte) + _, err = v.client.BatchV1().Jobs(job.Namespace).Update(job) + + if err != nil { + return err + } + return nil +} + +func (v *JobController) getRevisions(job *batchv1.Job) (JobRevisions, error) { + revisions := make(JobRevisions) + + if revisionsStr := job.Annotations[revisionsAnnotationKey]; revisionsStr != "" { + err := json.Unmarshal([]byte(revisionsStr), &revisions) + if err != nil { + return nil, fmt.Errorf("failed to get job %s's revisions, reason: %s", job.Name, err) + } + } + + return revisions, nil +} + +func (v *JobController) getCurrentRevision(item *batchv1.Job) JobRevision { + var revision JobRevision + for _, condition := range item.Status.Conditions { + if condition.Type == batchv1.JobFailed && condition.Status == v1.ConditionTrue { + revision.Status = Failed + revision.Reasons = append(revision.Reasons, condition.Reason) + revision.Messages = append(revision.Messages, condition.Message) + } + + if condition.Type == batchv1.JobComplete && condition.Status == v1.ConditionTrue { + revision.Status = Completed + } + } + + if len(revision.Status) == 0 { + revision.Status = Running + } + + if item.Spec.Completions != nil { + revision.DesirePodNum = *item.Spec.Completions + } + + revision.Succeed = item.Status.Succeeded + revision.Failed = item.Status.Failed + revision.StartTime = item.CreationTimestamp.Time + revision.Uid = string(item.UID) + if item.Status.CompletionTime != nil { + revision.CompletionTime = item.Status.CompletionTime.Time + } + + return revision +} diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go new file mode 100644 index 000000000..1643c6613 --- /dev/null +++ b/pkg/controller/job/job_controller_test.go @@ -0,0 +1,18 @@ +/* + + Copyright 2019 The KubeSphere Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ +package job diff --git a/pkg/controller/job/types.go b/pkg/controller/job/types.go new file mode 100644 index 000000000..999df657a --- /dev/null +++ b/pkg/controller/job/types.go @@ -0,0 +1,42 @@ +/* + + Copyright 2019 The KubeSphere Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ +package job + +import "time" + +const ( + Running = "running" + Failed = "failed" + Unfinished = "unfinished" + Completed = "completed" + Pause = "pause" +) + +type JobRevisions map[int]JobRevision + +type JobRevision struct { + Status string `json:"status"` + Reasons []string `json:"reasons,omitempty"` + Messages []string `json:"messages,omitempty"` + Succeed int32 `json:"succeed,omitempty"` + DesirePodNum int32 `json:"desire,omitempty"` + Failed int32 `json:"failed,omitempty"` + Uid string `json:"uid"` + StartTime time.Time `json:"start-time,omitempty"` + CompletionTime time.Time `json:"completion-time,omitempty"` +} diff --git a/pkg/models/iam/am.go b/pkg/models/iam/am.go index aef40bd5f..6e22357de 100644 --- a/pkg/models/iam/am.go +++ b/pkg/models/iam/am.go @@ -671,16 +671,13 @@ func CreateClusterRoleBinding(username string, clusterRoleName string) error { maxRetries := 3 for i := 0; i < maxRetries; i++ { _, err = k8s.Client().RbacV1().ClusterRoleBindings().Create(clusterRoleBinding) - if apierrors.IsAlreadyExists(err) { - time.Sleep(300 * time.Millisecond) - continue - } - if err != nil { - glog.Errorln("create cluster role binding", err) - return err + if err == nil { + return nil } + time.Sleep(300 * time.Millisecond) } - return nil + glog.Errorln("create cluster role binding", err) + return err } if !k8sutil.ContainsUser(found.Subjects, username) {