Merge pull request #193 from richardxz/master
update job's "rerun" function
This commit is contained in:
@@ -29,11 +29,16 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/client"
|
||||
)
|
||||
|
||||
var k8sClient *kubernetes.Clientset
|
||||
|
||||
const retryTimes = 3
|
||||
|
||||
func (ctl *JobCtl) generateObject(item v1.Job) *Job {
|
||||
var status, displayName string
|
||||
|
||||
@@ -134,11 +139,13 @@ func (ctl *JobCtl) initListerAndInformer() {
|
||||
|
||||
object := obj.(*v1.Job)
|
||||
mysqlObject := ctl.generateObject(*object)
|
||||
ctl.makeRevision(object)
|
||||
db.Create(mysqlObject)
|
||||
},
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
object := new.(*v1.Job)
|
||||
mysqlObject := ctl.generateObject(*object)
|
||||
ctl.makeRevision(object)
|
||||
db.Save(mysqlObject)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
@@ -186,41 +193,41 @@ func getRevisions(job v1.Job) (JobRevisions, error) {
|
||||
|
||||
err := json.Unmarshal([]byte(revisionsStr), &revisions)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
|
||||
return nil, fmt.Errorf("failed to rerun job %s", job.Name)
|
||||
return nil, fmt.Errorf("failed to get job %s's revisions, reason: %s", job.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return revisions, nil
|
||||
}
|
||||
|
||||
func getStatus(item *v1.Job) JobStatus {
|
||||
var status JobStatus
|
||||
func getCurrentRevision(item *v1.Job) JobRevision {
|
||||
var revision JobRevision
|
||||
for _, condition := range item.Status.Conditions {
|
||||
if condition.Type == "Failed" && condition.Status == "True" {
|
||||
status.Status = Failed
|
||||
status.Reasons = append(status.Reasons, condition.Reason)
|
||||
status.Messages = append(status.Messages, condition.Message)
|
||||
revision.Status = Failed
|
||||
revision.Reasons = append(revision.Reasons, condition.Reason)
|
||||
revision.Messages = append(revision.Messages, condition.Message)
|
||||
}
|
||||
|
||||
if condition.Type == "Complete" && condition.Status == "True" {
|
||||
status.Status = Completed
|
||||
revision.Status = Completed
|
||||
}
|
||||
}
|
||||
|
||||
if len(status.Status) == 0 {
|
||||
status.Status = Unfinished
|
||||
if len(revision.Status) == 0 {
|
||||
revision.Status = Running
|
||||
}
|
||||
|
||||
status.DesirePodNum = *item.Spec.Completions
|
||||
status.Succeed = item.Status.Succeeded
|
||||
status.Failed = item.Status.Failed
|
||||
status.StartTime = item.Status.StartTime.Time
|
||||
revision.DesirePodNum = *item.Spec.Completions
|
||||
revision.Succeed = item.Status.Succeeded
|
||||
revision.Failed = item.Status.Failed
|
||||
revision.StartTime = item.CreationTimestamp.Time
|
||||
revision.Uid = string(item.UID)
|
||||
if item.Status.CompletionTime != nil {
|
||||
status.CompletionTime = item.Status.CompletionTime.Time
|
||||
revision.CompletionTime = item.Status.CompletionTime.Time
|
||||
}
|
||||
|
||||
return status
|
||||
return revision
|
||||
}
|
||||
|
||||
func deleteJob(namespace, job string) error {
|
||||
@@ -229,46 +236,81 @@ func deleteJob(namespace, job string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (ctl *JobCtl) makeRevision(job *v1.Job) {
|
||||
revisionIndex := -1
|
||||
revisions, err := getRevisions(*job)
|
||||
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
uid := job.UID
|
||||
for index, revision := range revisions {
|
||||
if revision.Uid == string(uid) {
|
||||
currentRevision := getCurrentRevision(job)
|
||||
if reflect.DeepEqual(currentRevision, revision) {
|
||||
return
|
||||
} else {
|
||||
revisionIndex = index
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if revisionIndex == -1 {
|
||||
revisionIndex = len(revisions) + 1
|
||||
}
|
||||
|
||||
revisions[revisionIndex] = getCurrentRevision(job)
|
||||
|
||||
revisionsByte, err := json.Marshal(revisions)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
}
|
||||
|
||||
if job.Annotations == nil {
|
||||
job.Annotations = make(map[string]string)
|
||||
}
|
||||
|
||||
job.Annotations["revisions"] = string(revisionsByte)
|
||||
ctl.K8sClient.BatchV1().Jobs(job.Namespace).Update(job)
|
||||
|
||||
}
|
||||
|
||||
func JobReRun(namespace, jobName string) (string, error) {
|
||||
k8sClient = client.NewK8sClient()
|
||||
job, err := k8sClient.BatchV1().Jobs(namespace).Get(jobName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
newJob := *job
|
||||
newJob.ResourceVersion = ""
|
||||
newJob.Status = v1.JobStatus{}
|
||||
newJob.ObjectMeta.UID = ""
|
||||
newJob.Annotations["revisions"] = strings.Replace(job.Annotations["revisions"], Running, Unfinished, -1)
|
||||
|
||||
delete(newJob.Spec.Selector.MatchLabels, "controller-uid")
|
||||
delete(newJob.Spec.Template.ObjectMeta.Labels, "controller-uid")
|
||||
|
||||
revisions, err := getRevisions(*job)
|
||||
|
||||
err = deleteJob(namespace, jobName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
index := len(revisions) + 1
|
||||
value := getStatus(job)
|
||||
revisions[index] = value
|
||||
|
||||
revisionsByte, err := json.Marshal(revisions)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
|
||||
glog.Errorf("failed to rerun job %s, reason: %s", jobName, err)
|
||||
return "", fmt.Errorf("failed to rerun job %s", jobName)
|
||||
}
|
||||
|
||||
newJob.Annotations["revisions"] = string(revisionsByte)
|
||||
|
||||
err = deleteJob(job.Namespace, job.Name)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
|
||||
return "", fmt.Errorf("failed to rerun job %s", jobName)
|
||||
for i := 0; i < retryTimes; i++ {
|
||||
_, err = k8sClient.BatchV1().Jobs(namespace).Create(&newJob)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
_, err = k8sClient.BatchV1().Jobs(namespace).Create(&newJob)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
|
||||
glog.Errorf("failed to rerun job %s, reason: %s", jobName, err)
|
||||
return "", fmt.Errorf("failed to rerun job %s", jobName)
|
||||
}
|
||||
|
||||
|
||||
@@ -284,15 +284,16 @@ type StorageClass struct {
|
||||
Provisioner string `json:"provisioner"`
|
||||
}
|
||||
|
||||
type JobRevisions map[int]JobStatus
|
||||
type JobRevisions map[int]JobRevision
|
||||
|
||||
type JobStatus struct {
|
||||
type JobRevision struct {
|
||||
Status string `json:"status"`
|
||||
Reasons []string `json:"reasons"`
|
||||
Messages []string `json:"messages"`
|
||||
Succeed int32 `json:"succeed"`
|
||||
DesirePodNum int32 `json:"desire"`
|
||||
Failed int32 `json:"failed"`
|
||||
Uid string `json:"uid"`
|
||||
StartTime time.Time `json:"start-time"`
|
||||
CompletionTime time.Time `json:"completion-time"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user