diff --git a/pkg/models/controllers/jobs.go b/pkg/models/controllers/jobs.go index 9e93a1e2a..cb3f95a0d 100644 --- a/pkg/models/controllers/jobs.go +++ b/pkg/models/controllers/jobs.go @@ -29,11 +29,16 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "reflect" + "strings" + "kubesphere.io/kubesphere/pkg/client" ) var k8sClient *kubernetes.Clientset +const retryTimes = 3 + func (ctl *JobCtl) generateObject(item v1.Job) *Job { var status, displayName string @@ -134,11 +139,13 @@ func (ctl *JobCtl) initListerAndInformer() { object := obj.(*v1.Job) mysqlObject := ctl.generateObject(*object) + ctl.makeRevision(object) db.Create(mysqlObject) }, UpdateFunc: func(old, new interface{}) { object := new.(*v1.Job) mysqlObject := ctl.generateObject(*object) + ctl.makeRevision(object) db.Save(mysqlObject) }, DeleteFunc: func(obj interface{}) { @@ -186,41 +193,41 @@ func getRevisions(job v1.Job) (JobRevisions, error) { err := json.Unmarshal([]byte(revisionsStr), &revisions) if err != nil { - glog.Errorf("failed to rerun job %s, reason: %s", err, err) - return nil, fmt.Errorf("failed to rerun job %s", job.Name) + return nil, fmt.Errorf("failed to get job %s's revisions, reason: %s", job.Name, err) } } return revisions, nil } -func getStatus(item *v1.Job) JobStatus { - var status JobStatus +func getCurrentRevision(item *v1.Job) JobRevision { + var revision JobRevision for _, condition := range item.Status.Conditions { if condition.Type == "Failed" && condition.Status == "True" { - status.Status = Failed - status.Reasons = append(status.Reasons, condition.Reason) - status.Messages = append(status.Messages, condition.Message) + revision.Status = Failed + revision.Reasons = append(revision.Reasons, condition.Reason) + revision.Messages = append(revision.Messages, condition.Message) } if condition.Type == "Complete" && condition.Status == "True" { - status.Status = Completed + revision.Status = Completed } } - if len(status.Status) == 0 { - status.Status = Unfinished + if len(revision.Status) == 0 { + revision.Status = Running } - status.DesirePodNum = *item.Spec.Completions - status.Succeed = item.Status.Succeeded - status.Failed = item.Status.Failed - status.StartTime = item.Status.StartTime.Time + revision.DesirePodNum = *item.Spec.Completions + revision.Succeed = item.Status.Succeeded + revision.Failed = item.Status.Failed + revision.StartTime = item.CreationTimestamp.Time + revision.Uid = string(item.UID) if item.Status.CompletionTime != nil { - status.CompletionTime = item.Status.CompletionTime.Time + revision.CompletionTime = item.Status.CompletionTime.Time } - return status + return revision } func deleteJob(namespace, job string) error { @@ -229,46 +236,81 @@ func deleteJob(namespace, job string) error { return err } +func (ctl *JobCtl) makeRevision(job *v1.Job) { + revisionIndex := -1 + revisions, err := getRevisions(*job) + + if err != nil { + glog.Error(err) + return + } + + uid := job.UID + for index, revision := range revisions { + if revision.Uid == string(uid) { + currentRevision := getCurrentRevision(job) + if reflect.DeepEqual(currentRevision, revision) { + return + } else { + revisionIndex = index + break + } + } + } + + if revisionIndex == -1 { + revisionIndex = len(revisions) + 1 + } + + revisions[revisionIndex] = getCurrentRevision(job) + + revisionsByte, err := json.Marshal(revisions) + if err != nil { + glog.Error(err) + } + + if job.Annotations == nil { + job.Annotations = make(map[string]string) + } + + job.Annotations["revisions"] = string(revisionsByte) + ctl.K8sClient.BatchV1().Jobs(job.Namespace).Update(job) + +} + func JobReRun(namespace, jobName string) (string, error) { k8sClient = client.NewK8sClient() job, err := k8sClient.BatchV1().Jobs(namespace).Get(jobName, metav1.GetOptions{}) if err != nil { return "", err } + newJob := *job newJob.ResourceVersion = "" newJob.Status = v1.JobStatus{} newJob.ObjectMeta.UID = "" + newJob.Annotations["revisions"] = strings.Replace(job.Annotations["revisions"], Running, Unfinished, -1) + delete(newJob.Spec.Selector.MatchLabels, "controller-uid") delete(newJob.Spec.Template.ObjectMeta.Labels, "controller-uid") - revisions, err := getRevisions(*job) - + err = deleteJob(namespace, jobName) if err != nil { - return "", err - } - - index := len(revisions) + 1 - value := getStatus(job) - revisions[index] = value - - revisionsByte, err := json.Marshal(revisions) - if err != nil { - glog.Errorf("failed to rerun job %s, reason: %s", err, err) + glog.Errorf("failed to rerun job %s, reason: %s", jobName, err) return "", fmt.Errorf("failed to rerun job %s", jobName) } - newJob.Annotations["revisions"] = string(revisionsByte) - - err = deleteJob(job.Namespace, job.Name) - if err != nil { - glog.Errorf("failed to rerun job %s, reason: %s", err, err) - return "", fmt.Errorf("failed to rerun job %s", jobName) + for i := 0; i < retryTimes; i++ { + _, err = k8sClient.BatchV1().Jobs(namespace).Create(&newJob) + if err != nil { + time.Sleep(time.Second) + continue + } + break } - _, err = k8sClient.BatchV1().Jobs(namespace).Create(&newJob) if err != nil { - glog.Errorf("failed to rerun job %s, reason: %s", err, err) + glog.Errorf("failed to rerun job %s, reason: %s", jobName, err) return "", fmt.Errorf("failed to rerun job %s", jobName) } diff --git a/pkg/models/controllers/types.go b/pkg/models/controllers/types.go index 35ef692a2..d73d4bc07 100644 --- a/pkg/models/controllers/types.go +++ b/pkg/models/controllers/types.go @@ -284,15 +284,16 @@ type StorageClass struct { Provisioner string `json:"provisioner"` } -type JobRevisions map[int]JobStatus +type JobRevisions map[int]JobRevision -type JobStatus struct { +type JobRevision struct { Status string `json:"status"` Reasons []string `json:"reasons"` Messages []string `json:"messages"` Succeed int32 `json:"succeed"` DesirePodNum int32 `json:"desire"` Failed int32 `json:"failed"` + Uid string `json:"uid"` StartTime time.Time `json:"start-time"` CompletionTime time.Time `json:"completion-time"` }