diff --git a/pkg/apis/v1alpha/resources/resources.go b/pkg/apis/v1alpha/resources/resources.go index 41afd3720..f158cf6e8 100644 --- a/pkg/apis/v1alpha/resources/resources.go +++ b/pkg/apis/v1alpha/resources/resources.go @@ -31,7 +31,7 @@ func Register(ws *restful.WebService, subPath string) { tags := []string{"resources"} - ws.Route(ws.GET(subPath+"/{resource}").To(listResource).Produces(restful.MIME_JSON).Metadata(restfulspec.KeyOpenAPITags, tags).Doc("Get resource" + + ws.Route(ws.GET(subPath+"/{resource}").To(handleResource).Produces(restful.MIME_JSON).Metadata(restfulspec.KeyOpenAPITags, tags).Doc("Get resource" + " list").Param(ws.PathParameter("resource", "resource name").DataType( "string")).Param(ws.QueryParameter("conditions", "search "+ "conditions").DataType("string")).Param(ws.QueryParameter("paging", @@ -39,12 +39,15 @@ func Register(ws *restful.WebService, subPath string) { } -func listResource(req *restful.Request, resp *restful.Response) { +func handleResource(req *restful.Request, resp *restful.Response) { resource := req.PathParameter("resource") + if resource == "applications" { + handleApplication(req, resp) + return + } conditions := req.QueryParameter("conditions") paging := req.QueryParameter("paging") - res, err := models.ListResource(resource, conditions, paging) if err != nil { resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()}) @@ -53,3 +56,30 @@ func listResource(req *restful.Request, resp *restful.Response) { resp.WriteEntity(res) } + +func handleApplication(req *restful.Request, resp *restful.Response) { + //searchWord := req.QueryParameter("search-word") + paging := req.QueryParameter("paging") + clusterId := req.QueryParameter("cluster_id") + runtimeId := req.QueryParameter("runtime_id") + conditions := req.QueryParameter("conditions") + if len(clusterId) > 0 { + app, err := models.GetApplication(clusterId) + if err != nil { + resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()}) + return + } + resp.WriteEntity(app) + return + } + + res, err := models.ListApplication(runtimeId, conditions, paging) + + if err != nil { + resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()}) + return + } + + resp.WriteEntity(res) + +} diff --git a/pkg/apis/v1alpha/users/user.go b/pkg/apis/v1alpha/users/user.go index 48550f7af..45640db9b 100644 --- a/pkg/apis/v1alpha/users/user.go +++ b/pkg/apis/v1alpha/users/user.go @@ -55,7 +55,7 @@ func createUser(req *restful.Request, resp *restful.Response) { return } - err = models.CreateKubectlPod(user) + err = models.CreateKubectlDeploy(user) if err != nil { resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()}) @@ -69,7 +69,7 @@ func delUser(req *restful.Request, resp *restful.Response) { user := req.PathParameter("user") - err := models.DelKubectlPod(user) + err := models.DelKubectlDeploy(user) if err != nil && !apierrors.IsNotFound(err) { resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()}) diff --git a/pkg/app/app.go b/pkg/app/app.go index 979adcae3..b33abb4a8 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -23,7 +23,7 @@ import ( "github.com/emicklei/go-restful" "github.com/golang/glog" "k8s.io/api/core/v1" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "net" "net/http" @@ -31,9 +31,12 @@ import ( "github.com/emicklei/go-restful-openapi" "github.com/go-openapi/spec" + "k8s.io/apimachinery/pkg/api/errors" + _ "kubesphere.io/kubesphere/pkg/apis/v1alpha" "kubesphere.io/kubesphere/pkg/client" "kubesphere.io/kubesphere/pkg/constants" + "kubesphere.io/kubesphere/pkg/models" "kubesphere.io/kubesphere/pkg/models/controllers" "kubesphere.io/kubesphere/pkg/options" ) @@ -64,17 +67,26 @@ func newKubeSphereServer(options *options.ServerRunOptions) *kubeSphereServer { func preCheck() error { k8sClient := client.NewK8sClient() - nsList, err := k8sClient.CoreV1().Namespaces().List(meta_v1.ListOptions{}) - if err != nil { + _, err := k8sClient.CoreV1().Namespaces().Get(constants.KubeSphereControlNamespace, metaV1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { return err } - for _, ns := range nsList.Items { - if ns.Name == constants.KubeSphereControlNamespace { - return nil + + if errors.IsNotFound(err) { + namespace := v1.Namespace{ObjectMeta: metaV1.ObjectMeta{Name: constants.KubeSphereControlNamespace}} + _, err = k8sClient.CoreV1().Namespaces().Create(&namespace) + if err != nil { + return err } } - namespace := v1.Namespace{ObjectMeta: meta_v1.ObjectMeta{Name: constants.KubeSphereControlNamespace}} - _, err = k8sClient.CoreV1().Namespaces().Create(&namespace) + + _, err = k8sClient.AppsV1().Deployments(constants.KubeSphereControlNamespace).Get(constants.AdminUserName, metaV1.GetOptions{}) + + if errors.IsNotFound(err) { + models.CreateKubeConfig(constants.AdminUserName) + models.CreateKubectlDeploy(constants.AdminUserName) + return nil + } return err } diff --git a/pkg/constants/common.go b/pkg/constants/common.go index 8ac6a8cb0..6d35aedeb 100644 --- a/pkg/constants/common.go +++ b/pkg/constants/common.go @@ -34,8 +34,8 @@ const ( KubeSphereNamespace = "kubesphere-system" KubeSphereControlNamespace = "kubesphere-controls-system" IngressControllerNamespace = KubeSphereControlNamespace - - DataHome = "/etc/kubesphere" - IngressControllerFolder = DataHome + "/ingress-controller" - IngressControllerPrefix = "kubesphere-router-" + AdminUserName = "admin" + DataHome = "/etc/kubesphere" + IngressControllerFolder = DataHome + "/ingress-controller" + IngressControllerPrefix = "kubesphere-router-" ) diff --git a/pkg/models/controllers/applications.go b/pkg/models/controllers/applications.go new file mode 100644 index 000000000..84c129755 --- /dev/null +++ b/pkg/models/controllers/applications.go @@ -0,0 +1,498 @@ +/* +Copyright 2018 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + + "kubesphere.io/kubesphere/pkg/client" +) + +const ( + unknown = "-" + deploySurffix = "-Deployment" + daemonSurffix = "-DaemonSet" + stateSurffix = "-StatefulSet" +) + +type ApplicationCtl struct { + OpenpitrixAddr string +} + +type Application struct { + Name string `json:"name"` + RepoName string `json:"repoName"` + Runtime string `json:"namespace"` + RuntimeId string `json:"runtime_id"` + Version string `json:"version"` + VersionId string `json:"version_id"` + Status string `json:"status"` + UpdateTime time.Time `json:"updateTime"` + CreateTime time.Time `json:"createTime"` + App string `json:"app"` + AppId string `json:"app_id"` + Creator string `json:"creator,omitempty"` + WorkLoads *workLoads `json:"workloads,omitempty"` + Services *[]Service `json:"services,omitempty"` + Ingresses *[]ing `json:"ingresses,omitempty"` + ClusterID string `json:"cluster_id"` +} + +type ing struct { + Name string `json:"name"` + Rules []ingressRule `json:"rules"` +} + +type clusterRole struct { + ClusterID string `json:"cluster_id"` + Role string `json:"role"` +} + +type cluster struct { + ClusterID string `json:"cluster_id"` + Name string `json:"name"` + AppID string `json:"app_id"` + VersionID string `json:"version_id"` + Status string `json:"status"` + UpdateTime time.Time `json:"status_time"` + CreateTime time.Time `json:"createTime"` + RunTimeId string `json:"runtime_id"` + Description string `json:"description"` + ClusterRoleSets []clusterRole `json:"cluster_role_set"` +} + +type clusters struct { + Total int `json:"total_count"` + Clusters []cluster `json:"cluster_set"` +} + +type versionList struct { + Total int `json:"total_count"` + Versions []version `json:"app_version_set"` +} + +type version struct { + Name string `json:"name"` + VersionID string `json:"version_id"` +} + +type runtime struct { + RuntimeID string `json:"runtime_id"` + Zone string `json:"zone"` +} + +type runtimeList struct { + Total int `json:"total_count"` + Runtimes []runtime `json:"runtime_set"` +} + +type app struct { + AppId string `json:"app_id"` + Name string `json:"name"` + ChartName string `json:"chart_name"` + RepoId string `json:"repo_id"` +} + +type repo struct { + RepoId string `json:"repo_id"` + Name string `json:"name"` + Url string `json:"url"` +} + +type workLoads struct { + Deployments []Deployment `json:"deployments,omitempty"` + Statefulsets []Statefulset `json:"statefulsets,omitempty"` + Daemonsets []Daemonset `json:"daemonsets,omitempty"` +} + +type description struct { + Creator string `json:"creator"` +} + +type appList struct { + Total int `json:"total_count"` + Apps []app `json:"app_set"` +} + +type repoList struct { + Total int `json:"total_count"` + Repos []repo `json:"repo_set"` +} + +func (ctl *ApplicationCtl) GetAppInfo(appId string) (string, string, string, error) { + url := fmt.Sprintf("%s/v1/apps?app_id=%s", ctl.OpenpitrixAddr, appId) + resp, err := makeHttpRequest("GET", url, "") + if err != nil { + glog.Error(err) + return unknown, unknown, unknown, err + } + + var apps appList + err = json.Unmarshal(resp, &apps) + if err != nil { + glog.Error(err) + return unknown, unknown, unknown, err + } + + if len(apps.Apps) == 0 { + return unknown, unknown, unknown, err + } + + return apps.Apps[0].ChartName, apps.Apps[0].RepoId, apps.Apps[0].AppId, nil +} + +func (ctl *ApplicationCtl) GetRepo(repoId string) (string, error) { + url := fmt.Sprintf("%s/v1/repos?repo_id=%s", ctl.OpenpitrixAddr, repoId) + resp, err := makeHttpRequest("GET", url, "") + if err != nil { + glog.Error(err) + return unknown, err + } + + var repos repoList + err = json.Unmarshal(resp, &repos) + if err != nil { + glog.Error(err) + return unknown, err + } + + if len(repos.Repos) == 0 { + return unknown, err + } + + return repos.Repos[0].Name, nil +} + +func (ctl *ApplicationCtl) GetVersion(versionId string) (string, error) { + versionUrl := fmt.Sprintf("%s/v1/app_versions?version_id=%s", ctl.OpenpitrixAddr, versionId) + resp, err := makeHttpRequest("GET", versionUrl, "") + if err != nil { + glog.Error(err) + return unknown, err + } + + var versions versionList + err = json.Unmarshal(resp, &versions) + if err != nil { + glog.Error(err) + return unknown, err + } + + if len(versions.Versions) == 0 { + return unknown, nil + } + return versions.Versions[0].Name, nil +} + +func (ctl *ApplicationCtl) GetRuntime(runtimeId string) (string, error) { + + versionUrl := fmt.Sprintf("%s/v1/runtimes?runtime_id=%s", ctl.OpenpitrixAddr, runtimeId) + resp, err := makeHttpRequest("GET", versionUrl, "") + if err != nil { + glog.Error(err) + return unknown, err + } + + var runtimes runtimeList + err = json.Unmarshal(resp, &runtimes) + if err != nil { + glog.Error(err) + return unknown, err + } + + if len(runtimes.Runtimes) == 0 { + return unknown, nil + } + + return runtimes.Runtimes[0].Zone, nil +} + +func (ctl *ApplicationCtl) GetWorkLoads(namespace string, clusterRoles []clusterRole) *workLoads { + + var works workLoads + for _, clusterRole := range clusterRoles { + workLoadName := clusterRole.Role + if len(workLoadName) > 0 { + if strings.HasSuffix(workLoadName, deploySurffix) { + name := strings.Split(workLoadName, deploySurffix)[0] + ctl := ResourceControllers.Controllers[Deployments] + _, items, _ := ctl.ListWithConditions(fmt.Sprintf("namespace='%s' and name = '%s'", namespace, name), nil) + works.Deployments = append(works.Deployments, items.([]Deployment)...) + continue + } + + if strings.HasSuffix(workLoadName, daemonSurffix) { + name := strings.Split(workLoadName, daemonSurffix)[0] + ctl := ResourceControllers.Controllers[Daemonsets] + _, items, _ := ctl.ListWithConditions(fmt.Sprintf("namespace='%s' and name = '%s'", namespace, name), nil) + works.Daemonsets = append(works.Daemonsets, items.([]Daemonset)...) + continue + } + + if strings.HasSuffix(workLoadName, stateSurffix) { + name := strings.Split(workLoadName, stateSurffix)[0] + ctl := ResourceControllers.Controllers[Statefulsets] + _, items, _ := ctl.ListWithConditions(fmt.Sprintf("namespace='%s' and name = '%s'", namespace, name), nil) + works.Statefulsets = append(works.Statefulsets, items.([]Statefulset)...) + continue + } + } + } + return &works +} + +func (ctl *ApplicationCtl) getCreator(desc string) string { + var dc description + err := json.Unmarshal([]byte(desc), &dc) + if err != nil { + return unknown + } + return dc.Creator +} + +func (ctl *ApplicationCtl) getLabels(namespace string, workloads *workLoads) *[]map[string]string { + k8sClient := client.NewK8sClient() + + var workloadLables []map[string]string + if workloads == nil { + return nil + } + + for _, workload := range workloads.Deployments { + deploy, err := k8sClient.AppsV1().Deployments(namespace).Get(workload.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + continue + } + workloadLables = append(workloadLables, deploy.Labels) + } + + for _, workload := range workloads.Daemonsets { + daemonset, err := k8sClient.AppsV1().DaemonSets(namespace).Get(workload.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + continue + } + workloadLables = append(workloadLables, daemonset.Labels) + } + + for _, workload := range workloads.Statefulsets { + statefulset, err := k8sClient.AppsV1().StatefulSets(namespace).Get(workload.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + continue + } + workloadLables = append(workloadLables, statefulset.Labels) + } + + return &workloadLables +} + +func isExist(svcs []Service, svc v1.Service) bool { + for _, item := range svcs { + if item.Name == svc.Name && item.Namespace == svc.Namespace { + return true + } + } + return false +} + +func (ctl *ApplicationCtl) getSvcs(namespace string, workLoadLabels *[]map[string]string) *[]Service { + if len(*workLoadLabels) == 0 { + return nil + } + k8sClient := client.NewK8sClient() + var services []Service + for _, label := range *workLoadLabels { + labelSelector := labels.Set(label).AsSelector().String() + svcs, err := k8sClient.CoreV1().Services(namespace).List(metaV1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + glog.Errorf("get app's svc failed, reason: %v", err) + } + for _, item := range svcs.Items { + if !isExist(services, item) { + services = append(services, *generateSvcObject(item)) + } + } + } + + return &services +} + +func (ctl *ApplicationCtl) getIng(namespace string, services *[]Service) *[]ing { + if services == nil { + return nil + } + + ingCtl := ResourceControllers.Controllers[Ingresses] + var ings []ing + for _, svc := range *services { + _, items, err := ingCtl.ListWithConditions(fmt.Sprintf("namespace = '%s' and rules like '%%%s%%' ", namespace, svc.Name), nil) + if err != nil { + glog.Error(err) + return nil + } + + glog.Error(items) + for _, ingress := range items.([]Ingress) { + var rules []ingressRule + err := json.Unmarshal([]byte(ingress.Rules), &rules) + if err != nil { + return nil + } + + exist := false + var tmpRules []ingressRule + for _, rule := range rules { + if rule.Service == svc.Name { + exist = true + tmpRules = append(tmpRules, rule) + } + } + + if exist { + ings = append(ings, ing{Name: ingress.Name, Rules: tmpRules}) + } + } + } + + return &ings +} + +func (ctl *ApplicationCtl) ListApplication(runtimeId string, match, fuzzy map[string]string, paging *Paging) (int, interface{}, error) { + limit := paging.Limit + offset := paging.Offset + if strings.HasSuffix(ctl.OpenpitrixAddr, "/") { + ctl.OpenpitrixAddr = strings.TrimSuffix(ctl.OpenpitrixAddr, "/") + } + + defaultStatus := "status=active&status=stopped&status=pending&status=ceased" + + url := fmt.Sprintf("%s/v1/clusters?limit=%s&offset=%s", ctl.OpenpitrixAddr, strconv.Itoa(limit), strconv.Itoa(offset)) + + if len(fuzzy["name"]) > 0 { + url = fmt.Sprintf("%s&search_word=%s", url, fuzzy["name"]) + } + + if len(match["status"]) > 0 { + url = fmt.Sprintf("%s&status=%s", url, match["status"]) + } else { + url = fmt.Sprintf("%s&%s", url, defaultStatus) + } + + if len(runtimeId) > 0 { + url = fmt.Sprintf("%s&runtime_id=%s", url, runtimeId) + } + + resp, err := makeHttpRequest("GET", url, "") + if err != nil { + glog.Errorf("request %s failed, reason: %s", url, err) + return 0, nil, err + } + + var clusterList clusters + err = json.Unmarshal(resp, &clusterList) + + if err != nil { + return 0, nil, err + } + + var apps []Application + + for _, item := range clusterList.Clusters { + var app Application + + app.Name = item.Name + app.ClusterID = item.ClusterID + app.UpdateTime = item.UpdateTime + app.Status = item.Status + versionInfo, _ := ctl.GetVersion(item.VersionID) + app.Version = versionInfo + app.VersionId = item.VersionID + runtimeInfo, _ := ctl.GetRuntime(item.RunTimeId) + app.Runtime = runtimeInfo + app.RuntimeId = item.RunTimeId + appInfo, _, appId, _ := ctl.GetAppInfo(item.AppID) + app.App = appInfo + app.AppId = appId + + apps = append(apps, app) + } + + return clusterList.Total, apps, nil +} + +func (ctl *ApplicationCtl) GetApp(clusterId string) (*Application, error) { + if strings.HasSuffix(ctl.OpenpitrixAddr, "/") { + ctl.OpenpitrixAddr = strings.TrimSuffix(ctl.OpenpitrixAddr, "/") + } + + url := fmt.Sprintf("%s/v1/clusters?cluster_id=%s", ctl.OpenpitrixAddr, clusterId) + + resp, err := makeHttpRequest("GET", url, "") + if err != nil { + glog.Error(err) + return nil, err + } + + var clusterList clusters + err = json.Unmarshal(resp, &clusterList) + + if err != nil { + glog.Error(err) + return nil, err + } + + if len(clusterList.Clusters) == 0 { + return nil, fmt.Errorf("NotFound, clusterId:%s", clusterId) + } + + item := clusterList.Clusters[0] + var app Application + + app.Name = item.Name + app.ClusterID = item.ClusterID + app.UpdateTime = item.UpdateTime + app.CreateTime = item.CreateTime + app.Status = item.Status + versionInfo, _ := ctl.GetVersion(item.VersionID) + app.Version = versionInfo + app.VersionId = item.VersionID + + runtimeInfo, _ := ctl.GetRuntime(item.RunTimeId) + app.Runtime = runtimeInfo + app.RuntimeId = item.RunTimeId + appInfo, repoId, appId, _ := ctl.GetAppInfo(item.AppID) + app.App = appInfo + app.AppId = appId + app.Creator = ctl.getCreator(item.Description) + + app.RepoName, _ = ctl.GetRepo(repoId) + app.WorkLoads = ctl.GetWorkLoads(app.Runtime, item.ClusterRoleSets) + workloadLabels := ctl.getLabels(app.Runtime, app.WorkLoads) + app.Services = ctl.getSvcs(app.Runtime, workloadLabels) + app.Ingresses = ctl.getIng(app.Runtime, app.Services) + + return &app, nil +} diff --git a/pkg/models/controllers/clusterroles.go b/pkg/models/controllers/clusterroles.go index 1355bb1e2..3aab38c2a 100644 --- a/pkg/models/controllers/clusterroles.go +++ b/pkg/models/controllers/clusterroles.go @@ -27,9 +27,11 @@ import ( "k8s.io/client-go/tools/cache" ) +const systemPrefix = "system:" + func (ctl *ClusterRoleCtl) generateObject(item v1.ClusterRole) *ClusterRole { name := item.Name - if strings.HasPrefix(name, "system:") { + if strings.HasPrefix(name, systemPrefix) { return nil } @@ -43,30 +45,22 @@ func (ctl *ClusterRoleCtl) generateObject(item v1.ClusterRole) *ClusterRole { return object } -func (ctl *ClusterRoleCtl) listAndWatch() { - defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() +func (ctl *ClusterRoleCtl) Name() string { + return ctl.CommonAttribute.Name +} +func (ctl *ClusterRoleCtl) sync(stopChan chan struct{}) { db := ctl.DB if db.HasTable(&ClusterRole{}) { db.DropTable(&ClusterRole{}) - } db = db.CreateTable(&ClusterRole{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Rbac().V1().ClusterRoles().Informer() - lister := kubeInformerFactory.Rbac().V1().ClusterRoles().Lister() + ctl.initListerAndInformer() - list, err := lister.List(labels.Everything()) + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) return @@ -74,10 +68,38 @@ func (ctl *ClusterRoleCtl) listAndWatch() { for _, item := range list { obj := ctl.generateObject(*item) - db.Create(obj) - + if obj != nil { + db.Create(obj) + } } + ctl.informer.Run(stopChan) +} + +func (ctl *ClusterRoleCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", err, ctl.Name()) + return 0 + } + + count := 0 + for _, item := range list { + if !strings.HasPrefix(item.Name, systemPrefix) { + count++ + } + } + + return count +} + +func (ctl *ClusterRoleCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + ctl.lister = informerFactory.Rbac().V1().ClusterRoles().Lister() + + informer := informerFactory.Rbac().V1().ClusterRoles().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -102,13 +124,15 @@ func (ctl *ClusterRoleCtl) listAndWatch() { }, }) - - informer.Run(ctl.stopChan) + ctl.informer = informer } func (ctl *ClusterRoleCtl) CountWithConditions(conditions string) int { var object ClusterRole + if strings.Contains(conditions, "namespace") { + conditions = "" + } return countWithConditions(ctl.DB, conditions, &object) } @@ -124,10 +148,3 @@ func (ctl *ClusterRoleCtl) ListWithConditions(conditions string, paging *Paging) return total, list, nil } - -func (ctl *ClusterRoleCtl) Count(namespace string) int { - var count int - db := ctl.DB - db.Model(&ClusterRole{}).Count(&count) - return count -} diff --git a/pkg/models/controllers/common.go b/pkg/models/controllers/common.go index 3575ea8bc..bc2ac4bf2 100644 --- a/pkg/models/controllers/common.go +++ b/pkg/models/controllers/common.go @@ -16,7 +16,17 @@ limitations under the License. package controllers -import "github.com/jinzhu/gorm" +import ( + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + "github.com/golang/glog" + "github.com/jinzhu/gorm" + "github.com/pkg/errors" +) func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) { if len(conditions) == 0 { @@ -50,3 +60,83 @@ func countWithConditions(db *gorm.DB, conditions string, object interface{}) int } return count } + +func makeHttpRequest(method, url, data string) ([]byte, error) { + var req *http.Request + + var err error + if method == "GET" { + req, err = http.NewRequest(method, url, nil) + } else { + req, err = http.NewRequest(method, url, strings.NewReader(data)) + } + + if err != nil { + glog.Error(err) + return nil, err + } + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + + if err != nil { + err := fmt.Errorf("Request to %s failed, method: %s, reason: %s ", url, method, err) + glog.Error(err) + return nil, err + } + + body, err := ioutil.ReadAll(resp.Body) + defer resp.Body.Close() + if resp.StatusCode >= http.StatusBadRequest { + err = errors.New(string(body)) + } + return body, err +} + +func handleCrash(ctl Controller) { + close(ctl.chanAlive()) + if err := recover(); err != nil { + glog.Errorf("panic occur in %s controller's listAndWatch function, reason: %s", ctl.Name(), err) + return + } +} + +func hasSynced(ctl Controller) bool { + totalInDb := ctl.CountWithConditions("") + totalInK8s := ctl.total() + + if totalInDb == totalInK8s { + return true + } + + return false +} + +func checkAndResync(ctl Controller, stopChan chan struct{}) { + defer close(stopChan) + for { + select { + case <-ctl.chanStop(): + return + default: + time.Sleep(30 * time.Minute) + + if !hasSynced(ctl) { + glog.Errorf("the data in db and kubernetes is inconsistent, resync %s controller", ctl.Name()) + close(stopChan) + stopChan = make(chan struct{}) + go ctl.sync(stopChan) + } + } + } +} + +func listAndWatch(ctl Controller) { + defer handleCrash(ctl) + + stopChan := make(chan struct{}) + + go ctl.sync(stopChan) + + checkAndResync(ctl, stopChan) +} diff --git a/pkg/models/controllers/daemonsets.go b/pkg/models/controllers/daemonsets.go index 7d8b97abd..e96e22fd9 100644 --- a/pkg/models/controllers/daemonsets.go +++ b/pkg/models/controllers/daemonsets.go @@ -61,30 +61,21 @@ func (ctl *DaemonsetCtl) generateObject(item v1.DaemonSet) *Daemonset { return object } -func (ctl *DaemonsetCtl) listAndWatch() { - defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() +func (ctl *DaemonsetCtl) Name() string { + return ctl.CommonAttribute.Name +} +func (ctl *DaemonsetCtl) sync(stopChan chan struct{}) { db := ctl.DB if db.HasTable(&Daemonset{}) { db.DropTable(&Daemonset{}) - } db = db.CreateTable(&Daemonset{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Apps().V1().DaemonSets().Informer() - lister := kubeInformerFactory.Apps().V1().DaemonSets().Lister() - - list, err := lister.List(labels.Everything()) + ctl.initListerAndInformer() + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) return @@ -93,9 +84,27 @@ func (ctl *DaemonsetCtl) listAndWatch() { for _, item := range list { obj := ctl.generateObject(*item) db.Create(obj) - } + ctl.informer.Run(stopChan) +} + +func (ctl *DaemonsetCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", err, ctl.Name()) + return 0 + } + return len(list) +} + +func (ctl *DaemonsetCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + ctl.lister = informerFactory.Apps().V1().DaemonSets().Lister() + + informer := informerFactory.Apps().V1().DaemonSets().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -117,7 +126,7 @@ func (ctl *DaemonsetCtl) listAndWatch() { }, }) - informer.Run(ctl.stopChan) + ctl.informer = informer } func (ctl *DaemonsetCtl) CountWithConditions(conditions string) int { @@ -138,13 +147,13 @@ func (ctl *DaemonsetCtl) ListWithConditions(conditions string, paging *Paging) ( return total, list, nil } -func (ctl *DaemonsetCtl) Count(namespace string) int { - var count int - db := ctl.DB - if len(namespace) == 0 { - db.Model(&Daemonset{}).Count(&count) - } else { - db.Model(&Daemonset{}).Where("namespace = ?", namespace).Count(&count) - } - return count -} +//func (ctl *DaemonsetCtl) Count(namespace string) int { +// var count int +// db := ctl.DB +// if len(namespace) == 0 { +// db.Model(&Daemonset{}).Count(&count) +// } else { +// db.Model(&Daemonset{}).Where("namespace = ?", namespace).Count(&count) +// } +// return count +//} diff --git a/pkg/models/controllers/deployments.go b/pkg/models/controllers/deployments.go index b2e4410a8..4ae7f097d 100644 --- a/pkg/models/controllers/deployments.go +++ b/pkg/models/controllers/deployments.go @@ -21,9 +21,8 @@ import ( "github.com/golang/glog" "k8s.io/api/apps/v1" - "k8s.io/client-go/informers" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" ) @@ -65,15 +64,11 @@ func (ctl *DeploymentCtl) generateObject(item v1.Deployment) *Deployment { App: app, UpdateTime: updateTime, Status: status, Annotation: Annotation{item.Annotations}} } -func (ctl *DeploymentCtl) listAndWatch() { - defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() +func (ctl *DeploymentCtl) Name() string { + return ctl.CommonAttribute.Name +} +func (ctl *DeploymentCtl) sync(stopChan chan struct{}) { db := ctl.DB if db.HasTable(&Deployment{}) { db.DropTable(&Deployment{}) @@ -81,12 +76,8 @@ func (ctl *DeploymentCtl) listAndWatch() { db = db.CreateTable(&Deployment{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Apps().V1().Deployments().Informer() - lister := kubeInformerFactory.Apps().V1().Deployments().Lister() - - list, err := lister.List(labels.Everything()) + ctl.initListerAndInformer() + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) return @@ -95,9 +86,29 @@ func (ctl *DeploymentCtl) listAndWatch() { for _, item := range list { obj := ctl.generateObject(*item) db.Create(obj) - } + ctl.informer.Run(stopChan) +} + +func (ctl *DeploymentCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", ctl.Name(), err) + return 0 + } + + return len(list) +} + +func (ctl *DeploymentCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + + ctl.lister = informerFactory.Apps().V1().Deployments().Lister() + + informer := informerFactory.Apps().V1().Deployments().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -118,8 +129,7 @@ func (ctl *DeploymentCtl) listAndWatch() { }, }) - - informer.Run(ctl.stopChan) + ctl.informer = informer } func (ctl *DeploymentCtl) CountWithConditions(conditions string) int { @@ -139,14 +149,3 @@ func (ctl *DeploymentCtl) ListWithConditions(conditions string, paging *Paging) return total, list, nil } - -func (ctl *DeploymentCtl) Count(namespace string) int { - var count int - db := ctl.DB - if len(namespace) == 0 { - db.Model(&Deployment{}).Count(&count) - } else { - db.Model(&Deployment{}).Where("namespace = ?", namespace).Count(&count) - } - return count -} diff --git a/pkg/models/controllers/ingresses.go b/pkg/models/controllers/ingresses.go index 59baeb269..74b6704c0 100644 --- a/pkg/models/controllers/ingresses.go +++ b/pkg/models/controllers/ingresses.go @@ -20,6 +20,8 @@ import ( "strings" "time" + "encoding/json" + "github.com/golang/glog" "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/labels" @@ -46,35 +48,41 @@ func (ctl *IngressCtl) generateObject(item v1beta1.Ingress) *Ingress { ip = strings.Join(ipList, ",") } - object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, Annotation: Annotation{item.Annotations}} + var ingRules []ingressRule + for _, rule := range item.Spec.Rules { + host := rule.Host + for _, path := range rule.HTTP.Paths { + var ingRule ingressRule + ingRule.Host = host + ingRule.Service = path.Backend.ServiceName + ingRule.Port = path.Backend.ServicePort.IntVal + ingRule.Path = path.Path + ingRules = append(ingRules, ingRule) + } + } + + ruleStr, _ := json.Marshal(ingRules) + + object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, Annotation: Annotation{item.Annotations}, Rules: string(ruleStr)} return object } -func (ctl *IngressCtl) listAndWatch() { - defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() +func (ctl *IngressCtl) Name() string { + return ctl.CommonAttribute.Name +} +func (ctl *IngressCtl) sync(stopChan chan struct{}) { db := ctl.DB if db.HasTable(&Ingress{}) { db.DropTable(&Ingress{}) - } db = db.CreateTable(&Ingress{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Extensions().V1beta1().Ingresses().Informer() - lister := kubeInformerFactory.Extensions().V1beta1().Ingresses().Lister() - - list, err := lister.List(labels.Everything()) + ctl.initListerAndInformer() + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) return @@ -83,9 +91,28 @@ func (ctl *IngressCtl) listAndWatch() { for _, item := range list { obj := ctl.generateObject(*item) db.Create(obj) - } + ctl.informer.Run(stopChan) +} + +func (ctl *IngressCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", err, ctl.Name()) + return 0 + } + return len(list) +} + +func (ctl *IngressCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + + ctl.lister = informerFactory.Extensions().V1beta1().Ingresses().Lister() + + informer := informerFactory.Extensions().V1beta1().Ingresses().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -107,7 +134,7 @@ func (ctl *IngressCtl) listAndWatch() { }, }) - informer.Run(ctl.stopChan) + ctl.informer = informer } func (ctl *IngressCtl) CountWithConditions(conditions string) int { @@ -128,13 +155,13 @@ func (ctl *IngressCtl) ListWithConditions(conditions string, paging *Paging) (in return total, list, nil } -func (ctl *IngressCtl) Count(namespace string) int { - var count int - db := ctl.DB - if len(namespace) == 0 { - db.Model(&Ingress{}).Count(&count) - } else { - db.Model(&Ingress{}).Where("namespace = ?", namespace).Count(&count) - } - return count -} +//func (ctl *IngressCtl) Count(namespace string) int { +// var count int +// db := ctl.DB +// if len(namespace) == 0 { +// db.Model(&Ingress{}).Count(&count) +// } else { +// db.Model(&Ingress{}).Where("namespace = ?", namespace).Count(&count) +// } +// return count +//} diff --git a/pkg/models/controllers/namespaces.go b/pkg/models/controllers/namespaces.go index 081e43465..1a2622305 100644 --- a/pkg/models/controllers/namespaces.go +++ b/pkg/models/controllers/namespaces.go @@ -19,9 +19,6 @@ package controllers import ( "encoding/json" "fmt" - "io/ioutil" - "net/http" - "strings" "time" "github.com/golang/glog" @@ -30,12 +27,13 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/kubernetes/pkg/util/slice" + "k8s.io/client-go/informers" + "kubesphere.io/kubesphere/pkg/client" "kubesphere.io/kubesphere/pkg/constants" "kubesphere.io/kubesphere/pkg/options" @@ -69,25 +67,6 @@ type DeleteRunTime struct { RuntimeId []string `json:"runtime_id"` } -func makeHttpRequest(method, url, data string) ([]byte, error) { - req, err := http.NewRequest(method, url, strings.NewReader(data)) - if err != nil { - glog.Error(err) - return nil, err - } - - httpClient := &http.Client{} - resp, err := httpClient.Do(req) - if err != nil { - glog.Error(err) - return nil, err - } - - body, err := ioutil.ReadAll(resp.Body) - defer resp.Body.Close() - return body, err -} - func (ctl *NamespaceCtl) getKubeConfig(user string) (string, error) { k8sClient := client.NewK8sClient() configmap, err := k8sClient.CoreV1().ConfigMaps(kubectlNamespace).Get(user, metaV1.GetOptions{}) @@ -262,15 +241,11 @@ func (ctl *NamespaceCtl) generateObject(item v1.Namespace) *Namespace { return object } -func (ctl *NamespaceCtl) listAndWatch() { - defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() +func (ctl *NamespaceCtl) Name() string { + return ctl.CommonAttribute.Name +} +func (ctl *NamespaceCtl) sync(stopChan chan struct{}) { db := ctl.DB if db.HasTable(&Namespace{}) { @@ -279,12 +254,8 @@ func (ctl *NamespaceCtl) listAndWatch() { db = db.CreateTable(&Namespace{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Core().V1().Namespaces().Informer() - lister := kubeInformerFactory.Core().V1().Namespaces().Lister() - - list, err := lister.List(labels.Everything()) + ctl.initListerAndInformer() + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) return @@ -294,9 +265,28 @@ func (ctl *NamespaceCtl) listAndWatch() { obj := ctl.generateObject(*item) db.Create(obj) ctl.createRoleAndRuntime(*item) - } + ctl.informer.Run(stopChan) +} + +func (ctl *NamespaceCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", err, ctl.Name()) + return 0 + } + return len(list) +} + +func (ctl *NamespaceCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + + ctl.lister = informerFactory.Core().V1().Namespaces().Lister() + + informer := informerFactory.Core().V1().Namespaces().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -321,7 +311,7 @@ func (ctl *NamespaceCtl) listAndWatch() { }, }) - informer.Run(ctl.stopChan) + ctl.informer = informer } func (ctl *NamespaceCtl) CountWithConditions(conditions string) int { @@ -339,26 +329,21 @@ func (ctl *NamespaceCtl) ListWithConditions(conditions string, paging *Paging) ( listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order) - for index := range list { - usage, err := ctl.GetNamespaceQuota(list[index].Name) - if err == nil { - list[index].Usaeg = usage + if paging != nil { + for index := range list { + usage, err := ctl.GetNamespaceQuota(list[index].Name) + if err == nil { + list[index].Usaeg = usage + } } - } + return total, list, nil } -func (ctl *NamespaceCtl) Count(namespace string) int { - var count int - db := ctl.DB - db.Model(&Namespace{}).Count(&count) - return count -} - func getUsage(namespace, resource string) int { - ctl := rec.controllers[resource] - return ctl.Count(namespace) + ctl := ResourceControllers.Controllers[resource] + return ctl.CountWithConditions(fmt.Sprintf("namespace = '%s' ", namespace)) } func (ctl *NamespaceCtl) GetNamespaceQuota(namespace string) (v1.ResourceList, error) { @@ -373,7 +358,7 @@ func (ctl *NamespaceCtl) GetNamespaceQuota(namespace string) (v1.ResourceList, e usage[v1.ResourceName(resourceName)] = quantity } - podCtl := rec.controllers[Pods] + podCtl := ResourceControllers.Controllers[Pods] var quantity resource.Quantity used := podCtl.CountWithConditions(fmt.Sprintf("status=\"%s\" And namespace=\"%s\"", "Running", namespace)) quantity.Set(int64(used)) diff --git a/pkg/models/controllers/pods.go b/pkg/models/controllers/pods.go index 0249927ca..62515ecad 100644 --- a/pkg/models/controllers/pods.go +++ b/pkg/models/controllers/pods.go @@ -199,25 +199,24 @@ func (ctl *PodCtl) generateObject(item v1.Pod) *Pod { return object } -func (ctl *PodCtl) listAndWatch() { +func (ctl *PodCtl) Name() string { + return ctl.CommonAttribute.Name +} + +func (ctl *PodCtl) sync(stopChan chan struct{}) { db := ctl.DB if db.HasTable(&Pod{}) { db.DropTable(&Pod{}) - } db = db.CreateTable(&Pod{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Core().V1().Pods().Informer() - lister := kubeInformerFactory.Core().V1().Pods().Lister() - - list, err := lister.List(labels.Everything()) + ctl.initListerAndInformer() + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) - panic(err) + return } for _, item := range list { @@ -225,6 +224,26 @@ func (ctl *PodCtl) listAndWatch() { db.Create(obj) } + ctl.informer.Run(stopChan) +} + +func (ctl *PodCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", err, ctl.Name()) + return 0 + } + return len(list) +} + +func (ctl *PodCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + + ctl.lister = informerFactory.Core().V1().Pods().Lister() + + informer := informerFactory.Core().V1().Pods().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { object := obj.(*v1.Pod) @@ -249,7 +268,7 @@ func (ctl *PodCtl) listAndWatch() { }, }) - informer.Run(ctl.stopChan) + ctl.informer = informer } func (ctl *PodCtl) CountWithConditions(conditions string) int { @@ -270,13 +289,13 @@ func (ctl *PodCtl) ListWithConditions(conditions string, paging *Paging) (int, i return total, list, nil } -func (ctl *PodCtl) Count(namespace string) int { - var count int - db := ctl.DB - if len(namespace) == 0 { - db.Model(&Pod{}).Count(&count) - } else { - db.Model(&Pod{}).Where("namespace = ?", namespace).Count(&count) - } - return count -} +//func (ctl *PodCtl) Count(namespace string) int { +// var count int +// db := ctl.DB +// if len(namespace) == 0 { +// db.Model(&Pod{}).Count(&count) +// } else { +// db.Model(&Pod{}).Where("namespace = ?", namespace).Count(&count) +// } +// return count +//} diff --git a/pkg/models/controllers/pvcs.go b/pkg/models/controllers/pvcs.go index 8b1b09a5e..6295cb9f4 100644 --- a/pkg/models/controllers/pvcs.go +++ b/pkg/models/controllers/pvcs.go @@ -64,30 +64,21 @@ func (ctl *PvcCtl) generateObject(item *v1.PersistentVolumeClaim) *Pvc { return object } -func (ctl *PvcCtl) listAndWatch() { - defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() +func (ctl *PvcCtl) Name() string { + return ctl.CommonAttribute.Name +} +func (ctl *PvcCtl) sync(stopChan chan struct{}) { db := ctl.DB if db.HasTable(&Pvc{}) { db.DropTable(&Pvc{}) - } db = db.CreateTable(&Pvc{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Informer() - lister := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Lister() - - list, err := lister.List(labels.Everything()) + ctl.initListerAndInformer() + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) return @@ -96,9 +87,28 @@ func (ctl *PvcCtl) listAndWatch() { for _, item := range list { obj := ctl.generateObject(item) db.Create(obj) - } + ctl.informer.Run(stopChan) +} + +func (ctl *PvcCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", err, ctl.Name()) + return 0 + } + return len(list) +} + +func (ctl *PvcCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + + ctl.lister = informerFactory.Core().V1().PersistentVolumeClaims().Lister() + + informer := informerFactory.Core().V1().PersistentVolumeClaims().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -119,7 +129,7 @@ func (ctl *PvcCtl) listAndWatch() { }, }) - informer.Run(ctl.stopChan) + ctl.informer = informer } func (ctl *PvcCtl) CountWithConditions(conditions string) int { @@ -153,13 +163,13 @@ func (ctl *PvcCtl) ListWithConditions(conditions string, paging *Paging) (int, i return total, list, nil } -func (ctl *PvcCtl) Count(namespace string) int { - var count int - db := ctl.DB - if len(namespace) == 0 { - db.Model(&Pvc{}).Count(&count) - } else { - db.Model(&Pvc{}).Where("namespace = ?", namespace).Count(&count) - } - return count -} +//func (ctl *PvcCtl) Count(namespace string) int { +// var count int +// db := ctl.DB +// if len(namespace) == 0 { +// db.Model(&Pvc{}).Count(&count) +// } else { +// db.Model(&Pvc{}).Where("namespace = ?", namespace).Count(&count) +// } +// return count +//} diff --git a/pkg/models/controllers/roles.go b/pkg/models/controllers/roles.go index e99f06057..5f6020b2b 100644 --- a/pkg/models/controllers/roles.go +++ b/pkg/models/controllers/roles.go @@ -29,7 +29,7 @@ import ( func (ctl *RoleCtl) generateObject(item v1.Role) *Role { name := item.Name - if strings.HasPrefix(name, "system:") { + if strings.HasPrefix(name, systemPrefix) { return nil } namespace := item.Namespace @@ -43,30 +43,21 @@ func (ctl *RoleCtl) generateObject(item v1.Role) *Role { return object } -func (ctl *RoleCtl) listAndWatch() { - defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() +func (ctl *RoleCtl) Name() string { + return ctl.CommonAttribute.Name +} +func (ctl *RoleCtl) sync(stopChan chan struct{}) { db := ctl.DB if db.HasTable(&Role{}) { db.DropTable(&Role{}) - } db = db.CreateTable(&Role{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Rbac().V1().Roles().Informer() - lister := kubeInformerFactory.Rbac().V1().Roles().Lister() - - list, err := lister.List(labels.Everything()) + ctl.initListerAndInformer() + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) return @@ -74,10 +65,39 @@ func (ctl *RoleCtl) listAndWatch() { for _, item := range list { obj := ctl.generateObject(*item) - db.Create(obj) - + if obj != nil { + db.Create(obj) + } } + ctl.informer.Run(stopChan) +} + +func (ctl *RoleCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", err, ctl.Name()) + return 0 + } + + count := 0 + for _, item := range list { + if !strings.HasPrefix(item.Name, systemPrefix) { + count++ + } + } + + return count +} + +func (ctl *RoleCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + + ctl.lister = informerFactory.Rbac().V1().Roles().Lister() + + informer := informerFactory.Rbac().V1().Roles().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -103,7 +123,7 @@ func (ctl *RoleCtl) listAndWatch() { }, }) - informer.Run(ctl.stopChan) + ctl.informer = informer } func (ctl *RoleCtl) CountWithConditions(conditions string) int { @@ -124,9 +144,9 @@ func (ctl *RoleCtl) ListWithConditions(conditions string, paging *Paging) (int, return total, list, nil } -func (ctl *RoleCtl) Count(namespace string) int { - var count int - db := ctl.DB - db.Model(&Role{}).Where("namespace = ?", namespace).Count(&count) - return count -} +//func (ctl *RoleCtl) Count(namespace string) int { +// var count int +// db := ctl.DB +// db.Model(&Role{}).Where("namespace = ?", namespace).Count(&count) +// return count +//} diff --git a/pkg/models/controllers/run.go b/pkg/models/controllers/run.go index db4f0f9fe..4388ab2d3 100644 --- a/pkg/models/controllers/run.go +++ b/pkg/models/controllers/run.go @@ -27,45 +27,46 @@ import ( ) type resourceControllers struct { - controllers map[string]Controller + Controllers map[string]Controller k8sClient *kubernetes.Clientset } var stopChan chan struct{} -var rec resourceControllers +var ResourceControllers resourceControllers func (rec *resourceControllers) runContoller(name string) { var ctl Controller - attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan, aliveChan: make(chan struct{})} + attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan, + aliveChan: make(chan struct{}), Name: name} switch name { case Deployments: - ctl = &DeploymentCtl{attr} + ctl = &DeploymentCtl{CommonAttribute: attr} case Statefulsets: - ctl = &StatefulsetCtl{attr} + ctl = &StatefulsetCtl{CommonAttribute: attr} case Daemonsets: - ctl = &DaemonsetCtl{attr} + ctl = &DaemonsetCtl{CommonAttribute: attr} case Ingresses: - ctl = &IngressCtl{attr} + ctl = &IngressCtl{CommonAttribute: attr} case PersistentVolumeClaim: - ctl = &PvcCtl{attr} + ctl = &PvcCtl{CommonAttribute: attr} case Roles: - ctl = &RoleCtl{attr} + ctl = &RoleCtl{CommonAttribute: attr} case ClusterRoles: - ctl = &ClusterRoleCtl{attr} + ctl = &ClusterRoleCtl{CommonAttribute: attr} case Services: - ctl = &ServiceCtl{attr} + ctl = &ServiceCtl{CommonAttribute: attr} case Pods: - ctl = &PodCtl{attr} + ctl = &PodCtl{CommonAttribute: attr} case Namespaces: - ctl = &NamespaceCtl{attr} + ctl = &NamespaceCtl{CommonAttribute: attr} case StorageClasses: - ctl = &StorageClassCtl{attr} + ctl = &StorageClassCtl{CommonAttribute: attr} default: return } - rec.controllers[name] = ctl - go ctl.listAndWatch() + rec.Controllers[name] = ctl + go listAndWatch(ctl) } @@ -93,22 +94,23 @@ func Run() { stopChan := make(chan struct{}) defer close(stopChan) - rec = resourceControllers{k8sClient: client.NewK8sClient(), controllers: make(map[string]Controller)} + k8sClient := client.NewK8sClient() + ResourceControllers = resourceControllers{k8sClient: k8sClient, Controllers: make(map[string]Controller)} for _, item := range []string{Deployments, Statefulsets, Daemonsets, PersistentVolumeClaim, Pods, Services, Ingresses, Roles, ClusterRoles, Namespaces, StorageClasses} { - rec.runContoller(item) + ResourceControllers.runContoller(item) } go dbHealthCheck(client.NewDBClient()) for { - for ctlName, controller := range rec.controllers { + for ctlName, controller := range ResourceControllers.Controllers { select { case _, isClose := <-controller.chanAlive(): if !isClose { glog.Errorf("controller %s have stopped, restart it", ctlName) - rec.runContoller(ctlName) + ResourceControllers.runContoller(ctlName) } default: time.Sleep(5 * time.Second) diff --git a/pkg/models/controllers/services.go b/pkg/models/controllers/services.go index 2ebc40c50..026278265 100644 --- a/pkg/models/controllers/services.go +++ b/pkg/models/controllers/services.go @@ -26,7 +26,7 @@ import ( "k8s.io/client-go/tools/cache" ) -func (ctl *ServiceCtl) loadBalancerStatusStringer(item v1.Service) string { +func loadBalancerStatusStringer(item v1.Service) string { ingress := item.Status.LoadBalancer.Ingress result := sets.NewString() for i := range ingress { @@ -41,7 +41,7 @@ func (ctl *ServiceCtl) loadBalancerStatusStringer(item v1.Service) string { return r } -func (ctl *ServiceCtl) getExternalIp(item v1.Service) string { +func getExternalIp(item v1.Service) string { switch item.Spec.Type { case "ClusterIP", "NodePort": if len(item.Spec.ExternalIPs) > 0 { @@ -51,7 +51,7 @@ func (ctl *ServiceCtl) getExternalIp(item v1.Service) string { return item.Spec.ExternalName case "LoadBalancer": - lbIps := ctl.loadBalancerStatusStringer(item) + lbIps := loadBalancerStatusStringer(item) if len(item.Spec.ExternalIPs) > 0 { results := []string{} if len(lbIps) > 0 { @@ -68,12 +68,11 @@ func (ctl *ServiceCtl) getExternalIp(item v1.Service) string { return "" } -func (ctl *ServiceCtl) generateObject(item v1.Service) *Service { - +func generateSvcObject(item v1.Service) *Service { name := item.Name namespace := item.Namespace createTime := item.CreationTimestamp.Time - externalIp := ctl.getExternalIp(item) + externalIp := getExternalIp(item) serviceType := item.Spec.Type vip := item.Spec.ClusterIP ports := "" @@ -129,17 +128,18 @@ func (ctl *ServiceCtl) generateObject(item v1.Service) *Service { } return object + } -func (ctl *ServiceCtl) listAndWatch() { - defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() +func (ctl *ServiceCtl) generateObject(item v1.Service) *Service { + return generateSvcObject(item) +} +func (ctl *ServiceCtl) Name() string { + return ctl.CommonAttribute.Name +} + +func (ctl *ServiceCtl) sync(stopChan chan struct{}) { db := ctl.DB if db.HasTable(&Service{}) { @@ -148,12 +148,8 @@ func (ctl *ServiceCtl) listAndWatch() { db = db.CreateTable(&Service{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Core().V1().Services().Informer() - lister := kubeInformerFactory.Core().V1().Services().Lister() - - list, err := lister.List(labels.Everything()) + ctl.initListerAndInformer() + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) return @@ -164,6 +160,25 @@ func (ctl *ServiceCtl) listAndWatch() { db.Create(obj) } + ctl.informer.Run(stopChan) +} + +func (ctl *ServiceCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", err, ctl.Name()) + return 0 + } + return len(list) +} + +func (ctl *ServiceCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + ctl.lister = informerFactory.Core().V1().Services().Lister() + + informer := informerFactory.Core().V1().Services().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -185,7 +200,7 @@ func (ctl *ServiceCtl) listAndWatch() { }, }) - informer.Run(ctl.stopChan) + ctl.informer = informer } func (ctl *ServiceCtl) CountWithConditions(conditions string) int { @@ -205,14 +220,3 @@ func (ctl *ServiceCtl) ListWithConditions(conditions string, paging *Paging) (in return total, list, nil } - -func (ctl *ServiceCtl) Count(namespace string) int { - var count int - db := ctl.DB - if len(namespace) == 0 { - db.Model(&Service{}).Count(&count) - } else { - db.Model(&Service{}).Where("namespace = ?", namespace).Count(&count) - } - return count -} diff --git a/pkg/models/controllers/statefulsets.go b/pkg/models/controllers/statefulsets.go index 0d74d8ad3..372b25466 100644 --- a/pkg/models/controllers/statefulsets.go +++ b/pkg/models/controllers/statefulsets.go @@ -62,27 +62,21 @@ func (ctl *StatefulsetCtl) generateObject(item v1.StatefulSet) *Statefulset { return statefulSetObject } -func (ctl *StatefulsetCtl) listAndWatch() { - defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() +func (ctl *StatefulsetCtl) Name() string { + return ctl.CommonAttribute.Name +} +func (ctl *StatefulsetCtl) sync(stopChan chan struct{}) { db := ctl.DB + if db.HasTable(&Statefulset{}) { db.DropTable(&Statefulset{}) } db = db.CreateTable(&Statefulset{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Apps().V1().StatefulSets().Informer() - lister := kubeInformerFactory.Apps().V1().StatefulSets().Lister() - list, err := lister.List(labels.Everything()) + ctl.initListerAndInformer() + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) return @@ -91,9 +85,28 @@ func (ctl *StatefulsetCtl) listAndWatch() { for _, item := range list { obj := ctl.generateObject(*item) db.Create(obj) - } + ctl.informer.Run(stopChan) +} + +func (ctl *StatefulsetCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", err, ctl.Name()) + return 0 + } + return len(list) +} + +func (ctl *StatefulsetCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + + ctl.lister = informerFactory.Apps().V1().StatefulSets().Lister() + + informer := informerFactory.Apps().V1().StatefulSets().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -115,7 +128,7 @@ func (ctl *StatefulsetCtl) listAndWatch() { }, }) - informer.Run(ctl.stopChan) + ctl.informer = informer } func (ctl *StatefulsetCtl) CountWithConditions(conditions string) int { @@ -135,14 +148,3 @@ func (ctl *StatefulsetCtl) ListWithConditions(conditions string, paging *Paging) return total, list, nil } - -func (ctl *StatefulsetCtl) Count(namespace string) int { - var count int - db := ctl.DB - if len(namespace) == 0 { - db.Model(&Statefulset{}).Count(&count) - } else { - db.Model(&Statefulset{}).Where("namespace = ?", namespace).Count(&count) - } - return count -} diff --git a/pkg/models/controllers/storageclasses.go b/pkg/models/controllers/storageclasses.go index db7f8a57d..92bb0f732 100644 --- a/pkg/models/controllers/storageclasses.go +++ b/pkg/models/controllers/storageclasses.go @@ -46,15 +46,11 @@ func (ctl *StorageClassCtl) generateObject(item v1.StorageClass) *StorageClass { return object } -func (ctl *StorageClassCtl) listAndWatch() { - defer func() { - close(ctl.aliveChan) - if err := recover(); err != nil { - glog.Error(err) - return - } - }() +func (ctl *StorageClassCtl) Name() string { + return ctl.CommonAttribute.Name +} +func (ctl *StorageClassCtl) sync(stopChan chan struct{}) { db := ctl.DB if db.HasTable(&StorageClass{}) { @@ -63,12 +59,8 @@ func (ctl *StorageClassCtl) listAndWatch() { db = db.CreateTable(&StorageClass{}) - k8sClient := ctl.K8sClient - kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle) - informer := kubeInformerFactory.Storage().V1().StorageClasses().Informer() - lister := kubeInformerFactory.Storage().V1().StorageClasses().Lister() - - list, err := lister.List(labels.Everything()) + ctl.initListerAndInformer() + list, err := ctl.lister.List(labels.Everything()) if err != nil { glog.Error(err) return @@ -77,12 +69,30 @@ func (ctl *StorageClassCtl) listAndWatch() { for _, item := range list { obj := ctl.generateObject(*item) db.Create(obj) - } + ctl.informer.Run(stopChan) +} + +func (ctl *StorageClassCtl) total() int { + list, err := ctl.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("count %s falied, reason:%s", err, ctl.Name()) + return 0 + } + return len(list) +} + +func (ctl *StorageClassCtl) initListerAndInformer() { + db := ctl.DB + + informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle) + + ctl.lister = informerFactory.Storage().V1().StorageClasses().Lister() + + informer := informerFactory.Storage().V1().StorageClasses().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - object := obj.(*v1.StorageClass) mysqlObject := ctl.generateObject(*object) db.Create(mysqlObject) @@ -101,8 +111,7 @@ func (ctl *StorageClassCtl) listAndWatch() { }, }) - informer.Run(ctl.stopChan) - + ctl.informer = informer } func (ctl *StorageClassCtl) CountWithConditions(conditions string) int { @@ -122,17 +131,10 @@ func (ctl *StorageClassCtl) ListWithConditions(conditions string, paging *Paging for index, storageClass := range list { name := storageClass.Name - pvcCtl := PvcCtl{CommonAttribute{K8sClient: ctl.K8sClient, DB: ctl.DB}} + pvcCtl := ResourceControllers.Controllers[PersistentVolumeClaim] list[index].Count = pvcCtl.CountWithConditions(fmt.Sprintf("storage_class=\"%s\"", name)) } return total, list, nil } - -func (ctl *StorageClassCtl) Count(name string) int { - var count int - db := ctl.DB - db.Model(&StorageClass{}).Count(&count) - return count -} diff --git a/pkg/models/controllers/types.go b/pkg/models/controllers/types.go index 5a48d34f0..3f640e7d8 100644 --- a/pkg/models/controllers/types.go +++ b/pkg/models/controllers/types.go @@ -26,10 +26,16 @@ import ( "github.com/jinzhu/gorm" "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" + appV1 "k8s.io/client-go/listers/apps/v1" + coreV1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/listers/extensions/v1beta1" + rbacV1 "k8s.io/client-go/listers/rbac/v1" + storageV1 "k8s.io/client-go/listers/storage/v1" + "k8s.io/client-go/tools/cache" ) const ( - resyncCircle = 180 + resyncCircle = 600 Stopped = "stopped" PvcPending = "Pending" Running = "running" @@ -57,6 +63,7 @@ const ( ClusterRoles = "cluster-roles" Services = "services" StorageClasses = "storage-classes" + Applications = "applications" ) type Annotation struct { @@ -163,10 +170,18 @@ func (Pvc) TableName() string { return tablePersistentVolumeClaim } +type ingressRule struct { + Host string `json:"host"` + Path string `json:"path"` + Service string `json:"service"` + Port int32 `json:"port"` +} + type Ingress struct { Name string `gorm:"primary_key" json:"name"` Namespace string `gorm:"primary_key" json:"namespace"` Ip string `json:"ip,omitempty"` + Rules string `json:"rules, omitempty"` TlsTermination string `json:"tlsTermination,omitempty"` Annotation Annotation `json:"annotations"` CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"` @@ -273,16 +288,19 @@ type Paging struct { } type Controller interface { - listAndWatch() chanStop() chan struct{} chanAlive() chan struct{} - Count(namespace string) int CountWithConditions(condition string) int + total() int + initListerAndInformer() + sync(stopChan chan struct{}) + Name() string ListWithConditions(condition string, paging *Paging) (int, interface{}, error) } type CommonAttribute struct { K8sClient *kubernetes.Clientset + Name string DB *gorm.DB stopChan chan struct{} aliveChan chan struct{} @@ -300,44 +318,66 @@ func (ca *CommonAttribute) chanAlive() chan struct{} { type DeploymentCtl struct { CommonAttribute + lister appV1.DeploymentLister + informer cache.SharedIndexInformer } type StatefulsetCtl struct { CommonAttribute + lister appV1.StatefulSetLister + informer cache.SharedIndexInformer } type DaemonsetCtl struct { CommonAttribute + lister appV1.DaemonSetLister + informer cache.SharedIndexInformer } type ServiceCtl struct { CommonAttribute + lister coreV1.ServiceLister + informer cache.SharedIndexInformer } type PvcCtl struct { CommonAttribute + lister coreV1.PersistentVolumeClaimLister + informer cache.SharedIndexInformer } type PodCtl struct { CommonAttribute + lister coreV1.PodLister + informer cache.SharedIndexInformer } type IngressCtl struct { + lister v1beta1.IngressLister + informer cache.SharedIndexInformer CommonAttribute } type NamespaceCtl struct { CommonAttribute + lister coreV1.NamespaceLister + informer cache.SharedIndexInformer } type StorageClassCtl struct { + lister storageV1.StorageClassLister + informer cache.SharedIndexInformer CommonAttribute } type RoleCtl struct { + lister rbacV1.RoleLister + informer cache.SharedIndexInformer CommonAttribute } type ClusterRoleCtl struct { + lister rbacV1.ClusterRoleLister + informer cache.SharedIndexInformer CommonAttribute } diff --git a/pkg/models/kubeconfig.go b/pkg/models/kubeconfig.go index 761c2ef05..a0d80483a 100644 --- a/pkg/models/kubeconfig.go +++ b/pkg/models/kubeconfig.go @@ -31,11 +31,12 @@ import ( "github.com/golang/glog" "gopkg.in/yaml.v2" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/api/core/v1" + "kubesphere.io/kubesphere/pkg/client" "kubesphere.io/kubesphere/pkg/constants" "kubesphere.io/kubesphere/pkg/options" @@ -46,6 +47,7 @@ const ( keyPath = "/etc/kubernetes/pki/ca.key" clusterName = "kubernetes" kubectlConfigKey = "config" + defaultNamespace = "default" ) type clusterInfo struct { @@ -59,8 +61,9 @@ type cluster struct { } type contextInfo struct { - Cluster string `yaml:"cluster"` - User string `yaml:"user"` + Cluster string `yaml:"cluster"` + User string `yaml:"user"` + NameSpace string `yaml:"namespace"` } type contextObject struct { @@ -186,14 +189,14 @@ func newCertificate(info CertInformation) *x509.Certificate { } func generateCaAndKey(user, caPath, keyPath string) (string, string, error) { - crtinfo := CertInformation{CommonName: user, IsCA: false} + crtInfo := CertInformation{CommonName: user, IsCA: false} crt, pri, err := Parse(caPath, keyPath) if err != nil { glog.Error(err) return "", "", err } - cert, key, err := createCRT(crt, pri, crtinfo) + cert, key, err := createCRT(crt, pri, crtInfo) if err != nil { glog.Error(err) return "", "", err @@ -217,7 +220,7 @@ func createKubeConfig(userName string) (string, error) { tmpKubeConfig.Clusters = append(tmpKubeConfig.Clusters, tmpCluster) contextName := userName + "@" + clusterName - tmpContext := contextObject{Context: contextInfo{User: userName, Cluster: clusterName}, Name: contextName} + tmpContext := contextObject{Context: contextInfo{User: userName, Cluster: clusterName, NameSpace: defaultNamespace}, Name: contextName} tmpKubeConfig.Contexts = append(tmpKubeConfig.Contexts, tmpContext) cert, key, err := generateCaAndKey(userName, caPath, keyPath) @@ -240,42 +243,48 @@ func createKubeConfig(userName string) (string, error) { func CreateKubeConfig(user string) error { k8sClient := client.NewK8sClient() - config, err := createKubeConfig(user) - if err != nil { - glog.Errorln(err) - return err + + _, err := k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Get(user, metaV1.GetOptions{}) + + if errors.IsNotFound(err) { + config, err := createKubeConfig(user) + if err != nil { + glog.Errorln(err) + return err + } + + data := map[string]string{"config": string(config)} + configMap := v1.ConfigMap{TypeMeta: metaV1.TypeMeta{Kind: "Configmap", APIVersion: "v1"}, ObjectMeta: metaV1.ObjectMeta{Name: user}, Data: data} + _, err = k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Create(&configMap) + if err != nil && !errors.IsAlreadyExists(err) { + glog.Errorf("create user %s's kubeConfig failed, reason:", user, err) + return err + } } - data := map[string]string{"config": string(config)} - var configmap = v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "Configmap", APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{Name: user}, Data: data} - _, err = k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Create(&configmap) - if err != nil && !errors.IsAlreadyExists(err) { - glog.Errorf("create user %s's kubeConfig failed, reason:", user, err) - return err - } return nil } func GetKubeConfig(user string) (string, error) { k8sClient := client.NewK8sClient() - configmap, err := k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Get(user, metav1.GetOptions{}) + configMap, err := k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Get(user, metaV1.GetOptions{}) if err != nil { glog.Errorf("cannot get user %s's kubeConfig, reason:", user, err) return "", err } - return configmap.Data[kubectlConfigKey], nil + return configMap.Data[kubectlConfigKey], nil } func DelKubeConfig(user string) error { k8sClient := client.NewK8sClient() - _, err := k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Get(user, metav1.GetOptions{}) + _, err := k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Get(user, metaV1.GetOptions{}) if errors.IsNotFound(err) { return nil } - deletePolicy := metav1.DeletePropagationBackground - err = k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Delete(user, &metav1.DeleteOptions{PropagationPolicy: &deletePolicy}) + deletePolicy := metaV1.DeletePropagationBackground + err = k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Delete(user, &metaV1.DeleteOptions{PropagationPolicy: &deletePolicy}) if err != nil { glog.Errorf("delete user %s's kubeConfig failed, reason:", user, err) return err diff --git a/pkg/models/kubectl.go b/pkg/models/kubectl.go index 3a9b0f8d5..70aae7ccb 100644 --- a/pkg/models/kubectl.go +++ b/pkg/models/kubectl.go @@ -35,21 +35,20 @@ import ( const ( namespace = constants.KubeSphereControlNamespace - retry = 5 ) -type kubectlPodInfo struct { +type KubectlPodInfo struct { Namespace string `json:"namespace"` Pod string `json:"pod"` Container string `json:"container"` } -func GetKubectlPod(user string) (kubectlPodInfo, error) { +func GetKubectlPod(user string) (KubectlPodInfo, error) { k8sClient := client.NewK8sClient() deploy, err := k8sClient.AppsV1beta2().Deployments(namespace).Get(user, metav1.GetOptions{}) if err != nil { glog.Errorln(err) - return kubectlPodInfo{}, err + return KubectlPodInfo{}, err } selectors := deploy.Spec.Selector.MatchLabels @@ -57,16 +56,16 @@ func GetKubectlPod(user string) (kubectlPodInfo, error) { podList, err := k8sClient.CoreV1().Pods(namespace).List(metav1.ListOptions{LabelSelector: labelSelector}) if err != nil { glog.Errorln(err) - return kubectlPodInfo{}, err + return KubectlPodInfo{}, err } pod, err := selectCorrectPod(namespace, podList.Items) if err != nil { glog.Errorln(err) - return kubectlPodInfo{}, err + return KubectlPodInfo{}, err } - info := kubectlPodInfo{Namespace: pod.Namespace, Pod: pod.Name, Container: pod.Status.ContainerStatuses[0].Name} + info := KubectlPodInfo{Namespace: pod.Namespace, Pod: pod.Name, Container: pod.Status.ContainerStatuses[0].Name} return info, nil @@ -91,7 +90,12 @@ func selectCorrectPod(namespace string, pods []v1.Pod) (kubectlPod v1.Pod, err e return kubectPodList[random], nil } -func CreateKubectlPod(user string) error { +func CreateKubectlDeploy(user string) error { + k8sClient := client.NewK8sClient() + _, err := k8sClient.AppsV1().Deployments(namespace).Get(user, metav1.GetOptions{}) + if err == nil { + return nil + } replica := int32(1) selector := metav1.LabelSelector{MatchLabels: map[string]string{"user": user}} @@ -122,17 +126,12 @@ func CreateKubectlPod(user string) error { }, } - k8sClient := client.NewK8sClient() - _, err := k8sClient.AppsV1beta2().Deployments(namespace).Create(&deployment) - - if errors.IsAlreadyExists(err) { - return nil - } + _, err = k8sClient.AppsV1beta2().Deployments(namespace).Create(&deployment) return err } -func DelKubectlPod(user string) error { +func DelKubectlDeploy(user string) error { k8sClient := client.NewK8sClient() _, err := k8sClient.AppsV1beta2().Deployments(namespace).Get(user, metav1.GetOptions{}) if errors.IsNotFound(err) { diff --git a/pkg/models/quota.go b/pkg/models/quota.go index a9985dadc..5cc29ba70 100644 --- a/pkg/models/quota.go +++ b/pkg/models/quota.go @@ -22,6 +22,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "fmt" + "kubesphere.io/kubesphere/pkg/client" "kubesphere.io/kubesphere/pkg/models/controllers" ) @@ -55,7 +57,12 @@ func getUsage(namespace, resource string) int { if err != nil { return 0 } - return ctl.Count(namespace) + + if len(namespace) == 0 { + return ctl.CountWithConditions("") + } + + return ctl.CountWithConditions(fmt.Sprintf("namespace = '%s' ", namespace)) } func GetClusterQuota() (*ResourceQuota, error) { diff --git a/pkg/models/resources.go b/pkg/models/resources.go index 3e6f8630d..b44774e8d 100644 --- a/pkg/models/resources.go +++ b/pkg/models/resources.go @@ -6,8 +6,15 @@ import ( "strconv" "strings" - "kubesphere.io/kubesphere/pkg/client" + "github.com/golang/glog" + "kubesphere.io/kubesphere/pkg/models/controllers" + "kubesphere.io/kubesphere/pkg/options" +) + +const ( + limit = "limit" + page = "page" ) type ResourceList struct { @@ -18,35 +25,16 @@ type ResourceList struct { } func getController(resource string) (controllers.Controller, error) { - var ctl controllers.Controller - attr := controllers.CommonAttribute{DB: client.NewDBClient()} switch resource { - case controllers.Deployments: - ctl = &controllers.DeploymentCtl{attr} - case controllers.Statefulsets: - ctl = &controllers.StatefulsetCtl{attr} - case controllers.Daemonsets: - ctl = &controllers.DaemonsetCtl{attr} - case controllers.Ingresses: - ctl = &controllers.IngressCtl{attr} - case controllers.PersistentVolumeClaim: - ctl = &controllers.PvcCtl{attr} - case controllers.Roles: - ctl = &controllers.RoleCtl{attr} - case controllers.ClusterRoles: - ctl = &controllers.ClusterRoleCtl{attr} - case controllers.Services: - ctl = &controllers.ServiceCtl{attr} - case controllers.Pods: - ctl = &controllers.PodCtl{attr} - case controllers.Namespaces: - ctl = &controllers.NamespaceCtl{attr} - case controllers.StorageClasses: - ctl = &controllers.StorageClassCtl{attr} + case controllers.Deployments, controllers.Statefulsets, controllers.Daemonsets, controllers.Ingresses, + controllers.PersistentVolumeClaim, controllers.Roles, controllers.ClusterRoles, controllers.Services, + controllers.Pods, controllers.Namespaces, controllers.StorageClasses: + + return controllers.ResourceControllers.Controllers[resource], nil default: - return nil, errors.New("invalid resource type") + return nil, fmt.Errorf("invalid resource Name '%s'", resource) } - return ctl, nil + return nil, nil } @@ -90,26 +78,44 @@ func getConditions(str string) (map[string]string, map[string]string, error) { return match, fuzzy, nil } -func getPaging(str string) (map[string]int, error) { - paging := make(map[string]int) - if len(str) == 0 { - return paging, nil +func getPaging(resourceName, pagingStr string) (*controllers.Paging, map[string]int, error) { + defaultPaging := &controllers.Paging{Limit: 10, Offset: 0} + defautlPagingMap := map[string]int{"page": 1, "limit": 10} + if resourceName == controllers.Namespaces { + defaultPaging = nil + defautlPagingMap = map[string]int{"page": 0, "limit": 0} } - list := strings.Split(str, ",") + pagingMap := make(map[string]int) + + if len(pagingStr) == 0 { + return defaultPaging, defautlPagingMap, nil + } + + list := strings.Split(pagingStr, ",") for _, item := range list { kvs := strings.Split(item, "=") if len(kvs) < 2 { - return nil, errors.New("invalid Paging input") + return nil, nil, errors.New("invalid Paging input") } value, err := strconv.Atoi(kvs[1]) if err != nil { - return nil, err + return nil, nil, errors.New("invalid Paging input") } - paging[kvs[0]] = value + pagingMap[kvs[0]] = value } - return paging, nil + + if pagingMap[limit] <= 0 || pagingMap[page] <= 0 { + return nil, nil, errors.New("invalid Paging input") + } + + if pagingMap[limit] > 0 && pagingMap[page] > 0 { + offset := (pagingMap[page] - 1) * pagingMap[limit] + return &controllers.Paging{Limit: pagingMap[limit], Offset: offset}, pagingMap, nil + } + + return defaultPaging, defautlPagingMap, nil } func ListResource(resourceName, conditonSrt, pagingStr string) (*ResourceList, error) { @@ -118,12 +124,12 @@ func ListResource(resourceName, conditonSrt, pagingStr string) (*ResourceList, e return nil, err } - pagingMap, err := getPaging(pagingStr) + paging, pagingMap, err := getPaging(resourceName, pagingStr) if err != nil { return nil, err } - conditionStr, paging := generateConditionAndPaging(match, fuzzy, pagingMap) + conditionStr := generateConditionStr(match, fuzzy) ctl, err := getController(resourceName) if err != nil { @@ -135,10 +141,10 @@ func ListResource(resourceName, conditonSrt, pagingStr string) (*ResourceList, e return nil, err } - return &ResourceList{Total: total, Items: items, Page: pagingMap["page"], Limit: pagingMap["limit"]}, nil + return &ResourceList{Total: total, Items: items, Page: pagingMap[page], Limit: pagingMap[limit]}, nil } -func generateConditionAndPaging(match map[string]string, fuzzy map[string]string, paging map[string]int) (string, *controllers.Paging) { +func generateConditionStr(match map[string]string, fuzzy map[string]string) string { conditionStr := "" for k, v := range match { @@ -157,12 +163,7 @@ func generateConditionAndPaging(match map[string]string, fuzzy map[string]string } } - if paging["limit"] > 0 && paging["page"] >= 0 { - offset := (paging["page"] - 1) * paging["limit"] - return conditionStr, &controllers.Paging{Limit: paging["limit"], Offset: offset} - } - - return conditionStr, nil + return conditionStr } type workLoadStatus struct { @@ -176,14 +177,14 @@ func GetNamespacesResourceStatus(namespace string) (*workLoadStatus, error) { var status *ResourceList var err error for _, resource := range []string{controllers.Deployments, controllers.Statefulsets, controllers.Daemonsets, controllers.PersistentVolumeClaim} { - resourceStatus := controllers.Updating + notReadyStatus := controllers.Updating if resource == controllers.PersistentVolumeClaim { - resourceStatus = controllers.PvcPending + notReadyStatus = controllers.PvcPending } if len(namespace) > 0 { - status, err = ListResource(resource, fmt.Sprintf("status=%s,namespace=%s", resourceStatus, namespace), "") + status, err = ListResource(resource, fmt.Sprintf("status=%s,namespace=%s", notReadyStatus, namespace), "") } else { - status, err = ListResource(resource, fmt.Sprintf("status=%s", resourceStatus), "") + status, err = ListResource(resource, fmt.Sprintf("status=%s", notReadyStatus), "") } if err != nil { @@ -191,9 +192,7 @@ func GetNamespacesResourceStatus(namespace string) (*workLoadStatus, error) { } count := status.Total - //items := status.Items res.Count[resource] = count - //res.Items[resource] = items } return &res, nil @@ -203,3 +202,31 @@ func GetClusterResourceStatus() (*workLoadStatus, error) { return GetNamespacesResourceStatus("") } + +func GetApplication(clusterId string) (interface{}, error) { + ctl := &controllers.ApplicationCtl{OpenpitrixAddr: options.ServerOptions.GetOpAddress()} + return ctl.GetApp(clusterId) +} + +func ListApplication(runtimeId, conditions, pagingStr string) (*ResourceList, error) { + paging, pagingMap, err := getPaging(controllers.Applications, pagingStr) + if err != nil { + return nil, err + } + + match, fuzzy, err := getConditions(conditions) + if err != nil { + glog.Error(err) + return nil, err + } + + ctl := &controllers.ApplicationCtl{OpenpitrixAddr: options.ServerOptions.GetOpAddress()} + total, items, err := ctl.ListApplication(runtimeId, match, fuzzy, paging) + + if err != nil { + glog.Errorf("get application list failed, reason: %s", err) + return nil, err + } + + return &ResourceList{Total: total, Items: items, Page: pagingMap[page], Limit: pagingMap[limit]}, nil +}