diff --git a/pkg/client/dbclient.go b/pkg/client/dbclient.go index 9695b992c..87d44a66f 100644 --- a/pkg/client/dbclient.go +++ b/pkg/client/dbclient.go @@ -1,12 +1,9 @@ /* Copyright 2018 The KubeSphere Authors. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,13 +14,15 @@ limitations under the License. package client import ( - "github.com/jinzhu/gorm" - //_ "github.com/jinzhu/gorm/dialects/mysql" "fmt" - _ "github.com/go-sql-driver/mysql" "github.com/golang/glog" + "github.com/jinzhu/gorm" + _ "github.com/jinzhu/gorm/dialects/mysql" + "log" + + "kubesphere.io/kubesphere/pkg/logs" "kubesphere.io/kubesphere/pkg/options" ) @@ -34,13 +33,32 @@ const database = "kubesphere" func NewDBClient() *gorm.DB { if dbClient != nil { - return dbClient + err := dbClient.DB().Ping() + if err == nil { + return dbClient + } else { + glog.Error(err) + panic(err) + } } user := options.ServerOptions.GetMysqlUser() passwd := options.ServerOptions.GetMysqlPassword() addr := options.ServerOptions.GetMysqlAddr() - conn := fmt.Sprintf("%s:%s@tcp(%s)/mysql?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr) + if dbClient == nil { + conn := fmt.Sprintf("%s:%s@tcp(%s)/mysql?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr) + db, err := gorm.Open("mysql", conn) + + if err != nil { + glog.Error(err) + panic(err) + } + + db.Exec(fmt.Sprintf("create database if not exists %s;", database)) + db.Close() + } + + conn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr, database) db, err := gorm.Open("mysql", conn) if err != nil { @@ -48,15 +66,7 @@ func NewDBClient() *gorm.DB { panic(err) } - db.Exec(fmt.Sprintf("create database if not exists %s;", database)) - - conn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", user, passwd, addr, database) - db, err = gorm.Open("mysql", conn) - - if err != nil { - glog.Error(err) - panic(err) - } + db.SetLogger(log.New(logs.GlogWriter{}, " ", 0)) dbClient = db return dbClient } diff --git a/pkg/client/k8sclient.go b/pkg/client/k8sclient.go index b5f31b256..304b42ef3 100644 --- a/pkg/client/k8sclient.go +++ b/pkg/client/k8sclient.go @@ -1,12 +1,9 @@ /* Copyright 2018 The KubeSphere Authors. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,43 +16,18 @@ package client import ( "github.com/golang/glog" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "kubesphere.io/kubesphere/pkg/options" ) var k8sClient *kubernetes.Clientset -func getKubeConfig() (kubeConfig *rest.Config, err error) { - - kubeConfigFile := options.ServerOptions.GetKubeConfigFile() - - if len(kubeConfigFile) > 0 { - - kubeConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfigFile) - if err != nil { - return nil, err - } - - } else { - - kubeConfig, err = rest.InClusterConfig() - if err != nil { - return nil, err - } - } - - return kubeConfig, nil - -} - func NewK8sClient() *kubernetes.Clientset { if k8sClient != nil { return k8sClient } - kubeConfig, err := getKubeConfig() + kubeConfig, err := options.ServerOptions.GetKubeConfig() if err != nil { glog.Error(err) panic(err) diff --git a/pkg/models/controllers/clusterRoles.go b/pkg/models/controllers/clusterRoles.go index 28b308534..1355bb1e2 100644 --- a/pkg/models/controllers/clusterRoles.go +++ b/pkg/models/controllers/clusterRoles.go @@ -17,17 +17,17 @@ limitations under the License. package controllers import ( - "encoding/json" "strings" "time" "github.com/golang/glog" "k8s.io/api/rbac/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) -func (ctl *ClusterRoleCtl) generateObjec(item v1.ClusterRole) *ClusterRole { +func (ctl *ClusterRoleCtl) generateObject(item v1.ClusterRole) *ClusterRole { name := item.Name if strings.HasPrefix(name, "system:") { return nil @@ -38,9 +38,7 @@ func (ctl *ClusterRoleCtl) generateObjec(item v1.ClusterRole) *ClusterRole { createTime = time.Now() } - annotation, _ := json.Marshal(item.Annotations) - - object := &ClusterRole{Name: name, CreateTime: createTime, AnnotationStr: string(annotation)} + object := &ClusterRole{Name: name, CreateTime: createTime, Annotation: Annotation{item.Annotations}} return object } @@ -64,47 +62,48 @@ func (ctl *ClusterRoleCtl) listAndWatch() { db = db.CreateTable(&ClusterRole{}) k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Rbac().V1().ClusterRoles().Informer() + lister := kubeInformerFactory.Rbac().V1().ClusterRoles().Lister() - clusterRoleList, err := k8sClient.RbacV1().ClusterRoles().List(metaV1.ListOptions{}) + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range clusterRoleList.Items { - obj := ctl.generateObjec(item) - if obj != nil { - db.Create(obj) - } + for _, item := range list { + obj := ctl.generateObject(*item) + db.Create(obj) + } - clusterRoleWatcher, err := k8sClient.RbacV1().ClusterRoles().Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-clusterRoleWatcher.ResultChan(): - var role ClusterRole - if event.Object == nil { - panic("watch timeout, restart clusterRole controller") + object := obj.(*v1.ClusterRole) + mysqlObject := ctl.generateObject(*object) + if mysqlObject != nil { + db.Create(mysqlObject) } - object := event.Object.(*v1.ClusterRole) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, "\"\"").Find(&role) - db.Delete(role) - break + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.ClusterRole) + mysqlObject := ctl.generateObject(*object) + if mysqlObject != nil { + db.Save(mysqlObject) } - obj := ctl.generateObjec(*object) - if obj != nil { - db.Save(obj) - } - } - } + }, + DeleteFunc: func(obj interface{}) { + var item ClusterRole + object := obj.(*v1.ClusterRole) + db.Where("name=?", object.Name).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *ClusterRoleCtl) CountWithConditions(conditions string) int { @@ -123,12 +122,6 @@ func (ctl *ClusterRoleCtl) ListWithConditions(conditions string, paging *Paging) listWithConditions(db, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/common.go b/pkg/models/controllers/common.go new file mode 100644 index 000000000..3575ea8bc --- /dev/null +++ b/pkg/models/controllers/common.go @@ -0,0 +1,52 @@ +/* +Copyright 2018 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import "github.com/jinzhu/gorm" + +func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) { + if len(conditions) == 0 { + db.Model(object).Count(total) + } else { + db.Model(object).Where(conditions).Count(total) + } + + if paging != nil { + if len(conditions) > 0 { + db.Where(conditions).Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list) + } else { + db.Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list) + } + + } else { + if len(conditions) > 0 { + db.Where(conditions).Order(order).Find(list) + } else { + db.Order(order).Find(list) + } + } +} + +func countWithConditions(db *gorm.DB, conditions string, object interface{}) int { + var count int + if len(conditions) == 0 { + db.Model(object).Count(&count) + } else { + db.Model(object).Where(conditions).Count(&count) + } + return count +} diff --git a/pkg/models/controllers/daemonsets.go b/pkg/models/controllers/daemonsets.go index 2792b1bae..f2acea614 100644 --- a/pkg/models/controllers/daemonsets.go +++ b/pkg/models/controllers/daemonsets.go @@ -21,14 +21,13 @@ import ( "time" "github.com/golang/glog" - "k8s.io/api/apps/v1beta2" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) -func (ctl *DaemonsetCtl) generateObject(item v1beta2.DaemonSet) *Daemonset { +func (ctl *DaemonsetCtl) generateObject(item v1.DaemonSet) *Daemonset { var app string var status string name := item.Name @@ -53,24 +52,20 @@ func (ctl *DaemonsetCtl) generateObject(item v1beta2.DaemonSet) *Daemonset { } if availablePodNum >= desirePodNum { - status = running + status = Running } else { - status = updating + status = Updating } - annotation, _ := json.Marshal(item.Annotations) - object := &Daemonset{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum, - App: app, CreateTime: createTime, Status: status, NodeSelector: string(nodeSelectorStr), AnnotationStr: string(annotation)} + App: app, CreateTime: createTime, Status: status, NodeSelector: string(nodeSelectorStr), Annotation: Annotation{item.Annotations}} return object } func (ctl *DaemonsetCtl) listAndWatch() { defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { glog.Error(err) return @@ -86,43 +81,45 @@ func (ctl *DaemonsetCtl) listAndWatch() { db = db.CreateTable(&Daemonset{}) - k8sClient := client.NewK8sClient() - deoloyList, err := k8sClient.AppsV1beta2().DaemonSets("").List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Apps().V1().DaemonSets().Informer() + lister := kubeInformerFactory.Apps().V1().DaemonSets().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range deoloyList.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) + } - watcher, err := k8sClient.AppsV1beta2().DaemonSets("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var ss Daemonset - if event.Object == nil { - panic("watch timeout, restart daemonset controller") - } - object := event.Object.(*v1beta2.DaemonSet) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&ss) - db.Delete(ss) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + object := obj.(*v1.DaemonSet) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.DaemonSet) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Daemonset + object := obj.(*v1.DaemonSet) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *DaemonsetCtl) CountWithConditions(conditions string) int { @@ -140,12 +137,6 @@ func (ctl *DaemonsetCtl) ListWithConditions(conditions string, paging *Paging) ( listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/deployments.go b/pkg/models/controllers/deployments.go index 87f3b7892..680c091c8 100644 --- a/pkg/models/controllers/deployments.go +++ b/pkg/models/controllers/deployments.go @@ -17,16 +17,17 @@ limitations under the License. package controllers import ( - "encoding/json" "time" "github.com/golang/glog" - "k8s.io/api/apps/v1beta2" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/api/apps/v1" + "k8s.io/client-go/informers" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" ) -func (ctl *DeploymentCtl) generateObject(item v1beta2.Deployment) *Deployment { +func (ctl *DeploymentCtl) generateObject(item v1.Deployment) *Deployment { var app string var status string var updateTime time.Time @@ -53,19 +54,17 @@ func (ctl *DeploymentCtl) generateObject(item v1beta2.Deployment) *Deployment { } if item.Annotations["state"] == "stop" { - status = stopping + status = Stopped } else { if availablePodNum >= desirePodNum { - status = running + status = Running } else { - status = updating + status = Updating } } - annotation, _ := json.Marshal(item.Annotations) - return &Deployment{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum, - App: app, UpdateTime: updateTime, Status: status, AnnotationStr: string(annotation)} + App: app, UpdateTime: updateTime, Status: status, Annotation: Annotation{item.Annotations}} } func (ctl *DeploymentCtl) listAndWatch() { @@ -85,45 +84,44 @@ func (ctl *DeploymentCtl) listAndWatch() { db = db.CreateTable(&Deployment{}) k8sClient := ctl.K8sClient - deoloyList, err := k8sClient.AppsV1beta2().Deployments("").List(metaV1.ListOptions{}) + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Apps().V1().Deployments().Informer() + lister := kubeInformerFactory.Apps().V1().Deployments().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range deoloyList.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) } - watcher, err := k8sClient.AppsV1beta2().Deployments("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - glog.Error("here") - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): + object := obj.(*v1.Deployment) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.Deployment) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { var deploy Deployment - if event.Object == nil { - panic("watch timeout, restart deployment controller") - } - object := event.Object.(*v1beta2.Deployment) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&deploy) - db.Delete(deploy) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + object := obj.(*v1.Deployment) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&deploy) + db.Delete(deploy) + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *DeploymentCtl) CountWithConditions(conditions string) int { @@ -141,12 +139,6 @@ func (ctl *DeploymentCtl) ListWithConditions(conditions string, paging *Paging) listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/ingresses.go b/pkg/models/controllers/ingresses.go index cd644cefb..67c22029b 100644 --- a/pkg/models/controllers/ingresses.go +++ b/pkg/models/controllers/ingresses.go @@ -17,16 +17,14 @@ limitations under the License. package controllers import ( - "encoding/json" "strings" "time" "github.com/golang/glog" "k8s.io/api/extensions/v1beta1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) func (ctl *IngressCtl) generateObject(item v1beta1.Ingress) *Ingress { @@ -49,15 +47,14 @@ func (ctl *IngressCtl) generateObject(item v1beta1.Ingress) *Ingress { ip = strings.Join(ipList, ",") } - annotation, _ := json.Marshal(item.Annotations) - object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, AnnotationStr: string(annotation)} + object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, Annotation: Annotation{item.Annotations}} return object } func (ctl *IngressCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -73,43 +70,45 @@ func (ctl *IngressCtl) listAndWatch() { db = db.CreateTable(&Ingress{}) - k8sClient := client.NewK8sClient() - list, err := k8sClient.ExtensionsV1beta1().Ingresses("").List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Extensions().V1beta1().Ingresses().Informer() + lister := kubeInformerFactory.Extensions().V1beta1().Ingresses().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range list.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) + } - watcher, err := k8sClient.ExtensionsV1beta1().Ingresses("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var ing Ingress - if event.Object == nil { - panic("watch timeout, restart ingress controller") - } - object := event.Object.(*v1beta1.Ingress) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&ing) - db.Delete(ing) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + object := obj.(*v1beta1.Ingress) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1beta1.Ingress) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Ingress + object := obj.(*v1beta1.Ingress) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *IngressCtl) CountWithConditions(conditions string) int { @@ -127,12 +126,6 @@ func (ctl *IngressCtl) ListWithConditions(conditions string, paging *Paging) (in listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/namespaces.go b/pkg/models/controllers/namespaces.go index f0d013420..50aa4df66 100644 --- a/pkg/models/controllers/namespaces.go +++ b/pkg/models/controllers/namespaces.go @@ -27,8 +27,11 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/resource" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "kubesphere.io/kubesphere/pkg/client" "kubesphere.io/kubesphere/pkg/options" @@ -229,15 +232,14 @@ func (ctl *NamespaceCtl) generateObject(item v1.Namespace) *Namespace { createTime = time.Now() } - annotation, _ := json.Marshal(item.Annotations) - object := &Namespace{Name: name, CreateTime: createTime, Status: status, AnnotationStr: string(annotation)} + object := &Namespace{Name: name, CreateTime: createTime, Status: status, Annotation: Annotation{item.Annotations}} return object } func (ctl *NamespaceCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -252,50 +254,45 @@ func (ctl *NamespaceCtl) listAndWatch() { db = db.CreateTable(&Namespace{}) - k8sClient := client.NewK8sClient() - list, err := k8sClient.CoreV1().Namespaces().List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Core().V1().Namespaces().Informer() + lister := kubeInformerFactory.Core().V1().Namespaces().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range list.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) - ctl.createRoleAndRuntime(item) } - watcher, err := k8sClient.CoreV1().Namespaces().Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var ns Namespace - if event.Object == nil { - panic("watch timeout, restart namespace controller") - } - object := event.Object.(*v1.Namespace) - if event.Type == watch.Deleted { - db.Where("name=?", object.Name).Find(&ns) - db.Delete(ns) + object := obj.(*v1.Namespace) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.Namespace) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Namespace + object := obj.(*v1.Namespace) + db.Where("name=?", object.Name).Find(&item) + db.Delete(item) - ctl.deleteOpRuntime(*object) - break - } + }, + }) - ctl.createRoleAndRuntime(*object) - - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + informer.Run(ctl.stopChan) } func (ctl *NamespaceCtl) CountWithConditions(conditions string) int { @@ -313,11 +310,12 @@ func (ctl *NamespaceCtl) ListWithConditions(conditions string, paging *Paging) ( listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" + for index := range list { + usage, err := ctl.GetNamespaceQuota(list[index].Name) + if err == nil { + list[index].Usaeg = usage + } + } return total, list, nil } @@ -328,3 +326,28 @@ func (ctl *NamespaceCtl) Count(namespace string) int { db.Model(&Namespace{}).Count(&count) return count } + +func getUsage(namespace, resource string) int { + ctl := rec.controllers[resource] + return ctl.Count(namespace) +} + +func (ctl *NamespaceCtl) GetNamespaceQuota(namespace string) (v1.ResourceList, error) { + + usage := make(v1.ResourceList) + + resourceList := []string{Daemonsets, Deployments, Ingresses, Roles, Services, Statefulsets, PersistentVolumeClaim, Pods} + for _, resourceName := range resourceList { + used := getUsage(namespace, resourceName) + var quantity resource.Quantity + quantity.Set(int64(used)) + usage[v1.ResourceName(resourceName)] = quantity + } + + podCtl := rec.controllers[Pods] + var quantity resource.Quantity + used := podCtl.CountWithConditions(fmt.Sprintf("status=\"%s\" And namespace=\"%s\"", "Running", namespace)) + quantity.Set(int64(used)) + usage["runningPods"] = quantity + return usage, nil +} diff --git a/pkg/models/controllers/pods.go b/pkg/models/controllers/pods.go index 666858282..4dc7452f7 100644 --- a/pkg/models/controllers/pods.go +++ b/pkg/models/controllers/pods.go @@ -18,15 +18,75 @@ package controllers import ( "encoding/json" + "time" "github.com/golang/glog" "k8s.io/api/core/v1" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) +const inUse = "in_use_pods" + +func (ctl *PodCtl) addAnnotationToPvc(item v1.Pod) { + volumes := item.Spec.Volumes + for _, volume := range volumes { + pvc := volume.PersistentVolumeClaim + if pvc != nil { + name := pvc.ClaimName + + Pvc, _ := ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Get(name, metaV1.GetOptions{}) + if Pvc.Annotations == nil { + Pvc.Annotations = make(map[string]string) + } + annotation := Pvc.Annotations + if len(annotation[inUse]) == 0 { + pods := []string{item.Name} + str, _ := json.Marshal(pods) + annotation[inUse] = string(str) + } else { + var pods []string + json.Unmarshal([]byte(annotation[inUse]), pods) + for _, pod := range pods { + if pod == item.Name { + return + } + pods = append(pods, item.Name) + str, _ := json.Marshal(pods) + annotation[inUse] = string(str) + } + } + ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Update(Pvc) + } + } +} + +func (ctl *PodCtl) delAnnotationFromPvc(item v1.Pod) { + volumes := item.Spec.Volumes + for _, volume := range volumes { + pvc := volume.PersistentVolumeClaim + if pvc != nil { + name := pvc.ClaimName + Pvc, _ := ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Get(name, metaV1.GetOptions{}) + annotation := Pvc.Annotations + var pods []string + json.Unmarshal([]byte(annotation[inUse]), pods) + + for index, pod := range pods { + if pod == item.Name { + pods = append(pods[:index], pods[index+1:]...) + } + } + + str, _ := json.Marshal(pods) + annotation[inUse] = string(str) + ctl.K8sClient.CoreV1().PersistentVolumeClaims(item.Namespace).Update(Pvc) + } + } +} + func (ctl *PodCtl) generateObject(item v1.Pod) *Pod { name := item.Name namespace := item.Namespace @@ -37,13 +97,15 @@ func (ctl *PodCtl) generateObject(item v1.Pod) *Pod { createTime := item.CreationTimestamp.Time containerStatus := item.Status.ContainerStatuses containerSpecs := item.Spec.Containers - var containers []Container + + var containers Containers for _, containerSpec := range containerSpecs { var container Container container.Name = containerSpec.Name container.Image = containerSpec.Image container.Ports = containerSpec.Ports + container.Resources = containerSpec.Resources for _, status := range containerStatus { if container.Name == status.Name { container.Ready = status.Ready @@ -53,25 +115,13 @@ func (ctl *PodCtl) generateObject(item v1.Pod) *Pod { containers = append(containers, container) } - containerStr, _ := json.Marshal(containers) - - annotation, _ := json.Marshal(item.Annotations) - object := &Pod{Namespace: namespace, Name: name, Node: nodeName, PodIp: podIp, Status: status, NodeIp: nodeIp, - CreateTime: createTime, ContainerStr: string(containerStr), AnnotationStr: string(annotation)} + CreateTime: createTime, Annotation: Annotation{item.Annotations}, Containers: containers} return object } func (ctl *PodCtl) listAndWatch() { - defer func() { - defer close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() - db := ctl.DB if db.HasTable(&Pod{}) { @@ -81,43 +131,47 @@ func (ctl *PodCtl) listAndWatch() { db = db.CreateTable(&Pod{}) - k8sClient := client.NewK8sClient() - list, err := k8sClient.CoreV1().Pods("").List(meta_v1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Core().V1().Pods().Informer() + lister := kubeInformerFactory.Core().V1().Pods().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) - return + panic(err) } - for _, item := range list.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) } - watcher, err := k8sClient.CoreV1().Pods("").Watch(meta_v1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + object := obj.(*v1.Pod) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var po Pod - if event.Object == nil { - panic("watch timeout, restart pod controller") - } - object := event.Object.(*v1.Pod) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&po) - db.Delete(po) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + ctl.addAnnotationToPvc(*object) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.Pod) + mysqlObject := ctl.generateObject(*object) + + db.Save(mysqlObject) + + }, + DeleteFunc: func(obj interface{}) { + var item Pod + object := obj.(*v1.Pod) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + ctl.delAnnotationFromPvc(*object) + db.Delete(item) + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *PodCtl) CountWithConditions(conditions string) int { @@ -135,17 +189,6 @@ func (ctl *PodCtl) ListWithConditions(conditions string, paging *Paging) (int, i listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - var containers []Container - json.Unmarshal([]byte(item.ContainerStr), &containers) - list[index].Containers = containers - list[index].ContainerStr = "" - - annotation := make(Annotation) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/pvcs.go b/pkg/models/controllers/pvcs.go index 8956c61ff..e1eeb3af3 100644 --- a/pkg/models/controllers/pvcs.go +++ b/pkg/models/controllers/pvcs.go @@ -24,10 +24,9 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) const creator = "creator" @@ -60,17 +59,16 @@ func (ctl *PvcCtl) generateObject(item *v1.PersistentVolumeClaim) *Pvc { } accessModeStr = strings.Join(accessModeList, ",") - annotation, _ := json.Marshal(item.Annotations) object := &Pvc{Namespace: namespace, Name: name, Status: status, Capacity: capacity, - AccessMode: accessModeStr, StorageClassName: storageClass, CreateTime: createTime, AnnotationStr: string(annotation)} + AccessMode: accessModeStr, StorageClassName: storageClass, CreateTime: createTime, Annotation: Annotation{item.Annotations}} return object } func (ctl *PvcCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -86,43 +84,44 @@ func (ctl *PvcCtl) listAndWatch() { db = db.CreateTable(&Pvc{}) - k8sClient := client.NewK8sClient() - pvcList, err := k8sClient.CoreV1().PersistentVolumeClaims("").List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Informer() + lister := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range pvcList.Items { - obj := ctl.generateObject(&item) + for _, item := range list { + obj := ctl.generateObject(item) db.Create(obj) + } - watcher, err := k8sClient.CoreV1().PersistentVolumeClaims("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var pvc Pvc - if event.Object == nil { - panic("watch timeout, restart pvc controller") - } - object := event.Object.(*v1.PersistentVolumeClaim) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&pvc) - db.Delete(pvc) - break - } - obj := ctl.generateObject(object) - db.Save(obj) - } - } + object := obj.(*v1.PersistentVolumeClaim) + mysqlObject := ctl.generateObject(object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.PersistentVolumeClaim) + mysqlObject := ctl.generateObject(object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Pvc + object := obj.(*v1.PersistentVolumeClaim) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *PvcCtl) CountWithConditions(conditions string) int { @@ -140,11 +139,17 @@ func (ctl *PvcCtl) ListWithConditions(conditions string, paging *Paging) (int, i listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" + for index := range list { + inUsePods := list[index].Annotation.Values[inUse] + var pods []string + + json.Unmarshal([]byte(inUsePods), &pods) + + if len(pods) > 0 { + list[index].InUse = true + } else { + list[index].InUse = false + } } return total, list, nil diff --git a/pkg/models/controllers/roles.go b/pkg/models/controllers/roles.go index 866b77f05..e99f06057 100644 --- a/pkg/models/controllers/roles.go +++ b/pkg/models/controllers/roles.go @@ -17,16 +17,14 @@ limitations under the License. package controllers import ( - "encoding/json" "strings" "time" "github.com/golang/glog" "k8s.io/api/rbac/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) func (ctl *RoleCtl) generateObject(item v1.Role) *Role { @@ -40,9 +38,7 @@ func (ctl *RoleCtl) generateObject(item v1.Role) *Role { createTime = time.Now() } - annotation, _ := json.Marshal(item.Annotations) - - object := &Role{Namespace: namespace, Name: name, CreateTime: createTime, AnnotationStr: string(annotation)} + object := &Role{Namespace: namespace, Name: name, CreateTime: createTime, Annotation: Annotation{item.Annotations}} return object } @@ -65,49 +61,49 @@ func (ctl *RoleCtl) listAndWatch() { db = db.CreateTable(&Role{}) - k8sClient := client.NewK8sClient() - roleList, err := k8sClient.RbacV1().Roles("").List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Rbac().V1().Roles().Informer() + lister := kubeInformerFactory.Rbac().V1().Roles().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range roleList.Items { - obj := ctl.generateObject(item) - if obj != nil { - db.Create(obj) - } + for _, item := range list { + obj := ctl.generateObject(*item) + db.Create(obj) } - roleWatcher, err := k8sClient.RbacV1().Roles("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-roleWatcher.ResultChan(): - var role Role - if event.Object == nil { - panic("watch timeout, restart role controller") + object := obj.(*v1.Role) + mysqlObject := ctl.generateObject(*object) + if mysqlObject != nil { + db.Create(mysqlObject) } - object := event.Object.(*v1.Role) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&role) - db.Delete(role) - break + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.Role) + mysqlObject := ctl.generateObject(*object) + if mysqlObject != nil { + db.Save(mysqlObject) } - obj := ctl.generateObject(*object) - if obj != nil { - db.Save(obj) - } - break - } - } + }, + DeleteFunc: func(obj interface{}) { + var item Role + object := obj.(*v1.Role) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *RoleCtl) CountWithConditions(conditions string) int { @@ -125,12 +121,6 @@ func (ctl *RoleCtl) ListWithConditions(conditions string, paging *Paging) (int, listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/run.go b/pkg/models/controllers/run.go index cd1a4c96d..db4f0f9fe 100644 --- a/pkg/models/controllers/run.go +++ b/pkg/models/controllers/run.go @@ -28,15 +28,15 @@ import ( type resourceControllers struct { controllers map[string]Controller - db *gorm.DB k8sClient *kubernetes.Clientset } var stopChan chan struct{} +var rec resourceControllers func (rec *resourceControllers) runContoller(name string) { var ctl Controller - attr := CommonAttribute{DB: rec.db, K8sClient: rec.k8sClient, stopChan: stopChan, aliveChan: make(chan struct{})} + attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan, aliveChan: make(chan struct{})} switch name { case Deployments: ctl = &DeploymentCtl{attr} @@ -69,21 +69,39 @@ func (rec *resourceControllers) runContoller(name string) { } +func dbHealthCheck(db *gorm.DB) { + for { + count := 0 + var err error + for k := 0; k < 5; k++ { + err = db.DB().Ping() + if err != nil { + count++ + } + time.Sleep(1 * time.Second) + } + + if count > 3 { + panic(err) + } + } + +} + func Run() { - db := client.NewDBClient() stopChan := make(chan struct{}) - defer db.Commit() - defer db.Close() defer close(stopChan) - rec := resourceControllers{k8sClient: client.NewK8sClient(), db: db, controllers: make(map[string]Controller)} + rec = resourceControllers{k8sClient: client.NewK8sClient(), controllers: make(map[string]Controller)} for _, item := range []string{Deployments, Statefulsets, Daemonsets, PersistentVolumeClaim, Pods, Services, Ingresses, Roles, ClusterRoles, Namespaces, StorageClasses} { rec.runContoller(item) } + go dbHealthCheck(client.NewDBClient()) + for { for ctlName, controller := range rec.controllers { select { diff --git a/pkg/models/controllers/services.go b/pkg/models/controllers/services.go index a20b2cf6c..4a1bb4f32 100644 --- a/pkg/models/controllers/services.go +++ b/pkg/models/controllers/services.go @@ -17,18 +17,16 @@ limitations under the License. package controllers import ( - "encoding/json" "fmt" "strings" "time" "github.com/golang/glog" "k8s.io/api/core/v1" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/watch" - - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) const ( @@ -119,16 +117,15 @@ func (ctl *ServiceCtl) generateObject(item v1.Service) *Service { ports = ports[0 : len(ports)-1] } - annotation, _ := json.Marshal(item.Annotations) object := &Service{Namespace: namespace, Name: name, ServiceType: serviceType, ExternalIp: externalIp, - VirtualIp: vip, CreateTime: createTime, Ports: ports, AnnotationStr: string(annotation)} + VirtualIp: vip, CreateTime: createTime, Ports: ports, Annotation: Annotation{item.Annotations}} return object } func (ctl *ServiceCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -143,45 +140,45 @@ func (ctl *ServiceCtl) listAndWatch() { db = db.CreateTable(&Service{}) - k8sClient := client.NewK8sClient() - svcList, err := k8sClient.CoreV1().Services("").List(metaV1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Core().V1().Services().Informer() + lister := kubeInformerFactory.Core().V1().Services().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range svcList.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) + } - watcher, err := k8sClient.CoreV1().Services("").Watch(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var svc Service + object := obj.(*v1.Service) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.Service) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Service + object := obj.(*v1.Service) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) - if event.Object == nil { - panic("watch timeout, restart service controller") - } - object := event.Object.(*v1.Service) + }, + }) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&svc) - db.Delete(svc) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } + informer.Run(ctl.stopChan) } func (ctl *ServiceCtl) CountWithConditions(conditions string) int { @@ -199,12 +196,6 @@ func (ctl *ServiceCtl) ListWithConditions(conditions string, paging *Paging) (in listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/statefulsets.go b/pkg/models/controllers/statefulsets.go index babc211c2..90bc31be8 100644 --- a/pkg/models/controllers/statefulsets.go +++ b/pkg/models/controllers/statefulsets.go @@ -17,18 +17,17 @@ limitations under the License. package controllers import ( - "encoding/json" "time" "github.com/golang/glog" - "k8s.io/api/apps/v1beta2" - metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) -func (ctl *StatefulsetCtl) generateObject(item v1beta2.StatefulSet) *Statefulset { +func (ctl *StatefulsetCtl) generateObject(item v1.StatefulSet) *Statefulset { var app string var status string name := item.Name @@ -50,26 +49,24 @@ func (ctl *StatefulsetCtl) generateObject(item v1beta2.StatefulSet) *Statefulset } if item.Annotations["state"] == "stop" { - status = stopping + status = Stopped } else { if availablePodNum >= desirePodNum { - status = running + status = Running } else { - status = updating + status = Updating } } - annotation, _ := json.Marshal(item.Annotations) - statefulSetObject := &Statefulset{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum, - App: app, CreateTime: createTime, Status: status, AnnotationStr: string(annotation)} + App: app, CreateTime: createTime, Status: status, Annotation: Annotation{item.Annotations}} return statefulSetObject } func (ctl *StatefulsetCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -82,42 +79,45 @@ func (ctl *StatefulsetCtl) listAndWatch() { } db = db.CreateTable(&Statefulset{}) - k8sClient := client.NewK8sClient() - deoloyList, err := k8sClient.AppsV1beta2().StatefulSets("").List(metaV1.ListOptions{}) - if err != nil { - glog.Error(err) - } + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Apps().V1().StatefulSets().Informer() + lister := kubeInformerFactory.Apps().V1().StatefulSets().Lister() - for _, item := range deoloyList.Items { - obj := ctl.generateObject(item) - db.Create(obj) - } - - watcher, err := k8sClient.AppsV1beta2().StatefulSets("").Watch(metaV1.ListOptions{}) + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var tmp Statefulset - if event.Object == nil { - panic("watch timeout, restart statefulset controller") - } - object := event.Object.(*v1beta2.StatefulSet) - if event.Type == watch.Deleted { - db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&tmp) - db.Delete(tmp) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } + for _, item := range list { + obj := ctl.generateObject(*item) + db.Create(obj) + } + + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + + object := obj.(*v1.StatefulSet) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.StatefulSet) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item Statefulset + object := obj.(*v1.StatefulSet) + db.Where("name=? And namespace=?", object.Name, object.Namespace).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) } func (ctl *StatefulsetCtl) CountWithConditions(conditions string) int { @@ -135,12 +135,6 @@ func (ctl *StatefulsetCtl) ListWithConditions(conditions string, paging *Paging) listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index, item := range list { - annotation := make(map[string]string) - json.Unmarshal([]byte(item.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" - } return total, list, nil } diff --git a/pkg/models/controllers/storageclasses.go b/pkg/models/controllers/storageclasses.go index 2d07843c2..db7f8a57d 100644 --- a/pkg/models/controllers/storageclasses.go +++ b/pkg/models/controllers/storageclasses.go @@ -17,19 +17,18 @@ limitations under the License. package controllers import ( - "encoding/json" "fmt" "time" "github.com/golang/glog" - "k8s.io/api/storage/v1beta1" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/api/storage/v1" - "kubesphere.io/kubesphere/pkg/client" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) -func (ctl *StorageClassCtl) generateObject(item v1beta1.StorageClass) *StorageClass { +func (ctl *StorageClassCtl) generateObject(item v1.StorageClass) *StorageClass { name := item.Name createTime := item.CreationTimestamp.Time @@ -42,15 +41,14 @@ func (ctl *StorageClassCtl) generateObject(item v1beta1.StorageClass) *StorageCl createTime = time.Now() } - annotation, _ := json.Marshal(item.Annotations) - object := &StorageClass{Name: name, CreateTime: createTime, IsDefault: isDefault, AnnotationStr: string(annotation)} + object := &StorageClass{Name: name, CreateTime: createTime, IsDefault: isDefault, Annotation: Annotation{item.Annotations}} return object } func (ctl *StorageClassCtl) listAndWatch() { defer func() { - defer close(ctl.aliveChan) + close(ctl.aliveChan) if err := recover(); err != nil { glog.Error(err) return @@ -65,43 +63,46 @@ func (ctl *StorageClassCtl) listAndWatch() { db = db.CreateTable(&StorageClass{}) - k8sClient := client.NewK8sClient() - list, err := k8sClient.StorageV1beta1().StorageClasses().List(meta_v1.ListOptions{}) + k8sClient := ctl.K8sClient + kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) + informer := kubeInformerFactory.Storage().V1().StorageClasses().Informer() + lister := kubeInformerFactory.Storage().V1().StorageClasses().Lister() + + list, err := lister.List(labels.Everything()) if err != nil { glog.Error(err) return } - for _, item := range list.Items { - obj := ctl.generateObject(item) + for _, item := range list { + obj := ctl.generateObject(*item) db.Create(obj) + } - watcher, err := k8sClient.StorageV1beta1().StorageClasses().Watch(meta_v1.ListOptions{}) - if err != nil { - glog.Error(err) - return - } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + + object := obj.(*v1.StorageClass) + mysqlObject := ctl.generateObject(*object) + db.Create(mysqlObject) + }, + UpdateFunc: func(old, new interface{}) { + object := new.(*v1.StorageClass) + mysqlObject := ctl.generateObject(*object) + db.Save(mysqlObject) + }, + DeleteFunc: func(obj interface{}) { + var item StorageClass + object := obj.(*v1.StorageClass) + db.Where("name=?", object.Name).Find(&item) + db.Delete(item) + + }, + }) + + informer.Run(ctl.stopChan) - for { - select { - case <-ctl.stopChan: - return - case event := <-watcher.ResultChan(): - var sc StorageClass - if event.Object == nil { - panic("watch timeout, restart storageClass controller") - } - object := event.Object.(*v1beta1.StorageClass) - if event.Type == watch.Deleted { - db.Where("name=?", object.Name).Find(&sc) - db.Delete(sc) - break - } - obj := ctl.generateObject(*object) - db.Save(obj) - } - } } func (ctl *StorageClassCtl) CountWithConditions(conditions string) int { @@ -121,10 +122,6 @@ func (ctl *StorageClassCtl) ListWithConditions(conditions string, paging *Paging for index, storageClass := range list { name := storageClass.Name - annotation := make(map[string]string) - json.Unmarshal([]byte(storageClass.AnnotationStr), &annotation) - list[index].Annotation = annotation - list[index].AnnotationStr = "" pvcCtl := PvcCtl{CommonAttribute{K8sClient: ctl.K8sClient, DB: ctl.DB}} list[index].Count = pvcCtl.CountWithConditions(fmt.Sprintf("storage_class=\"%s\"", name)) diff --git a/pkg/models/controllers/types.go b/pkg/models/controllers/types.go index 5ad55fcb3..f3a764f2c 100644 --- a/pkg/models/controllers/types.go +++ b/pkg/models/controllers/types.go @@ -19,15 +19,20 @@ package controllers import ( "time" + "database/sql/driver" + "encoding/json" + "errors" + "github.com/jinzhu/gorm" "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" ) const ( - stopping = "stopped" - running = "running" - updating = "updating" + resyncCircle = 180 + Stopped = "stopped" + Running = "running" + Updating = "updating" tablePods = "pods" tableDeployments = "deployments" tableDaemonsets = "daemonsets" @@ -53,41 +58,37 @@ const ( StorageClasses = "storage-classes" ) -var ResourceTable = map[string]string{Deployments: tableDeployments, Statefulsets: tableStatefulsets, Daemonsets: tableDaemonsets, - Pods: tablePods, Namespaces: tableNamespaces, Ingresses: tableIngresses, PersistentVolumeClaim: tablePersistentVolumeClaim, Roles: tableRoles, - Services: tableServices, StorageClasses: tableStorageClasses, ClusterRoles: tableClusterRoles} +type Annotation struct { + Values map[string]string `gorm:"type:TEXT"` +} -type Annotation map[string]string +func (annotation *Annotation) Scan(val interface{}) error { + switch val := val.(type) { + case string: + return json.Unmarshal([]byte(val), annotation) + case []byte: + return json.Unmarshal(val, annotation) + default: + return errors.New("not support") + } + return nil +} -// -//func (annotation *Annotation)Scan(val interface{}) error{ -// switch val := val.(type){ -// case string: -// return json.Unmarshal([]byte(val), annotation) -// case []byte: -// return json.Unmarshal(val, annotation) -// default: -// return errors.New("not support") -// } -// return nil -//} -// -//func (annotation *Annotation)Value() (driver.Value, error){ -// bytes, err := json.Marshal(annotation) -// return string(bytes), err -//} +func (annotation Annotation) Value() (driver.Value, error) { + bytes, err := json.Marshal(annotation) + return string(bytes), err +} type Deployment struct { Name string `gorm:"primary_key" json:"name"` Namespace string `gorm:"primary_key" json:"namespace"` App string `json:"app,omitempty"` - Available int32 `json:"available"` - Desire int32 `json:"desire"` - Status string `json:"status"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - UpdateTime time.Time `gorm:"column:updateTime" json:"updateTime,omitempty"` + Available int32 `json:"available"` + Desire int32 `json:"desire"` + Status string `json:"status"` + Annotation Annotation `json:"annotations"` + UpdateTime time.Time `gorm:"column:updateTime" json:"updateTime,omitempty"` } func (Deployment) TableName() string { @@ -99,12 +100,11 @@ type Statefulset struct { Namespace string `gorm:"primary_key" json:"namespace,omitempty"` App string `json:"app,omitempty"` - Available int32 `json:"available"` - Desire int32 `json:"desire"` - Status string `json:"status"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Available int32 `json:"available"` + Desire int32 `json:"desire"` + Status string `json:"status"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } func (Statefulset) TableName() string { @@ -116,13 +116,12 @@ type Daemonset struct { Namespace string `gorm:"primary_key" json:"namespace,omitempty"` App string `json:"app,omitempty"` - Available int32 `json:"available"` - Desire int32 `json:"desire"` - Status string `json:"status"` - NodeSelector string `json:"nodeSelector, omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Available int32 `json:"available"` + Desire int32 `json:"desire"` + Status string `json:"status"` + NodeSelector string `json:"nodeSelector, omitempty"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } func (Daemonset) TableName() string { @@ -137,10 +136,9 @@ type Service struct { VirtualIp string `json:"virtualIp,omitempty"` ExternalIp string `json:"externalIp,omitempty"` - Ports string `json:"ports,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Ports string `json:"ports,omitempty"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } func (Service) TableName() string { @@ -153,10 +151,10 @@ type Pvc struct { Status string `json:"status,omitempty"` Capacity string `json:"capacity,omitempty"` AccessMode string `json:"accessMode,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` + Annotation Annotation `json:"annotations"` CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` StorageClassName string `gorm:"column:storage_class" json:"storage_class,omitempty"` + InUse bool `gorm:"-" json:"inUse"` } func (Pvc) TableName() string { @@ -168,8 +166,7 @@ type Ingress struct { Namespace string `gorm:"primary_key" json:"namespace"` Ip string `json:"ip,omitempty"` TlsTermination string `json:"tlsTermination,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` + Annotation Annotation `json:"annotations"` CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } @@ -178,24 +175,41 @@ func (Ingress) TableName() string { } type Pod struct { - Name string `gorm:"primary_key" json:"name"` - Namespace string `gorm:"primary_key" json:"namespace"` - Status string `json:"status,omitempty"` - Node string `json:"node,omitempty"` - NodeIp string `json:"nodeIp,omitempty"` - PodIp string `json:"podIp,omitempty"` - ContainerStr string `gorm:"type:text" json:",omitempty"` - Containers []Container `json:"containers,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Name string `gorm:"primary_key" json:"name"` + Namespace string `gorm:"primary_key" json:"namespace"` + Status string `json:"status,omitempty"` + Node string `json:"node,omitempty"` + NodeIp string `json:"nodeIp,omitempty"` + PodIp string `json:"podIp,omitempty"` + Containers Containers `gorm:"type:text" json:"containers,omitempty"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } type Container struct { - Name string `json:"name"` - Ready bool `json:"ready,omitempty"` - Image string `json:"image"` - Ports []v1.ContainerPort `json:"ports"` + Name string `json:"name"` + Ready bool `json:"ready,omitempty"` + Image string `json:"image"` + Resources v1.ResourceRequirements `json:"resources"` + Ports []v1.ContainerPort `json:"ports"` +} +type Containers []Container + +func (containers *Containers) Scan(val interface{}) error { + switch val := val.(type) { + case string: + return json.Unmarshal([]byte(val), containers) + case []byte: + return json.Unmarshal(val, containers) + default: + return errors.New("not support") + } + return nil +} + +func (containers Containers) Value() (driver.Value, error) { + bytes, err := json.Marshal(containers) + return string(bytes), err } func (Pod) TableName() string { @@ -203,11 +217,10 @@ func (Pod) TableName() string { } type Role struct { - Name string `gorm:"primary_key" json:"name"` - Namespace string `gorm:"primary_key" json:"namespace"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Name string `gorm:"primary_key" json:"name"` + Namespace string `gorm:"primary_key" json:"namespace"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } func (Role) TableName() string { @@ -215,10 +228,9 @@ func (Role) TableName() string { } type ClusterRole struct { - Name string `gorm:"primary_key" json:"name"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Name string `gorm:"primary_key" json:"name"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` } func (ClusterRole) TableName() string { @@ -230,10 +242,10 @@ type Namespace struct { Creator string `json:"creator,omitempty"` Status string `json:"status"` - Descrition string `json:"description,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Descrition string `json:"description,omitempty"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + Usaeg v1.ResourceList `gorm:"-" json:"usage,omitempty"` } func (Namespace) TableName() string { @@ -241,13 +253,12 @@ func (Namespace) TableName() string { } type StorageClass struct { - Name string `gorm:"primary_key" json:"name"` - Creator string `json:"creator,omitempty"` - AnnotationStr string `gorm:"type:text" json:"annotationStr,omitempty"` - Annotation Annotation `gorm:"-" json:"annotation"` - CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` - IsDefault bool `json:"default"` - Count int `json:"count"` + Name string `gorm:"primary_key" json:"name"` + Creator string `json:"creator,omitempty"` + Annotation Annotation `json:"annotations"` + CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` + IsDefault bool `json:"default"` + Count int `json:"count"` } func (StorageClass) TableName() string { @@ -262,7 +273,8 @@ type Controller interface { listAndWatch() chanStop() chan struct{} chanAlive() chan struct{} - Count(conditions string) int + Count(namespace string) int + CountWithConditions(condition string) int ListWithConditions(condition string, paging *Paging) (int, interface{}, error) } @@ -326,36 +338,3 @@ type RoleCtl struct { type ClusterRoleCtl struct { CommonAttribute } - -func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) { - if len(conditions) == 0 { - db.Model(object).Count(total) - } else { - db.Model(object).Where(conditions).Count(total) - } - - if paging != nil { - if len(conditions) > 0 { - db.Where(conditions).Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list) - } else { - db.Order(order).Limit(paging.Limit).Offset(paging.Offset).Find(list) - } - - } else { - if len(conditions) > 0 { - db.Where(conditions).Order(order).Find(list) - } else { - db.Order(order).Find(list) - } - } -} - -func countWithConditions(db *gorm.DB, conditions string, object interface{}) int { - var count int - if len(conditions) == 0 { - db.Model(object).Count(&count) - } else { - db.Model(object).Where(conditions).Count(&count) - } - return count -} diff --git a/pkg/models/resources.go b/pkg/models/resources.go index f5fea4ebc..1d80a69b6 100644 --- a/pkg/models/resources.go +++ b/pkg/models/resources.go @@ -134,24 +134,31 @@ func generateConditionAndPaging(conditions map[string]string, paging map[string] } type workLoadStatus struct { - NameSpace string `json:"namespace"` - Count map[string]int `json:"data"` - Items map[string]interface{} + NameSpace string `json:"namespace"` + Count map[string]int `json:"data"` + Items map[string]interface{} `json:"items,omitempty"` } func GetNamespacesResourceStatus(namespace string) (*workLoadStatus, error) { res := workLoadStatus{Count: make(map[string]int), NameSpace: namespace, Items: make(map[string]interface{})} + var status *ResourceList + var err error for _, resource := range []string{controllers.Deployments, controllers.Statefulsets, controllers.Daemonsets} { - status, err := ListResource(resource, "status=updating", "") + if len(namespace) > 0 { + status, err = ListResource(resource, fmt.Sprintf("status=%s,namespace=%s", controllers.Updating, namespace), "") + } else { + status, err = ListResource(resource, fmt.Sprintf("status=%s", controllers.Updating), "") + } + if err != nil { return nil, err } count := status.Total - items := status.Items + //items := status.Items res.Count[resource] = count - res.Items[resource] = items + //res.Items[resource] = items } return &res, nil diff --git a/pkg/models/terminal.go b/pkg/models/terminal.go index 45ed71bc3..95c0d1f99 100644 --- a/pkg/models/terminal.go +++ b/pkg/models/terminal.go @@ -11,6 +11,10 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +// +// the code is mainly from: +// https://github.com/kubernetes/dashboard/blob/master/src/app/backend/handler/terminal.go +// thanks to the related developer package models