add resync function and support to view deployed applications
This commit is contained in:
498
pkg/models/controllers/applications.go
Normal file
498
pkg/models/controllers/applications.go
Normal file
@@ -0,0 +1,498 @@
|
||||
/*
|
||||
Copyright 2018 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/client"
|
||||
)
|
||||
|
||||
const (
|
||||
unknown = "-"
|
||||
deploySurffix = "-Deployment"
|
||||
daemonSurffix = "-DaemonSet"
|
||||
stateSurffix = "-StatefulSet"
|
||||
)
|
||||
|
||||
type ApplicationCtl struct {
|
||||
OpenpitrixAddr string
|
||||
}
|
||||
|
||||
type Application struct {
|
||||
Name string `json:"name"`
|
||||
RepoName string `json:"repoName"`
|
||||
Runtime string `json:"namespace"`
|
||||
RuntimeId string `json:"runtime_id"`
|
||||
Version string `json:"version"`
|
||||
VersionId string `json:"version_id"`
|
||||
Status string `json:"status"`
|
||||
UpdateTime time.Time `json:"updateTime"`
|
||||
CreateTime time.Time `json:"createTime"`
|
||||
App string `json:"app"`
|
||||
AppId string `json:"app_id"`
|
||||
Creator string `json:"creator,omitempty"`
|
||||
WorkLoads *workLoads `json:"workloads,omitempty"`
|
||||
Services *[]Service `json:"services,omitempty"`
|
||||
Ingresses *[]ing `json:"ingresses,omitempty"`
|
||||
ClusterID string `json:"cluster_id"`
|
||||
}
|
||||
|
||||
type ing struct {
|
||||
Name string `json:"name"`
|
||||
Rules []ingressRule `json:"rules"`
|
||||
}
|
||||
|
||||
type clusterRole struct {
|
||||
ClusterID string `json:"cluster_id"`
|
||||
Role string `json:"role"`
|
||||
}
|
||||
|
||||
type cluster struct {
|
||||
ClusterID string `json:"cluster_id"`
|
||||
Name string `json:"name"`
|
||||
AppID string `json:"app_id"`
|
||||
VersionID string `json:"version_id"`
|
||||
Status string `json:"status"`
|
||||
UpdateTime time.Time `json:"status_time"`
|
||||
CreateTime time.Time `json:"createTime"`
|
||||
RunTimeId string `json:"runtime_id"`
|
||||
Description string `json:"description"`
|
||||
ClusterRoleSets []clusterRole `json:"cluster_role_set"`
|
||||
}
|
||||
|
||||
type clusters struct {
|
||||
Total int `json:"total_count"`
|
||||
Clusters []cluster `json:"cluster_set"`
|
||||
}
|
||||
|
||||
type versionList struct {
|
||||
Total int `json:"total_count"`
|
||||
Versions []version `json:"app_version_set"`
|
||||
}
|
||||
|
||||
type version struct {
|
||||
Name string `json:"name"`
|
||||
VersionID string `json:"version_id"`
|
||||
}
|
||||
|
||||
type runtime struct {
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
Zone string `json:"zone"`
|
||||
}
|
||||
|
||||
type runtimeList struct {
|
||||
Total int `json:"total_count"`
|
||||
Runtimes []runtime `json:"runtime_set"`
|
||||
}
|
||||
|
||||
type app struct {
|
||||
AppId string `json:"app_id"`
|
||||
Name string `json:"name"`
|
||||
ChartName string `json:"chart_name"`
|
||||
RepoId string `json:"repo_id"`
|
||||
}
|
||||
|
||||
type repo struct {
|
||||
RepoId string `json:"repo_id"`
|
||||
Name string `json:"name"`
|
||||
Url string `json:"url"`
|
||||
}
|
||||
|
||||
type workLoads struct {
|
||||
Deployments []Deployment `json:"deployments,omitempty"`
|
||||
Statefulsets []Statefulset `json:"statefulsets,omitempty"`
|
||||
Daemonsets []Daemonset `json:"daemonsets,omitempty"`
|
||||
}
|
||||
|
||||
type description struct {
|
||||
Creator string `json:"creator"`
|
||||
}
|
||||
|
||||
type appList struct {
|
||||
Total int `json:"total_count"`
|
||||
Apps []app `json:"app_set"`
|
||||
}
|
||||
|
||||
type repoList struct {
|
||||
Total int `json:"total_count"`
|
||||
Repos []repo `json:"repo_set"`
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) GetAppInfo(appId string) (string, string, string, error) {
|
||||
url := fmt.Sprintf("%s/v1/apps?app_id=%s", ctl.OpenpitrixAddr, appId)
|
||||
resp, err := makeHttpRequest("GET", url, "")
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return unknown, unknown, unknown, err
|
||||
}
|
||||
|
||||
var apps appList
|
||||
err = json.Unmarshal(resp, &apps)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return unknown, unknown, unknown, err
|
||||
}
|
||||
|
||||
if len(apps.Apps) == 0 {
|
||||
return unknown, unknown, unknown, err
|
||||
}
|
||||
|
||||
return apps.Apps[0].ChartName, apps.Apps[0].RepoId, apps.Apps[0].AppId, nil
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) GetRepo(repoId string) (string, error) {
|
||||
url := fmt.Sprintf("%s/v1/repos?repo_id=%s", ctl.OpenpitrixAddr, repoId)
|
||||
resp, err := makeHttpRequest("GET", url, "")
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return unknown, err
|
||||
}
|
||||
|
||||
var repos repoList
|
||||
err = json.Unmarshal(resp, &repos)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return unknown, err
|
||||
}
|
||||
|
||||
if len(repos.Repos) == 0 {
|
||||
return unknown, err
|
||||
}
|
||||
|
||||
return repos.Repos[0].Name, nil
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) GetVersion(versionId string) (string, error) {
|
||||
versionUrl := fmt.Sprintf("%s/v1/app_versions?version_id=%s", ctl.OpenpitrixAddr, versionId)
|
||||
resp, err := makeHttpRequest("GET", versionUrl, "")
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return unknown, err
|
||||
}
|
||||
|
||||
var versions versionList
|
||||
err = json.Unmarshal(resp, &versions)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return unknown, err
|
||||
}
|
||||
|
||||
if len(versions.Versions) == 0 {
|
||||
return unknown, nil
|
||||
}
|
||||
return versions.Versions[0].Name, nil
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) GetRuntime(runtimeId string) (string, error) {
|
||||
|
||||
versionUrl := fmt.Sprintf("%s/v1/runtimes?runtime_id=%s", ctl.OpenpitrixAddr, runtimeId)
|
||||
resp, err := makeHttpRequest("GET", versionUrl, "")
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return unknown, err
|
||||
}
|
||||
|
||||
var runtimes runtimeList
|
||||
err = json.Unmarshal(resp, &runtimes)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return unknown, err
|
||||
}
|
||||
|
||||
if len(runtimes.Runtimes) == 0 {
|
||||
return unknown, nil
|
||||
}
|
||||
|
||||
return runtimes.Runtimes[0].Zone, nil
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) GetWorkLoads(namespace string, clusterRoles []clusterRole) *workLoads {
|
||||
|
||||
var works workLoads
|
||||
for _, clusterRole := range clusterRoles {
|
||||
workLoadName := clusterRole.Role
|
||||
if len(workLoadName) > 0 {
|
||||
if strings.HasSuffix(workLoadName, deploySurffix) {
|
||||
name := strings.Split(workLoadName, deploySurffix)[0]
|
||||
ctl := ResourceControllers.Controllers[Deployments]
|
||||
_, items, _ := ctl.ListWithConditions(fmt.Sprintf("namespace='%s' and name = '%s'", namespace, name), nil)
|
||||
works.Deployments = append(works.Deployments, items.([]Deployment)...)
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.HasSuffix(workLoadName, daemonSurffix) {
|
||||
name := strings.Split(workLoadName, daemonSurffix)[0]
|
||||
ctl := ResourceControllers.Controllers[Daemonsets]
|
||||
_, items, _ := ctl.ListWithConditions(fmt.Sprintf("namespace='%s' and name = '%s'", namespace, name), nil)
|
||||
works.Daemonsets = append(works.Daemonsets, items.([]Daemonset)...)
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.HasSuffix(workLoadName, stateSurffix) {
|
||||
name := strings.Split(workLoadName, stateSurffix)[0]
|
||||
ctl := ResourceControllers.Controllers[Statefulsets]
|
||||
_, items, _ := ctl.ListWithConditions(fmt.Sprintf("namespace='%s' and name = '%s'", namespace, name), nil)
|
||||
works.Statefulsets = append(works.Statefulsets, items.([]Statefulset)...)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return &works
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) getCreator(desc string) string {
|
||||
var dc description
|
||||
err := json.Unmarshal([]byte(desc), &dc)
|
||||
if err != nil {
|
||||
return unknown
|
||||
}
|
||||
return dc.Creator
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) getLabels(namespace string, workloads *workLoads) *[]map[string]string {
|
||||
k8sClient := client.NewK8sClient()
|
||||
|
||||
var workloadLables []map[string]string
|
||||
if workloads == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, workload := range workloads.Deployments {
|
||||
deploy, err := k8sClient.AppsV1().Deployments(namespace).Get(workload.Name, metaV1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
continue
|
||||
}
|
||||
workloadLables = append(workloadLables, deploy.Labels)
|
||||
}
|
||||
|
||||
for _, workload := range workloads.Daemonsets {
|
||||
daemonset, err := k8sClient.AppsV1().DaemonSets(namespace).Get(workload.Name, metaV1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
continue
|
||||
}
|
||||
workloadLables = append(workloadLables, daemonset.Labels)
|
||||
}
|
||||
|
||||
for _, workload := range workloads.Statefulsets {
|
||||
statefulset, err := k8sClient.AppsV1().StatefulSets(namespace).Get(workload.Name, metaV1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
continue
|
||||
}
|
||||
workloadLables = append(workloadLables, statefulset.Labels)
|
||||
}
|
||||
|
||||
return &workloadLables
|
||||
}
|
||||
|
||||
func isExist(svcs []Service, svc v1.Service) bool {
|
||||
for _, item := range svcs {
|
||||
if item.Name == svc.Name && item.Namespace == svc.Namespace {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) getSvcs(namespace string, workLoadLabels *[]map[string]string) *[]Service {
|
||||
if len(*workLoadLabels) == 0 {
|
||||
return nil
|
||||
}
|
||||
k8sClient := client.NewK8sClient()
|
||||
var services []Service
|
||||
for _, label := range *workLoadLabels {
|
||||
labelSelector := labels.Set(label).AsSelector().String()
|
||||
svcs, err := k8sClient.CoreV1().Services(namespace).List(metaV1.ListOptions{LabelSelector: labelSelector})
|
||||
if err != nil {
|
||||
glog.Errorf("get app's svc failed, reason: %v", err)
|
||||
}
|
||||
for _, item := range svcs.Items {
|
||||
if !isExist(services, item) {
|
||||
services = append(services, *generateSvcObject(item))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &services
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) getIng(namespace string, services *[]Service) *[]ing {
|
||||
if services == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
ingCtl := ResourceControllers.Controllers[Ingresses]
|
||||
var ings []ing
|
||||
for _, svc := range *services {
|
||||
_, items, err := ingCtl.ListWithConditions(fmt.Sprintf("namespace = '%s' and rules like '%%%s%%' ", namespace, svc.Name), nil)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.Error(items)
|
||||
for _, ingress := range items.([]Ingress) {
|
||||
var rules []ingressRule
|
||||
err := json.Unmarshal([]byte(ingress.Rules), &rules)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
exist := false
|
||||
var tmpRules []ingressRule
|
||||
for _, rule := range rules {
|
||||
if rule.Service == svc.Name {
|
||||
exist = true
|
||||
tmpRules = append(tmpRules, rule)
|
||||
}
|
||||
}
|
||||
|
||||
if exist {
|
||||
ings = append(ings, ing{Name: ingress.Name, Rules: tmpRules})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &ings
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) ListApplication(runtimeId string, match, fuzzy map[string]string, paging *Paging) (int, interface{}, error) {
|
||||
limit := paging.Limit
|
||||
offset := paging.Offset
|
||||
if strings.HasSuffix(ctl.OpenpitrixAddr, "/") {
|
||||
ctl.OpenpitrixAddr = strings.TrimSuffix(ctl.OpenpitrixAddr, "/")
|
||||
}
|
||||
|
||||
defaultStatus := "status=active&status=stopped&status=pending&status=ceased"
|
||||
|
||||
url := fmt.Sprintf("%s/v1/clusters?limit=%s&offset=%s", ctl.OpenpitrixAddr, strconv.Itoa(limit), strconv.Itoa(offset))
|
||||
|
||||
if len(fuzzy["name"]) > 0 {
|
||||
url = fmt.Sprintf("%s&search_word=%s", url, fuzzy["name"])
|
||||
}
|
||||
|
||||
if len(match["status"]) > 0 {
|
||||
url = fmt.Sprintf("%s&status=%s", url, match["status"])
|
||||
} else {
|
||||
url = fmt.Sprintf("%s&%s", url, defaultStatus)
|
||||
}
|
||||
|
||||
if len(runtimeId) > 0 {
|
||||
url = fmt.Sprintf("%s&runtime_id=%s", url, runtimeId)
|
||||
}
|
||||
|
||||
resp, err := makeHttpRequest("GET", url, "")
|
||||
if err != nil {
|
||||
glog.Errorf("request %s failed, reason: %s", url, err)
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
var clusterList clusters
|
||||
err = json.Unmarshal(resp, &clusterList)
|
||||
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
var apps []Application
|
||||
|
||||
for _, item := range clusterList.Clusters {
|
||||
var app Application
|
||||
|
||||
app.Name = item.Name
|
||||
app.ClusterID = item.ClusterID
|
||||
app.UpdateTime = item.UpdateTime
|
||||
app.Status = item.Status
|
||||
versionInfo, _ := ctl.GetVersion(item.VersionID)
|
||||
app.Version = versionInfo
|
||||
app.VersionId = item.VersionID
|
||||
runtimeInfo, _ := ctl.GetRuntime(item.RunTimeId)
|
||||
app.Runtime = runtimeInfo
|
||||
app.RuntimeId = item.RunTimeId
|
||||
appInfo, _, appId, _ := ctl.GetAppInfo(item.AppID)
|
||||
app.App = appInfo
|
||||
app.AppId = appId
|
||||
|
||||
apps = append(apps, app)
|
||||
}
|
||||
|
||||
return clusterList.Total, apps, nil
|
||||
}
|
||||
|
||||
func (ctl *ApplicationCtl) GetApp(clusterId string) (*Application, error) {
|
||||
if strings.HasSuffix(ctl.OpenpitrixAddr, "/") {
|
||||
ctl.OpenpitrixAddr = strings.TrimSuffix(ctl.OpenpitrixAddr, "/")
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/v1/clusters?cluster_id=%s", ctl.OpenpitrixAddr, clusterId)
|
||||
|
||||
resp, err := makeHttpRequest("GET", url, "")
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var clusterList clusters
|
||||
err = json.Unmarshal(resp, &clusterList)
|
||||
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(clusterList.Clusters) == 0 {
|
||||
return nil, fmt.Errorf("NotFound, clusterId:%s", clusterId)
|
||||
}
|
||||
|
||||
item := clusterList.Clusters[0]
|
||||
var app Application
|
||||
|
||||
app.Name = item.Name
|
||||
app.ClusterID = item.ClusterID
|
||||
app.UpdateTime = item.UpdateTime
|
||||
app.CreateTime = item.CreateTime
|
||||
app.Status = item.Status
|
||||
versionInfo, _ := ctl.GetVersion(item.VersionID)
|
||||
app.Version = versionInfo
|
||||
app.VersionId = item.VersionID
|
||||
|
||||
runtimeInfo, _ := ctl.GetRuntime(item.RunTimeId)
|
||||
app.Runtime = runtimeInfo
|
||||
app.RuntimeId = item.RunTimeId
|
||||
appInfo, repoId, appId, _ := ctl.GetAppInfo(item.AppID)
|
||||
app.App = appInfo
|
||||
app.AppId = appId
|
||||
app.Creator = ctl.getCreator(item.Description)
|
||||
|
||||
app.RepoName, _ = ctl.GetRepo(repoId)
|
||||
app.WorkLoads = ctl.GetWorkLoads(app.Runtime, item.ClusterRoleSets)
|
||||
workloadLabels := ctl.getLabels(app.Runtime, app.WorkLoads)
|
||||
app.Services = ctl.getSvcs(app.Runtime, workloadLabels)
|
||||
app.Ingresses = ctl.getIng(app.Runtime, app.Services)
|
||||
|
||||
return &app, nil
|
||||
}
|
||||
@@ -27,9 +27,11 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
const systemPrefix = "system:"
|
||||
|
||||
func (ctl *ClusterRoleCtl) generateObject(item v1.ClusterRole) *ClusterRole {
|
||||
name := item.Name
|
||||
if strings.HasPrefix(name, "system:") {
|
||||
if strings.HasPrefix(name, systemPrefix) {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -43,30 +45,22 @@ func (ctl *ClusterRoleCtl) generateObject(item v1.ClusterRole) *ClusterRole {
|
||||
return object
|
||||
}
|
||||
|
||||
func (ctl *ClusterRoleCtl) listAndWatch() {
|
||||
defer func() {
|
||||
close(ctl.aliveChan)
|
||||
if err := recover(); err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
func (ctl *ClusterRoleCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *ClusterRoleCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
|
||||
if db.HasTable(&ClusterRole{}) {
|
||||
db.DropTable(&ClusterRole{})
|
||||
|
||||
}
|
||||
|
||||
db = db.CreateTable(&ClusterRole{})
|
||||
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Rbac().V1().ClusterRoles().Informer()
|
||||
lister := kubeInformerFactory.Rbac().V1().ClusterRoles().Lister()
|
||||
ctl.initListerAndInformer()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
@@ -74,10 +68,38 @@ func (ctl *ClusterRoleCtl) listAndWatch() {
|
||||
|
||||
for _, item := range list {
|
||||
obj := ctl.generateObject(*item)
|
||||
db.Create(obj)
|
||||
|
||||
if obj != nil {
|
||||
db.Create(obj)
|
||||
}
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *ClusterRoleCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
|
||||
return 0
|
||||
}
|
||||
|
||||
count := 0
|
||||
for _, item := range list {
|
||||
if !strings.HasPrefix(item.Name, systemPrefix) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
func (ctl *ClusterRoleCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
ctl.lister = informerFactory.Rbac().V1().ClusterRoles().Lister()
|
||||
|
||||
informer := informerFactory.Rbac().V1().ClusterRoles().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
|
||||
@@ -102,13 +124,15 @@ func (ctl *ClusterRoleCtl) listAndWatch() {
|
||||
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *ClusterRoleCtl) CountWithConditions(conditions string) int {
|
||||
var object ClusterRole
|
||||
|
||||
if strings.Contains(conditions, "namespace") {
|
||||
conditions = ""
|
||||
}
|
||||
return countWithConditions(ctl.DB, conditions, &object)
|
||||
}
|
||||
|
||||
@@ -124,10 +148,3 @@ func (ctl *ClusterRoleCtl) ListWithConditions(conditions string, paging *Paging)
|
||||
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *ClusterRoleCtl) Count(namespace string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
db.Model(&ClusterRole{}).Count(&count)
|
||||
return count
|
||||
}
|
||||
|
||||
@@ -16,7 +16,17 @@ limitations under the License.
|
||||
|
||||
package controllers
|
||||
|
||||
import "github.com/jinzhu/gorm"
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/jinzhu/gorm"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func listWithConditions(db *gorm.DB, total *int, object, list interface{}, conditions string, paging *Paging, order string) {
|
||||
if len(conditions) == 0 {
|
||||
@@ -50,3 +60,83 @@ func countWithConditions(db *gorm.DB, conditions string, object interface{}) int
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func makeHttpRequest(method, url, data string) ([]byte, error) {
|
||||
var req *http.Request
|
||||
|
||||
var err error
|
||||
if method == "GET" {
|
||||
req, err = http.NewRequest(method, url, nil)
|
||||
} else {
|
||||
req, err = http.NewRequest(method, url, strings.NewReader(data))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
httpClient := &http.Client{}
|
||||
resp, err := httpClient.Do(req)
|
||||
|
||||
if err != nil {
|
||||
err := fmt.Errorf("Request to %s failed, method: %s, reason: %s ", url, method, err)
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= http.StatusBadRequest {
|
||||
err = errors.New(string(body))
|
||||
}
|
||||
return body, err
|
||||
}
|
||||
|
||||
func handleCrash(ctl Controller) {
|
||||
close(ctl.chanAlive())
|
||||
if err := recover(); err != nil {
|
||||
glog.Errorf("panic occur in %s controller's listAndWatch function, reason: %s", ctl.Name(), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func hasSynced(ctl Controller) bool {
|
||||
totalInDb := ctl.CountWithConditions("")
|
||||
totalInK8s := ctl.total()
|
||||
|
||||
if totalInDb == totalInK8s {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func checkAndResync(ctl Controller, stopChan chan struct{}) {
|
||||
defer close(stopChan)
|
||||
for {
|
||||
select {
|
||||
case <-ctl.chanStop():
|
||||
return
|
||||
default:
|
||||
time.Sleep(30 * time.Minute)
|
||||
|
||||
if !hasSynced(ctl) {
|
||||
glog.Errorf("the data in db and kubernetes is inconsistent, resync %s controller", ctl.Name())
|
||||
close(stopChan)
|
||||
stopChan = make(chan struct{})
|
||||
go ctl.sync(stopChan)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func listAndWatch(ctl Controller) {
|
||||
defer handleCrash(ctl)
|
||||
|
||||
stopChan := make(chan struct{})
|
||||
|
||||
go ctl.sync(stopChan)
|
||||
|
||||
checkAndResync(ctl, stopChan)
|
||||
}
|
||||
|
||||
@@ -61,30 +61,21 @@ func (ctl *DaemonsetCtl) generateObject(item v1.DaemonSet) *Daemonset {
|
||||
return object
|
||||
}
|
||||
|
||||
func (ctl *DaemonsetCtl) listAndWatch() {
|
||||
defer func() {
|
||||
close(ctl.aliveChan)
|
||||
if err := recover(); err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
func (ctl *DaemonsetCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *DaemonsetCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
|
||||
if db.HasTable(&Daemonset{}) {
|
||||
db.DropTable(&Daemonset{})
|
||||
|
||||
}
|
||||
|
||||
db = db.CreateTable(&Daemonset{})
|
||||
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Apps().V1().DaemonSets().Informer()
|
||||
lister := kubeInformerFactory.Apps().V1().DaemonSets().Lister()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
ctl.initListerAndInformer()
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
@@ -93,9 +84,27 @@ func (ctl *DaemonsetCtl) listAndWatch() {
|
||||
for _, item := range list {
|
||||
obj := ctl.generateObject(*item)
|
||||
db.Create(obj)
|
||||
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *DaemonsetCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
|
||||
return 0
|
||||
}
|
||||
return len(list)
|
||||
}
|
||||
|
||||
func (ctl *DaemonsetCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
ctl.lister = informerFactory.Apps().V1().DaemonSets().Lister()
|
||||
|
||||
informer := informerFactory.Apps().V1().DaemonSets().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
|
||||
@@ -117,7 +126,7 @@ func (ctl *DaemonsetCtl) listAndWatch() {
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *DaemonsetCtl) CountWithConditions(conditions string) int {
|
||||
@@ -138,13 +147,13 @@ func (ctl *DaemonsetCtl) ListWithConditions(conditions string, paging *Paging) (
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *DaemonsetCtl) Count(namespace string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
if len(namespace) == 0 {
|
||||
db.Model(&Daemonset{}).Count(&count)
|
||||
} else {
|
||||
db.Model(&Daemonset{}).Where("namespace = ?", namespace).Count(&count)
|
||||
}
|
||||
return count
|
||||
}
|
||||
//func (ctl *DaemonsetCtl) Count(namespace string) int {
|
||||
// var count int
|
||||
// db := ctl.DB
|
||||
// if len(namespace) == 0 {
|
||||
// db.Model(&Daemonset{}).Count(&count)
|
||||
// } else {
|
||||
// db.Model(&Daemonset{}).Where("namespace = ?", namespace).Count(&count)
|
||||
// }
|
||||
// return count
|
||||
//}
|
||||
|
||||
@@ -21,9 +21,8 @@ import (
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/apps/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
@@ -65,15 +64,11 @@ func (ctl *DeploymentCtl) generateObject(item v1.Deployment) *Deployment {
|
||||
App: app, UpdateTime: updateTime, Status: status, Annotation: Annotation{item.Annotations}}
|
||||
}
|
||||
|
||||
func (ctl *DeploymentCtl) listAndWatch() {
|
||||
defer func() {
|
||||
close(ctl.aliveChan)
|
||||
if err := recover(); err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
func (ctl *DeploymentCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *DeploymentCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
if db.HasTable(&Deployment{}) {
|
||||
db.DropTable(&Deployment{})
|
||||
@@ -81,12 +76,8 @@ func (ctl *DeploymentCtl) listAndWatch() {
|
||||
|
||||
db = db.CreateTable(&Deployment{})
|
||||
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Apps().V1().Deployments().Informer()
|
||||
lister := kubeInformerFactory.Apps().V1().Deployments().Lister()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
ctl.initListerAndInformer()
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
@@ -95,9 +86,29 @@ func (ctl *DeploymentCtl) listAndWatch() {
|
||||
for _, item := range list {
|
||||
obj := ctl.generateObject(*item)
|
||||
db.Create(obj)
|
||||
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *DeploymentCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", ctl.Name(), err)
|
||||
return 0
|
||||
}
|
||||
|
||||
return len(list)
|
||||
}
|
||||
|
||||
func (ctl *DeploymentCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
|
||||
ctl.lister = informerFactory.Apps().V1().Deployments().Lister()
|
||||
|
||||
informer := informerFactory.Apps().V1().Deployments().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
|
||||
@@ -118,8 +129,7 @@ func (ctl *DeploymentCtl) listAndWatch() {
|
||||
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *DeploymentCtl) CountWithConditions(conditions string) int {
|
||||
@@ -139,14 +149,3 @@ func (ctl *DeploymentCtl) ListWithConditions(conditions string, paging *Paging)
|
||||
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *DeploymentCtl) Count(namespace string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
if len(namespace) == 0 {
|
||||
db.Model(&Deployment{}).Count(&count)
|
||||
} else {
|
||||
db.Model(&Deployment{}).Where("namespace = ?", namespace).Count(&count)
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"encoding/json"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@@ -46,35 +48,41 @@ func (ctl *IngressCtl) generateObject(item v1beta1.Ingress) *Ingress {
|
||||
ip = strings.Join(ipList, ",")
|
||||
}
|
||||
|
||||
object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, Annotation: Annotation{item.Annotations}}
|
||||
var ingRules []ingressRule
|
||||
for _, rule := range item.Spec.Rules {
|
||||
host := rule.Host
|
||||
for _, path := range rule.HTTP.Paths {
|
||||
var ingRule ingressRule
|
||||
ingRule.Host = host
|
||||
ingRule.Service = path.Backend.ServiceName
|
||||
ingRule.Port = path.Backend.ServicePort.IntVal
|
||||
ingRule.Path = path.Path
|
||||
ingRules = append(ingRules, ingRule)
|
||||
}
|
||||
}
|
||||
|
||||
ruleStr, _ := json.Marshal(ingRules)
|
||||
|
||||
object := &Ingress{Namespace: namespace, Name: name, TlsTermination: tls, Ip: ip, CreateTime: createTime, Annotation: Annotation{item.Annotations}, Rules: string(ruleStr)}
|
||||
|
||||
return object
|
||||
}
|
||||
|
||||
func (ctl *IngressCtl) listAndWatch() {
|
||||
defer func() {
|
||||
close(ctl.aliveChan)
|
||||
if err := recover(); err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
func (ctl *IngressCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *IngressCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
|
||||
if db.HasTable(&Ingress{}) {
|
||||
db.DropTable(&Ingress{})
|
||||
|
||||
}
|
||||
|
||||
db = db.CreateTable(&Ingress{})
|
||||
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Extensions().V1beta1().Ingresses().Informer()
|
||||
lister := kubeInformerFactory.Extensions().V1beta1().Ingresses().Lister()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
ctl.initListerAndInformer()
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
@@ -83,9 +91,28 @@ func (ctl *IngressCtl) listAndWatch() {
|
||||
for _, item := range list {
|
||||
obj := ctl.generateObject(*item)
|
||||
db.Create(obj)
|
||||
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *IngressCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
|
||||
return 0
|
||||
}
|
||||
return len(list)
|
||||
}
|
||||
|
||||
func (ctl *IngressCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
|
||||
ctl.lister = informerFactory.Extensions().V1beta1().Ingresses().Lister()
|
||||
|
||||
informer := informerFactory.Extensions().V1beta1().Ingresses().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
|
||||
@@ -107,7 +134,7 @@ func (ctl *IngressCtl) listAndWatch() {
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *IngressCtl) CountWithConditions(conditions string) int {
|
||||
@@ -128,13 +155,13 @@ func (ctl *IngressCtl) ListWithConditions(conditions string, paging *Paging) (in
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *IngressCtl) Count(namespace string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
if len(namespace) == 0 {
|
||||
db.Model(&Ingress{}).Count(&count)
|
||||
} else {
|
||||
db.Model(&Ingress{}).Where("namespace = ?", namespace).Count(&count)
|
||||
}
|
||||
return count
|
||||
}
|
||||
//func (ctl *IngressCtl) Count(namespace string) int {
|
||||
// var count int
|
||||
// db := ctl.DB
|
||||
// if len(namespace) == 0 {
|
||||
// db.Model(&Ingress{}).Count(&count)
|
||||
// } else {
|
||||
// db.Model(&Ingress{}).Where("namespace = ?", namespace).Count(&count)
|
||||
// }
|
||||
// return count
|
||||
//}
|
||||
|
||||
@@ -19,9 +19,6 @@ package controllers
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
@@ -30,12 +27,13 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
|
||||
"k8s.io/client-go/informers"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/client"
|
||||
"kubesphere.io/kubesphere/pkg/constants"
|
||||
"kubesphere.io/kubesphere/pkg/options"
|
||||
@@ -69,25 +67,6 @@ type DeleteRunTime struct {
|
||||
RuntimeId []string `json:"runtime_id"`
|
||||
}
|
||||
|
||||
func makeHttpRequest(method, url, data string) ([]byte, error) {
|
||||
req, err := http.NewRequest(method, url, strings.NewReader(data))
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
httpClient := &http.Client{}
|
||||
resp, err := httpClient.Do(req)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
defer resp.Body.Close()
|
||||
return body, err
|
||||
}
|
||||
|
||||
func (ctl *NamespaceCtl) getKubeConfig(user string) (string, error) {
|
||||
k8sClient := client.NewK8sClient()
|
||||
configmap, err := k8sClient.CoreV1().ConfigMaps(kubectlNamespace).Get(user, metaV1.GetOptions{})
|
||||
@@ -262,15 +241,11 @@ func (ctl *NamespaceCtl) generateObject(item v1.Namespace) *Namespace {
|
||||
return object
|
||||
}
|
||||
|
||||
func (ctl *NamespaceCtl) listAndWatch() {
|
||||
defer func() {
|
||||
close(ctl.aliveChan)
|
||||
if err := recover(); err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
func (ctl *NamespaceCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *NamespaceCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
|
||||
if db.HasTable(&Namespace{}) {
|
||||
@@ -279,12 +254,8 @@ func (ctl *NamespaceCtl) listAndWatch() {
|
||||
|
||||
db = db.CreateTable(&Namespace{})
|
||||
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Core().V1().Namespaces().Informer()
|
||||
lister := kubeInformerFactory.Core().V1().Namespaces().Lister()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
ctl.initListerAndInformer()
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
@@ -294,9 +265,28 @@ func (ctl *NamespaceCtl) listAndWatch() {
|
||||
obj := ctl.generateObject(*item)
|
||||
db.Create(obj)
|
||||
ctl.createRoleAndRuntime(*item)
|
||||
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *NamespaceCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
|
||||
return 0
|
||||
}
|
||||
return len(list)
|
||||
}
|
||||
|
||||
func (ctl *NamespaceCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
|
||||
ctl.lister = informerFactory.Core().V1().Namespaces().Lister()
|
||||
|
||||
informer := informerFactory.Core().V1().Namespaces().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
|
||||
@@ -321,7 +311,7 @@ func (ctl *NamespaceCtl) listAndWatch() {
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *NamespaceCtl) CountWithConditions(conditions string) int {
|
||||
@@ -339,26 +329,21 @@ func (ctl *NamespaceCtl) ListWithConditions(conditions string, paging *Paging) (
|
||||
|
||||
listWithConditions(ctl.DB, &total, &object, &list, conditions, paging, order)
|
||||
|
||||
for index := range list {
|
||||
usage, err := ctl.GetNamespaceQuota(list[index].Name)
|
||||
if err == nil {
|
||||
list[index].Usaeg = usage
|
||||
if paging != nil {
|
||||
for index := range list {
|
||||
usage, err := ctl.GetNamespaceQuota(list[index].Name)
|
||||
if err == nil {
|
||||
list[index].Usaeg = usage
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *NamespaceCtl) Count(namespace string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
db.Model(&Namespace{}).Count(&count)
|
||||
return count
|
||||
}
|
||||
|
||||
func getUsage(namespace, resource string) int {
|
||||
ctl := rec.controllers[resource]
|
||||
return ctl.Count(namespace)
|
||||
ctl := ResourceControllers.Controllers[resource]
|
||||
return ctl.CountWithConditions(fmt.Sprintf("namespace = '%s' ", namespace))
|
||||
}
|
||||
|
||||
func (ctl *NamespaceCtl) GetNamespaceQuota(namespace string) (v1.ResourceList, error) {
|
||||
@@ -373,7 +358,7 @@ func (ctl *NamespaceCtl) GetNamespaceQuota(namespace string) (v1.ResourceList, e
|
||||
usage[v1.ResourceName(resourceName)] = quantity
|
||||
}
|
||||
|
||||
podCtl := rec.controllers[Pods]
|
||||
podCtl := ResourceControllers.Controllers[Pods]
|
||||
var quantity resource.Quantity
|
||||
used := podCtl.CountWithConditions(fmt.Sprintf("status=\"%s\" And namespace=\"%s\"", "Running", namespace))
|
||||
quantity.Set(int64(used))
|
||||
|
||||
@@ -199,25 +199,24 @@ func (ctl *PodCtl) generateObject(item v1.Pod) *Pod {
|
||||
return object
|
||||
}
|
||||
|
||||
func (ctl *PodCtl) listAndWatch() {
|
||||
func (ctl *PodCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *PodCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
|
||||
if db.HasTable(&Pod{}) {
|
||||
db.DropTable(&Pod{})
|
||||
|
||||
}
|
||||
|
||||
db = db.CreateTable(&Pod{})
|
||||
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Core().V1().Pods().Informer()
|
||||
lister := kubeInformerFactory.Core().V1().Pods().Lister()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
ctl.initListerAndInformer()
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, item := range list {
|
||||
@@ -225,6 +224,26 @@ func (ctl *PodCtl) listAndWatch() {
|
||||
db.Create(obj)
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *PodCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
|
||||
return 0
|
||||
}
|
||||
return len(list)
|
||||
}
|
||||
|
||||
func (ctl *PodCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
|
||||
ctl.lister = informerFactory.Core().V1().Pods().Lister()
|
||||
|
||||
informer := informerFactory.Core().V1().Pods().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
object := obj.(*v1.Pod)
|
||||
@@ -249,7 +268,7 @@ func (ctl *PodCtl) listAndWatch() {
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *PodCtl) CountWithConditions(conditions string) int {
|
||||
@@ -270,13 +289,13 @@ func (ctl *PodCtl) ListWithConditions(conditions string, paging *Paging) (int, i
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *PodCtl) Count(namespace string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
if len(namespace) == 0 {
|
||||
db.Model(&Pod{}).Count(&count)
|
||||
} else {
|
||||
db.Model(&Pod{}).Where("namespace = ?", namespace).Count(&count)
|
||||
}
|
||||
return count
|
||||
}
|
||||
//func (ctl *PodCtl) Count(namespace string) int {
|
||||
// var count int
|
||||
// db := ctl.DB
|
||||
// if len(namespace) == 0 {
|
||||
// db.Model(&Pod{}).Count(&count)
|
||||
// } else {
|
||||
// db.Model(&Pod{}).Where("namespace = ?", namespace).Count(&count)
|
||||
// }
|
||||
// return count
|
||||
//}
|
||||
|
||||
@@ -64,30 +64,21 @@ func (ctl *PvcCtl) generateObject(item *v1.PersistentVolumeClaim) *Pvc {
|
||||
return object
|
||||
}
|
||||
|
||||
func (ctl *PvcCtl) listAndWatch() {
|
||||
defer func() {
|
||||
close(ctl.aliveChan)
|
||||
if err := recover(); err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
func (ctl *PvcCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *PvcCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
|
||||
if db.HasTable(&Pvc{}) {
|
||||
db.DropTable(&Pvc{})
|
||||
|
||||
}
|
||||
|
||||
db = db.CreateTable(&Pvc{})
|
||||
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Informer()
|
||||
lister := kubeInformerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
ctl.initListerAndInformer()
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
@@ -96,9 +87,28 @@ func (ctl *PvcCtl) listAndWatch() {
|
||||
for _, item := range list {
|
||||
obj := ctl.generateObject(item)
|
||||
db.Create(obj)
|
||||
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *PvcCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
|
||||
return 0
|
||||
}
|
||||
return len(list)
|
||||
}
|
||||
|
||||
func (ctl *PvcCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
|
||||
ctl.lister = informerFactory.Core().V1().PersistentVolumeClaims().Lister()
|
||||
|
||||
informer := informerFactory.Core().V1().PersistentVolumeClaims().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
|
||||
@@ -119,7 +129,7 @@ func (ctl *PvcCtl) listAndWatch() {
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *PvcCtl) CountWithConditions(conditions string) int {
|
||||
@@ -153,13 +163,13 @@ func (ctl *PvcCtl) ListWithConditions(conditions string, paging *Paging) (int, i
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *PvcCtl) Count(namespace string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
if len(namespace) == 0 {
|
||||
db.Model(&Pvc{}).Count(&count)
|
||||
} else {
|
||||
db.Model(&Pvc{}).Where("namespace = ?", namespace).Count(&count)
|
||||
}
|
||||
return count
|
||||
}
|
||||
//func (ctl *PvcCtl) Count(namespace string) int {
|
||||
// var count int
|
||||
// db := ctl.DB
|
||||
// if len(namespace) == 0 {
|
||||
// db.Model(&Pvc{}).Count(&count)
|
||||
// } else {
|
||||
// db.Model(&Pvc{}).Where("namespace = ?", namespace).Count(&count)
|
||||
// }
|
||||
// return count
|
||||
//}
|
||||
|
||||
@@ -29,7 +29,7 @@ import (
|
||||
|
||||
func (ctl *RoleCtl) generateObject(item v1.Role) *Role {
|
||||
name := item.Name
|
||||
if strings.HasPrefix(name, "system:") {
|
||||
if strings.HasPrefix(name, systemPrefix) {
|
||||
return nil
|
||||
}
|
||||
namespace := item.Namespace
|
||||
@@ -43,30 +43,21 @@ func (ctl *RoleCtl) generateObject(item v1.Role) *Role {
|
||||
return object
|
||||
}
|
||||
|
||||
func (ctl *RoleCtl) listAndWatch() {
|
||||
defer func() {
|
||||
close(ctl.aliveChan)
|
||||
if err := recover(); err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
func (ctl *RoleCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *RoleCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
|
||||
if db.HasTable(&Role{}) {
|
||||
db.DropTable(&Role{})
|
||||
|
||||
}
|
||||
|
||||
db = db.CreateTable(&Role{})
|
||||
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Rbac().V1().Roles().Informer()
|
||||
lister := kubeInformerFactory.Rbac().V1().Roles().Lister()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
ctl.initListerAndInformer()
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
@@ -74,10 +65,39 @@ func (ctl *RoleCtl) listAndWatch() {
|
||||
|
||||
for _, item := range list {
|
||||
obj := ctl.generateObject(*item)
|
||||
db.Create(obj)
|
||||
|
||||
if obj != nil {
|
||||
db.Create(obj)
|
||||
}
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *RoleCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
|
||||
return 0
|
||||
}
|
||||
|
||||
count := 0
|
||||
for _, item := range list {
|
||||
if !strings.HasPrefix(item.Name, systemPrefix) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
func (ctl *RoleCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
|
||||
ctl.lister = informerFactory.Rbac().V1().Roles().Lister()
|
||||
|
||||
informer := informerFactory.Rbac().V1().Roles().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
|
||||
@@ -103,7 +123,7 @@ func (ctl *RoleCtl) listAndWatch() {
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *RoleCtl) CountWithConditions(conditions string) int {
|
||||
@@ -124,9 +144,9 @@ func (ctl *RoleCtl) ListWithConditions(conditions string, paging *Paging) (int,
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *RoleCtl) Count(namespace string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
db.Model(&Role{}).Where("namespace = ?", namespace).Count(&count)
|
||||
return count
|
||||
}
|
||||
//func (ctl *RoleCtl) Count(namespace string) int {
|
||||
// var count int
|
||||
// db := ctl.DB
|
||||
// db.Model(&Role{}).Where("namespace = ?", namespace).Count(&count)
|
||||
// return count
|
||||
//}
|
||||
|
||||
@@ -27,45 +27,46 @@ import (
|
||||
)
|
||||
|
||||
type resourceControllers struct {
|
||||
controllers map[string]Controller
|
||||
Controllers map[string]Controller
|
||||
k8sClient *kubernetes.Clientset
|
||||
}
|
||||
|
||||
var stopChan chan struct{}
|
||||
var rec resourceControllers
|
||||
var ResourceControllers resourceControllers
|
||||
|
||||
func (rec *resourceControllers) runContoller(name string) {
|
||||
var ctl Controller
|
||||
attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan, aliveChan: make(chan struct{})}
|
||||
attr := CommonAttribute{DB: client.NewDBClient(), K8sClient: rec.k8sClient, stopChan: stopChan,
|
||||
aliveChan: make(chan struct{}), Name: name}
|
||||
switch name {
|
||||
case Deployments:
|
||||
ctl = &DeploymentCtl{attr}
|
||||
ctl = &DeploymentCtl{CommonAttribute: attr}
|
||||
case Statefulsets:
|
||||
ctl = &StatefulsetCtl{attr}
|
||||
ctl = &StatefulsetCtl{CommonAttribute: attr}
|
||||
case Daemonsets:
|
||||
ctl = &DaemonsetCtl{attr}
|
||||
ctl = &DaemonsetCtl{CommonAttribute: attr}
|
||||
case Ingresses:
|
||||
ctl = &IngressCtl{attr}
|
||||
ctl = &IngressCtl{CommonAttribute: attr}
|
||||
case PersistentVolumeClaim:
|
||||
ctl = &PvcCtl{attr}
|
||||
ctl = &PvcCtl{CommonAttribute: attr}
|
||||
case Roles:
|
||||
ctl = &RoleCtl{attr}
|
||||
ctl = &RoleCtl{CommonAttribute: attr}
|
||||
case ClusterRoles:
|
||||
ctl = &ClusterRoleCtl{attr}
|
||||
ctl = &ClusterRoleCtl{CommonAttribute: attr}
|
||||
case Services:
|
||||
ctl = &ServiceCtl{attr}
|
||||
ctl = &ServiceCtl{CommonAttribute: attr}
|
||||
case Pods:
|
||||
ctl = &PodCtl{attr}
|
||||
ctl = &PodCtl{CommonAttribute: attr}
|
||||
case Namespaces:
|
||||
ctl = &NamespaceCtl{attr}
|
||||
ctl = &NamespaceCtl{CommonAttribute: attr}
|
||||
case StorageClasses:
|
||||
ctl = &StorageClassCtl{attr}
|
||||
ctl = &StorageClassCtl{CommonAttribute: attr}
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
rec.controllers[name] = ctl
|
||||
go ctl.listAndWatch()
|
||||
rec.Controllers[name] = ctl
|
||||
go listAndWatch(ctl)
|
||||
|
||||
}
|
||||
|
||||
@@ -93,22 +94,23 @@ func Run() {
|
||||
stopChan := make(chan struct{})
|
||||
defer close(stopChan)
|
||||
|
||||
rec = resourceControllers{k8sClient: client.NewK8sClient(), controllers: make(map[string]Controller)}
|
||||
k8sClient := client.NewK8sClient()
|
||||
ResourceControllers = resourceControllers{k8sClient: k8sClient, Controllers: make(map[string]Controller)}
|
||||
|
||||
for _, item := range []string{Deployments, Statefulsets, Daemonsets, PersistentVolumeClaim, Pods, Services,
|
||||
Ingresses, Roles, ClusterRoles, Namespaces, StorageClasses} {
|
||||
rec.runContoller(item)
|
||||
ResourceControllers.runContoller(item)
|
||||
}
|
||||
|
||||
go dbHealthCheck(client.NewDBClient())
|
||||
|
||||
for {
|
||||
for ctlName, controller := range rec.controllers {
|
||||
for ctlName, controller := range ResourceControllers.Controllers {
|
||||
select {
|
||||
case _, isClose := <-controller.chanAlive():
|
||||
if !isClose {
|
||||
glog.Errorf("controller %s have stopped, restart it", ctlName)
|
||||
rec.runContoller(ctlName)
|
||||
ResourceControllers.runContoller(ctlName)
|
||||
}
|
||||
default:
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
func (ctl *ServiceCtl) loadBalancerStatusStringer(item v1.Service) string {
|
||||
func loadBalancerStatusStringer(item v1.Service) string {
|
||||
ingress := item.Status.LoadBalancer.Ingress
|
||||
result := sets.NewString()
|
||||
for i := range ingress {
|
||||
@@ -41,7 +41,7 @@ func (ctl *ServiceCtl) loadBalancerStatusStringer(item v1.Service) string {
|
||||
return r
|
||||
}
|
||||
|
||||
func (ctl *ServiceCtl) getExternalIp(item v1.Service) string {
|
||||
func getExternalIp(item v1.Service) string {
|
||||
switch item.Spec.Type {
|
||||
case "ClusterIP", "NodePort":
|
||||
if len(item.Spec.ExternalIPs) > 0 {
|
||||
@@ -51,7 +51,7 @@ func (ctl *ServiceCtl) getExternalIp(item v1.Service) string {
|
||||
return item.Spec.ExternalName
|
||||
|
||||
case "LoadBalancer":
|
||||
lbIps := ctl.loadBalancerStatusStringer(item)
|
||||
lbIps := loadBalancerStatusStringer(item)
|
||||
if len(item.Spec.ExternalIPs) > 0 {
|
||||
results := []string{}
|
||||
if len(lbIps) > 0 {
|
||||
@@ -68,12 +68,11 @@ func (ctl *ServiceCtl) getExternalIp(item v1.Service) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (ctl *ServiceCtl) generateObject(item v1.Service) *Service {
|
||||
|
||||
func generateSvcObject(item v1.Service) *Service {
|
||||
name := item.Name
|
||||
namespace := item.Namespace
|
||||
createTime := item.CreationTimestamp.Time
|
||||
externalIp := ctl.getExternalIp(item)
|
||||
externalIp := getExternalIp(item)
|
||||
serviceType := item.Spec.Type
|
||||
vip := item.Spec.ClusterIP
|
||||
ports := ""
|
||||
@@ -129,17 +128,18 @@ func (ctl *ServiceCtl) generateObject(item v1.Service) *Service {
|
||||
}
|
||||
|
||||
return object
|
||||
|
||||
}
|
||||
|
||||
func (ctl *ServiceCtl) listAndWatch() {
|
||||
defer func() {
|
||||
close(ctl.aliveChan)
|
||||
if err := recover(); err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
func (ctl *ServiceCtl) generateObject(item v1.Service) *Service {
|
||||
return generateSvcObject(item)
|
||||
}
|
||||
|
||||
func (ctl *ServiceCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *ServiceCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
|
||||
if db.HasTable(&Service{}) {
|
||||
@@ -148,12 +148,8 @@ func (ctl *ServiceCtl) listAndWatch() {
|
||||
|
||||
db = db.CreateTable(&Service{})
|
||||
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Core().V1().Services().Informer()
|
||||
lister := kubeInformerFactory.Core().V1().Services().Lister()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
ctl.initListerAndInformer()
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
@@ -164,6 +160,25 @@ func (ctl *ServiceCtl) listAndWatch() {
|
||||
db.Create(obj)
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *ServiceCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
|
||||
return 0
|
||||
}
|
||||
return len(list)
|
||||
}
|
||||
|
||||
func (ctl *ServiceCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
ctl.lister = informerFactory.Core().V1().Services().Lister()
|
||||
|
||||
informer := informerFactory.Core().V1().Services().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
|
||||
@@ -185,7 +200,7 @@ func (ctl *ServiceCtl) listAndWatch() {
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *ServiceCtl) CountWithConditions(conditions string) int {
|
||||
@@ -205,14 +220,3 @@ func (ctl *ServiceCtl) ListWithConditions(conditions string, paging *Paging) (in
|
||||
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *ServiceCtl) Count(namespace string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
if len(namespace) == 0 {
|
||||
db.Model(&Service{}).Count(&count)
|
||||
} else {
|
||||
db.Model(&Service{}).Where("namespace = ?", namespace).Count(&count)
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
@@ -62,27 +62,21 @@ func (ctl *StatefulsetCtl) generateObject(item v1.StatefulSet) *Statefulset {
|
||||
return statefulSetObject
|
||||
}
|
||||
|
||||
func (ctl *StatefulsetCtl) listAndWatch() {
|
||||
defer func() {
|
||||
close(ctl.aliveChan)
|
||||
if err := recover(); err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
func (ctl *StatefulsetCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *StatefulsetCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
|
||||
if db.HasTable(&Statefulset{}) {
|
||||
db.DropTable(&Statefulset{})
|
||||
}
|
||||
|
||||
db = db.CreateTable(&Statefulset{})
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Apps().V1().StatefulSets().Informer()
|
||||
lister := kubeInformerFactory.Apps().V1().StatefulSets().Lister()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
ctl.initListerAndInformer()
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
@@ -91,9 +85,28 @@ func (ctl *StatefulsetCtl) listAndWatch() {
|
||||
for _, item := range list {
|
||||
obj := ctl.generateObject(*item)
|
||||
db.Create(obj)
|
||||
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *StatefulsetCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
|
||||
return 0
|
||||
}
|
||||
return len(list)
|
||||
}
|
||||
|
||||
func (ctl *StatefulsetCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
|
||||
ctl.lister = informerFactory.Apps().V1().StatefulSets().Lister()
|
||||
|
||||
informer := informerFactory.Apps().V1().StatefulSets().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
|
||||
@@ -115,7 +128,7 @@ func (ctl *StatefulsetCtl) listAndWatch() {
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *StatefulsetCtl) CountWithConditions(conditions string) int {
|
||||
@@ -135,14 +148,3 @@ func (ctl *StatefulsetCtl) ListWithConditions(conditions string, paging *Paging)
|
||||
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *StatefulsetCtl) Count(namespace string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
if len(namespace) == 0 {
|
||||
db.Model(&Statefulset{}).Count(&count)
|
||||
} else {
|
||||
db.Model(&Statefulset{}).Where("namespace = ?", namespace).Count(&count)
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
@@ -46,15 +46,11 @@ func (ctl *StorageClassCtl) generateObject(item v1.StorageClass) *StorageClass {
|
||||
return object
|
||||
}
|
||||
|
||||
func (ctl *StorageClassCtl) listAndWatch() {
|
||||
defer func() {
|
||||
close(ctl.aliveChan)
|
||||
if err := recover(); err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
func (ctl *StorageClassCtl) Name() string {
|
||||
return ctl.CommonAttribute.Name
|
||||
}
|
||||
|
||||
func (ctl *StorageClassCtl) sync(stopChan chan struct{}) {
|
||||
db := ctl.DB
|
||||
|
||||
if db.HasTable(&StorageClass{}) {
|
||||
@@ -63,12 +59,8 @@ func (ctl *StorageClassCtl) listAndWatch() {
|
||||
|
||||
db = db.CreateTable(&StorageClass{})
|
||||
|
||||
k8sClient := ctl.K8sClient
|
||||
kubeInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*resyncCircle)
|
||||
informer := kubeInformerFactory.Storage().V1().StorageClasses().Informer()
|
||||
lister := kubeInformerFactory.Storage().V1().StorageClasses().Lister()
|
||||
|
||||
list, err := lister.List(labels.Everything())
|
||||
ctl.initListerAndInformer()
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return
|
||||
@@ -77,12 +69,30 @@ func (ctl *StorageClassCtl) listAndWatch() {
|
||||
for _, item := range list {
|
||||
obj := ctl.generateObject(*item)
|
||||
db.Create(obj)
|
||||
|
||||
}
|
||||
|
||||
ctl.informer.Run(stopChan)
|
||||
}
|
||||
|
||||
func (ctl *StorageClassCtl) total() int {
|
||||
list, err := ctl.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("count %s falied, reason:%s", err, ctl.Name())
|
||||
return 0
|
||||
}
|
||||
return len(list)
|
||||
}
|
||||
|
||||
func (ctl *StorageClassCtl) initListerAndInformer() {
|
||||
db := ctl.DB
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(ctl.K8sClient, time.Second*resyncCircle)
|
||||
|
||||
ctl.lister = informerFactory.Storage().V1().StorageClasses().Lister()
|
||||
|
||||
informer := informerFactory.Storage().V1().StorageClasses().Informer()
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
|
||||
object := obj.(*v1.StorageClass)
|
||||
mysqlObject := ctl.generateObject(*object)
|
||||
db.Create(mysqlObject)
|
||||
@@ -101,8 +111,7 @@ func (ctl *StorageClassCtl) listAndWatch() {
|
||||
},
|
||||
})
|
||||
|
||||
informer.Run(ctl.stopChan)
|
||||
|
||||
ctl.informer = informer
|
||||
}
|
||||
|
||||
func (ctl *StorageClassCtl) CountWithConditions(conditions string) int {
|
||||
@@ -122,17 +131,10 @@ func (ctl *StorageClassCtl) ListWithConditions(conditions string, paging *Paging
|
||||
|
||||
for index, storageClass := range list {
|
||||
name := storageClass.Name
|
||||
pvcCtl := PvcCtl{CommonAttribute{K8sClient: ctl.K8sClient, DB: ctl.DB}}
|
||||
pvcCtl := ResourceControllers.Controllers[PersistentVolumeClaim]
|
||||
|
||||
list[index].Count = pvcCtl.CountWithConditions(fmt.Sprintf("storage_class=\"%s\"", name))
|
||||
}
|
||||
|
||||
return total, list, nil
|
||||
}
|
||||
|
||||
func (ctl *StorageClassCtl) Count(name string) int {
|
||||
var count int
|
||||
db := ctl.DB
|
||||
db.Model(&StorageClass{}).Count(&count)
|
||||
return count
|
||||
}
|
||||
|
||||
@@ -26,10 +26,16 @@ import (
|
||||
"github.com/jinzhu/gorm"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
appV1 "k8s.io/client-go/listers/apps/v1"
|
||||
coreV1 "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/listers/extensions/v1beta1"
|
||||
rbacV1 "k8s.io/client-go/listers/rbac/v1"
|
||||
storageV1 "k8s.io/client-go/listers/storage/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
const (
|
||||
resyncCircle = 180
|
||||
resyncCircle = 600
|
||||
Stopped = "stopped"
|
||||
PvcPending = "Pending"
|
||||
Running = "running"
|
||||
@@ -57,6 +63,7 @@ const (
|
||||
ClusterRoles = "cluster-roles"
|
||||
Services = "services"
|
||||
StorageClasses = "storage-classes"
|
||||
Applications = "applications"
|
||||
)
|
||||
|
||||
type Annotation struct {
|
||||
@@ -163,10 +170,18 @@ func (Pvc) TableName() string {
|
||||
return tablePersistentVolumeClaim
|
||||
}
|
||||
|
||||
type ingressRule struct {
|
||||
Host string `json:"host"`
|
||||
Path string `json:"path"`
|
||||
Service string `json:"service"`
|
||||
Port int32 `json:"port"`
|
||||
}
|
||||
|
||||
type Ingress struct {
|
||||
Name string `gorm:"primary_key" json:"name"`
|
||||
Namespace string `gorm:"primary_key" json:"namespace"`
|
||||
Ip string `json:"ip,omitempty"`
|
||||
Rules string `json:"rules, omitempty"`
|
||||
TlsTermination string `json:"tlsTermination,omitempty"`
|
||||
Annotation Annotation `json:"annotations"`
|
||||
CreateTime time.Time `gorm:"column:createTime" json:"createTime,omitempty"`
|
||||
@@ -273,16 +288,19 @@ type Paging struct {
|
||||
}
|
||||
|
||||
type Controller interface {
|
||||
listAndWatch()
|
||||
chanStop() chan struct{}
|
||||
chanAlive() chan struct{}
|
||||
Count(namespace string) int
|
||||
CountWithConditions(condition string) int
|
||||
total() int
|
||||
initListerAndInformer()
|
||||
sync(stopChan chan struct{})
|
||||
Name() string
|
||||
ListWithConditions(condition string, paging *Paging) (int, interface{}, error)
|
||||
}
|
||||
|
||||
type CommonAttribute struct {
|
||||
K8sClient *kubernetes.Clientset
|
||||
Name string
|
||||
DB *gorm.DB
|
||||
stopChan chan struct{}
|
||||
aliveChan chan struct{}
|
||||
@@ -300,44 +318,66 @@ func (ca *CommonAttribute) chanAlive() chan struct{} {
|
||||
|
||||
type DeploymentCtl struct {
|
||||
CommonAttribute
|
||||
lister appV1.DeploymentLister
|
||||
informer cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
type StatefulsetCtl struct {
|
||||
CommonAttribute
|
||||
lister appV1.StatefulSetLister
|
||||
informer cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
type DaemonsetCtl struct {
|
||||
CommonAttribute
|
||||
lister appV1.DaemonSetLister
|
||||
informer cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
type ServiceCtl struct {
|
||||
CommonAttribute
|
||||
lister coreV1.ServiceLister
|
||||
informer cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
type PvcCtl struct {
|
||||
CommonAttribute
|
||||
lister coreV1.PersistentVolumeClaimLister
|
||||
informer cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
type PodCtl struct {
|
||||
CommonAttribute
|
||||
lister coreV1.PodLister
|
||||
informer cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
type IngressCtl struct {
|
||||
lister v1beta1.IngressLister
|
||||
informer cache.SharedIndexInformer
|
||||
CommonAttribute
|
||||
}
|
||||
|
||||
type NamespaceCtl struct {
|
||||
CommonAttribute
|
||||
lister coreV1.NamespaceLister
|
||||
informer cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
type StorageClassCtl struct {
|
||||
lister storageV1.StorageClassLister
|
||||
informer cache.SharedIndexInformer
|
||||
CommonAttribute
|
||||
}
|
||||
|
||||
type RoleCtl struct {
|
||||
lister rbacV1.RoleLister
|
||||
informer cache.SharedIndexInformer
|
||||
CommonAttribute
|
||||
}
|
||||
|
||||
type ClusterRoleCtl struct {
|
||||
lister rbacV1.ClusterRoleLister
|
||||
informer cache.SharedIndexInformer
|
||||
CommonAttribute
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user