add resync function and support to view deployed applications

This commit is contained in:
richardxz
2018-07-30 23:13:42 -04:00
parent a4a48eb4fc
commit e803897d10
23 changed files with 1261 additions and 453 deletions

View File

@@ -31,7 +31,7 @@ func Register(ws *restful.WebService, subPath string) {
tags := []string{"resources"}
ws.Route(ws.GET(subPath+"/{resource}").To(listResource).Produces(restful.MIME_JSON).Metadata(restfulspec.KeyOpenAPITags, tags).Doc("Get resource" +
ws.Route(ws.GET(subPath+"/{resource}").To(handleResource).Produces(restful.MIME_JSON).Metadata(restfulspec.KeyOpenAPITags, tags).Doc("Get resource" +
" list").Param(ws.PathParameter("resource", "resource name").DataType(
"string")).Param(ws.QueryParameter("conditions", "search "+
"conditions").DataType("string")).Param(ws.QueryParameter("paging",
@@ -39,12 +39,15 @@ func Register(ws *restful.WebService, subPath string) {
}
func listResource(req *restful.Request, resp *restful.Response) {
func handleResource(req *restful.Request, resp *restful.Response) {
resource := req.PathParameter("resource")
if resource == "applications" {
handleApplication(req, resp)
return
}
conditions := req.QueryParameter("conditions")
paging := req.QueryParameter("paging")
res, err := models.ListResource(resource, conditions, paging)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
@@ -53,3 +56,30 @@ func listResource(req *restful.Request, resp *restful.Response) {
resp.WriteEntity(res)
}
func handleApplication(req *restful.Request, resp *restful.Response) {
//searchWord := req.QueryParameter("search-word")
paging := req.QueryParameter("paging")
clusterId := req.QueryParameter("cluster_id")
runtimeId := req.QueryParameter("runtime_id")
conditions := req.QueryParameter("conditions")
if len(clusterId) > 0 {
app, err := models.GetApplication(clusterId)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
return
}
resp.WriteEntity(app)
return
}
res, err := models.ListApplication(runtimeId, conditions, paging)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
return
}
resp.WriteEntity(res)
}

View File

@@ -55,7 +55,7 @@ func createUser(req *restful.Request, resp *restful.Response) {
return
}
err = models.CreateKubectlPod(user)
err = models.CreateKubectlDeploy(user)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
@@ -69,7 +69,7 @@ func delUser(req *restful.Request, resp *restful.Response) {
user := req.PathParameter("user")
err := models.DelKubectlPod(user)
err := models.DelKubectlDeploy(user)
if err != nil && !apierrors.IsNotFound(err) {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})

View File

@@ -23,7 +23,7 @@ import (
"github.com/emicklei/go-restful"
"github.com/golang/glog"
"k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net"
"net/http"
@@ -31,9 +31,12 @@ import (
"github.com/emicklei/go-restful-openapi"
"github.com/go-openapi/spec"
"k8s.io/apimachinery/pkg/api/errors"
_ "kubesphere.io/kubesphere/pkg/apis/v1alpha"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/models"
"kubesphere.io/kubesphere/pkg/models/controllers"
"kubesphere.io/kubesphere/pkg/options"
)
@@ -64,17 +67,26 @@ func newKubeSphereServer(options *options.ServerRunOptions) *kubeSphereServer {
func preCheck() error {
k8sClient := client.NewK8sClient()
nsList, err := k8sClient.CoreV1().Namespaces().List(meta_v1.ListOptions{})
if err != nil {
_, err := k8sClient.CoreV1().Namespaces().Get(constants.KubeSphereControlNamespace, metaV1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
for _, ns := range nsList.Items {
if ns.Name == constants.KubeSphereControlNamespace {
return nil
if errors.IsNotFound(err) {
namespace := v1.Namespace{ObjectMeta: metaV1.ObjectMeta{Name: constants.KubeSphereControlNamespace}}
_, err = k8sClient.CoreV1().Namespaces().Create(&namespace)
if err != nil {
return err
}
}
namespace := v1.Namespace{ObjectMeta: meta_v1.ObjectMeta{Name: constants.KubeSphereControlNamespace}}
_, err = k8sClient.CoreV1().Namespaces().Create(&namespace)
_, err = k8sClient.AppsV1().Deployments(constants.KubeSphereControlNamespace).Get(constants.AdminUserName, metaV1.GetOptions{})
if errors.IsNotFound(err) {
models.CreateKubeConfig(constants.AdminUserName)
models.CreateKubectlDeploy(constants.AdminUserName)
return nil
}
return err
}

View File

@@ -34,8 +34,8 @@ const (
KubeSphereNamespace = "kubesphere-system"
KubeSphereControlNamespace = "kubesphere-controls-system"
IngressControllerNamespace = KubeSphereControlNamespace
DataHome = "/etc/kubesphere"
IngressControllerFolder = DataHome + "/ingress-controller"
IngressControllerPrefix = "kubesphere-router-"
AdminUserName = "admin"
DataHome = "/etc/kubesphere"
IngressControllerFolder = DataHome + "/ingress-controller"
IngressControllerPrefix = "kubesphere-router-"
)

View File

@@ -0,0 +1,498 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"kubesphere.io/kubesphere/pkg/client"
)
const (
unknown = "-"
deploySurffix = "-Deployment"
daemonSurffix = "-DaemonSet"
stateSurffix = "-StatefulSet"
)
type ApplicationCtl struct {
OpenpitrixAddr string
}
type Application struct {
Name string `json:"name"`
RepoName string `json:"repoName"`
Runtime string `json:"namespace"`
RuntimeId string `json:"runtime_id"`
Version string `json:"version"`
VersionId string `json:"version_id"`
Status string `json:"status"`
UpdateTime time.Time `json:"updateTime"`
CreateTime time.Time `json:"createTime"`
App string `json:"app"`
AppId string `json:"app_id"`
Creator string `json:"creator,omitempty"`
WorkLoads *workLoads `json:"workloads,omitempty"`
Services *[]Service `json:"services,omitempty"`
Ingresses *[]ing `json:"ingresses,omitempty"`
ClusterID string `json:"cluster_id"`
}
type ing struct {
Name string `json:"name"`
Rules []ingressRule `json:"rules"`
}
type clusterRole struct {
ClusterID string `json:"cluster_id"`
Role string `json:"role"`
}
type cluster struct {
ClusterID string `json:"cluster_id"`
Name string `json:"name"`
AppID string `json:"app_id"`
VersionID string `json:"version_id"`
Status string `json:"status"`
UpdateTime time.Time `json:"status_time"`
CreateTime time.Time `json:"createTime"`
RunTimeId string `json:"runtime_id"`
Description string `json:"description"`
ClusterRoleSets []clusterRole `json:"cluster_role_set"`
}
type clusters struct {
Total int `json:"total_count"`
Clusters []cluster `json:"cluster_set"`
}
type versionList struct {
Total int `json:"total_count"`
Versions []version `json:"app_version_set"`
}
type version struct {
Name string `json:"name"`
VersionID string `json:"version_id"`
}
type runtime struct {
RuntimeID string `json:"runtime_id"`
Zone string `json:"zone"`
}
type runtimeList struct {
Total int `json:"total_count"`
Runtimes []runtime `json:"runtime_set"`
}
type app struct {
AppId string `json:"app_id"`
Name string `json:"name"`
ChartName string `json:"chart_name"`
RepoId string `json:"repo_id"`
}
type repo struct {
RepoId string `json:"repo_id"`
Name string `json:"name"`
Url string `json:"url"`
}
type workLoads struct {
Deployments []Deployment `json:"deployments,omitempty"`
Statefulsets []Statefulset `json:"statefulsets,omitempty"`
Daemonsets []Daemonset `json:"daemonsets,omitempty"`
}
type description struct {
Creator string `json:"creator"`
}
type appList struct {
Total int `json:"total_count"`
Apps []app `json:"app_set"`
}
type repoList struct {
Total int `json:"total_count"`
Repos []repo `json:"repo_set"`
}
func (ctl *ApplicationCtl) GetAppInfo(appId string) (string, string, string, error) {
url := fmt.Sprintf("%s/v1/apps?app_id=%s", ctl.OpenpitrixAddr, appId)
resp, err := makeHttpRequest("GET", url, "")
if err != nil {
glog.Error(err)
return unknown, unknown, unknown, err
}
var apps appList
err = json.Unmarshal(resp, &apps)
if err != nil {
glog.Error(err)
return unknown, unknown, unknown, err
}
if len(apps.Apps) == 0 {
return unknown, unknown, unknown, err
}
return apps.Apps[0].ChartName, apps.Apps[0].RepoId, apps.Apps[0].AppId, nil
}
func (ctl *ApplicationCtl) GetRepo(repoId string) (string, error) {
url := fmt.Sprintf("%s/v1/repos?repo_id=%s", ctl.OpenpitrixAddr, repoId)
resp, err := makeHttpRequest("GET", url, "")
if err != nil {
glog.Error(err)
return unknown, err
}
var repos repoList
err = json.Unmarshal(resp, &repos)
if err != nil {
glog.Error(err)
return unknown, err
}
if len(repos.Repos) == 0 {
return unknown, err
}
return repos.Repos[0].Name, nil
}
func (ctl *ApplicationCtl) GetVersion(versionId string) (string, error) {
versionUrl := fmt.Sprintf("%s/v1/app_versions?version_id=%s", ctl.OpenpitrixAddr, versionId)
resp, err := makeHttpRequest("GET", versionUrl, "")
if err != nil {
glog.Error(err)
return unknown, err
}
var versions versionList
err = json.Unmarshal(resp, &versions)
if err != nil {
glog.Error(err)
return unknown, err
}
if len(versions.Versions) == 0 {
return unknown, nil
}
return versions.Versions[0].Name, nil
}
func (ctl *ApplicationCtl) GetRuntime(runtimeId string) (string, error) {
versionUrl := fmt.Sprintf("%s/v1/runtimes?runtime_id=%s", ctl.OpenpitrixAddr, runtimeId)
resp, err := makeHttpRequest("GET", versionUrl, "")
if err != nil {
glog.Error(err)
return unknown, err
}
var runtimes runtimeList
err = json.Unmarshal(resp, &runtimes)
if err != nil {
glog.Error(err)
return unknown, err
}
if len(runtimes.Runtimes) == 0 {
return unknown, nil
}
return runtimes.Runtimes[0].Zone, nil
}
func (ctl *ApplicationCtl) GetWorkLoads(namespace string, clusterRoles []clusterRole) *workLoads {
var works workLoads
for _, clusterRole := range clusterRoles {
workLoadName := clusterRole.Role
if len(workLoadName) > 0 {
if strings.HasSuffix(workLoadName, deploySurffix) {
name := strings.Split(workLoadName, deploySurffix)[0]
ctl := ResourceControllers.Controllers[Deployments]
_, items, _ := ctl.ListWithConditions(fmt.Sprintf("namespace='%s' and name = '%s'", namespace, name), nil)
works.Deployments = append(works.Deployments, items.([]Deployment)...)
continue
}
if strings.HasSuffix(workLoadName, daemonSurffix) {
name := strings.Split(workLoadName, daemonSurffix)[0]
ctl := ResourceControllers.Controllers[Daemonsets]
_, items, _ := ctl.ListWithConditions(fmt.Sprintf("namespace='%s' and name = '%s'", namespace, name), nil)
works.Daemonsets = append(works.Daemonsets, items.([]Daemonset)...)
continue
}
if strings.HasSuffix(workLoadName, stateSurffix) {
name := strings.Split(workLoadName, stateSurffix)[0]
ctl := ResourceControllers.Controllers[Statefulsets]
_, items, _ := ctl.ListWithConditions(fmt.Sprintf("namespace='%s' and name = '%s'", namespace, name), nil)
works.Statefulsets = append(works.Statefulsets, items.([]Statefulset)...)
continue
}
}
}
return &works
}
func (ctl *ApplicationCtl) getCreator(desc string) string {
var dc description
err := json.Unmarshal([]byte(desc), &dc)
if err != nil {
return unknown
}
return dc.Creator
}
func (ctl *ApplicationCtl) getLabels(namespace string, workloads *workLoads) *[]map[string]string {
k8sClient := client.NewK8sClient()
var workloadLables []map[string]string
if workloads == nil {
return nil
}
for _, workload := range workloads.Deployments {
deploy, err := k8sClient.AppsV1().Deployments(namespace).Get(workload.Name, metaV1.GetOptions{})
if errors.IsNotFound(err) {
continue
}
workloadLables = append(workloadLables, deploy.Labels)
}
for _, workload := range workloads.Daemonsets {
daemonset, err := k8sClient.AppsV1().DaemonSets(namespace).Get(workload.Name, metaV1.GetOptions{})
if errors.IsNotFound(err) {
continue
}
workloadLables = append(workloadLables, daemonset.Labels)
}
for _, workload := range workloads.Statefulsets {
statefulset, err := k8sClient.AppsV1().StatefulSets(namespace).Get(workload.Name, metaV1.GetOptions{})
if errors.IsNotFound(err) {
continue
}
workloadLables = append(workloadLables, statefulset.Labels)
}
return &workloadLables
}
func isExist(svcs []Service, svc v1.Service) bool {
for _, item := range svcs {
if item.Name == svc.Name && item.Namespace == svc.Namespace {
return true
}
}
return false
}
func (ctl *ApplicationCtl) getSvcs(namespace string, workLoadLabels *[]map[string]string) *[]Service {
if len(*workLoadLabels) == 0 {
return nil
}
k8sClient := client.NewK8sClient()
var services []Service
for _, label := range *workLoadLabels {
labelSelector := labels.Set(label).AsSelector().String()
svcs, err := k8sClient.CoreV1().Services(namespace).List(metaV1.ListOptions{LabelSelector: labelSelector})
if err != nil {
glog.Errorf("get app's svc failed, reason: %v", err)
}
for _, item := range svcs.Items {
if !isExist(services, item) {
services = append(services, *generateSvcObject(item))
}
}
}
return &services
}
func (ctl *ApplicationCtl) getIng(namespace string, services *[]Service) *[]ing {
if services == nil {
return nil
}
ingCtl := ResourceControllers.Controllers[Ingresses]
var ings []ing
for _, svc := range *services {
_, items, err := ingCtl.ListWithConditions(fmt.Sprintf("namespace = '%s' and rules like '%%%s%%' ", namespace, svc.Name), nil)
if err != nil {
glog.Error(err)
return nil
}
glog.Error(items)
for _, ingress := range items.([]Ingress) {
var rules []ingressRule
err := json.Unmarshal([]byte(ingress.Rules), &rules)
if err != nil {
return nil
}
exist := false
var tmpRules []ingressRule
for _, rule := range rules {
if rule.Service == svc.Name {
exist = true
tmpRules = append(tmpRules, rule)
}
}
if exist {
ings = append(ings, ing{Name: ingress.Name, Rules: tmpRules})
}
}
}
return &ings
}
func (ctl *ApplicationCtl) ListApplication(runtimeId string, match, fuzzy map[string]string, paging *Paging) (int, interface{}, error) {
limit := paging.Limit
offset := paging.Offset
if strings.HasSuffix(ctl.OpenpitrixAddr, "/") {
ctl.OpenpitrixAddr = strings.TrimSuffix(ctl.OpenpitrixAddr, "/")
}
defaultStatus := "status=active&status=stopped&status=pending&status=ceased"
url := fmt.Sprintf("%s/v1/clusters?limit=%s&offset=%s", ctl.OpenpitrixAddr, strconv.Itoa(limit), strconv.Itoa(offset))
if len(fuzzy["name"]) > 0 {
url = fmt.Sprintf("%s&search_word=%s", url, fuzzy["name"])
}
if len(match["status"]) > 0 {
url = fmt.Sprintf("%s&status=%s", url, match["status"])
} else {
url = fmt.Sprintf("%s&%s", url, defaultStatus)
}
if len(runtimeId) > 0 {
url = fmt.Sprintf("%s&runtime_id=%s", url, runtimeId)
}
resp, err := makeHttpRequest("GET", url, "")
if err != nil {
glog.Errorf("request %s failed, reason: %s", url, err)
return 0, nil, err
}
var clusterList clusters
err = json.Unmarshal(resp, &clusterList)
if err != nil {
return 0, nil, err
}
var apps []Application
for _, item := range clusterList.Clusters {
var app Application
app.Name = item.Name
app.ClusterID = item.ClusterID
app.UpdateTime = item.UpdateTime
app.Status = item.Status
versionInfo, _ := ctl.GetVersion(item.VersionID)
app.Version = versionInfo
app.VersionId = item.VersionID
runtimeInfo, _ := ctl.GetRuntime(item.RunTimeId)
app.Runtime = runtimeInfo
app.RuntimeId = item.RunTimeId
appInfo, _, appId, _ := ctl.GetAppInfo(item.AppID)
app.App = appInfo
app.AppId = appId
apps = append(apps, app)
}
return clusterList.Total, apps, nil
}
func (ctl *ApplicationCtl) GetApp(clusterId string) (*Application, error) {
if strings.HasSuffix(ctl.OpenpitrixAddr, "/") {
ctl.OpenpitrixAddr = strings.TrimSuffix(ctl.OpenpitrixAddr, "/")
}
url := fmt.Sprintf("%s/v1/clusters?cluster_id=%s", ctl.OpenpitrixAddr, clusterId)
resp, err := makeHttpRequest("GET", url, "")
if err != nil {
glog.Error(err)
return nil, err
}
var clusterList clusters
err = json.Unmarshal(resp, &clusterList)
if err != nil {
glog.Error(err)
return nil, err
}
if len(clusterList.Clusters) == 0 {
return nil, fmt.Errorf("NotFound, clusterId:%s", clusterId)
}
item := clusterList.Clusters[0]
var app Application
app.Name = item.Name
app.ClusterID = item.ClusterID
app.UpdateTime = item.UpdateTime
app.CreateTime = item.CreateTime
app.Status = item.Status
versionInfo, _ := ctl.GetVersion(item.VersionID)
app.Version = versionInfo
app.VersionId = item.VersionID
runtimeInfo, _ := ctl.GetRuntime(item.RunTimeId)
app.Runtime = runtimeInfo
app.RuntimeId = item.RunTimeId
appInfo, repoId, appId, _ := ctl.GetAppInfo(item.AppID)
app.App = appInfo
app.AppId = appId
app.Creator = ctl.getCreator(item.Description)
app.RepoName, _ = ctl.GetRepo(repoId)
app.WorkLoads = ctl.GetWorkLoads(app.Runtime, item.ClusterRoleSets)
workloadLabels := ctl.getLabels(app.Runtime, app.WorkLoads)
app.Services = ctl.getSvcs(app.Runtime, workloadLabels)
app.Ingresses = ctl.getIng(app.Runtime, app.Services)
return &app, nil
}

View File

@@ -27,9 +27,11 @@ import (
"k8s.io/client-go/tools/cache"
)
const systemPrefix = "system:"
func (ctl *ClusterRoleCtl) generateObject(item v1.ClusterRole) *ClusterRole {
name := item.Name
if strings.HasPrefix(name, "system:") {
if strings.HasPrefix(name, systemPrefix) {
return nil
}
@@ -43,30 +45,22 @@ func (ctl *ClusterRoleCtl) generateObject(item v1.ClusterRole) *ClusterRole {
return object
}
func (ctl *ClusterRoleCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
func (ctl *ClusterRoleCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *ClusterRoleCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&ClusterRole{}) {
db.DropTable(&ClusterRole{})
}
db = db.CreateTable(&ClusterRole{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Rbac().V1().ClusterRoles().Informer()
lister := kubeInformerFactory.Rbac().V1().ClusterRoles().Lister()
ctl.initListerAndInformer()
list, err := lister.List(labels.Everything())
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
@@ -74,10 +68,38 @@ func (ctl *ClusterRoleCtl) listAndWatch() {
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
if obj != nil {
db.Create(obj)
}
}
ctl.informer.Run(stopChan)
}
func (ctl *ClusterRoleCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
count := 0
for _, item := range list {
if !strings.HasPrefix(item.Name, systemPrefix) {
count++
}
}
return count
}
func (ctl *ClusterRoleCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Rbac().V1().ClusterRoles().Lister()
informer := informerFactory.Rbac().V1().ClusterRoles().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@@ -102,13 +124,15 @@ func (ctl *ClusterRoleCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *ClusterRoleCtl) CountWithConditions(conditions string) int {
var object ClusterRole
if strings.Contains(conditions, "namespace") {
conditions = ""
}
return countWithConditions(ctl.DB, conditions, &object)
}
@@ -124,10 +148,3 @@ func (ctl *ClusterRoleCtl) ListWithConditions(conditions string, paging *Paging)
return total, list, nil
}
func (ctl *ClusterRoleCtl) Count(namespace string) int {
var count int
db := ctl.DB
db.Model(&ClusterRole{}).Count(&count)
return count
}

View File

@@ -16,7 +16,17 @@ limitations under the License.
package controllers
import "github.com/jinzhu/gorm"
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/golang/glog"
"github.com/jinzhu/gorm"
"github.com/pkg/errors"
)
func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) {
if len(conditions) == 0 {
@@ -50,3 +60,83 @@ func countWithConditions(db *gorm.DB, conditions string, object interface{}) int
}
return count
}
func makeHttpRequest(method, url, data string) ([]byte, error) {
var req *http.Request
var err error
if method == "GET" {
req, err = http.NewRequest(method, url, nil)
} else {
req, err = http.NewRequest(method, url, strings.NewReader(data))
}
if err != nil {
glog.Error(err)
return nil, err
}
httpClient := &http.Client{}
resp, err := httpClient.Do(req)
if err != nil {
err := fmt.Errorf("Request to %s failed, method: %s, reason: %s ", url, method, err)
glog.Error(err)
return nil, err
}
body, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if resp.StatusCode >= http.StatusBadRequest {
err = errors.New(string(body))
}
return body, err
}
func handleCrash(ctl Controller) {
close(ctl.chanAlive())
if err := recover(); err != nil {
glog.Errorf("panic occur in %s controller's listAndWatch function, reason: %s", ctl.Name(), err)
return
}
}
func hasSynced(ctl Controller) bool {
totalInDb := ctl.CountWithConditions("")
totalInK8s := ctl.total()
if totalInDb == totalInK8s {
return true
}
return false
}
func checkAndResync(ctl Controller, stopChan chan struct{}) {
defer close(stopChan)
for {
select {
case <-ctl.chanStop():
return
default:
time.Sleep(30 * time.Minute)
if !hasSynced(ctl) {
glog.Errorf("the data in db and kubernetes is inconsistent, resync %s controller", ctl.Name())
close(stopChan)
stopChan = make(chan struct{})
go ctl.sync(stopChan)
}
}
}
}
func listAndWatch(ctl Controller) {
defer handleCrash(ctl)
stopChan := make(chan struct{})
go ctl.sync(stopChan)
checkAndResync(ctl, stopChan)
}

View File

@@ -61,30 +61,21 @@ func (ctl *DaemonsetCtl) generateObject(item v1.DaemonSet) *Daemonset {
return object
}
func (ctl *DaemonsetCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
func (ctl *DaemonsetCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *DaemonsetCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&Daemonset{}) {
db.DropTable(&Daemonset{})
}
db = db.CreateTable(&Daemonset{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Apps().V1().DaemonSets().Informer()
lister := kubeInformerFactory.Apps().V1().DaemonSets().Lister()
list, err := lister.List(labels.Everything())
ctl.initListerAndInformer()
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
@@ -93,9 +84,27 @@ func (ctl *DaemonsetCtl) listAndWatch() {
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
ctl.informer.Run(stopChan)
}
func (ctl *DaemonsetCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
return len(list)
}
func (ctl *DaemonsetCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Apps().V1().DaemonSets().Lister()
informer := informerFactory.Apps().V1().DaemonSets().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@@ -117,7 +126,7 @@ func (ctl *DaemonsetCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *DaemonsetCtl) CountWithConditions(conditions string) int {
@@ -138,13 +147,13 @@ func (ctl *DaemonsetCtl) ListWithConditions(conditions string, paging *Paging) (
return total, list, nil
}
func (ctl *DaemonsetCtl) Count(namespace string) int {
var count int
db := ctl.DB
if len(namespace) == 0 {
db.Model(&Daemonset{}).Count(&count)
} else {
db.Model(&Daemonset{}).Where("namespace = ?", namespace).Count(&count)
}
return count
}
//func (ctl *DaemonsetCtl) Count(namespace string) int {
// var count int
// db := ctl.DB
// if len(namespace) == 0 {
// db.Model(&Daemonset{}).Count(&count)
// } else {
// db.Model(&Daemonset{}).Where("namespace = ?", namespace).Count(&count)
// }
// return count
//}

View File

@@ -21,9 +21,8 @@ import (
"github.com/golang/glog"
"k8s.io/api/apps/v1"
"k8s.io/client-go/informers"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
@@ -65,15 +64,11 @@ func (ctl *DeploymentCtl) generateObject(item v1.Deployment) *Deployment {
App: app, UpdateTime: updateTime, Status: status, Annotation: Annotation{item.Annotations}}
}
func (ctl *DeploymentCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
func (ctl *DeploymentCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *DeploymentCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&Deployment{}) {
db.DropTable(&Deployment{})
@@ -81,12 +76,8 @@ func (ctl *DeploymentCtl) listAndWatch() {
db = db.CreateTable(&Deployment{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Apps().V1().Deployments().Informer()
lister := kubeInformerFactory.Apps().V1().Deployments().Lister()
list, err := lister.List(labels.Everything())
ctl.initListerAndInformer()
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
@@ -95,9 +86,29 @@ func (ctl *DeploymentCtl) listAndWatch() {
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
ctl.informer.Run(stopChan)
}
func (ctl *DeploymentCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", ctl.Name(), err)
return 0
}
return len(list)
}
func (ctl *DeploymentCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Apps().V1().Deployments().Lister()
informer := informerFactory.Apps().V1().Deployments().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@@ -118,8 +129,7 @@ func (ctl *DeploymentCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *DeploymentCtl) CountWithConditions(conditions string) int {
@@ -139,14 +149,3 @@ func (ctl *DeploymentCtl) ListWithConditions(conditions string, paging *Paging)
return total, list, nil
}
func (ctl *DeploymentCtl) Count(namespace string) int {
var count int
db := ctl.DB
if len(namespace) == 0 {
db.Model(&Deployment{}).Count(&count)
} else {
db.Model(&Deployment{}).Where("namespace = ?", namespace).Count(&count)
}
return count
}

View File

@@ -20,6 +20,8 @@ import (
"strings"
"time"
"encoding/json"
"github.com/golang/glog"
"k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/labels"
@@ -46,35 +48,41 @@ func (ctl *IngressCtl) generateObject(item v1beta1.Ingress) *Ingress {
ip = strings.Join(ipList, ",")
}
object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, Annotation: Annotation{item.Annotations}}
var ingRules []ingressRule
for _, rule := range item.Spec.Rules {
host := rule.Host
for _, path := range rule.HTTP.Paths {
var ingRule ingressRule
ingRule.Host = host
ingRule.Service = path.Backend.ServiceName
ingRule.Port = path.Backend.ServicePort.IntVal
ingRule.Path = path.Path
ingRules = append(ingRules, ingRule)
}
}
ruleStr, _ := json.Marshal(ingRules)
object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, Annotation: Annotation{item.Annotations}, Rules: string(ruleStr)}
return object
}
func (ctl *IngressCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
func (ctl *IngressCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *IngressCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&Ingress{}) {
db.DropTable(&Ingress{})
}
db = db.CreateTable(&Ingress{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Extensions().V1beta1().Ingresses().Informer()
lister := kubeInformerFactory.Extensions().V1beta1().Ingresses().Lister()
list, err := lister.List(labels.Everything())
ctl.initListerAndInformer()
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
@@ -83,9 +91,28 @@ func (ctl *IngressCtl) listAndWatch() {
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
ctl.informer.Run(stopChan)
}
func (ctl *IngressCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
return len(list)
}
func (ctl *IngressCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Extensions().V1beta1().Ingresses().Lister()
informer := informerFactory.Extensions().V1beta1().Ingresses().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@@ -107,7 +134,7 @@ func (ctl *IngressCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *IngressCtl) CountWithConditions(conditions string) int {
@@ -128,13 +155,13 @@ func (ctl *IngressCtl) ListWithConditions(conditions string, paging *Paging) (in
return total, list, nil
}
func (ctl *IngressCtl) Count(namespace string) int {
var count int
db := ctl.DB
if len(namespace) == 0 {
db.Model(&Ingress{}).Count(&count)
} else {
db.Model(&Ingress{}).Where("namespace = ?", namespace).Count(&count)
}
return count
}
//func (ctl *IngressCtl) Count(namespace string) int {
// var count int
// db := ctl.DB
// if len(namespace) == 0 {
// db.Model(&Ingress{}).Count(&count)
// } else {
// db.Model(&Ingress{}).Where("namespace = ?", namespace).Count(&count)
// }
// return count
//}

View File

@@ -19,9 +19,6 @@ package controllers
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/golang/glog"
@@ -30,12 +27,13 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/kubernetes/pkg/util/slice"
"k8s.io/client-go/informers"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/options"
@@ -69,25 +67,6 @@ type DeleteRunTime struct {
RuntimeId []string `json:"runtime_id"`
}
func makeHttpRequest(method, url, data string) ([]byte, error) {
req, err := http.NewRequest(method, url, strings.NewReader(data))
if err != nil {
glog.Error(err)
return nil, err
}
httpClient := &http.Client{}
resp, err := httpClient.Do(req)
if err != nil {
glog.Error(err)
return nil, err
}
body, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
return body, err
}
func (ctl *NamespaceCtl) getKubeConfig(user string) (string, error) {
k8sClient := client.NewK8sClient()
configmap, err := k8sClient.CoreV1().ConfigMaps(kubectlNamespace).Get(user, metaV1.GetOptions{})
@@ -262,15 +241,11 @@ func (ctl *NamespaceCtl) generateObject(item v1.Namespace) *Namespace {
return object
}
func (ctl *NamespaceCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
func (ctl *NamespaceCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *NamespaceCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&Namespace{}) {
@@ -279,12 +254,8 @@ func (ctl *NamespaceCtl) listAndWatch() {
db = db.CreateTable(&Namespace{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Core().V1().Namespaces().Informer()
lister := kubeInformerFactory.Core().V1().Namespaces().Lister()
list, err := lister.List(labels.Everything())
ctl.initListerAndInformer()
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
@@ -294,9 +265,28 @@ func (ctl *NamespaceCtl) listAndWatch() {
obj := ctl.generateObject(*item)
db.Create(obj)
ctl.createRoleAndRuntime(*item)
}
ctl.informer.Run(stopChan)
}
func (ctl *NamespaceCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
return len(list)
}
func (ctl *NamespaceCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Core().V1().Namespaces().Lister()
informer := informerFactory.Core().V1().Namespaces().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@@ -321,7 +311,7 @@ func (ctl *NamespaceCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *NamespaceCtl) CountWithConditions(conditions string) int {
@@ -339,26 +329,21 @@ func (ctl *NamespaceCtl) ListWithConditions(conditions string, paging *Paging) (
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
for index := range list {
usage, err := ctl.GetNamespaceQuota(list[index].Name)
if err == nil {
list[index].Usaeg = usage
if paging != nil {
for index := range list {
usage, err := ctl.GetNamespaceQuota(list[index].Name)
if err == nil {
list[index].Usaeg = usage
}
}
}
return total, list, nil
}
func (ctl *NamespaceCtl) Count(namespace string) int {
var count int
db := ctl.DB
db.Model(&Namespace{}).Count(&count)
return count
}
func getUsage(namespace, resource string) int {
ctl := rec.controllers[resource]
return ctl.Count(namespace)
ctl := ResourceControllers.Controllers[resource]
return ctl.CountWithConditions(fmt.Sprintf("namespace = '%s' ", namespace))
}
func (ctl *NamespaceCtl) GetNamespaceQuota(namespace string) (v1.ResourceList, error) {
@@ -373,7 +358,7 @@ func (ctl *NamespaceCtl) GetNamespaceQuota(namespace string) (v1.ResourceList, e
usage[v1.ResourceName(resourceName)] = quantity
}
podCtl := rec.controllers[Pods]
podCtl := ResourceControllers.Controllers[Pods]
var quantity resource.Quantity
used := podCtl.CountWithConditions(fmt.Sprintf("status=\"%s\" And namespace=\"%s\"", "Running", namespace))
quantity.Set(int64(used))

View File

@@ -199,25 +199,24 @@ func (ctl *PodCtl) generateObject(item v1.Pod) *Pod {
return object
}
func (ctl *PodCtl) listAndWatch() {
func (ctl *PodCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *PodCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&Pod{}) {
db.DropTable(&Pod{})
}
db = db.CreateTable(&Pod{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Core().V1().Pods().Informer()
lister := kubeInformerFactory.Core().V1().Pods().Lister()
list, err := lister.List(labels.Everything())
ctl.initListerAndInformer()
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
panic(err)
return
}
for _, item := range list {
@@ -225,6 +224,26 @@ func (ctl *PodCtl) listAndWatch() {
db.Create(obj)
}
ctl.informer.Run(stopChan)
}
func (ctl *PodCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
return len(list)
}
func (ctl *PodCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Core().V1().Pods().Lister()
informer := informerFactory.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.Pod)
@@ -249,7 +268,7 @@ func (ctl *PodCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *PodCtl) CountWithConditions(conditions string) int {
@@ -270,13 +289,13 @@ func (ctl *PodCtl) ListWithConditions(conditions string, paging *Paging) (int, i
return total, list, nil
}
func (ctl *PodCtl) Count(namespace string) int {
var count int
db := ctl.DB
if len(namespace) == 0 {
db.Model(&Pod{}).Count(&count)
} else {
db.Model(&Pod{}).Where("namespace = ?", namespace).Count(&count)
}
return count
}
//func (ctl *PodCtl) Count(namespace string) int {
// var count int
// db := ctl.DB
// if len(namespace) == 0 {
// db.Model(&Pod{}).Count(&count)
// } else {
// db.Model(&Pod{}).Where("namespace = ?", namespace).Count(&count)
// }
// return count
//}

View File

@@ -64,30 +64,21 @@ func (ctl *PvcCtl) generateObject(item *v1.PersistentVolumeClaim) *Pvc {
return object
}
func (ctl *PvcCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
func (ctl *PvcCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *PvcCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&Pvc{}) {
db.DropTable(&Pvc{})
}
db = db.CreateTable(&Pvc{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Informer()
lister := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Lister()
list, err := lister.List(labels.Everything())
ctl.initListerAndInformer()
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
@@ -96,9 +87,28 @@ func (ctl *PvcCtl) listAndWatch() {
for _, item := range list {
obj := ctl.generateObject(item)
db.Create(obj)
}
ctl.informer.Run(stopChan)
}
func (ctl *PvcCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
return len(list)
}
func (ctl *PvcCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Core().V1().PersistentVolumeClaims().Lister()
informer := informerFactory.Core().V1().PersistentVolumeClaims().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@@ -119,7 +129,7 @@ func (ctl *PvcCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *PvcCtl) CountWithConditions(conditions string) int {
@@ -153,13 +163,13 @@ func (ctl *PvcCtl) ListWithConditions(conditions string, paging *Paging) (int, i
return total, list, nil
}
func (ctl *PvcCtl) Count(namespace string) int {
var count int
db := ctl.DB
if len(namespace) == 0 {
db.Model(&Pvc{}).Count(&count)
} else {
db.Model(&Pvc{}).Where("namespace = ?", namespace).Count(&count)
}
return count
}
//func (ctl *PvcCtl) Count(namespace string) int {
// var count int
// db := ctl.DB
// if len(namespace) == 0 {
// db.Model(&Pvc{}).Count(&count)
// } else {
// db.Model(&Pvc{}).Where("namespace = ?", namespace).Count(&count)
// }
// return count
//}

View File

@@ -29,7 +29,7 @@ import (
func (ctl *RoleCtl) generateObject(item v1.Role) *Role {
name := item.Name
if strings.HasPrefix(name, "system:") {
if strings.HasPrefix(name, systemPrefix) {
return nil
}
namespace := item.Namespace
@@ -43,30 +43,21 @@ func (ctl *RoleCtl) generateObject(item v1.Role) *Role {
return object
}
func (ctl *RoleCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
func (ctl *RoleCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *RoleCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&Role{}) {
db.DropTable(&Role{})
}
db = db.CreateTable(&Role{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Rbac().V1().Roles().Informer()
lister := kubeInformerFactory.Rbac().V1().Roles().Lister()
list, err := lister.List(labels.Everything())
ctl.initListerAndInformer()
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
@@ -74,10 +65,39 @@ func (ctl *RoleCtl) listAndWatch() {
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
if obj != nil {
db.Create(obj)
}
}
ctl.informer.Run(stopChan)
}
func (ctl *RoleCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
count := 0
for _, item := range list {
if !strings.HasPrefix(item.Name, systemPrefix) {
count++
}
}
return count
}
func (ctl *RoleCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Rbac().V1().Roles().Lister()
informer := informerFactory.Rbac().V1().Roles().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@@ -103,7 +123,7 @@ func (ctl *RoleCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *RoleCtl) CountWithConditions(conditions string) int {
@@ -124,9 +144,9 @@ func (ctl *RoleCtl) ListWithConditions(conditions string, paging *Paging) (int,
return total, list, nil
}
func (ctl *RoleCtl) Count(namespace string) int {
var count int
db := ctl.DB
db.Model(&Role{}).Where("namespace = ?", namespace).Count(&count)
return count
}
//func (ctl *RoleCtl) Count(namespace string) int {
// var count int
// db := ctl.DB
// db.Model(&Role{}).Where("namespace = ?", namespace).Count(&count)
// return count
//}

View File

@@ -27,45 +27,46 @@ import (
)
type resourceControllers struct {
controllers map[string]Controller
Controllers map[string]Controller
k8sClient *kubernetes.Clientset
}
var stopChan chan struct{}
var rec resourceControllers
var ResourceControllers resourceControllers
func (rec *resourceControllers) runContoller(name string) {
var ctl Controller
attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan, aliveChan: make(chan struct{})}
attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan,
aliveChan: make(chan struct{}), Name: name}
switch name {
case Deployments:
ctl = &DeploymentCtl{attr}
ctl = &DeploymentCtl{CommonAttribute: attr}
case Statefulsets:
ctl = &StatefulsetCtl{attr}
ctl = &StatefulsetCtl{CommonAttribute: attr}
case Daemonsets:
ctl = &DaemonsetCtl{attr}
ctl = &DaemonsetCtl{CommonAttribute: attr}
case Ingresses:
ctl = &IngressCtl{attr}
ctl = &IngressCtl{CommonAttribute: attr}
case PersistentVolumeClaim:
ctl = &PvcCtl{attr}
ctl = &PvcCtl{CommonAttribute: attr}
case Roles:
ctl = &RoleCtl{attr}
ctl = &RoleCtl{CommonAttribute: attr}
case ClusterRoles:
ctl = &ClusterRoleCtl{attr}
ctl = &ClusterRoleCtl{CommonAttribute: attr}
case Services:
ctl = &ServiceCtl{attr}
ctl = &ServiceCtl{CommonAttribute: attr}
case Pods:
ctl = &PodCtl{attr}
ctl = &PodCtl{CommonAttribute: attr}
case Namespaces:
ctl = &NamespaceCtl{attr}
ctl = &NamespaceCtl{CommonAttribute: attr}
case StorageClasses:
ctl = &StorageClassCtl{attr}
ctl = &StorageClassCtl{CommonAttribute: attr}
default:
return
}
rec.controllers[name] = ctl
go ctl.listAndWatch()
rec.Controllers[name] = ctl
go listAndWatch(ctl)
}
@@ -93,22 +94,23 @@ func Run() {
stopChan := make(chan struct{})
defer close(stopChan)
rec = resourceControllers{k8sClient: client.NewK8sClient(), controllers: make(map[string]Controller)}
k8sClient := client.NewK8sClient()
ResourceControllers = resourceControllers{k8sClient: k8sClient, Controllers: make(map[string]Controller)}
for _, item := range []string{Deployments, Statefulsets, Daemonsets, PersistentVolumeClaim, Pods, Services,
Ingresses, Roles, ClusterRoles, Namespaces, StorageClasses} {
rec.runContoller(item)
ResourceControllers.runContoller(item)
}
go dbHealthCheck(client.NewDBClient())
for {
for ctlName, controller := range rec.controllers {
for ctlName, controller := range ResourceControllers.Controllers {
select {
case _, isClose := <-controller.chanAlive():
if !isClose {
glog.Errorf("controller %s have stopped, restart it", ctlName)
rec.runContoller(ctlName)
ResourceControllers.runContoller(ctlName)
}
default:
time.Sleep(5 * time.Second)

View File

@@ -26,7 +26,7 @@ import (
"k8s.io/client-go/tools/cache"
)
func (ctl *ServiceCtl) loadBalancerStatusStringer(item v1.Service) string {
func loadBalancerStatusStringer(item v1.Service) string {
ingress := item.Status.LoadBalancer.Ingress
result := sets.NewString()
for i := range ingress {
@@ -41,7 +41,7 @@ func (ctl *ServiceCtl) loadBalancerStatusStringer(item v1.Service) string {
return r
}
func (ctl *ServiceCtl) getExternalIp(item v1.Service) string {
func getExternalIp(item v1.Service) string {
switch item.Spec.Type {
case "ClusterIP", "NodePort":
if len(item.Spec.ExternalIPs) > 0 {
@@ -51,7 +51,7 @@ func (ctl *ServiceCtl) getExternalIp(item v1.Service) string {
return item.Spec.ExternalName
case "LoadBalancer":
lbIps := ctl.loadBalancerStatusStringer(item)
lbIps := loadBalancerStatusStringer(item)
if len(item.Spec.ExternalIPs) > 0 {
results := []string{}
if len(lbIps) > 0 {
@@ -68,12 +68,11 @@ func (ctl *ServiceCtl) getExternalIp(item v1.Service) string {
return ""
}
func (ctl *ServiceCtl) generateObject(item v1.Service) *Service {
func generateSvcObject(item v1.Service) *Service {
name := item.Name
namespace := item.Namespace
createTime := item.CreationTimestamp.Time
externalIp := ctl.getExternalIp(item)
externalIp := getExternalIp(item)
serviceType := item.Spec.Type
vip := item.Spec.ClusterIP
ports := ""
@@ -129,17 +128,18 @@ func (ctl *ServiceCtl) generateObject(item v1.Service) *Service {
}
return object
}
func (ctl *ServiceCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
func (ctl *ServiceCtl) generateObject(item v1.Service) *Service {
return generateSvcObject(item)
}
func (ctl *ServiceCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *ServiceCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&Service{}) {
@@ -148,12 +148,8 @@ func (ctl *ServiceCtl) listAndWatch() {
db = db.CreateTable(&Service{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Core().V1().Services().Informer()
lister := kubeInformerFactory.Core().V1().Services().Lister()
list, err := lister.List(labels.Everything())
ctl.initListerAndInformer()
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
@@ -164,6 +160,25 @@ func (ctl *ServiceCtl) listAndWatch() {
db.Create(obj)
}
ctl.informer.Run(stopChan)
}
func (ctl *ServiceCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
return len(list)
}
func (ctl *ServiceCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Core().V1().Services().Lister()
informer := informerFactory.Core().V1().Services().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@@ -185,7 +200,7 @@ func (ctl *ServiceCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *ServiceCtl) CountWithConditions(conditions string) int {
@@ -205,14 +220,3 @@ func (ctl *ServiceCtl) ListWithConditions(conditions string, paging *Paging) (in
return total, list, nil
}
func (ctl *ServiceCtl) Count(namespace string) int {
var count int
db := ctl.DB
if len(namespace) == 0 {
db.Model(&Service{}).Count(&count)
} else {
db.Model(&Service{}).Where("namespace = ?", namespace).Count(&count)
}
return count
}

View File

@@ -62,27 +62,21 @@ func (ctl *StatefulsetCtl) generateObject(item v1.StatefulSet) *Statefulset {
return statefulSetObject
}
func (ctl *StatefulsetCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
func (ctl *StatefulsetCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *StatefulsetCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&Statefulset{}) {
db.DropTable(&Statefulset{})
}
db = db.CreateTable(&Statefulset{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Apps().V1().StatefulSets().Informer()
lister := kubeInformerFactory.Apps().V1().StatefulSets().Lister()
list, err := lister.List(labels.Everything())
ctl.initListerAndInformer()
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
@@ -91,9 +85,28 @@ func (ctl *StatefulsetCtl) listAndWatch() {
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
ctl.informer.Run(stopChan)
}
func (ctl *StatefulsetCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
return len(list)
}
func (ctl *StatefulsetCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Apps().V1().StatefulSets().Lister()
informer := informerFactory.Apps().V1().StatefulSets().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@@ -115,7 +128,7 @@ func (ctl *StatefulsetCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *StatefulsetCtl) CountWithConditions(conditions string) int {
@@ -135,14 +148,3 @@ func (ctl *StatefulsetCtl) ListWithConditions(conditions string, paging *Paging)
return total, list, nil
}
func (ctl *StatefulsetCtl) Count(namespace string) int {
var count int
db := ctl.DB
if len(namespace) == 0 {
db.Model(&Statefulset{}).Count(&count)
} else {
db.Model(&Statefulset{}).Where("namespace = ?", namespace).Count(&count)
}
return count
}

View File

@@ -46,15 +46,11 @@ func (ctl *StorageClassCtl) generateObject(item v1.StorageClass) *StorageClass {
return object
}
func (ctl *StorageClassCtl) listAndWatch() {
defer func() {
close(ctl.aliveChan)
if err := recover(); err != nil {
glog.Error(err)
return
}
}()
func (ctl *StorageClassCtl) Name() string {
return ctl.CommonAttribute.Name
}
func (ctl *StorageClassCtl) sync(stopChan chan struct{}) {
db := ctl.DB
if db.HasTable(&StorageClass{}) {
@@ -63,12 +59,8 @@ func (ctl *StorageClassCtl) listAndWatch() {
db = db.CreateTable(&StorageClass{})
k8sClient := ctl.K8sClient
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
informer := kubeInformerFactory.Storage().V1().StorageClasses().Informer()
lister := kubeInformerFactory.Storage().V1().StorageClasses().Lister()
list, err := lister.List(labels.Everything())
ctl.initListerAndInformer()
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Error(err)
return
@@ -77,12 +69,30 @@ func (ctl *StorageClassCtl) listAndWatch() {
for _, item := range list {
obj := ctl.generateObject(*item)
db.Create(obj)
}
ctl.informer.Run(stopChan)
}
func (ctl *StorageClassCtl) total() int {
list, err := ctl.lister.List(labels.Everything())
if err != nil {
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
return 0
}
return len(list)
}
func (ctl *StorageClassCtl) initListerAndInformer() {
db := ctl.DB
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
ctl.lister = informerFactory.Storage().V1().StorageClasses().Lister()
informer := informerFactory.Storage().V1().StorageClasses().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := obj.(*v1.StorageClass)
mysqlObject := ctl.generateObject(*object)
db.Create(mysqlObject)
@@ -101,8 +111,7 @@ func (ctl *StorageClassCtl) listAndWatch() {
},
})
informer.Run(ctl.stopChan)
ctl.informer = informer
}
func (ctl *StorageClassCtl) CountWithConditions(conditions string) int {
@@ -122,17 +131,10 @@ func (ctl *StorageClassCtl) ListWithConditions(conditions string, paging *Paging
for index, storageClass := range list {
name := storageClass.Name
pvcCtl := PvcCtl{CommonAttribute{K8sClient: ctl.K8sClient, DB: ctl.DB}}
pvcCtl := ResourceControllers.Controllers[PersistentVolumeClaim]
list[index].Count = pvcCtl.CountWithConditions(fmt.Sprintf("storage_class=\"%s\"", name))
}
return total, list, nil
}
func (ctl *StorageClassCtl) Count(name string) int {
var count int
db := ctl.DB
db.Model(&StorageClass{}).Count(&count)
return count
}

View File

@@ -26,10 +26,16 @@ import (
"github.com/jinzhu/gorm"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
appV1 "k8s.io/client-go/listers/apps/v1"
coreV1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/listers/extensions/v1beta1"
rbacV1 "k8s.io/client-go/listers/rbac/v1"
storageV1 "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
)
const (
resyncCircle = 180
resyncCircle = 600
Stopped = "stopped"
PvcPending = "Pending"
Running = "running"
@@ -57,6 +63,7 @@ const (
ClusterRoles = "cluster-roles"
Services = "services"
StorageClasses = "storage-classes"
Applications = "applications"
)
type Annotation struct {
@@ -163,10 +170,18 @@ func (Pvc) TableName() string {
return tablePersistentVolumeClaim
}
type ingressRule struct {
Host string `json:"host"`
Path string `json:"path"`
Service string `json:"service"`
Port int32 `json:"port"`
}
type Ingress struct {
Name string `gorm:"primary_key" json:"name"`
Namespace string `gorm:"primary_key" json:"namespace"`
Ip string `json:"ip,omitempty"`
Rules string `json:"rules, omitempty"`
TlsTermination string `json:"tlsTermination,omitempty"`
Annotation Annotation `json:"annotations"`
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
@@ -273,16 +288,19 @@ type Paging struct {
}
type Controller interface {
listAndWatch()
chanStop() chan struct{}
chanAlive() chan struct{}
Count(namespace string) int
CountWithConditions(condition string) int
total() int
initListerAndInformer()
sync(stopChan chan struct{})
Name() string
ListWithConditions(condition string, paging *Paging) (int, interface{}, error)
}
type CommonAttribute struct {
K8sClient *kubernetes.Clientset
Name string
DB *gorm.DB
stopChan chan struct{}
aliveChan chan struct{}
@@ -300,44 +318,66 @@ func (ca *CommonAttribute) chanAlive() chan struct{} {
type DeploymentCtl struct {
CommonAttribute
lister appV1.DeploymentLister
informer cache.SharedIndexInformer
}
type StatefulsetCtl struct {
CommonAttribute
lister appV1.StatefulSetLister
informer cache.SharedIndexInformer
}
type DaemonsetCtl struct {
CommonAttribute
lister appV1.DaemonSetLister
informer cache.SharedIndexInformer
}
type ServiceCtl struct {
CommonAttribute
lister coreV1.ServiceLister
informer cache.SharedIndexInformer
}
type PvcCtl struct {
CommonAttribute
lister coreV1.PersistentVolumeClaimLister
informer cache.SharedIndexInformer
}
type PodCtl struct {
CommonAttribute
lister coreV1.PodLister
informer cache.SharedIndexInformer
}
type IngressCtl struct {
lister v1beta1.IngressLister
informer cache.SharedIndexInformer
CommonAttribute
}
type NamespaceCtl struct {
CommonAttribute
lister coreV1.NamespaceLister
informer cache.SharedIndexInformer
}
type StorageClassCtl struct {
lister storageV1.StorageClassLister
informer cache.SharedIndexInformer
CommonAttribute
}
type RoleCtl struct {
lister rbacV1.RoleLister
informer cache.SharedIndexInformer
CommonAttribute
}
type ClusterRoleCtl struct {
lister rbacV1.ClusterRoleLister
informer cache.SharedIndexInformer
CommonAttribute
}

View File

@@ -31,11 +31,12 @@ import (
"github.com/golang/glog"
"gopkg.in/yaml.v2"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/api/core/v1"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/options"
@@ -46,6 +47,7 @@ const (
keyPath = "/etc/kubernetes/pki/ca.key"
clusterName = "kubernetes"
kubectlConfigKey = "config"
defaultNamespace = "default"
)
type clusterInfo struct {
@@ -59,8 +61,9 @@ type cluster struct {
}
type contextInfo struct {
Cluster string `yaml:"cluster"`
User string `yaml:"user"`
Cluster string `yaml:"cluster"`
User string `yaml:"user"`
NameSpace string `yaml:"namespace"`
}
type contextObject struct {
@@ -186,14 +189,14 @@ func newCertificate(info CertInformation) *x509.Certificate {
}
func generateCaAndKey(user, caPath, keyPath string) (string, string, error) {
crtinfo := CertInformation{CommonName: user, IsCA: false}
crtInfo := CertInformation{CommonName: user, IsCA: false}
crt, pri, err := Parse(caPath, keyPath)
if err != nil {
glog.Error(err)
return "", "", err
}
cert, key, err := createCRT(crt, pri, crtinfo)
cert, key, err := createCRT(crt, pri, crtInfo)
if err != nil {
glog.Error(err)
return "", "", err
@@ -217,7 +220,7 @@ func createKubeConfig(userName string) (string, error) {
tmpKubeConfig.Clusters = append(tmpKubeConfig.Clusters, tmpCluster)
contextName := userName + "@" + clusterName
tmpContext := contextObject{Context: contextInfo{User: userName, Cluster: clusterName}, Name: contextName}
tmpContext := contextObject{Context: contextInfo{User: userName, Cluster: clusterName, NameSpace: defaultNamespace}, Name: contextName}
tmpKubeConfig.Contexts = append(tmpKubeConfig.Contexts, tmpContext)
cert, key, err := generateCaAndKey(userName, caPath, keyPath)
@@ -240,42 +243,48 @@ func createKubeConfig(userName string) (string, error) {
func CreateKubeConfig(user string) error {
k8sClient := client.NewK8sClient()
config, err := createKubeConfig(user)
if err != nil {
glog.Errorln(err)
return err
_, err := k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Get(user, metaV1.GetOptions{})
if errors.IsNotFound(err) {
config, err := createKubeConfig(user)
if err != nil {
glog.Errorln(err)
return err
}
data := map[string]string{"config": string(config)}
configMap := v1.ConfigMap{TypeMeta: metaV1.TypeMeta{Kind: "Configmap", APIVersion: "v1"}, ObjectMeta: metaV1.ObjectMeta{Name: user}, Data: data}
_, err = k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Create(&configMap)
if err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("create user %s's kubeConfig failed, reason:", user, err)
return err
}
}
data := map[string]string{"config": string(config)}
var configmap = v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "Configmap", APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{Name: user}, Data: data}
_, err = k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Create(&configmap)
if err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("create user %s's kubeConfig failed, reason:", user, err)
return err
}
return nil
}
func GetKubeConfig(user string) (string, error) {
k8sClient := client.NewK8sClient()
configmap, err := k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Get(user, metav1.GetOptions{})
configMap, err := k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Get(user, metaV1.GetOptions{})
if err != nil {
glog.Errorf("cannot get user %s's kubeConfig, reason:", user, err)
return "", err
}
return configmap.Data[kubectlConfigKey], nil
return configMap.Data[kubectlConfigKey], nil
}
func DelKubeConfig(user string) error {
k8sClient := client.NewK8sClient()
_, err := k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Get(user, metav1.GetOptions{})
_, err := k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Get(user, metaV1.GetOptions{})
if errors.IsNotFound(err) {
return nil
}
deletePolicy := metav1.DeletePropagationBackground
err = k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Delete(user, &metav1.DeleteOptions{PropagationPolicy: &deletePolicy})
deletePolicy := metaV1.DeletePropagationBackground
err = k8sClient.CoreV1().ConfigMaps(constants.KubeSphereControlNamespace).Delete(user, &metaV1.DeleteOptions{PropagationPolicy: &deletePolicy})
if err != nil {
glog.Errorf("delete user %s's kubeConfig failed, reason:", user, err)
return err

View File

@@ -35,21 +35,20 @@ import (
const (
namespace = constants.KubeSphereControlNamespace
retry = 5
)
type kubectlPodInfo struct {
type KubectlPodInfo struct {
Namespace string `json:"namespace"`
Pod string `json:"pod"`
Container string `json:"container"`
}
func GetKubectlPod(user string) (kubectlPodInfo, error) {
func GetKubectlPod(user string) (KubectlPodInfo, error) {
k8sClient := client.NewK8sClient()
deploy, err := k8sClient.AppsV1beta2().Deployments(namespace).Get(user, metav1.GetOptions{})
if err != nil {
glog.Errorln(err)
return kubectlPodInfo{}, err
return KubectlPodInfo{}, err
}
selectors := deploy.Spec.Selector.MatchLabels
@@ -57,16 +56,16 @@ func GetKubectlPod(user string) (kubectlPodInfo, error) {
podList, err := k8sClient.CoreV1().Pods(namespace).List(metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
glog.Errorln(err)
return kubectlPodInfo{}, err
return KubectlPodInfo{}, err
}
pod, err := selectCorrectPod(namespace, podList.Items)
if err != nil {
glog.Errorln(err)
return kubectlPodInfo{}, err
return KubectlPodInfo{}, err
}
info := kubectlPodInfo{Namespace: pod.Namespace, Pod: pod.Name, Container: pod.Status.ContainerStatuses[0].Name}
info := KubectlPodInfo{Namespace: pod.Namespace, Pod: pod.Name, Container: pod.Status.ContainerStatuses[0].Name}
return info, nil
@@ -91,7 +90,12 @@ func selectCorrectPod(namespace string, pods []v1.Pod) (kubectlPod v1.Pod, err e
return kubectPodList[random], nil
}
func CreateKubectlPod(user string) error {
func CreateKubectlDeploy(user string) error {
k8sClient := client.NewK8sClient()
_, err := k8sClient.AppsV1().Deployments(namespace).Get(user, metav1.GetOptions{})
if err == nil {
return nil
}
replica := int32(1)
selector := metav1.LabelSelector{MatchLabels: map[string]string{"user": user}}
@@ -122,17 +126,12 @@ func CreateKubectlPod(user string) error {
},
}
k8sClient := client.NewK8sClient()
_, err := k8sClient.AppsV1beta2().Deployments(namespace).Create(&deployment)
if errors.IsAlreadyExists(err) {
return nil
}
_, err = k8sClient.AppsV1beta2().Deployments(namespace).Create(&deployment)
return err
}
func DelKubectlPod(user string) error {
func DelKubectlDeploy(user string) error {
k8sClient := client.NewK8sClient()
_, err := k8sClient.AppsV1beta2().Deployments(namespace).Get(user, metav1.GetOptions{})
if errors.IsNotFound(err) {

View File

@@ -22,6 +22,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"fmt"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/models/controllers"
)
@@ -55,7 +57,12 @@ func getUsage(namespace, resource string) int {
if err != nil {
return 0
}
return ctl.Count(namespace)
if len(namespace) == 0 {
return ctl.CountWithConditions("")
}
return ctl.CountWithConditions(fmt.Sprintf("namespace = '%s' ", namespace))
}
func GetClusterQuota() (*ResourceQuota, error) {

View File

@@ -6,8 +6,15 @@ import (
"strconv"
"strings"
"kubesphere.io/kubesphere/pkg/client"
"github.com/golang/glog"
"kubesphere.io/kubesphere/pkg/models/controllers"
"kubesphere.io/kubesphere/pkg/options"
)
const (
limit = "limit"
page = "page"
)
type ResourceList struct {
@@ -18,35 +25,16 @@ type ResourceList struct {
}
func getController(resource string) (controllers.Controller, error) {
var ctl controllers.Controller
attr := controllers.CommonAttribute{DB: client.NewDBClient()}
switch resource {
case controllers.Deployments:
ctl = &controllers.DeploymentCtl{attr}
case controllers.Statefulsets:
ctl = &controllers.StatefulsetCtl{attr}
case controllers.Daemonsets:
ctl = &controllers.DaemonsetCtl{attr}
case controllers.Ingresses:
ctl = &controllers.IngressCtl{attr}
case controllers.PersistentVolumeClaim:
ctl = &controllers.PvcCtl{attr}
case controllers.Roles:
ctl = &controllers.RoleCtl{attr}
case controllers.ClusterRoles:
ctl = &controllers.ClusterRoleCtl{attr}
case controllers.Services:
ctl = &controllers.ServiceCtl{attr}
case controllers.Pods:
ctl = &controllers.PodCtl{attr}
case controllers.Namespaces:
ctl = &controllers.NamespaceCtl{attr}
case controllers.StorageClasses:
ctl = &controllers.StorageClassCtl{attr}
case controllers.Deployments, controllers.Statefulsets, controllers.Daemonsets, controllers.Ingresses,
controllers.PersistentVolumeClaim, controllers.Roles, controllers.ClusterRoles, controllers.Services,
controllers.Pods, controllers.Namespaces, controllers.StorageClasses:
return controllers.ResourceControllers.Controllers[resource], nil
default:
return nil, errors.New("invalid resource type")
return nil, fmt.Errorf("invalid resource Name '%s'", resource)
}
return ctl, nil
return nil, nil
}
@@ -90,26 +78,44 @@ func getConditions(str string) (map[string]string, map[string]string, error) {
return match, fuzzy, nil
}
func getPaging(str string) (map[string]int, error) {
paging := make(map[string]int)
if len(str) == 0 {
return paging, nil
func getPaging(resourceName, pagingStr string) (*controllers.Paging, map[string]int, error) {
defaultPaging := &controllers.Paging{Limit: 10, Offset: 0}
defautlPagingMap := map[string]int{"page": 1, "limit": 10}
if resourceName == controllers.Namespaces {
defaultPaging = nil
defautlPagingMap = map[string]int{"page": 0, "limit": 0}
}
list := strings.Split(str, ",")
pagingMap := make(map[string]int)
if len(pagingStr) == 0 {
return defaultPaging, defautlPagingMap, nil
}
list := strings.Split(pagingStr, ",")
for _, item := range list {
kvs := strings.Split(item, "=")
if len(kvs) < 2 {
return nil, errors.New("invalid Paging input")
return nil, nil, errors.New("invalid Paging input")
}
value, err := strconv.Atoi(kvs[1])
if err != nil {
return nil, err
return nil, nil, errors.New("invalid Paging input")
}
paging[kvs[0]] = value
pagingMap[kvs[0]] = value
}
return paging, nil
if pagingMap[limit] <= 0 || pagingMap[page] <= 0 {
return nil, nil, errors.New("invalid Paging input")
}
if pagingMap[limit] > 0 && pagingMap[page] > 0 {
offset := (pagingMap[page] - 1) * pagingMap[limit]
return &controllers.Paging{Limit: pagingMap[limit], Offset: offset}, pagingMap, nil
}
return defaultPaging, defautlPagingMap, nil
}
func ListResource(resourceName, conditonSrt, pagingStr string) (*ResourceList, error) {
@@ -118,12 +124,12 @@ func ListResource(resourceName, conditonSrt, pagingStr string) (*ResourceList, e
return nil, err
}
pagingMap, err := getPaging(pagingStr)
paging, pagingMap, err := getPaging(resourceName, pagingStr)
if err != nil {
return nil, err
}
conditionStr, paging := generateConditionAndPaging(match, fuzzy, pagingMap)
conditionStr := generateConditionStr(match, fuzzy)
ctl, err := getController(resourceName)
if err != nil {
@@ -135,10 +141,10 @@ func ListResource(resourceName, conditonSrt, pagingStr string) (*ResourceList, e
return nil, err
}
return &ResourceList{Total: total, Items: items, Page: pagingMap["page"], Limit: pagingMap["limit"]}, nil
return &ResourceList{Total: total, Items: items, Page: pagingMap[page], Limit: pagingMap[limit]}, nil
}
func generateConditionAndPaging(match map[string]string, fuzzy map[string]string, paging map[string]int) (string, *controllers.Paging) {
func generateConditionStr(match map[string]string, fuzzy map[string]string) string {
conditionStr := ""
for k, v := range match {
@@ -157,12 +163,7 @@ func generateConditionAndPaging(match map[string]string, fuzzy map[string]string
}
}
if paging["limit"] > 0 && paging["page"] >= 0 {
offset := (paging["page"] - 1) * paging["limit"]
return conditionStr, &controllers.Paging{Limit: paging["limit"], Offset: offset}
}
return conditionStr, nil
return conditionStr
}
type workLoadStatus struct {
@@ -176,14 +177,14 @@ func GetNamespacesResourceStatus(namespace string) (*workLoadStatus, error) {
var status *ResourceList
var err error
for _, resource := range []string{controllers.Deployments, controllers.Statefulsets, controllers.Daemonsets, controllers.PersistentVolumeClaim} {
resourceStatus := controllers.Updating
notReadyStatus := controllers.Updating
if resource == controllers.PersistentVolumeClaim {
resourceStatus = controllers.PvcPending
notReadyStatus = controllers.PvcPending
}
if len(namespace) > 0 {
status, err = ListResource(resource, fmt.Sprintf("status=%s,namespace=%s", resourceStatus, namespace), "")
status, err = ListResource(resource, fmt.Sprintf("status=%s,namespace=%s", notReadyStatus, namespace), "")
} else {
status, err = ListResource(resource, fmt.Sprintf("status=%s", resourceStatus), "")
status, err = ListResource(resource, fmt.Sprintf("status=%s", notReadyStatus), "")
}
if err != nil {
@@ -191,9 +192,7 @@ func GetNamespacesResourceStatus(namespace string) (*workLoadStatus, error) {
}
count := status.Total
//items := status.Items
res.Count[resource] = count
//res.Items[resource] = items
}
return &res, nil
@@ -203,3 +202,31 @@ func GetClusterResourceStatus() (*workLoadStatus, error) {
return GetNamespacesResourceStatus("")
}
func GetApplication(clusterId string) (interface{}, error) {
ctl := &controllers.ApplicationCtl{OpenpitrixAddr: options.ServerOptions.GetOpAddress()}
return ctl.GetApp(clusterId)
}
func ListApplication(runtimeId, conditions, pagingStr string) (*ResourceList, error) {
paging, pagingMap, err := getPaging(controllers.Applications, pagingStr)
if err != nil {
return nil, err
}
match, fuzzy, err := getConditions(conditions)
if err != nil {
glog.Error(err)
return nil, err
}
ctl := &controllers.ApplicationCtl{OpenpitrixAddr: options.ServerOptions.GetOpAddress()}
total, items, err := ctl.ListApplication(runtimeId, match, fuzzy, paging)
if err != nil {
glog.Errorf("get application list failed, reason: %s", err)
return nil, err
}
return &ResourceList{Total: total, Items: items, Page: pagingMap[page], Limit: pagingMap[limit]}, nil
}