diff --git a/Makefile b/Makefile index f7aad38f9..b0612bd0e 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ # Use of this source code is governed by a Apache license # that can be found in the LICENSE file. -TARG.Name:=kubesphere +TRAG.Name:=kubesphere-apiserver TRAG.Gopkg:=kubesphere.io/kubesphere TRAG.Version:=$(TRAG.Gopkg)/pkg/version @@ -90,7 +90,8 @@ build: fmt mkdir -p ./tmp/bin && cp -r ./install/ ./tmp/ $(call get_build_flags) $(RUN_IN_DOCKER) time go install -ldflags '$(BUILD_FLAG)' $(TRAG.Gopkg)/cmd/... - @docker build -t $(TARG.Name) -f ./Dockerfile.dev ./tmp + mv ./tmp/bin/cmd ./tmp/bin/$(TRAG.Name) + @docker build -t $(TRAG.Name) -f ./Dockerfile.dev ./tmp @docker image prune -f 1>/dev/null 2>&1 @echo "build done" diff --git a/pkg/apis/v1alpha/nodes/nodes_handler.go b/pkg/apis/v1alpha/nodes/nodes_handler.go index 62ed76c90..dd4aa5fc3 100644 --- a/pkg/apis/v1alpha/nodes/nodes_handler.go +++ b/pkg/apis/v1alpha/nodes/nodes_handler.go @@ -38,6 +38,10 @@ func Register(ws *restful.WebService, subPath string) { ws.Route(ws.POST(subPath+"/{nodename}/drainage").To(handleDrainNode).Filter(route.RouteLogging)). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) + + ws.Route(ws.GET(subPath+"/{nodename}/drainage").To(handleDrainStatus).Filter(route.RouteLogging)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) } func handleNodes(request *restful.Request, response *restful.Response) { @@ -84,3 +88,20 @@ func handleDrainNode(request *restful.Request, response *restful.Response) { } } + +func handleDrainStatus(request *restful.Request, response *restful.Response) { + + nodeName := request.PathParameter("nodename") + + result, err := models.DrainStatus(nodeName) + + if err != nil { + + response.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()}) + + } else { + + response.WriteAsJson(result) + + } +} diff --git a/pkg/client/dbclient.go b/pkg/client/dbclient.go index 9695b992c..87d44a66f 100644 --- a/pkg/client/dbclient.go +++ b/pkg/client/dbclient.go @@ -1,12 +1,9 @@ /* Copyright 2018 The KubeSphere Authors. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,13 +14,15 @@ limitations under the License. package client import ( - "github.com/jinzhu/gorm" - //_ "github.com/jinzhu/gorm/dialects/mysql" "fmt" - _ "github.com/go-sql-driver/mysql" "github.com/golang/glog" + "github.com/jinzhu/gorm" + _ "github.com/jinzhu/gorm/dialects/mysql" + "log" + + "kubesphere.io/kubesphere/pkg/logs" "kubesphere.io/kubesphere/pkg/options" ) @@ -34,13 +33,32 @@ const database = "kubesphere" func NewDBClient() *gorm.DB { if dbClient != nil { - return dbClient + err := dbClient.DB().Ping() + if err == nil { + return dbClient + } else { + glog.Error(err) + panic(err) + } } user := options.ServerOptions.GetMysqlUser() passwd := options.ServerOptions.GetMysqlPassword() addr := options.ServerOptions.GetMysqlAddr() - conn := fmt.Sprintf("%s:%s@tcp(%s)/mysql?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr) + if dbClient == nil { + conn := fmt.Sprintf("%s:%s@tcp(%s)/mysql?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr) + db, err := gorm.Open("mysql", conn) + + if err != nil { + glog.Error(err) + panic(err) + } + + db.Exec(fmt.Sprintf("create database if not exists %s;", database)) + db.Close() + } + + conn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr, database) db, err := gorm.Open("mysql", conn) if err != nil { @@ -48,15 +66,7 @@ func NewDBClient() *gorm.DB { panic(err) } - db.Exec(fmt.Sprintf("create database if not exists %s;", database)) - - conn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr, database) - db, err = gorm.Open("mysql", conn) - - if err != nil { - glog.Error(err) - panic(err) - } + db.SetLogger(log.New(logs.GlogWriter{}, " ", 0)) dbClient = db return dbClient } diff --git a/pkg/client/k8sclient.go b/pkg/client/k8sclient.go index b5f31b256..304b42ef3 100644 --- a/pkg/client/k8sclient.go +++ b/pkg/client/k8sclient.go @@ -1,12 +1,9 @@ /* Copyright 2018 The KubeSphere Authors. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,43 +16,18 @@ package client import ( "github.com/golang/glog" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "kubesphere.io/kubesphere/pkg/options" ) var k8sClient *kubernetes.Clientset -func getKubeConfig() (kubeConfig *rest.Config, err error) { - - kubeConfigFile := options.ServerOptions.GetKubeConfigFile() - - if len(kubeConfigFile) > 0 { - - kubeConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfigFile) - if err != nil { - return nil, err - } - - } else { - - kubeConfig, err = rest.InClusterConfig() - if err != nil { - return nil, err - } - } - - return kubeConfig, nil - -} - func NewK8sClient() *kubernetes.Clientset { if k8sClient != nil { return k8sClient } - kubeConfig, err := getKubeConfig() + kubeConfig, err := options.ServerOptions.GetKubeConfig() if err != nil { glog.Error(err) panic(err) diff --git a/pkg/models/components.go b/pkg/models/components.go index a4ae72e3f..ddf55fa6b 100644 --- a/pkg/models/components.go +++ b/pkg/models/components.go @@ -43,7 +43,7 @@ type Components struct { SelfLink string `json:"selfLink"` Label interface{} `json:"label"` HealthStatus string `json:"healthStatus"` - CreateTime time.Time `json:"updateTime"` + CreateTime time.Time `json:"createTime"` } /*** @@ -173,7 +173,6 @@ func GetComponentsByNamespace(ns string) ([]Components, error) { if ns != KUBESYSTEM { option.LabelSelector = "" } - servicelists, err := k8sClient.CoreV1().Services(ns).List(option) if err != nil { diff --git a/pkg/models/controllers/clusterRoles.go b/pkg/models/controllers/clusterRoles.go index 28b308534..1355bb1e2 100644 --- a/pkg/models/controllers/clusterRoles.go +++ b/pkg/models/controllers/clusterRoles.go @@ -17,17 +17,17 @@ limitations under the License. package controllers import ( - "encoding/json" "strings" "time" "github.com/golang/glog" "k8s.io/api/rbac/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) -func (ctl *ClusterRoleCtl) generateObjec(item v1.ClusterRole) *ClusterRole { +func (ctl *ClusterRoleCtl) generateObject(item v1.ClusterRole) *ClusterRole { name := item.Name if strings.HasPrefix(name, "system:") { return nil @@ -38,9 +38,7 @@ func (ctl *ClusterRoleCtl) generateObjec(item v1.ClusterRole) *ClusterRole { createTime = time.Now() } - annotation, _ := json.Marshal(item.Annotations) - - object := &ClusterRole{Name: name, CreateTime: createTime, AnnotationStr: string(annotation)} + object := &ClusterRole{Name: name, CreateTime: createTime, Annotation: Annotation{item.Annotations}} return object } @@ -64,47 +62,48 @@ func (ctl *ClusterRoleCtl) listAndWatch() { db = db.CreateTable(&ClusterRole{}) k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Rbac().V1().ClusterRoles().Informer() + lister := kubeInformerFactory.Rbac().V1().ClusterRoles().Lister() - clusterRoleList, err := k8sClient.RbacV1().ClusterRoles().List(metaV1.ListOptions{}) + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range clusterRoleList.Items { - obj := ctl.generateObjec(item) - if obj != nil { - db.Create(obj) - } + for _, item := range list { + obj := ctl.generateObject(*item) + db.Create(obj) + } - clusterRoleWatcher, err := k8sClient.RbacV1().ClusterRoles().Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-clusterRoleWatcher.ResultChan(): - var role ClusterRole - if event.Object == nil { - panic("watch timeout, restart clusterRole controller") + object := obj.(*v1.ClusterRole) + mysqlObject := ctl.generateObject(*object) + if mysqlObject != nil { + db.Create(mysqlObject) } - object := event.Object.(*v1.ClusterRole) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, "\"\"").Find(&role) - db.Delete(role) - break + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.ClusterRole) + mysqlObject := ctl.generateObject(*object) + if mysqlObject != nil { + db.Save(mysqlObject) } - obj := ctl.generateObjec(*object) - if obj != nil { - db.Save(obj) - } - } - } + }, + DeleteFunc: func(obj interface{}) { + var item ClusterRole + object := obj.(*v1.ClusterRole) + db.Where("name=?", object.Name).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *ClusterRoleCtl) CountWithConditions(conditions string) int { @@ -123,12 +122,6 @@ func (ctl *ClusterRoleCtl) ListWithConditions(conditions string, paging *Paging) listWithConditions(db, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/common.go b/pkg/models/controllers/common.go new file mode 100644 index 000000000..3575ea8bc --- /dev/null +++ b/pkg/models/controllers/common.go @@ -0,0 +1,52 @@ +/* +Copyright 2018 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import "github.com/jinzhu/gorm" + +func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) { + if len(conditions) == 0 { + db.Model(object).Count(total) + } else { + db.Model(object).Where(conditions).Count(total) + } + + if paging != nil { + if len(conditions) > 0 { + db.Where(conditions).Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list) + } else { + db.Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list) + } + + } else { + if len(conditions) > 0 { + db.Where(conditions).Order(order).Find(list) + } else { + db.Order(order).Find(list) + } + } +} + +func countWithConditions(db *gorm.DB, conditions string, object interface{}) int { + var count int + if len(conditions) == 0 { + db.Model(object).Count(&count) + } else { + db.Model(object).Where(conditions).Count(&count) + } + return count +} diff --git a/pkg/models/controllers/daemonsets.go b/pkg/models/controllers/daemonsets.go index 2792b1bae..f2acea614 100644 --- a/pkg/models/controllers/daemonsets.go +++ b/pkg/models/controllers/daemonsets.go @@ -21,14 +21,13 @@ import ( "time" "github.com/golang/glog" - "k8s.io/api/apps/v1beta2" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) -func (ctl *DaemonsetCtl) generateObject(item v1beta2.DaemonSet) *Daemonset { +func (ctl *DaemonsetCtl) generateObject(item v1.DaemonSet) *Daemonset { var app string var status string name := item.Name @@ -53,24 +52,20 @@ func (ctl *DaemonsetCtl) generateObject(item v1beta2.DaemonSet) *Daemonset { } if availablePodNum >= desirePodNum { - status = running + status = Running } else { - status = updating + status = Updating } - annotation, _ := json.Marshal(item.Annotations) - object := &Daemonset{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum, - App: app, CreateTime: createTime, Status: status, NodeSelector: string(nodeSelectorStr), AnnotationStr: string(annotation)} + App: app, CreateTime: createTime, Status: status, NodeSelector: string(nodeSelectorStr), Annotation: Annotation{item.Annotations}} return object } func (ctl *DaemonsetCtl) listAndWatch() { defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { glog.Error(err) return @@ -86,43 +81,45 @@ func (ctl *DaemonsetCtl) listAndWatch() { db = db.CreateTable(&Daemonset{}) - k8sClient := client.NewK8sClient() - deoloyList, err := k8sClient.AppsV1beta2().DaemonSets("").List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Apps().V1().DaemonSets().Informer() + lister := kubeInformerFactory.Apps().V1().DaemonSets().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range deoloyList.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) + } - watcher, err := k8sClient.AppsV1beta2().DaemonSets("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var ss Daemonset - if event.Object == nil { - panic("watch timeout, restart daemonset controller") - } - object := event.Object.(*v1beta2.DaemonSet) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&ss) - db.Delete(ss) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + object := obj.(*v1.DaemonSet) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.DaemonSet) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Daemonset + object := obj.(*v1.DaemonSet) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *DaemonsetCtl) CountWithConditions(conditions string) int { @@ -140,12 +137,6 @@ func (ctl *DaemonsetCtl) ListWithConditions(conditions string, paging *Paging) ( listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/deployments.go b/pkg/models/controllers/deployments.go index 87f3b7892..680c091c8 100644 --- a/pkg/models/controllers/deployments.go +++ b/pkg/models/controllers/deployments.go @@ -17,16 +17,17 @@ limitations under the License. package controllers import ( - "encoding/json" "time" "github.com/golang/glog" - "k8s.io/api/apps/v1beta2" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/api/apps/v1" + "k8s.io/client-go/informers" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" ) -func (ctl *DeploymentCtl) generateObject(item v1beta2.Deployment) *Deployment { +func (ctl *DeploymentCtl) generateObject(item v1.Deployment) *Deployment { var app string var status string var updateTime time.Time @@ -53,19 +54,17 @@ func (ctl *DeploymentCtl) generateObject(item v1beta2.Deployment) *Deployment { } if item.Annotations["state"] == "stop" { - status = stopping + status = Stopped } else { if availablePodNum >= desirePodNum { - status = running + status = Running } else { - status = updating + status = Updating } } - annotation, _ := json.Marshal(item.Annotations) - return &Deployment{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum, - App: app, UpdateTime: updateTime, Status: status, AnnotationStr: string(annotation)} + App: app, UpdateTime: updateTime, Status: status, Annotation: Annotation{item.Annotations}} } func (ctl *DeploymentCtl) listAndWatch() { @@ -85,45 +84,44 @@ func (ctl *DeploymentCtl) listAndWatch() { db = db.CreateTable(&Deployment{}) k8sClient := ctl.K8sClient - deoloyList, err := k8sClient.AppsV1beta2().Deployments("").List(metaV1.ListOptions{}) + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Apps().V1().Deployments().Informer() + lister := kubeInformerFactory.Apps().V1().Deployments().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range deoloyList.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) } - watcher, err := k8sClient.AppsV1beta2().Deployments("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - glog.Error("here") - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): + object := obj.(*v1.Deployment) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.Deployment) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { var deploy Deployment - if event.Object == nil { - panic("watch timeout, restart deployment controller") - } - object := event.Object.(*v1beta2.Deployment) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&deploy) - db.Delete(deploy) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + object := obj.(*v1.Deployment) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&deploy) + db.Delete(deploy) + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *DeploymentCtl) CountWithConditions(conditions string) int { @@ -141,12 +139,6 @@ func (ctl *DeploymentCtl) ListWithConditions(conditions string, paging *Paging) listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/ingresses.go b/pkg/models/controllers/ingresses.go index cd644cefb..67c22029b 100644 --- a/pkg/models/controllers/ingresses.go +++ b/pkg/models/controllers/ingresses.go @@ -17,16 +17,14 @@ limitations under the License. package controllers import ( - "encoding/json" "strings" "time" "github.com/golang/glog" "k8s.io/api/extensions/v1beta1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) func (ctl *IngressCtl) generateObject(item v1beta1.Ingress) *Ingress { @@ -49,15 +47,14 @@ func (ctl *IngressCtl) generateObject(item v1beta1.Ingress) *Ingress { ip = strings.Join(ipList, ",") } - annotation, _ := json.Marshal(item.Annotations) - object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, AnnotationStr: string(annotation)} + object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, Annotation: Annotation{item.Annotations}} return object } func (ctl *IngressCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -73,43 +70,45 @@ func (ctl *IngressCtl) listAndWatch() { db = db.CreateTable(&Ingress{}) - k8sClient := client.NewK8sClient() - list, err := k8sClient.ExtensionsV1beta1().Ingresses("").List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Extensions().V1beta1().Ingresses().Informer() + lister := kubeInformerFactory.Extensions().V1beta1().Ingresses().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range list.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) + } - watcher, err := k8sClient.ExtensionsV1beta1().Ingresses("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var ing Ingress - if event.Object == nil { - panic("watch timeout, restart ingress controller") - } - object := event.Object.(*v1beta1.Ingress) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&ing) - db.Delete(ing) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + object := obj.(*v1beta1.Ingress) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1beta1.Ingress) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Ingress + object := obj.(*v1beta1.Ingress) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *IngressCtl) CountWithConditions(conditions string) int { @@ -127,12 +126,6 @@ func (ctl *IngressCtl) ListWithConditions(conditions string, paging *Paging) (in listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/namespaces.go b/pkg/models/controllers/namespaces.go index f0d013420..50aa4df66 100644 --- a/pkg/models/controllers/namespaces.go +++ b/pkg/models/controllers/namespaces.go @@ -27,8 +27,11 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/resource" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "kubesphere.io/kubesphere/pkg/client" "kubesphere.io/kubesphere/pkg/options" @@ -229,15 +232,14 @@ func (ctl *NamespaceCtl) generateObject(item v1.Namespace) *Namespace { createTime = time.Now() } - annotation, _ := json.Marshal(item.Annotations) - object := &Namespace{Name: name, CreateTime: createTime, Status: status, AnnotationStr: string(annotation)} + object := &Namespace{Name: name, CreateTime: createTime, Status: status, Annotation: Annotation{item.Annotations}} return object } func (ctl *NamespaceCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -252,50 +254,45 @@ func (ctl *NamespaceCtl) listAndWatch() { db = db.CreateTable(&Namespace{}) - k8sClient := client.NewK8sClient() - list, err := k8sClient.CoreV1().Namespaces().List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Core().V1().Namespaces().Informer() + lister := kubeInformerFactory.Core().V1().Namespaces().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range list.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) - ctl.createRoleAndRuntime(item) } - watcher, err := k8sClient.CoreV1().Namespaces().Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var ns Namespace - if event.Object == nil { - panic("watch timeout, restart namespace controller") - } - object := event.Object.(*v1.Namespace) - if event.Type == watch.Deleted { - db.Where("name=?", object.Name).Find(&ns) - db.Delete(ns) + object := obj.(*v1.Namespace) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.Namespace) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Namespace + object := obj.(*v1.Namespace) + db.Where("name=?", object.Name).Find(&item) + db.Delete(item) - ctl.deleteOpRuntime(*object) - break - } + }, + }) - ctl.createRoleAndRuntime(*object) - - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + informer.Run(ctl.stopChan) } func (ctl *NamespaceCtl) CountWithConditions(conditions string) int { @@ -313,11 +310,12 @@ func (ctl *NamespaceCtl) ListWithConditions(conditions string, paging *Paging) ( listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" + for index := range list { + usage, err := ctl.GetNamespaceQuota(list[index].Name) + if err == nil { + list[index].Usaeg = usage + } + } return total, list, nil } @@ -328,3 +326,28 @@ func (ctl *NamespaceCtl) Count(namespace string) int { db.Model(&Namespace{}).Count(&count) return count } + +func getUsage(namespace, resource string) int { + ctl := rec.controllers[resource] + return ctl.Count(namespace) +} + +func (ctl *NamespaceCtl) GetNamespaceQuota(namespace string) (v1.ResourceList, error) { + + usage := make(v1.ResourceList) + + resourceList := []string{Daemonsets, Deployments, Ingresses, Roles, Services, Statefulsets, PersistentVolumeClaim, Pods} + for _, resourceName := range resourceList { + used := getUsage(namespace, resourceName) + var quantity resource.Quantity + quantity.Set(int64(used)) + usage[v1.ResourceName(resourceName)] = quantity + } + + podCtl := rec.controllers[Pods] + var quantity resource.Quantity + used := podCtl.CountWithConditions(fmt.Sprintf("status=\"%s\" And namespace=\"%s\"", "Running", namespace)) + quantity.Set(int64(used)) + usage["runningPods"] = quantity + return usage, nil +} diff --git a/pkg/models/controllers/pods.go b/pkg/models/controllers/pods.go index 666858282..4dc7452f7 100644 --- a/pkg/models/controllers/pods.go +++ b/pkg/models/controllers/pods.go @@ -18,15 +18,75 @@ package controllers import ( "encoding/json" + "time" "github.com/golang/glog" "k8s.io/api/core/v1" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) +const inUse = "in_use_pods" + +func (ctl *PodCtl) addAnnotationToPvc(item v1.Pod) { + volumes := item.Spec.Volumes + for _, volume := range volumes { + pvc := volume.PersistentVolumeClaim + if pvc != nil { + name := pvc.ClaimName + + Pvc, _ := ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Get(name, metaV1.GetOptions{}) + if Pvc.Annotations == nil { + Pvc.Annotations = make(map[string]string) + } + annotation := Pvc.Annotations + if len(annotation[inUse]) == 0 { + pods := []string{item.Name} + str, _ := json.Marshal(pods) + annotation[inUse] = string(str) + } else { + var pods []string + json.Unmarshal([]byte(annotation[inUse]), pods) + for _, pod := range pods { + if pod == item.Name { + return + } + pods = append(pods, item.Name) + str, _ := json.Marshal(pods) + annotation[inUse] = string(str) + } + } + ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Update(Pvc) + } + } +} + +func (ctl *PodCtl) delAnnotationFromPvc(item v1.Pod) { + volumes := item.Spec.Volumes + for _, volume := range volumes { + pvc := volume.PersistentVolumeClaim + if pvc != nil { + name := pvc.ClaimName + Pvc, _ := ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Get(name, metaV1.GetOptions{}) + annotation := Pvc.Annotations + var pods []string + json.Unmarshal([]byte(annotation[inUse]), pods) + + for index, pod := range pods { + if pod == item.Name { + pods = append(pods[:index], pods[index+1:]...) + } + } + + str, _ := json.Marshal(pods) + annotation[inUse] = string(str) + ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Update(Pvc) + } + } +} + func (ctl *PodCtl) generateObject(item v1.Pod) *Pod { name := item.Name namespace := item.Namespace @@ -37,13 +97,15 @@ func (ctl *PodCtl) generateObject(item v1.Pod) *Pod { createTime := item.CreationTimestamp.Time containerStatus := item.Status.ContainerStatuses containerSpecs := item.Spec.Containers - var containers []Container + + var containers Containers for _, containerSpec := range containerSpecs { var container Container container.Name = containerSpec.Name container.Image = containerSpec.Image container.Ports = containerSpec.Ports + container.Resources = containerSpec.Resources for _, status := range containerStatus { if container.Name == status.Name { container.Ready = status.Ready @@ -53,25 +115,13 @@ func (ctl *PodCtl) generateObject(item v1.Pod) *Pod { containers = append(containers, container) } - containerStr, _ := json.Marshal(containers) - - annotation, _ := json.Marshal(item.Annotations) - object := &Pod{Namespace: namespace, Name: name, Node: nodeName, PodIp: podIp, Status: status, NodeIp: nodeIp, - CreateTime: createTime, ContainerStr: string(containerStr), AnnotationStr: string(annotation)} + CreateTime: createTime, Annotation: Annotation{item.Annotations}, Containers: containers} return object } func (ctl *PodCtl) listAndWatch() { - defer func() { - defer close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() - db := ctl.DB if db.HasTable(&Pod{}) { @@ -81,43 +131,47 @@ func (ctl *PodCtl) listAndWatch() { db = db.CreateTable(&Pod{}) - k8sClient := client.NewK8sClient() - list, err := k8sClient.CoreV1().Pods("").List(meta_v1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Core().V1().Pods().Informer() + lister := kubeInformerFactory.Core().V1().Pods().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) - return + panic(err) } - for _, item := range list.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) } - watcher, err := k8sClient.CoreV1().Pods("").Watch(meta_v1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + object := obj.(*v1.Pod) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var po Pod - if event.Object == nil { - panic("watch timeout, restart pod controller") - } - object := event.Object.(*v1.Pod) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&po) - db.Delete(po) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + ctl.addAnnotationToPvc(*object) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.Pod) + mysqlObject := ctl.generateObject(*object) + + db.Save(mysqlObject) + + }, + DeleteFunc: func(obj interface{}) { + var item Pod + object := obj.(*v1.Pod) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + ctl.delAnnotationFromPvc(*object) + db.Delete(item) + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *PodCtl) CountWithConditions(conditions string) int { @@ -135,17 +189,6 @@ func (ctl *PodCtl) ListWithConditions(conditions string, paging *Paging) (int, i listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - var containers []Container - json.Unmarshal([]byte(item.ContainerStr), &containers) - list[index].Containers = containers - list[index].ContainerStr = "" - - annotation := make(Annotation) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/pvcs.go b/pkg/models/controllers/pvcs.go index 8956c61ff..e1eeb3af3 100644 --- a/pkg/models/controllers/pvcs.go +++ b/pkg/models/controllers/pvcs.go @@ -24,10 +24,9 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) const creator = "creator" @@ -60,17 +59,16 @@ func (ctl *PvcCtl) generateObject(item *v1.PersistentVolumeClaim) *Pvc { } accessModeStr = strings.Join(accessModeList, ",") - annotation, _ := json.Marshal(item.Annotations) object := &Pvc{Namespace: namespace, Name: name, Status: status, Capacity: capacity, - AccessMode: accessModeStr, StorageClassName: storageClass, CreateTime: createTime, AnnotationStr: string(annotation)} + AccessMode: accessModeStr, StorageClassName: storageClass, CreateTime: createTime, Annotation: Annotation{item.Annotations}} return object } func (ctl *PvcCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -86,43 +84,44 @@ func (ctl *PvcCtl) listAndWatch() { db = db.CreateTable(&Pvc{}) - k8sClient := client.NewK8sClient() - pvcList, err := k8sClient.CoreV1().PersistentVolumeClaims("").List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Informer() + lister := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range pvcList.Items { - obj := ctl.generateObject(&item) + for _, item := range list { + obj := ctl.generateObject(item) db.Create(obj) + } - watcher, err := k8sClient.CoreV1().PersistentVolumeClaims("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var pvc Pvc - if event.Object == nil { - panic("watch timeout, restart pvc controller") - } - object := event.Object.(*v1.PersistentVolumeClaim) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&pvc) - db.Delete(pvc) - break - } - obj := ctl.generateObject(object) - db.Save(obj) - } - } + object := obj.(*v1.PersistentVolumeClaim) + mysqlObject := ctl.generateObject(object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.PersistentVolumeClaim) + mysqlObject := ctl.generateObject(object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Pvc + object := obj.(*v1.PersistentVolumeClaim) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *PvcCtl) CountWithConditions(conditions string) int { @@ -140,11 +139,17 @@ func (ctl *PvcCtl) ListWithConditions(conditions string, paging *Paging) (int, i listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" + for index := range list { + inUsePods := list[index].Annotation.Values[inUse] + var pods []string + + json.Unmarshal([]byte(inUsePods), &pods) + + if len(pods) > 0 { + list[index].InUse = true + } else { + list[index].InUse = false + } } return total, list, nil diff --git a/pkg/models/controllers/roles.go b/pkg/models/controllers/roles.go index 866b77f05..e99f06057 100644 --- a/pkg/models/controllers/roles.go +++ b/pkg/models/controllers/roles.go @@ -17,16 +17,14 @@ limitations under the License. package controllers import ( - "encoding/json" "strings" "time" "github.com/golang/glog" "k8s.io/api/rbac/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) func (ctl *RoleCtl) generateObject(item v1.Role) *Role { @@ -40,9 +38,7 @@ func (ctl *RoleCtl) generateObject(item v1.Role) *Role { createTime = time.Now() } - annotation, _ := json.Marshal(item.Annotations) - - object := &Role{Namespace: namespace, Name: name, CreateTime: createTime, AnnotationStr: string(annotation)} + object := &Role{Namespace: namespace, Name: name, CreateTime: createTime, Annotation: Annotation{item.Annotations}} return object } @@ -65,49 +61,49 @@ func (ctl *RoleCtl) listAndWatch() { db = db.CreateTable(&Role{}) - k8sClient := client.NewK8sClient() - roleList, err := k8sClient.RbacV1().Roles("").List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Rbac().V1().Roles().Informer() + lister := kubeInformerFactory.Rbac().V1().Roles().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range roleList.Items { - obj := ctl.generateObject(item) - if obj != nil { - db.Create(obj) - } + for _, item := range list { + obj := ctl.generateObject(*item) + db.Create(obj) } - roleWatcher, err := k8sClient.RbacV1().Roles("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-roleWatcher.ResultChan(): - var role Role - if event.Object == nil { - panic("watch timeout, restart role controller") + object := obj.(*v1.Role) + mysqlObject := ctl.generateObject(*object) + if mysqlObject != nil { + db.Create(mysqlObject) } - object := event.Object.(*v1.Role) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&role) - db.Delete(role) - break + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.Role) + mysqlObject := ctl.generateObject(*object) + if mysqlObject != nil { + db.Save(mysqlObject) } - obj := ctl.generateObject(*object) - if obj != nil { - db.Save(obj) - } - break - } - } + }, + DeleteFunc: func(obj interface{}) { + var item Role + object := obj.(*v1.Role) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *RoleCtl) CountWithConditions(conditions string) int { @@ -125,12 +121,6 @@ func (ctl *RoleCtl) ListWithConditions(conditions string, paging *Paging) (int, listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/run.go b/pkg/models/controllers/run.go index cd1a4c96d..db4f0f9fe 100644 --- a/pkg/models/controllers/run.go +++ b/pkg/models/controllers/run.go @@ -28,15 +28,15 @@ import ( type resourceControllers struct { controllers map[string]Controller - db *gorm.DB k8sClient *kubernetes.Clientset } var stopChan chan struct{} +var rec resourceControllers func (rec *resourceControllers) runContoller(name string) { var ctl Controller - attr := CommonAttribute{DB: rec.db, K8sClient: rec.k8sClient, stopChan: stopChan, aliveChan: make(chan struct{})} + attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan, aliveChan: make(chan struct{})} switch name { case Deployments: ctl = &DeploymentCtl{attr} @@ -69,21 +69,39 @@ func (rec *resourceControllers) runContoller(name string) { } +func dbHealthCheck(db *gorm.DB) { + for { + count := 0 + var err error + for k := 0; k < 5; k++ { + err = db.DB().Ping() + if err != nil { + count++ + } + time.Sleep(1 * time.Second) + } + + if count > 3 { + panic(err) + } + } + +} + func Run() { - db := client.NewDBClient() stopChan := make(chan struct{}) - defer db.Commit() - defer db.Close() defer close(stopChan) - rec := resourceControllers{k8sClient: client.NewK8sClient(), db: db, controllers: make(map[string]Controller)} + rec = resourceControllers{k8sClient: client.NewK8sClient(), controllers: make(map[string]Controller)} for _, item := range []string{Deployments, Statefulsets, Daemonsets, PersistentVolumeClaim, Pods, Services, Ingresses, Roles, ClusterRoles, Namespaces, StorageClasses} { rec.runContoller(item) } + go dbHealthCheck(client.NewDBClient()) + for { for ctlName, controller := range rec.controllers { select { diff --git a/pkg/models/controllers/services.go b/pkg/models/controllers/services.go index a20b2cf6c..4a1bb4f32 100644 --- a/pkg/models/controllers/services.go +++ b/pkg/models/controllers/services.go @@ -17,18 +17,16 @@ limitations under the License. package controllers import ( - "encoding/json" "fmt" "strings" "time" "github.com/golang/glog" "k8s.io/api/core/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) const ( @@ -119,16 +117,15 @@ func (ctl *ServiceCtl) generateObject(item v1.Service) *Service { ports = ports[0 : len(ports)-1] } - annotation, _ := json.Marshal(item.Annotations) object := &Service{Namespace: namespace, Name: name, ServiceType: serviceType, ExternalIp: externalIp, - VirtualIp: vip, CreateTime: createTime, Ports: ports, AnnotationStr: string(annotation)} + VirtualIp: vip, CreateTime: createTime, Ports: ports, Annotation: Annotation{item.Annotations}} return object } func (ctl *ServiceCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -143,45 +140,45 @@ func (ctl *ServiceCtl) listAndWatch() { db = db.CreateTable(&Service{}) - k8sClient := client.NewK8sClient() - svcList, err := k8sClient.CoreV1().Services("").List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Core().V1().Services().Informer() + lister := kubeInformerFactory.Core().V1().Services().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range svcList.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) + } - watcher, err := k8sClient.CoreV1().Services("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var svc Service + object := obj.(*v1.Service) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.Service) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Service + object := obj.(*v1.Service) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) - if event.Object == nil { - panic("watch timeout, restart service controller") - } - object := event.Object.(*v1.Service) + }, + }) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&svc) - db.Delete(svc) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + informer.Run(ctl.stopChan) } func (ctl *ServiceCtl) CountWithConditions(conditions string) int { @@ -199,12 +196,6 @@ func (ctl *ServiceCtl) ListWithConditions(conditions string, paging *Paging) (in listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/statefulsets.go b/pkg/models/controllers/statefulsets.go index babc211c2..90bc31be8 100644 --- a/pkg/models/controllers/statefulsets.go +++ b/pkg/models/controllers/statefulsets.go @@ -17,18 +17,17 @@ limitations under the License. package controllers import ( - "encoding/json" "time" "github.com/golang/glog" - "k8s.io/api/apps/v1beta2" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) -func (ctl *StatefulsetCtl) generateObject(item v1beta2.StatefulSet) *Statefulset { +func (ctl *StatefulsetCtl) generateObject(item v1.StatefulSet) *Statefulset { var app string var status string name := item.Name @@ -50,26 +49,24 @@ func (ctl *StatefulsetCtl) generateObject(item v1beta2.StatefulSet) *Statefulset } if item.Annotations["state"] == "stop" { - status = stopping + status = Stopped } else { if availablePodNum >= desirePodNum { - status = running + status = Running } else { - status = updating + status = Updating } } - annotation, _ := json.Marshal(item.Annotations) - statefulSetObject := &Statefulset{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum, - App: app, CreateTime: createTime, Status: status, AnnotationStr: string(annotation)} + App: app, CreateTime: createTime, Status: status, Annotation: Annotation{item.Annotations}} return statefulSetObject } func (ctl *StatefulsetCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -82,42 +79,45 @@ func (ctl *StatefulsetCtl) listAndWatch() { } db = db.CreateTable(&Statefulset{}) - k8sClient := client.NewK8sClient() - deoloyList, err := k8sClient.AppsV1beta2().StatefulSets("").List(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - } + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Apps().V1().StatefulSets().Informer() + lister := kubeInformerFactory.Apps().V1().StatefulSets().Lister() - for _, item := range deoloyList.Items { - obj := ctl.generateObject(item) - db.Create(obj) - } - - watcher, err := k8sClient.AppsV1beta2().StatefulSets("").Watch(metaV1.ListOptions{}) + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var tmp Statefulset - if event.Object == nil { - panic("watch timeout, restart statefulset controller") - } - object := event.Object.(*v1beta2.StatefulSet) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&tmp) - db.Delete(tmp) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } + for _, item := range list { + obj := ctl.generateObject(*item) + db.Create(obj) + } + + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + + object := obj.(*v1.StatefulSet) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.StatefulSet) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Statefulset + object := obj.(*v1.StatefulSet) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *StatefulsetCtl) CountWithConditions(conditions string) int { @@ -135,12 +135,6 @@ func (ctl *StatefulsetCtl) ListWithConditions(conditions string, paging *Paging) listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/storageclasses.go b/pkg/models/controllers/storageclasses.go index 2d07843c2..db7f8a57d 100644 --- a/pkg/models/controllers/storageclasses.go +++ b/pkg/models/controllers/storageclasses.go @@ -17,19 +17,18 @@ limitations under the License. package controllers import ( - "encoding/json" "fmt" "time" "github.com/golang/glog" - "k8s.io/api/storage/v1beta1" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/api/storage/v1" - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) -func (ctl *StorageClassCtl) generateObject(item v1beta1.StorageClass) *StorageClass { +func (ctl *StorageClassCtl) generateObject(item v1.StorageClass) *StorageClass { name := item.Name createTime := item.CreationTimestamp.Time @@ -42,15 +41,14 @@ func (ctl *StorageClassCtl) generateObject(item v1beta1.StorageClass) *StorageCl createTime = time.Now() } - annotation, _ := json.Marshal(item.Annotations) - object := &StorageClass{Name: name, CreateTime: createTime, IsDefault: isDefault, AnnotationStr: string(annotation)} + object := &StorageClass{Name: name, CreateTime: createTime, IsDefault: isDefault, Annotation: Annotation{item.Annotations}} return object } func (ctl *StorageClassCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -65,43 +63,46 @@ func (ctl *StorageClassCtl) listAndWatch() { db = db.CreateTable(&StorageClass{}) - k8sClient := client.NewK8sClient() - list, err := k8sClient.StorageV1beta1().StorageClasses().List(meta_v1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Storage().V1().StorageClasses().Informer() + lister := kubeInformerFactory.Storage().V1().StorageClasses().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range list.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) + } - watcher, err := k8sClient.StorageV1beta1().StorageClasses().Watch(meta_v1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + + object := obj.(*v1.StorageClass) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.StorageClass) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item StorageClass + object := obj.(*v1.StorageClass) + db.Where("name=?", object.Name).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var sc StorageClass - if event.Object == nil { - panic("watch timeout, restart storageClass controller") - } - object := event.Object.(*v1beta1.StorageClass) - if event.Type == watch.Deleted { - db.Where("name=?", object.Name).Find(&sc) - db.Delete(sc) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } } func (ctl *StorageClassCtl) CountWithConditions(conditions string) int { @@ -121,10 +122,6 @@ func (ctl *StorageClassCtl) ListWithConditions(conditions string, paging *Paging for index, storageClass := range list { name := storageClass.Name - annotation := make(map[string]string) - json.Unmarshal([]byte(storageClass.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" pvcCtl := PvcCtl{CommonAttribute{K8sClient: ctl.K8sClient, DB: ctl.DB}} list[index].Count = pvcCtl.CountWithConditions(fmt.Sprintf("storage_class=\"%s\"", name)) diff --git a/pkg/models/controllers/types.go b/pkg/models/controllers/types.go index 5ad55fcb3..f3a764f2c 100644 --- a/pkg/models/controllers/types.go +++ b/pkg/models/controllers/types.go @@ -19,15 +19,20 @@ package controllers import ( "time" + "database/sql/driver" + "encoding/json" + "errors" + "github.com/jinzhu/gorm" "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" ) const ( - stopping = "stopped" - running = "running" - updating = "updating" + resyncCircle = 180 + Stopped = "stopped" + Running = "running" + Updating = "updating" tablePods = "pods" tableDeployments = "deployments" tableDaemonsets = "daemonsets" @@ -53,41 +58,37 @@ const ( StorageClasses = "storage-classes" ) -var ResourceTable = map[string]string{Deployments: tableDeployments, Statefulsets: tableStatefulsets, Daemonsets: tableDaemonsets, - Pods: tablePods, Namespaces: tableNamespaces, Ingresses: tableIngresses, PersistentVolumeClaim: tablePersistentVolumeClaim, Roles: tableRoles, - Services: tableServices, StorageClasses: tableStorageClasses, ClusterRoles: tableClusterRoles} +type Annotation struct { + Values map[string]string `gorm:"type:TEXT"` +} -type Annotation map[string]string +func (annotation *Annotation) Scan(val interface{}) error { + switch val := val.(type) { + case string: + return json.Unmarshal([]byte(val), annotation) + case []byte: + return json.Unmarshal(val, annotation) + default: + return errors.New("not support") + } + return nil +} -// -//func (annotation *Annotation)Scan(val interface{}) error{ -// switch val := val.(type){ -// case string: -// return json.Unmarshal([]byte(val), annotation) -// case []byte: -// return json.Unmarshal(val, annotation) -// default: -// return errors.New("not support") -// } -// return nil -//} -// -//func (annotation *Annotation)Value() (driver.Value, error){ -// bytes, err := json.Marshal(annotation) -// return string(bytes), err -//} +func (annotation Annotation) Value() (driver.Value, error) { + bytes, err := json.Marshal(annotation) + return string(bytes), err +} type Deployment struct { Name string `gorm:"primary_key" json:"name"` Namespace string `gorm:"primary_key" json:"namespace"` App string `json:"app,omitempty"` - Available int32 `json:"available"` - Desire int32 `json:"desire"` - Status string `json:"status"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - UpdateTime time.Time `gorm:"column:updateTime" json:"updateTime,omitempty"` + Available int32 `json:"available"` + Desire int32 `json:"desire"` + Status string `json:"status"` + Annotation Annotation `json:"annotations"` + UpdateTime time.Time `gorm:"column:updateTime" json:"updateTime,omitempty"` } func (Deployment) TableName() string { @@ -99,12 +100,11 @@ type Statefulset struct { Namespace string `gorm:"primary_key" json:"namespace,omitempty"` App string `json:"app,omitempty"` - Available int32 `json:"available"` - Desire int32 `json:"desire"` - Status string `json:"status"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Available int32 `json:"available"` + Desire int32 `json:"desire"` + Status string `json:"status"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } func (Statefulset) TableName() string { @@ -116,13 +116,12 @@ type Daemonset struct { Namespace string `gorm:"primary_key" json:"namespace,omitempty"` App string `json:"app,omitempty"` - Available int32 `json:"available"` - Desire int32 `json:"desire"` - Status string `json:"status"` - NodeSelector string `json:"nodeSelector, omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Available int32 `json:"available"` + Desire int32 `json:"desire"` + Status string `json:"status"` + NodeSelector string `json:"nodeSelector, omitempty"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } func (Daemonset) TableName() string { @@ -137,10 +136,9 @@ type Service struct { VirtualIp string `json:"virtualIp,omitempty"` ExternalIp string `json:"externalIp,omitempty"` - Ports string `json:"ports,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Ports string `json:"ports,omitempty"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } func (Service) TableName() string { @@ -153,10 +151,10 @@ type Pvc struct { Status string `json:"status,omitempty"` Capacity string `json:"capacity,omitempty"` AccessMode string `json:"accessMode,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` + Annotation Annotation `json:"annotations"` CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` StorageClassName string `gorm:"column:storage_class" json:"storage_class,omitempty"` + InUse bool `gorm:"-" json:"inUse"` } func (Pvc) TableName() string { @@ -168,8 +166,7 @@ type Ingress struct { Namespace string `gorm:"primary_key" json:"namespace"` Ip string `json:"ip,omitempty"` TlsTermination string `json:"tlsTermination,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` + Annotation Annotation `json:"annotations"` CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } @@ -178,24 +175,41 @@ func (Ingress) TableName() string { } type Pod struct { - Name string `gorm:"primary_key" json:"name"` - Namespace string `gorm:"primary_key" json:"namespace"` - Status string `json:"status,omitempty"` - Node string `json:"node,omitempty"` - NodeIp string `json:"nodeIp,omitempty"` - PodIp string `json:"podIp,omitempty"` - ContainerStr string `gorm:"type:text" json:",omitempty"` - Containers []Container `json:"containers,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Name string `gorm:"primary_key" json:"name"` + Namespace string `gorm:"primary_key" json:"namespace"` + Status string `json:"status,omitempty"` + Node string `json:"node,omitempty"` + NodeIp string `json:"nodeIp,omitempty"` + PodIp string `json:"podIp,omitempty"` + Containers Containers `gorm:"type:text" json:"containers,omitempty"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } type Container struct { - Name string `json:"name"` - Ready bool `json:"ready,omitempty"` - Image string `json:"image"` - Ports []v1.ContainerPort `json:"ports"` + Name string `json:"name"` + Ready bool `json:"ready,omitempty"` + Image string `json:"image"` + Resources v1.ResourceRequirements `json:"resources"` + Ports []v1.ContainerPort `json:"ports"` +} +type Containers []Container + +func (containers *Containers) Scan(val interface{}) error { + switch val := val.(type) { + case string: + return json.Unmarshal([]byte(val), containers) + case []byte: + return json.Unmarshal(val, containers) + default: + return errors.New("not support") + } + return nil +} + +func (containers Containers) Value() (driver.Value, error) { + bytes, err := json.Marshal(containers) + return string(bytes), err } func (Pod) TableName() string { @@ -203,11 +217,10 @@ func (Pod) TableName() string { } type Role struct { - Name string `gorm:"primary_key" json:"name"` - Namespace string `gorm:"primary_key" json:"namespace"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Name string `gorm:"primary_key" json:"name"` + Namespace string `gorm:"primary_key" json:"namespace"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } func (Role) TableName() string { @@ -215,10 +228,9 @@ func (Role) TableName() string { } type ClusterRole struct { - Name string `gorm:"primary_key" json:"name"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Name string `gorm:"primary_key" json:"name"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } func (ClusterRole) TableName() string { @@ -230,10 +242,10 @@ type Namespace struct { Creator string `json:"creator,omitempty"` Status string `json:"status"` - Descrition string `json:"description,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Descrition string `json:"description,omitempty"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Usaeg v1.ResourceList `gorm:"-" json:"usage,omitempty"` } func (Namespace) TableName() string { @@ -241,13 +253,12 @@ func (Namespace) TableName() string { } type StorageClass struct { - Name string `gorm:"primary_key" json:"name"` - Creator string `json:"creator,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` - IsDefault bool `json:"default"` - Count int `json:"count"` + Name string `gorm:"primary_key" json:"name"` + Creator string `json:"creator,omitempty"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + IsDefault bool `json:"default"` + Count int `json:"count"` } func (StorageClass) TableName() string { @@ -262,7 +273,8 @@ type Controller interface { listAndWatch() chanStop() chan struct{} chanAlive() chan struct{} - Count(conditions string) int + Count(namespace string) int + CountWithConditions(condition string) int ListWithConditions(condition string, paging *Paging) (int, interface{}, error) } @@ -326,36 +338,3 @@ type RoleCtl struct { type ClusterRoleCtl struct { CommonAttribute } - -func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) { - if len(conditions) == 0 { - db.Model(object).Count(total) - } else { - db.Model(object).Where(conditions).Count(total) - } - - if paging != nil { - if len(conditions) > 0 { - db.Where(conditions).Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list) - } else { - db.Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list) - } - - } else { - if len(conditions) > 0 { - db.Where(conditions).Order(order).Find(list) - } else { - db.Order(order).Find(list) - } - } -} - -func countWithConditions(db *gorm.DB, conditions string, object interface{}) int { - var count int - if len(conditions) == 0 { - db.Model(object).Count(&count) - } else { - db.Model(object).Where(conditions).Count(&count) - } - return count -} diff --git a/pkg/models/nodes.go b/pkg/models/nodes.go index cfe006e80..2f99153a9 100644 --- a/pkg/models/nodes.go +++ b/pkg/models/nodes.go @@ -19,15 +19,19 @@ package models import ( "encoding/json" "fmt" + "math" "strconv" "strings" + "time" "github.com/golang/glog" - v1beta2 "k8s.io/api/apps/v1beta2" + "k8s.io/api/apps/v1beta2" "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - types "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "kubesphere.io/kubesphere/pkg/client" kubeclient "kubesphere.io/kubesphere/pkg/client" @@ -48,6 +52,8 @@ const ( KubeletReady = "Ready" ) +const GracePeriods = 900 + type ResultNode struct { NodeName string `json:"node_name"` NodeStatus string `json:"node_status"` @@ -221,6 +227,33 @@ func getNodeFileSystemStatus(node *v1.Node) (string, string, string) { func DrainNode(nodename string) (msg constants.MessageResponse, err error) { + k8sclient := kubeclient.NewK8sClient() + node, err := k8sclient.CoreV1().Nodes().Get(nodename, metav1.GetOptions{}) + if err != nil { + glog.Fatal(err) + return msg, err + } + + if node.Spec.Unschedulable { + glog.Info(node.Spec.Unschedulable) + msg.Message = fmt.Sprintf("node %s have been drained", nodename) + return msg, nil + } + + data := []byte(" {\"spec\":{\"unschedulable\":true}}") + nodestatus, err := k8sclient.CoreV1().Nodes().Patch(nodename, types.StrategicMergePatchType, data) + glog.Info(nodestatus) + + if err != nil { + glog.Fatal(err) + return msg, err + } + msg.Message = "success" + return msg, nil +} + +func DrainStatus(nodename string) (msg constants.MessageResponse, err error) { + k8sclient := kubeclient.NewK8sClient() var options metav1.ListOptions pods := make([]v1.Pod, 0) @@ -241,56 +274,74 @@ func DrainNode(nodename string) (msg constants.MessageResponse, err error) { return msg, err } + // remove mirror pod static pod if len(podList.Items) > 0 { for _, pod := range podList.Items { if !containDaemonset(pod, *daemonsetList) { + //static or mirror pod + if isStaticPod(&pod) || isMirrorPod(&pod) { + + continue + + } else { + + pods = append(pods, pod) + + } - pods = append(pods, pod) } + } } - //create eviction - var eviction policy.Eviction - eviction.Kind = "Eviction" - eviction.APIVersion = "policy/v1beta1" - if len(pods) > 0 { - - for _, pod := range pods { - - eviction.Namespace = pod.Namespace - eviction.Name = pod.Name - err := k8sclient.CoreV1().Pods(pod.Namespace).Evict(&eviction) - if err != nil { - return msg, err - } - } - } - - data := []byte(" {\"spec\":{\"unschedulable\":true}}") - nodestatus, err := k8sclient.CoreV1().Nodes().Patch(nodename, types.StrategicMergePatchType, data) - - if err != nil { - - glog.Fatal(err) - return msg, err - - } - - if nodestatus.Spec.Unschedulable { + if len(pods) == 0 { msg.Message = fmt.Sprintf("success") + return msg, nil } else { - glog.Fatal(err) - return msg, err + //create eviction + getPodFn := func(namespace, name string) (*v1.Pod, error) { + k8sclient := kubeclient.NewK8sClient() + return k8sclient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) + } + evicerr := evictPods(pods, 900, getPodFn) + + if evicerr == nil { + + msg.Message = fmt.Sprintf("success") + return msg, nil + + } else { + + glog.Info(evicerr) + msg.Message = evicerr.Error() + return msg, nil + } } - return msg, nil +} +func getPodSource(pod *v1.Pod) (string, error) { + if pod.Annotations != nil { + if source, ok := pod.Annotations["kubernetes.io/config.source"]; ok { + return source, nil + } + } + return "", fmt.Errorf("cannot get source of pod %q", pod.UID) +} + +func isStaticPod(pod *v1.Pod) bool { + source, err := getPodSource(pod) + return err == nil && source != "api" +} + +func isMirrorPod(pod *v1.Pod) bool { + _, ok := pod.Annotations[v1.MirrorPodAnnotationKey] + return ok } func containDaemonset(pod v1.Pod, daemonsetList v1beta2.DaemonSetList) bool { @@ -307,3 +358,119 @@ func containDaemonset(pod v1.Pod, daemonsetList v1beta2.DaemonSetList) bool { return flag } + +func evictPod(pod v1.Pod, GracePeriodSeconds int) error { + + k8sclient := kubeclient.NewK8sClient() + deleteOptions := &metav1.DeleteOptions{} + if GracePeriodSeconds >= 0 { + gracePeriodSeconds := int64(GracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriodSeconds + } + + var eviction policy.Eviction + eviction.Kind = "Eviction" + eviction.APIVersion = "policy/v1beta1" + eviction.Namespace = pod.Namespace + eviction.Name = pod.Name + eviction.DeleteOptions = deleteOptions + err := k8sclient.CoreV1().Pods(pod.Namespace).Evict(&eviction) + if err != nil { + return err + } + + return nil +} + +func evictPods(pods []v1.Pod, GracePeriodSeconds int, getPodFn func(namespace, name string) (*v1.Pod, error)) error { + doneCh := make(chan bool, len(pods)) + errCh := make(chan error, 1) + + for _, pod := range pods { + go func(pod v1.Pod, doneCh chan bool, errCh chan error) { + var err error + for { + err = evictPod(pod, GracePeriodSeconds) + if err == nil { + break + } else if apierrors.IsNotFound(err) { + doneCh <- true + glog.Info(fmt.Sprintf("pod %s evict", pod.Name)) + return + } else if apierrors.IsTooManyRequests(err) { + time.Sleep(5 * time.Second) + } else { + errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) + return + } + } + + podArray := []v1.Pod{pod} + _, err = waitForDelete(podArray, time.Second, time.Duration(math.MaxInt64), getPodFn) + if err == nil { + doneCh <- true + glog.Info(fmt.Sprintf("pod %s delete", pod.Name)) + } else { + errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) + } + }(pod, doneCh, errCh) + } + + Timeout := GracePeriods * power(10, 9) + doneCount := 0 + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = time.Duration(Timeout) + } + for { + select { + case err := <-errCh: + return err + case <-doneCh: + doneCount++ + if doneCount == len(pods) { + return nil + } + case <-time.After(globalTimeout): + return fmt.Errorf("Drain did not complete within %v, please check node status in a few minutes", globalTimeout) + } + } +} + +func waitForDelete(pods []v1.Pod, interval, timeout time.Duration, getPodFn func(string, string) (*v1.Pod, error)) ([]v1.Pod, error) { + + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + pendingPods := []v1.Pod{} + for i, pod := range pods { + p, err := getPodFn(pod.Namespace, pod.Name) + if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { + continue + } else if err != nil { + return false, err + } else { + pendingPods = append(pendingPods, pods[i]) + } + } + pods = pendingPods + if len(pendingPods) > 0 { + return false, nil + } + return true, nil + }) + return pods, err +} + +func power(x int64, n int) int64 { + + var res int64 = 1 + for n != 0 { + res *= x + n-- + } + + return res + +} diff --git a/pkg/models/resources.go b/pkg/models/resources.go index f5fea4ebc..1d80a69b6 100644 --- a/pkg/models/resources.go +++ b/pkg/models/resources.go @@ -134,24 +134,31 @@ func generateConditionAndPaging(conditions map[string]string, paging map[string] } type workLoadStatus struct { - NameSpace string `json:"namespace"` - Count map[string]int `json:"data"` - Items map[string]interface{} + NameSpace string `json:"namespace"` + Count map[string]int `json:"data"` + Items map[string]interface{} `json:"items,omitempty"` } func GetNamespacesResourceStatus(namespace string) (*workLoadStatus, error) { res := workLoadStatus{Count: make(map[string]int), NameSpace: namespace, Items: make(map[string]interface{})} + var status *ResourceList + var err error for _, resource := range []string{controllers.Deployments, controllers.Statefulsets, controllers.Daemonsets} { - status, err := ListResource(resource, "status=updating", "") + if len(namespace) > 0 { + status, err = ListResource(resource, fmt.Sprintf("status=%s,namespace=%s", controllers.Updating, namespace), "") + } else { + status, err = ListResource(resource, fmt.Sprintf("status=%s", controllers.Updating), "") + } + if err != nil { return nil, err } count := status.Total - items := status.Items + //items := status.Items res.Count[resource] = count - res.Items[resource] = items + //res.Items[resource] = items } return &res, nil diff --git a/pkg/models/storage.go b/pkg/models/storage.go index c61e4cd25..bb4d900ba 100644 --- a/pkg/models/storage.go +++ b/pkg/models/storage.go @@ -47,11 +47,12 @@ func GetPvcListBySc(storageclass string) (res []v12.PersistentVolumeClaim, err e // Select persistent volume claims which // storage class name is equal to the specific storage class. for _, claim := range claimList.Items { - if claim.Spec.StorageClassName != nil && - *claim.Spec.StorageClassName == storageclass { + if claim.Spec.StorageClassName != nil { + if *claim.Spec.StorageClassName == storageclass { + res = append(res, claim) + } + } else if claim.GetAnnotations()[v12.BetaStorageClassAnnotation] == storageclass { res = append(res, claim) - } else { - continue } } return res, nil diff --git a/pkg/models/terminal.go b/pkg/models/terminal.go index 45ed71bc3..95c0d1f99 100644 --- a/pkg/models/terminal.go +++ b/pkg/models/terminal.go @@ -11,6 +11,10 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// +// the code is mainly from: +// https://github.com/kubernetes/dashboard/blob/master/src/app/backend/handler/terminal.go +// thanks to the related developer package models