refactor workspace api

This commit is contained in:
hongming
2018-10-25 14:55:16 +08:00
parent a8d5f552a0
commit 70065d430d
30 changed files with 2805 additions and 1249 deletions

View File

@@ -0,0 +1,62 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
)
func (ctl *ClusterRoleBindingCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *ClusterRoleBindingCtl) sync(stopChan chan struct{}) {
ctl.initListerAndInformer()
ctl.informer.Run(stopChan)
}
func (ctl *ClusterRoleBindingCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
return len(list)
}
func (ctl *ClusterRoleBindingCtl) initListerAndInformer() {
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Rbac().V1().ClusterRoleBindings().Lister()
ctl.informer = informerFactory.Rbac().V1().ClusterRoleBindings().Informer()
}
func (ctl *ClusterRoleBindingCtl) CountWithConditions(conditions string) int {
return 0
}
func (ctl *ClusterRoleBindingCtl) ListWithConditions(conditions string, paging *Paging, order string) (int, interface{}, error) {
return 0, nil, errors.New("not implement")
}
func (ctl *ClusterRoleBindingCtl) Lister() interface{} {
return ctl.lister
}

View File

@@ -36,7 +36,7 @@ func (ctl *ClusterRoleCtl) generateObject(item v1.ClusterRole) *ClusterRole {
}
name := item.Name
if strings.HasPrefix(name, systemPrefix) {
if strings.HasPrefix(name, systemPrefix) || item.Annotations == nil || len(item.Annotations[creator]) == 0 {
return nil
}
@@ -74,7 +74,9 @@ func (ctl *ClusterRoleCtl) sync(stopChan chan struct{}) {
for _, item := range list {
obj := ctl.generateObject(*item)
if obj != nil {
db.Create(obj)
if err := db.Create(obj).Error; err != nil {
glog.Error("cluster roles sync error", err)
}
}
}
@@ -111,14 +113,18 @@ func (ctl *ClusterRoleCtl) initListerAndInformer() {
object := obj.(*v1.ClusterRole)
mysqlObject := ctl.generateObject(*object)
if mysqlObject != nil {
db.Create(mysqlObject)
if err := db.Create(mysqlObject).Error; err != nil {
glog.Error("cluster roles sync error", err)
}
}
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.ClusterRole)
mysqlObject := ctl.generateObject(*object)
if mysqlObject != nil {
db.Save(mysqlObject)
if err := db.Save(mysqlObject).Error; err != nil {
glog.Error("cluster roles update error", err)
}
}
},
DeleteFunc: func(obj interface{}) {

View File

@@ -29,11 +29,16 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"reflect"
"strings"
"kubesphere.io/kubesphere/pkg/client"
)
var k8sClient *kubernetes.Clientset
const retryTimes = 3
func (ctl *JobCtl) generateObject(item v1.Job) *Job {
var status, displayName string
@@ -134,11 +139,13 @@ func (ctl *JobCtl) initListerAndInformer() {
object := obj.(*v1.Job)
mysqlObject := ctl.generateObject(*object)
ctl.makeRevision(object)
db.Create(mysqlObject)
},
UpdateFunc: func(old, new interface{}) {
object := new.(*v1.Job)
mysqlObject := ctl.generateObject(*object)
ctl.makeRevision(object)
db.Save(mysqlObject)
},
DeleteFunc: func(obj interface{}) {
@@ -186,41 +193,41 @@ func getRevisions(job v1.Job) (JobRevisions, error) {
err := json.Unmarshal([]byte(revisionsStr), &revisions)
if err != nil {
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
return nil, fmt.Errorf("failed to rerun job %s", job.Name)
return nil, fmt.Errorf("failed to get job %s's revisions, reason: %s", job.Name, err)
}
}
return revisions, nil
}
func getStatus(item *v1.Job) JobStatus {
var status JobStatus
func getCurrentRevision(item *v1.Job) JobRevision {
var revision JobRevision
for _, condition := range item.Status.Conditions {
if condition.Type == "Failed" && condition.Status == "True" {
status.Status = Failed
status.Reasons = append(status.Reasons, condition.Reason)
status.Messages = append(status.Messages, condition.Message)
revision.Status = Failed
revision.Reasons = append(revision.Reasons, condition.Reason)
revision.Messages = append(revision.Messages, condition.Message)
}
if condition.Type == "Complete" && condition.Status == "True" {
status.Status = Completed
revision.Status = Completed
}
}
if len(status.Status) == 0 {
status.Status = Unfinished
if len(revision.Status) == 0 {
revision.Status = Running
}
status.DesirePodNum = *item.Spec.Completions
status.Succeed = item.Status.Succeeded
status.Failed = item.Status.Failed
status.StartTime = item.Status.StartTime.Time
revision.DesirePodNum = *item.Spec.Completions
revision.Succeed = item.Status.Succeeded
revision.Failed = item.Status.Failed
revision.StartTime = item.CreationTimestamp.Time
revision.Uid = string(item.UID)
if item.Status.CompletionTime != nil {
status.CompletionTime = item.Status.CompletionTime.Time
revision.CompletionTime = item.Status.CompletionTime.Time
}
return status
return revision
}
func deleteJob(namespace, job string) error {
@@ -229,46 +236,81 @@ func deleteJob(namespace, job string) error {
return err
}
func (ctl *JobCtl) makeRevision(job *v1.Job) {
revisionIndex := -1
revisions, err := getRevisions(*job)
if err != nil {
glog.Error(err)
return
}
uid := job.UID
for index, revision := range revisions {
if revision.Uid == string(uid) {
currentRevision := getCurrentRevision(job)
if reflect.DeepEqual(currentRevision, revision) {
return
} else {
revisionIndex = index
break
}
}
}
if revisionIndex == -1 {
revisionIndex = len(revisions) + 1
}
revisions[revisionIndex] = getCurrentRevision(job)
revisionsByte, err := json.Marshal(revisions)
if err != nil {
glog.Error(err)
}
if job.Annotations == nil {
job.Annotations = make(map[string]string)
}
job.Annotations["revisions"] = string(revisionsByte)
ctl.K8sClient.BatchV1().Jobs(job.Namespace).Update(job)
}
func JobReRun(namespace, jobName string) (string, error) {
k8sClient = client.NewK8sClient()
job, err := k8sClient.BatchV1().Jobs(namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
return "", err
}
newJob := *job
newJob.ResourceVersion = ""
newJob.Status = v1.JobStatus{}
newJob.ObjectMeta.UID = ""
newJob.Annotations["revisions"] = strings.Replace(job.Annotations["revisions"], Running, Unfinished, -1)
delete(newJob.Spec.Selector.MatchLabels, "controller-uid")
delete(newJob.Spec.Template.ObjectMeta.Labels, "controller-uid")
revisions, err := getRevisions(*job)
err = deleteJob(namespace, jobName)
if err != nil {
return "", err
}
index := len(revisions) + 1
value := getStatus(job)
revisions[index] = value
revisionsByte, err := json.Marshal(revisions)
if err != nil {
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
glog.Errorf("failed to rerun job %s, reason: %s", jobName, err)
return "", fmt.Errorf("failed to rerun job %s", jobName)
}
newJob.Annotations["revisions"] = string(revisionsByte)
err = deleteJob(job.Namespace, job.Name)
if err != nil {
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
return "", fmt.Errorf("failed to rerun job %s", jobName)
for i := 0; i < retryTimes; i++ {
_, err = k8sClient.BatchV1().Jobs(namespace).Create(&newJob)
if err != nil {
time.Sleep(time.Second)
continue
}
break
}
_, err = k8sClient.BatchV1().Jobs(namespace).Create(&newJob)
if err != nil {
glog.Errorf("failed to rerun job %s, reason: %s", err, err)
glog.Errorf("failed to rerun job %s, reason: %s", jobName, err)
return "", fmt.Errorf("failed to rerun job %s", jobName)
}

View File

@@ -147,9 +147,9 @@ func (ctl *NamespaceCtl) createDefaultRoleBinding(ns, user string) error {
}
func (ctl *NamespaceCtl) createDefaultRole(ns string) error {
adminRole := &rbac.Role{ObjectMeta: metaV1.ObjectMeta{Name: admin, Namespace: ns}, Rules: adminRules}
editorRole := &rbac.Role{ObjectMeta: metaV1.ObjectMeta{Name: editor, Namespace: ns}, Rules: editorRules}
viewerRole := &rbac.Role{ObjectMeta: metaV1.ObjectMeta{Name: viewer, Namespace: ns}, Rules: viewerRules}
adminRole := &rbac.Role{ObjectMeta: metaV1.ObjectMeta{Name: admin, Namespace: ns, Annotations: map[string]string{"creator": "system"}}, Rules: adminRules}
editorRole := &rbac.Role{ObjectMeta: metaV1.ObjectMeta{Name: editor, Namespace: ns, Annotations: map[string]string{"creator": "system"}}, Rules: editorRules}
viewerRole := &rbac.Role{ObjectMeta: metaV1.ObjectMeta{Name: viewer, Namespace: ns, Annotations: map[string]string{"creator": "system"}}, Rules: viewerRules}
_, err := ctl.K8sClient.RbacV1().Roles(ns).Create(adminRole)

View File

@@ -38,8 +38,12 @@ func (ctl *PvcCtl) generateObject(item *v1.PersistentVolumeClaim) *Pvc {
name := item.Name
namespace := item.Namespace
status := fmt.Sprintf("%s", item.Status.Phase)
createTime := item.CreationTimestamp.Time
status := fmt.Sprintf("%s", item.Status.Phase)
if item.DeletionTimestamp != nil {
status = "Terminating"
}
var capacity, storageClass, accessModeStr string
if createTime.IsZero() {

View File

@@ -0,0 +1,64 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
)
func (ctl *RoleBindingCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *RoleBindingCtl) sync(stopChan chan struct{}) {
ctl.initListerAndInformer()
ctl.informer.Run(stopChan)
}
func (ctl *RoleBindingCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
return len(list)
}
func (ctl *RoleBindingCtl) initListerAndInformer() {
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Rbac().V1().RoleBindings().Lister()
ctl.informer = informerFactory.Rbac().V1().RoleBindings().Informer()
}
func (ctl *RoleBindingCtl) CountWithConditions(conditions string) int {
return 0
}
func (ctl *RoleBindingCtl) ListWithConditions(conditions string, paging *Paging, order string) (int, interface{}, error) {
return 0, nil, errors.New("not implement")
}
func (ctl *RoleBindingCtl) Lister() interface{} {
return ctl.lister
}

View File

@@ -30,12 +30,12 @@ import (
func (ctl *RoleCtl) generateObject(item v1.Role) *Role {
var displayName string
if item.Annotations != nil && len(item.Annotations[DisplayName]) > 0 {
if item.Annotations != nil && len(item.Annotations[DisplayName]) == 0 {
displayName = item.Annotations[DisplayName]
}
name := item.Name
if strings.HasPrefix(name, systemPrefix) {
if strings.HasPrefix(name, systemPrefix) || item.Annotations == nil || len(item.Annotations[creator]) == 0 {
return nil
}
namespace := item.Namespace

View File

@@ -37,7 +37,7 @@ type resourceControllers struct {
var ResourceControllers resourceControllers
func (rec *resourceControllers) runContoller(name string, stopChan chan struct{}, wg *sync.WaitGroup) {
func (rec *resourceControllers) runController(name string, stopChan chan struct{}, wg *sync.WaitGroup) {
var ctl Controller
attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan,
aliveChan: make(chan struct{}), Name: name}
@@ -78,6 +78,10 @@ func (rec *resourceControllers) runContoller(name string, stopChan chan struct{}
ctl = &ConfigMapCtl{CommonAttribute: attr}
case Secrets:
ctl = &SecretCtl{CommonAttribute: attr}
case ClusterRoleBindings:
ctl = &ClusterRoleBindingCtl{CommonAttribute: attr}
case RoleBindings:
ctl = &RoleBindingCtl{CommonAttribute: attr}
default:
return
}
@@ -116,9 +120,9 @@ func Run(stopChan chan struct{}, wg *sync.WaitGroup) {
ResourceControllers = resourceControllers{k8sClient: k8sClient, Controllers: make(map[string]Controller)}
for _, item := range []string{Deployments, Statefulsets, Daemonsets, PersistentVolumeClaim, Pods, Services,
Ingresses, Roles, ClusterRoles, Namespaces, StorageClasses, Jobs, Cronjobs, Nodes, Replicasets,
Ingresses, Roles, RoleBindings, ClusterRoles, ClusterRoleBindings, Namespaces, StorageClasses, Jobs, Cronjobs, Nodes, Replicasets,
ControllerRevisions, ConfigMaps, Secrets} {
ResourceControllers.runContoller(item, stopChan, wg)
ResourceControllers.runController(item, stopChan, wg)
}
go dbHealthCheck(client.NewDBClient())
@@ -131,7 +135,7 @@ func Run(stopChan chan struct{}, wg *sync.WaitGroup) {
case _, isClose := <-controller.chanAlive():
if !isClose {
glog.Errorf("controller %s have stopped, restart it", ctlName)
ResourceControllers.runContoller(ctlName, stopChan, wg)
ResourceControllers.runController(ctlName, stopChan, wg)
}
default:
time.Sleep(3 * time.Second)

View File

@@ -95,7 +95,7 @@ func generateSvcObject(item v1.Service) *Service {
createTime = time.Now()
}
if len(item.Spec.ClusterIP) == 0 {
if len(item.Spec.ClusterIP) == 0 || item.Spec.ClusterIP == "None" {
if len(item.Spec.Selector) == 0 {
serviceType = "Headless(Selector)"
}

View File

@@ -49,6 +49,7 @@ const (
Warning = "warning"
Error = "error"
DisplayName = "displayName"
creator = "creator"
Pods = "pods"
Deployments = "deployments"
@@ -58,7 +59,9 @@ const (
Ingresses = "ingresses"
PersistentVolumeClaim = "persistent-volume-claims"
Roles = "roles"
RoleBindings = "role-bindings"
ClusterRoles = "cluster-roles"
ClusterRoleBindings = "cluster-role-bindings"
Services = "services"
StorageClasses = "storage-classes"
Applications = "applications"
@@ -284,15 +287,16 @@ type StorageClass struct {
Provisioner string `json:"provisioner"`
}
type JobRevisions map[int]JobStatus
type JobRevisions map[int]JobRevision
type JobStatus struct {
type JobRevision struct {
Status string `json:"status"`
Reasons []string `json:"reasons"`
Messages []string `json:"messages"`
Succeed int32 `json:"succeed"`
DesirePodNum int32 `json:"desire"`
Failed int32 `json:"failed"`
Uid string `json:"uid"`
StartTime time.Time `json:"start-time"`
CompletionTime time.Time `json:"completion-time"`
}
@@ -462,6 +466,17 @@ type ClusterRoleCtl struct {
CommonAttribute
}
type ClusterRoleBindingCtl struct {
lister rbacV1.ClusterRoleBindingLister
informer cache.SharedIndexInformer
CommonAttribute
}
type RoleBindingCtl struct {
lister rbacV1.RoleBindingLister
informer cache.SharedIndexInformer
CommonAttribute
}
type JobCtl struct {
lister batchv1.JobLister
informer cache.SharedIndexInformer