add kubectl/kubeconfig/quota/terminal api, but quota api is tempoary, will be changed as soon

This commit is contained in:
jenkins
2018-06-08 00:50:21 -04:00
committed by jeff
parent e9957a1aa7
commit 7140181e94
55 changed files with 8295 additions and 139 deletions

View File

@@ -22,14 +22,16 @@ import (
"kubesphere.io/kubesphere/pkg/apis/v1alpha/components"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/containers"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/iam"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/kubeconfig"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/kubectl"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/nodes"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/pods"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/quota"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/registries"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/routes"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/storage"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/terminal"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/users"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/volumes"
"kubesphere.io/kubesphere/pkg/apis/v1alpha/workloadstatus"
)
func init() {
@@ -37,8 +39,6 @@ func init() {
ws := new(restful.WebService)
ws.Path("/api/v1alpha1")
kubeconfig.Register(ws, "/namespaces/{namespace}/kubeconfig")
kubectl.Register(ws, "/namespaces/{namespace}/kubectl")
registries.Register(ws, "/registries")
storage.Register(ws, "/storage")
volumes.Register(ws, "/volumes")
@@ -47,10 +47,15 @@ func init() {
containers.Register(ws)
iam.Register(ws)
components.Register(ws, "/components")
routes.Register(ws)
user.Register(ws, "/users/{user}")
terminal.Register(ws, "/namespaces/{namespace}/pod/{pod}/shell/{container}")
workloadstatus.Register(ws, "/status")
quota.Register(ws, "/quota")
// add webservice to default container
restful.Add(ws)
// add websocket handler to default container
terminal.RegisterWebSocketHandler(restful.DefaultContainer, "/api/v1alpha1/sockjs/")
}

View File

@@ -1,54 +0,0 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubectl
import (
"net/http"
"github.com/emicklei/go-restful"
"kubesphere.io/kubesphere/pkg/models"
)
func Register(ws *restful.WebService, subPath string) {
ws.Route(ws.GET(subPath).Consumes("*/*").Produces(restful.MIME_JSON).To(handleKubectl).Doc("use to "+
"get a kubectl pod in specified namespaces").Param(ws.PathParameter("namespace",
"namespace").DataType("string")).Do(returns200, returns500))
}
func handleKubectl(req *restful.Request, resp *restful.Response) {
ns := req.PathParameter("namespace")
kubectlPod, err := models.GetKubectlPod(ns)
if err != nil {
resp.WriteError(http.StatusInternalServerError, err)
}
resp.WriteEntity(kubectlPod)
}
func returns200(b *restful.RouteBuilder) {
b.Returns(http.StatusOK, "OK", nil)
}
func returns500(b *restful.RouteBuilder) {
b.Returns(http.StatusInternalServerError, "fail", nil)
}

View File

@@ -0,0 +1,54 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package quota
import (
"net/http"
"github.com/emicklei/go-restful"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/models"
)
func Register(ws *restful.WebService, subPath string) {
ws.Route(ws.GET(subPath).To(getClusterQuota).Produces(restful.MIME_JSON))
ws.Route(ws.GET(subPath + "/namespaces/{namespace}").To(getNamespaceQuota).Produces(restful.MIME_JSON))
}
func getNamespaceQuota(req *restful.Request, resp *restful.Response) {
namespace := req.PathParameter("namespace")
quota, err := models.GetNamespaceQuota(namespace)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
}
resp.WriteEntity(quota)
}
func getClusterQuota(req *restful.Request, resp *restful.Response) {
quota, err := models.GetClusterQuota()
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
}
resp.WriteEntity(quota)
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package kubeconfig
package terminal
import (
"net/http"
@@ -22,30 +22,23 @@ import (
"kubesphere.io/kubesphere/pkg/models"
)
const DefaultServiceAccount = "default"
type Config struct {
Certificate string
Server string
User string
Token string
}
func Register(ws *restful.WebService, subPath string) {
ws.Route(ws.GET(subPath).To(handleKubeconfig))
ws.Route(ws.GET(subPath).To(handleExecShell))
}
func handleKubeconfig(req *restful.Request, resp *restful.Response) {
ns := req.PathParameter("namespace")
kubectlConfig, err := models.GetKubeConfig(ns)
func handleExecShell(req *restful.Request, resp *restful.Response) {
res, err := models.HandleExecShell(req)
if err != nil {
resp.WriteError(http.StatusInternalServerError, err)
}
resp.WriteEntity(kubectlConfig)
resp.WriteEntity(res)
}
func RegisterWebSocketHandler(container *restful.Container, path string) {
handler := models.CreateTerminalHandler(path[0 : len(path)-1])
container.Handle(path, handler)
}

View File

@@ -0,0 +1,103 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package user
import (
"net/http"
"github.com/emicklei/go-restful"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/models"
)
func Register(ws *restful.WebService, subPath string) {
ws.Route(ws.POST(subPath).To(createUser).Consumes("*/*").Produces(restful.MIME_JSON))
ws.Route(ws.DELETE(subPath).To(delUser).Produces(restful.MIME_JSON))
ws.Route(ws.GET(subPath + "/kubectl").To(getKubectl).Produces(restful.MIME_JSON))
ws.Route(ws.GET(subPath + "/kubeconfig").To(getKubeconfig).Produces(restful.MIME_JSON))
}
func createUser(req *restful.Request, resp *restful.Response) {
user := req.PathParameter("user")
err := models.CreateKubeConfig(user)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
return
}
err = models.CreateKubectlPod(user)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
return
}
resp.WriteEntity(constants.MessageResponse{Message: "successfully created"})
}
func delUser(req *restful.Request, resp *restful.Response) {
user := req.PathParameter("user")
err := models.DelKubectlPod(user)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
return
}
err = models.DelKubeConfig(user)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
return
}
resp.WriteEntity(constants.MessageResponse{Message: "successfully deleted"})
}
func getKubectl(req *restful.Request, resp *restful.Response) {
user := req.PathParameter("user")
kubectlPod, err := models.GetKubectlPod(user)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
return
}
resp.WriteEntity(kubectlPod)
}
func getKubeconfig(req *restful.Request, resp *restful.Response) {
user := req.PathParameter("user")
kubectlConfig, err := models.GetKubeConfig(user)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
return
}
resp.WriteEntity(kubectlConfig)
}

View File

@@ -0,0 +1,49 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloadstatus
import (
"net/http"
"github.com/emicklei/go-restful"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/models"
)
func Register(ws *restful.WebService, subPath string) {
ws.Route(ws.GET(subPath).To(getClusterStatus).Produces(restful.MIME_JSON))
ws.Route(ws.GET(subPath + "/namespaces/{namespace}").To(getNamespaceStatus).Produces(restful.MIME_JSON))
}
func getClusterStatus(req *restful.Request, resp *restful.Response) {
res, err := models.GetClusterResourceStatus()
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
}
resp.WriteEntity(res)
}
func getNamespaceStatus(req *restful.Request, resp *restful.Response) {
res, err := models.GetNamespacesResourceStatus(req.PathParameter("namespace"))
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
}
resp.WriteEntity(res)
}

View File

@@ -19,13 +19,20 @@ package app
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"github.com/emicklei/go-restful"
"github.com/golang/glog"
"k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
_ "kubesphere.io/kubesphere/pkg/apis/v1alpha"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/models/jobs/cronjobs"
"kubesphere.io/kubesphere/pkg/models/jobs/resources"
"kubesphere.io/kubesphere/pkg/options"
)
@@ -53,7 +60,29 @@ func newKubeSphereServer(options *options.ServerRunOptions) *kubeSphereServer {
return &s
}
func preCheck() error {
k8sClient := client.NewK8sClient()
nsList, err := k8sClient.CoreV1().Namespaces().List(meta_v1.ListOptions{})
if err != nil {
return err
}
for _, ns := range nsList.Items {
if ns.Name == constants.NameSpace {
return nil
}
}
namespace := v1.Namespace{ObjectMeta: meta_v1.ObjectMeta{Name: constants.NameSpace}}
_, err = k8sClient.CoreV1().Namespaces().Create(&namespace)
return err
}
func (server *kubeSphereServer) run() {
err := preCheck()
if err != nil {
glog.Error(err)
return
}
go resources.Run()
go cronjobs.Run()
if len(server.certFile) > 0 && len(server.keyFile) > 0 {
servingCert, err := tls.LoadX509KeyPair(server.certFile, server.keyFile)

130
pkg/client/etcdclient.go Normal file
View File

@@ -0,0 +1,130 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"context"
"fmt"
"time"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/golang/glog"
"kubesphere.io/kubesphere/pkg/options"
)
type EtcdClient struct {
cli *clientv3.Client
}
func (cli EtcdClient) Put(key, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10)*time.Second)
_, err := cli.cli.Put(ctx, key, value)
cancel()
if err != nil {
switch err {
case context.Canceled:
glog.Errorf("ctx is canceled by another routine: %v\n", err)
case context.DeadlineExceeded:
glog.Errorf("ctx is attached with a deadline is exceeded: %v\n", err)
case rpctypes.ErrEmptyKey:
glog.Errorf("client-side error: %v\n", err)
default:
glog.Errorf("bad cluster endpoints, which are not etcd servers: %v\n", err)
}
return err
}
return nil
}
func (cli EtcdClient) Get(key string) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10)*time.Second)
resp, err := cli.cli.Get(ctx, key)
cancel()
if err != nil {
glog.Error(err)
return nil, err
}
if len(resp.Kvs) == 0 {
return nil, fmt.Errorf("empty value of key: %s", key)
}
return resp.Kvs[0].Value, nil
}
func (cli EtcdClient) Close() {
cli.cli.Close()
}
func newEtcdClientWithHttps(certFile, keyFile, caFile string, endpoints []string, dialTimeout int) (*clientv3.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
TrustedCAFile: caFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
glog.Errorln(err)
return nil, err
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: time.Duration(dialTimeout) * time.Second,
TLS: tlsConfig,
})
if err != nil {
glog.Errorln(err)
return nil, err
}
return cli, nil // make sure to close the client
}
func newEtcdClient(endpoints []string, dialTimeout int) (*clientv3.Client, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: time.Duration(dialTimeout) * time.Second,
})
if err != nil {
glog.Errorln(err)
return nil, err
}
return cli, nil // make sure to close the client
}
func NewEtcdClient() (*EtcdClient, error) {
var cli *clientv3.Client
var err error
cert := options.ServerOptions.GetEtcdCertFile()
key := options.ServerOptions.GetEtcdKeyFile()
ca := options.ServerOptions.GetEtcdCaFile()
endpoints := options.ServerOptions.GetEtcdEndPoints()
if len(cert) > 0 && len(key) > 0 && len(ca) > 0 {
cli, err = newEtcdClientWithHttps(cert, key, ca, endpoints, 20)
} else {
cli, err = newEtcdClient(endpoints, 20)
}
if err != nil {
return nil, err
}
return &EtcdClient{cli}, nil
}

View File

@@ -25,8 +25,14 @@ type PageableResponse struct {
TotalCount int `json:"total_count"`
}
const APIVERSION = "v1alpha1"
const KIND = "kubesphere"
const DATA_HOME = "/etc/kubesphere"
const INGRESS_CONTROLLER_FOLDER = DATA_HOME + "/ingress-controller"
const (
APIVERSION = "v1alpha1"
KIND = "kubesphere"
Root = "/kubesphere"
UpdateCircle = 60
QuotaKey = "resource-quota"
WorkloadStatusKey = "workload-status"
NameSpace = "kubesphere"
DATA_HOME = "/etc/kubesphere"
INGRESS_CONTROLLER_FOLDER = DATA_HOME + "/ingress-controller"
)

View File

@@ -0,0 +1,231 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cronjobs
import (
"encoding/json"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/models/jobs/resources"
)
const (
pods = "count/pods"
daemonsets = "count/daemonsets.apps"
deployments = "count/deployments.apps"
ingress = "count/ingresses.extensions"
roles = "count/roles.rbac.authorization.k8s.io"
services = "count/services"
statefulsets = "count/statefulsets.apps"
persistentvolumeclaims = "persistentvolumeclaims"
)
type resourceUsage struct {
NameSpace string
Data v1.ResourceQuotaStatus
UpdateTimeStamp int64
}
type resourceQuotaWorker struct {
k8sClient *kubernetes.Clientset
resChan chan dataType
stopChan chan struct{}
}
func (ru resourceUsage) namespace() string {
return ru.NameSpace
}
type workloadList map[string][]resources.WorkLoadObject
type otherResourceList map[string][]resources.OtherResourceObject
type workload struct {
ResourceType string `json:"type"`
ResourceList workloadList `json:"lists"`
UpdateTimeStamp int64 `json:"updateTimestamp"`
}
type otherResource struct {
ResourceType string `json:"type"`
ResourceList otherResourceList `json:"lists"`
UpdateTimeStamp int64 `json:"updateTimestamp"`
}
var workLoads = []string{"deployments", "daemonsets", "statefulsets"}
var resourceMap = map[string]string{daemonsets: "daemonsets", deployments: "deployments", ingress: "ingresses",
roles: "roles", services: "services", statefulsets: "statefulsets", persistentvolumeclaims: "persistent-volume-claim", pods: "pods"}
func contain(items []string, item string) bool {
for _, v := range items {
if v == item {
return false
}
}
return true
}
func (rw *resourceQuotaWorker) getResourceusage(namespace, resourceName string) (int, error) {
etcdcli, err := client.NewEtcdClient()
if err != nil {
glog.Error(err)
return 0, err
}
defer etcdcli.Close()
key := constants.Root + "/" + resourceName
value, err := etcdcli.Get(key)
if err != nil {
glog.Error(err)
}
if contain(workLoads, resourceName) {
resourceStatus := workload{ResourceList: make(workloadList)}
err := json.Unmarshal(value, &resourceStatus)
if err != nil {
glog.Error(err)
return 0, nil
}
return len(resourceStatus.ResourceList[namespace]), nil
} else {
resourceStatus := otherResource{ResourceList: make(otherResourceList)}
err := json.Unmarshal(value, &resourceStatus)
if err != nil {
glog.Error(err)
return 0, err
}
return len(resourceStatus.ResourceList[namespace]), nil
}
return 0, nil
}
func (rw *resourceQuotaWorker) updateNamespaceQuota(tmpResourceList, resourceList v1.ResourceList) {
if tmpResourceList == nil {
tmpResourceList = resourceList
}
for resource, usage := range resourceList {
tmpUsage, exist := tmpResourceList[resource]
if !exist {
tmpResourceList[resource] = usage
}
if tmpUsage.Cmp(usage) == 1 {
tmpResourceList[resource] = usage
}
}
}
func (rw *resourceQuotaWorker) getNamespaceResourceUsageByQuota(namespace string) (*v1.ResourceQuotaStatus, error) {
quotaList, err := rw.k8sClient.CoreV1().ResourceQuotas(namespace).List(meta_v1.ListOptions{})
if err != nil || len(quotaList.Items) == 0 {
return nil, err
}
quotaStatus := v1.ResourceQuotaStatus{Hard: make(v1.ResourceList), Used: make(v1.ResourceList)}
for _, quota := range quotaList.Items {
rw.updateNamespaceQuota(quotaStatus.Hard, quota.Status.Hard)
rw.updateNamespaceQuota(quotaStatus.Used, quota.Status.Used)
}
return &quotaStatus, nil
}
func (rw *resourceQuotaWorker) getNamespaceQuota(namespace string) (v1.ResourceQuotaStatus, error) {
quota, err := rw.getNamespaceResourceUsageByQuota(namespace)
if err != nil {
return v1.ResourceQuotaStatus{}, err
}
if quota == nil {
quota = new(v1.ResourceQuotaStatus)
quota.Used = make(v1.ResourceList)
}
for k, v := range resourceMap {
if _, exist := quota.Used[v1.ResourceName(k)]; !exist {
used, err := rw.getResourceusage(namespace, v)
if err != nil {
continue
}
var quantity resource.Quantity
quantity.Set(int64(used))
quota.Used[v1.ResourceName(k)] = quantity
}
}
return *quota, nil
}
func (rw *resourceQuotaWorker) workOnce() {
clusterQuota := new(v1.ResourceQuotaStatus)
clusterQuota.Used = make(v1.ResourceList)
namespaces, err := rw.k8sClient.CoreV1().Namespaces().List(meta_v1.ListOptions{})
if err != nil {
glog.Error(err)
return
}
for _, ns := range namespaces.Items {
namespace := ns.Name
nsquota, err := rw.getNamespaceQuota(namespace)
if err != nil {
glog.Error(err)
return
}
res := resourceUsage{NameSpace: namespace, Data: nsquota, UpdateTimeStamp: time.Now().Unix()}
rw.resChan <- res
for k, v := range nsquota.Used {
tmp := clusterQuota.Used[k]
tmp.Add(v)
clusterQuota.Used[k] = tmp
}
}
var quantity resource.Quantity
quantity.Set(int64(len(namespaces.Items)))
clusterQuota.Used["count/namespaces"] = quantity
res := resourceUsage{NameSpace: "\"\"", Data: *clusterQuota, UpdateTimeStamp: time.Now().Unix()}
rw.resChan <- res
}
func (rw *resourceQuotaWorker) chanStop() chan struct{} {
return rw.stopChan
}
func (rw *resourceQuotaWorker) chanRes() chan dataType {
return rw.resChan
}

View File

@@ -0,0 +1,136 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cronjobs
import (
"encoding/json"
"time"
"github.com/golang/glog"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
)
var etcdClient *client.EtcdClient
var stopChan = make(chan struct{})
type dataType interface {
namespace() string
}
type Worker interface {
workOnce()
chanRes() chan dataType
chanStop() chan struct{}
}
func registerWorker(workers map[string]Worker, name string) {
glog.Infof("Register cronjob: %s", name)
k8sClient := client.NewK8sClient()
switch name {
case constants.WorkloadStatusKey:
worker := workloadWorker{k8sClient: k8sClient, stopChan: stopChan, resChan: make(chan dataType, 10)}
workers[constants.WorkloadStatusKey] = &worker
case constants.QuotaKey:
worker := resourceQuotaWorker{k8sClient: k8sClient, stopChan: stopChan, resChan: make(chan dataType, 10)}
workers[constants.QuotaKey] = &worker
}
}
func run(worker Worker) {
defer func() {
if err := recover(); err != nil {
glog.Error(err)
close(worker.chanRes())
}
}()
for {
select {
case <-worker.chanStop():
return
default:
break
}
worker.workOnce()
time.Sleep(time.Duration(constants.UpdateCircle) * time.Second)
}
}
func startWorks(workers map[string]Worker) {
for wokername, woker := range workers {
glog.Infof("cronjob %s start to work", wokername)
go run(woker)
}
}
func receiveResourceStatus(workers map[string]Worker) {
defer func() {
close(stopChan)
}()
for {
for name, worker := range workers {
select {
case res, ok := <-worker.chanRes():
if !ok {
glog.Errorf("cronjob:%s have stopped", name)
registerWorker(workers, name)
run(workers[name])
} else {
value, err := json.Marshal(res)
if err != nil {
glog.Error(err)
continue
}
key := constants.Root + "/" + name + "/" + res.namespace()
err = etcdClient.Put(key, string(value))
if err != nil {
glog.Error(err)
}
}
default:
continue
}
}
}
}
func Run() {
glog.Info("Begin to run cronjob")
var err error
etcdClient, err = client.NewEtcdClient()
if err != nil {
glog.Error(err)
}
defer etcdClient.Close()
workers := make(map[string]Worker)
workerList := []string{constants.QuotaKey, constants.WorkloadStatusKey}
for _, name := range workerList {
registerWorker(workers, name)
}
startWorks(workers)
receiveResourceStatus(workers)
}

View File

@@ -0,0 +1,104 @@
package cronjobs
import (
"encoding/json"
"time"
"github.com/golang/glog"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
)
var workLoadList = []string{"deployments", "daemonsets", "statefulsets"}
type workLoadStatus struct {
NameSpace string
Data map[string]int
UpdateTimeStamp int64
}
func (ws workLoadStatus) namespace() string {
return ws.NameSpace
}
type workloadWorker struct {
k8sClient *kubernetes.Clientset
resChan chan dataType
stopChan chan struct{}
}
func (ww *workloadWorker) GetNamespacesResourceStatus(namespace string) (map[string]int, error) {
cli, err := client.NewEtcdClient()
if err != nil {
glog.Error(err)
return nil, err
}
defer cli.Close()
res := make(map[string]int)
for _, resourceName := range workLoadList {
key := constants.Root + "/" + resourceName
value, err := cli.Get(key)
if err != nil {
continue
}
resourceStatus := workload{ResourceList: make(workloadList)}
err = json.Unmarshal(value, &resourceStatus)
if err != nil {
glog.Error(err)
return nil, err
}
notReady := 0
for _, v := range resourceStatus.ResourceList[namespace] {
if !v.Ready {
notReady++
}
}
res[resourceName] = notReady
}
return res, nil
}
func (ww workloadWorker) workOnce() {
namespaces, err := ww.k8sClient.CoreV1().Namespaces().List(meta_v1.ListOptions{})
if err != nil {
glog.Error(err)
}
resourceStatus := make(map[string]int)
for _, item := range namespaces.Items {
namespace := item.Name
namespacesResourceStatus, err := ww.GetNamespacesResourceStatus(namespace)
if err != nil {
glog.Error(err)
}
var ws = workLoadStatus{UpdateTimeStamp: time.Now().Unix(), Data: namespacesResourceStatus, NameSpace: namespace}
ww.resChan <- ws
for k, v := range namespacesResourceStatus {
resourceStatus[k] = v + resourceStatus[k]
}
}
var ws = workLoadStatus{UpdateTimeStamp: time.Now().Unix(), Data: resourceStatus, NameSpace: "\"\""}
ww.resChan <- ws
}
func (ww workloadWorker) chanRes() chan dataType {
return ww.resChan
}
func (ww workloadWorker) chanStop() chan struct{} {
return ww.stopChan
}

View File

@@ -0,0 +1,108 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"k8s.io/api/apps/v1beta2"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
type daemonset struct {
k8sClient *kubernetes.Clientset
}
func (ds *daemonset) list() (interface{}, error) {
daemonsetList, err := ds.k8sClient.AppsV1beta2().DaemonSets("").List(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return daemonsetList.Items, nil
}
func (ds *daemonset) getWatcher() (watch.Interface, error) {
watcher, err := ds.k8sClient.AppsV1beta2().DaemonSets("").Watch(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return watcher, nil
}
func (ds *daemonset) generateObject(item v1beta2.DaemonSet) WorkLoadObject {
var app string
var ready bool
name := item.Name
namespace := item.Namespace
availablePodNum := item.Status.CurrentNumberScheduled
desirePodNum := item.Status.DesiredNumberScheduled
createTime := item.CreationTimestamp
release := item.ObjectMeta.Labels["release"]
nodeSelector := item.Spec.Template.Spec.NodeSelector
chart := item.ObjectMeta.Labels["chart"]
if len(release) > 0 && len(chart) > 0 {
app = release + "/" + chart
} else {
app = "-"
}
if availablePodNum >= desirePodNum {
ready = true
} else {
ready = false
}
workloadObject := WorkLoadObject{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum,
App: app, CreateTime: createTime, Ready: ready, NodeSelector: nodeSelector}
return workloadObject
}
func (ds *daemonset) updateWithObject(status *ResourceStatus, item v1beta2.DaemonSet) {
namespace := item.Namespace
dsObject := ds.generateObject(item)
status.ResourceList.update(namespace, dsObject)
}
func (ds *daemonset) updateWithObjects(status *ResourceStatus, objects interface{}) {
if status.ResourceList == nil {
status.ResourceList = make(Resources)
}
items := objects.([]v1beta2.DaemonSet)
for _, item := range items {
ds.updateWithObject(status, item)
}
}
func (ds *daemonset) updateWithEvent(status *ResourceStatus, event watch.Event) {
object := event.Object.(*v1beta2.DaemonSet)
namespace := object.Namespace
daemonsetObject := ds.generateObject(*object)
if event.Type == watch.Deleted {
status.ResourceList.del(namespace, daemonsetObject)
return
}
ds.updateWithObject(status, *object)
}

View File

@@ -0,0 +1,113 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"k8s.io/api/apps/v1beta2"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
type deployment struct {
k8sClient *kubernetes.Clientset
}
func (deploy *deployment) list() (interface{}, error) {
deoloyList, err := deploy.k8sClient.AppsV1beta2().Deployments("").List(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return deoloyList.Items, nil
}
func (deploy *deployment) getWatcher() (watch.Interface, error) {
watcher, err := deploy.k8sClient.AppsV1beta2().Deployments("").Watch(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return watcher, nil
}
func (deploy *deployment) generateObject(item v1beta2.Deployment) WorkLoadObject {
var app string
var ready bool
var updateTime meta_v1.Time
name := item.Name
namespace := item.Namespace
availablePodNum := item.Status.AvailableReplicas
desirePodNum := *item.Spec.Replicas
release := item.ObjectMeta.Labels["release"]
chart := item.ObjectMeta.Labels["chart"]
if len(release) > 0 && len(chart) > 0 {
app = release + "/" + chart
} else {
app = "-"
}
for _, conditon := range item.Status.Conditions {
if conditon.Type == "Progressing" {
updateTime = conditon.LastUpdateTime
}
}
if availablePodNum >= desirePodNum {
ready = true
} else {
ready = false
}
deployObject := WorkLoadObject{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum,
App: app, UpdateTime: updateTime, Ready: ready}
return deployObject
}
func (deploy *deployment) updateWithObject(status *ResourceStatus, item v1beta2.Deployment) {
namespace := item.Namespace
deployObject := deploy.generateObject(item)
status.ResourceList.update(namespace, deployObject)
}
func (deploy *deployment) updateWithObjects(status *ResourceStatus, objects interface{}) {
if status.ResourceList == nil {
status.ResourceList = make(Resources)
}
items := objects.([]v1beta2.Deployment)
for _, item := range items {
deploy.updateWithObject(status, item)
}
}
func (deploy *deployment) updateWithEvent(status *ResourceStatus, event watch.Event) {
object := event.Object.(*v1beta2.Deployment)
namespace := object.Namespace
deployObject := deploy.generateObject(*object)
if event.Type == watch.Deleted {
status.ResourceList.del(namespace, deployObject)
return
}
deploy.updateWithObject(status, *object)
}

View File

@@ -0,0 +1,87 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"k8s.io/api/extensions/v1beta1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
type ingress struct {
k8sClient *kubernetes.Clientset
}
func (ing *ingress) list() (interface{}, error) {
list, err := ing.k8sClient.ExtensionsV1beta1().Ingresses("").List(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return list.Items, nil
}
func (ing *ingress) getWatcher() (watch.Interface, error) {
watcher, err := ing.k8sClient.ExtensionsV1beta1().Ingresses("").Watch(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return watcher, nil
}
func (ing *ingress) generateObject(item v1beta1.Ingress) OtherResourceObject {
name := item.Name
ns := item.Namespace
object := OtherResourceObject{Namespace: ns, Name: name}
return object
}
func (ing *ingress) updateWithObject(status *ResourceStatus, item v1beta1.Ingress) {
namespace := item.Namespace
object := ing.generateObject(item)
status.ResourceList.update(namespace, object)
}
func (ing *ingress) updateWithObjects(status *ResourceStatus, objects interface{}) {
if status.ResourceList == nil {
status.ResourceList = make(Resources)
}
items := objects.([]v1beta1.Ingress)
for _, item := range items {
ing.updateWithObject(status, item)
}
}
func (ing *ingress) updateWithEvent(status *ResourceStatus, event watch.Event) {
object := event.Object.(*v1beta1.Ingress)
namespace := object.Namespace
tmpObject := ing.generateObject(*object)
if event.Type == watch.Deleted {
status.ResourceList.del(namespace, tmpObject)
return
}
ing.updateWithObject(status, *object)
}

View File

@@ -0,0 +1,87 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
type namespace struct {
k8sClient *kubernetes.Clientset
}
func (ns *namespace) list() (interface{}, error) {
nsList, err := ns.k8sClient.CoreV1().Namespaces().List(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return nsList.Items, nil
}
func (ns *namespace) getWatcher() (watch.Interface, error) {
watcher, err := ns.k8sClient.CoreV1().Namespaces().Watch(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return watcher, nil
}
func (ns *namespace) generateObject(item v1.Namespace) OtherResourceObject {
name := item.Name
nsp := item.Namespace
object := OtherResourceObject{Namespace: nsp, Name: name}
return object
}
func (ns *namespace) updateWithObject(status *ResourceStatus, item v1.Namespace) {
namespace := item.Namespace
object := ns.generateObject(item)
status.ResourceList.update(namespace, object)
}
func (ns *namespace) updateWithObjects(status *ResourceStatus, objects interface{}) {
if status.ResourceList == nil {
status.ResourceList = make(Resources)
}
items := objects.([]v1.Namespace)
for _, item := range items {
ns.updateWithObject(status, item)
}
}
func (ns *namespace) updateWithEvent(status *ResourceStatus, event watch.Event) {
object := event.Object.(*v1.Namespace)
namespace := object.Namespace
tmpObject := ns.generateObject(*object)
if event.Type == watch.Deleted {
status.ResourceList.del(namespace, tmpObject)
return
}
ns.updateWithObject(status, *object)
}

View File

@@ -0,0 +1,87 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
type pod struct {
k8sClient *kubernetes.Clientset
}
func (po *pod) list() (interface{}, error) {
list, err := po.k8sClient.CoreV1().Pods("").List(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return list.Items, nil
}
func (po *pod) getWatcher() (watch.Interface, error) {
watcher, err := po.k8sClient.CoreV1().Pods("").Watch(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return watcher, nil
}
func (po *pod) generateObject(item v1.Pod) OtherResourceObject {
name := item.Name
ns := item.Namespace
Object := OtherResourceObject{Namespace: ns, Name: name}
return Object
}
func (po *pod) updateWithObject(status *ResourceStatus, item v1.Pod) {
namespace := item.Namespace
object := po.generateObject(item)
status.ResourceList.update(namespace, object)
}
func (po *pod) updateWithObjects(status *ResourceStatus, objects interface{}) {
if status.ResourceList == nil {
status.ResourceList = make(Resources)
}
items := objects.([]v1.Pod)
for _, item := range items {
po.updateWithObject(status, item)
}
}
func (po *pod) updateWithEvent(status *ResourceStatus, event watch.Event) {
object := event.Object.(*v1.Pod)
namespace := object.Namespace
tmpObject := po.generateObject(*object)
if event.Type == watch.Deleted {
status.ResourceList.del(namespace, tmpObject)
return
}
po.updateWithObject(status, *object)
}

View File

@@ -0,0 +1,87 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
type persistmentVolume struct {
k8sClient *kubernetes.Clientset
}
func (pvc *persistmentVolume) list() (interface{}, error) {
list, err := pvc.k8sClient.CoreV1().PersistentVolumeClaims("").List(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return list.Items, nil
}
func (pvc *persistmentVolume) getWatcher() (watch.Interface, error) {
watcher, err := pvc.k8sClient.CoreV1().PersistentVolumeClaims("").Watch(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return watcher, nil
}
func (pvc *persistmentVolume) generateObject(item v1.PersistentVolumeClaim) OtherResourceObject {
name := item.Name
ns := item.Namespace
object := OtherResourceObject{Namespace: ns, Name: name}
return object
}
func (pvc *persistmentVolume) updateWithObject(status *ResourceStatus, item v1.PersistentVolumeClaim) {
namespace := item.Namespace
object := pvc.generateObject(item)
status.ResourceList.update(namespace, object)
}
func (pvc *persistmentVolume) updateWithObjects(status *ResourceStatus, objects interface{}) {
if status.ResourceList == nil {
status.ResourceList = make(Resources)
}
items := objects.([]v1.PersistentVolumeClaim)
for _, item := range items {
pvc.updateWithObject(status, item)
}
}
func (pvc *persistmentVolume) updateWithEvent(status *ResourceStatus, event watch.Event) {
object := event.Object.(*v1.PersistentVolumeClaim)
namespace := object.Namespace
tmpObject := pvc.generateObject(*object)
if event.Type == watch.Deleted {
status.ResourceList.del(namespace, tmpObject)
return
}
pvc.updateWithObject(status, *object)
}

View File

@@ -0,0 +1,87 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"k8s.io/api/rbac/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
type role struct {
k8sClient *kubernetes.Clientset
}
func (r *role) list() (interface{}, error) {
list, err := r.k8sClient.RbacV1().Roles("").List(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return list.Items, nil
}
func (r *role) getWatcher() (watch.Interface, error) {
watcher, err := r.k8sClient.RbacV1().Roles("").Watch(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return watcher, nil
}
func (r *role) generateObject(item v1.Role) OtherResourceObject {
name := item.Name
ns := item.Namespace
object := OtherResourceObject{Namespace: ns, Name: name}
return object
}
func (r *role) updateWithObject(status *ResourceStatus, item v1.Role) {
namespace := item.Namespace
object := r.generateObject(item)
status.ResourceList.update(namespace, object)
}
func (r *role) updateWithObjects(status *ResourceStatus, objects interface{}) {
if status.ResourceList == nil {
status.ResourceList = make(Resources)
}
items := objects.([]v1.Role)
for _, item := range items {
r.updateWithObject(status, item)
}
}
func (r *role) updateWithEvent(status *ResourceStatus, event watch.Event) {
object := event.Object.(*v1.Role)
namespace := object.Namespace
tmpObject := r.generateObject(*object)
if event.Type == watch.Deleted {
status.ResourceList.del(namespace, tmpObject)
return
}
r.updateWithObject(status, *object)
}

View File

@@ -0,0 +1,178 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"encoding/json"
"time"
"github.com/golang/glog"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
)
var etcdClient *client.EtcdClient
var stopChan = make(chan struct{})
const (
pods = "pods"
deployments = "deployments"
daemonsets = "daemonsets"
statefulsets = "statefulsets"
namespaces = "namespaces"
ingresses = "ingresses"
persistentVolumeClaim = "persistent-volume-claim"
roles = "roles"
services = "services"
)
func registerResource(resourceChans map[string]ResourceChan, resourceType string) {
resourceChan := ResourceChan{Type: resourceType, StatusChan: make(chan *ResourceStatus), StopChan: stopChan}
resourceChans[resourceType] = resourceChan
}
func updateStatus(resource Resource, resourceChan ResourceChan) {
defer func() {
if err := recover(); err != nil {
glog.Error(err)
close(resourceChan.StatusChan)
}
}()
var clusterStatus ResourceStatus
clusterStatus.UpdateTimeStamp = time.Now().Unix()
clusterStatus.ResourceType = resourceChan.Type
items, err := resource.list()
if err != nil {
glog.Errorln(err)
return
}
resource.updateWithObjects(&clusterStatus, items)
watcher, err := resource.getWatcher()
if err != nil {
glog.Error(err)
return
}
for {
select {
case <-resourceChan.StopChan:
return
case event := <-watcher.ResultChan():
resource.updateWithEvent(&clusterStatus, event)
break
default:
break
}
if time.Now().Unix()-clusterStatus.UpdateTimeStamp > constants.UpdateCircle {
clusterStatus.UpdateTimeStamp = time.Now().Unix()
resourceChan.StatusChan <- &clusterStatus
}
}
}
func updateResourceStatus(resourceChan ResourceChan) {
glog.Infof("updateResourceStatus:%s", resourceChan.Type)
client := client.NewK8sClient()
switch resourceChan.Type {
case deployments:
deploy := deployment{k8sClient: client}
go updateStatus(&deploy, resourceChan)
case daemonsets:
ds := daemonset{k8sClient: client}
go updateStatus(&ds, resourceChan)
case statefulsets:
ss := statefulset{k8sClient: client}
go updateStatus(&ss, resourceChan)
case namespaces:
ns := namespace{k8sClient: client}
go updateStatus(&ns, resourceChan)
case ingresses:
ing := ingress{k8sClient: client}
go updateStatus(&ing, resourceChan)
case persistentVolumeClaim:
pvc := persistmentVolume{k8sClient: client}
go updateStatus(&pvc, resourceChan)
case roles:
r := role{k8sClient: client}
go updateStatus(&r, resourceChan)
case services:
svc := service{k8sClient: client}
go updateStatus(&svc, resourceChan)
case pods:
po := pod{k8sClient: client}
go updateStatus(&po, resourceChan)
}
}
func updateAllResourceStatus(resourceChans map[string]ResourceChan) {
for _, resourceChan := range resourceChans {
updateResourceStatus(resourceChan)
}
}
func receiveResourceStatus(resourceChans map[string]ResourceChan) {
defer func() {
close(stopChan)
}()
for {
for _, resourceChan := range resourceChans {
select {
case res, ok := <-resourceChan.StatusChan:
if !ok {
glog.Errorf("job:calculate %s' status have stopped", resourceChan.Type)
registerResource(resourceChans, resourceChan.Type)
updateResourceStatus(resourceChans[resourceChan.Type])
} else {
value, _ := json.Marshal(res)
key := constants.Root + "/" + res.ResourceType
etcdClient.Put(key, string(value))
}
default:
continue
}
}
}
}
func Run() {
glog.Info("Begin to submit resource status")
var err error
etcdClient, err = client.NewEtcdClient()
defer etcdClient.Close()
if err != nil {
glog.Error(err)
}
resourceChans := make(map[string]ResourceChan)
resourceList := []string{statefulsets, deployments, daemonsets, namespaces, ingresses, services, roles, persistentVolumeClaim, pods}
for _, resource := range resourceList {
registerResource(resourceChans, resource)
}
updateAllResourceStatus(resourceChans)
receiveResourceStatus(resourceChans)
}

View File

@@ -0,0 +1,87 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
type service struct {
k8sClient *kubernetes.Clientset
}
func (svc *service) list() (interface{}, error) {
list, err := svc.k8sClient.CoreV1().Services("").List(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return list.Items, nil
}
func (svc *service) getWatcher() (watch.Interface, error) {
watcher, err := svc.k8sClient.CoreV1().Services("").Watch(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return watcher, nil
}
func (svc *service) generateObject(item v1.Service) OtherResourceObject {
name := item.Name
ns := item.Namespace
object := OtherResourceObject{Namespace: ns, Name: name}
return object
}
func (svc *service) updateWithObject(status *ResourceStatus, item v1.Service) {
namespace := item.Namespace
object := svc.generateObject(item)
status.ResourceList.update(namespace, object)
}
func (svc *service) updateWithObjects(status *ResourceStatus, objects interface{}) {
if status.ResourceList == nil {
status.ResourceList = make(Resources)
}
items := objects.([]v1.Service)
for _, item := range items {
svc.updateWithObject(status, item)
}
}
func (svc *service) updateWithEvent(status *ResourceStatus, event watch.Event) {
object := event.Object.(*v1.Service)
namespace := object.Namespace
tmpObject := svc.generateObject(*object)
if event.Type == watch.Deleted {
status.ResourceList.del(namespace, tmpObject)
return
}
svc.updateWithObject(status, *object)
}

View File

@@ -0,0 +1,107 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"k8s.io/api/apps/v1beta2"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
type statefulset struct {
k8sClient *kubernetes.Clientset
}
func (ss *statefulset) list() (interface{}, error) {
daemonsetList, err := ss.k8sClient.AppsV1beta2().StatefulSets("").List(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return daemonsetList.Items, nil
}
func (ss *statefulset) getWatcher() (watch.Interface, error) {
watcher, err := ss.k8sClient.AppsV1beta2().StatefulSets("").Watch(meta_v1.ListOptions{})
if err != nil {
return nil, err
}
return watcher, nil
}
func (ss *statefulset) generateObject(item v1beta2.StatefulSet) WorkLoadObject {
var app string
var ready bool
name := item.Name
namespace := item.Namespace
availablePodNum := item.Status.ReadyReplicas
desirePodNum := *item.Spec.Replicas
createTime := item.CreationTimestamp
release := item.ObjectMeta.Labels["release"]
chart := item.ObjectMeta.Labels["chart"]
if len(release) > 0 && len(chart) > 0 {
app = release + "/" + chart
} else {
app = "-"
}
if availablePodNum >= desirePodNum {
ready = true
} else {
ready = false
}
statefulSetObject := WorkLoadObject{Namespace: namespace, Name: name, Available: availablePodNum, Desire: desirePodNum,
App: app, CreateTime: createTime, Ready: ready}
return statefulSetObject
}
func (ss *statefulset) updateWithObject(status *ResourceStatus, item v1beta2.StatefulSet) {
namespace := item.Namespace
ssObject := ss.generateObject(item)
status.ResourceList.update(namespace, ssObject)
}
func (ss *statefulset) updateWithObjects(status *ResourceStatus, objects interface{}) {
if status.ResourceList == nil {
status.ResourceList = make(Resources)
}
items := objects.([]v1beta2.StatefulSet)
for _, item := range items {
ss.updateWithObject(status, item)
}
}
func (ss *statefulset) updateWithEvent(status *ResourceStatus, event watch.Event) {
object := event.Object.(*v1beta2.StatefulSet)
namespace := object.Namespace
ssObject := ss.generateObject(*object)
if event.Type == watch.Deleted {
status.ResourceList.del(namespace, ssObject)
return
}
ss.updateWithObject(status, *object)
}

View File

@@ -0,0 +1,105 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)
type resource interface {
equal(object resource) bool
}
type resourcetList interface {
update(key string, value resource)
del(key string, value resource)
}
type ResourceStatus struct {
ResourceType string `json:"type"`
ResourceList resourcetList `json:"lists"`
UpdateTimeStamp int64 `json:"updateTimestamp"`
}
type ResourceChan struct {
Type string
StatusChan chan *ResourceStatus
StopChan chan struct{}
}
type Resource interface {
list() (interface{}, error)
getWatcher() (watch.Interface, error)
updateWithObjects(workload *ResourceStatus, objects interface{})
updateWithEvent(workload *ResourceStatus, event watch.Event)
}
type WorkLoadObject struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
App string `json:"app"`
Available int32 `json:"available"`
Desire int32 `json:"desire"`
Ready bool `json:"ready"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
UpdateTime meta_v1.Time `json:"updateTime,omitempty"`
CreateTime meta_v1.Time `json:"createTime,omitempty"`
}
type OtherResourceObject struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
}
type Resources map[string][]resource
func (resources Resources) update(namespace string, object resource) {
for index, tmpObject := range resources[namespace] {
if tmpObject.equal(object) {
resources[namespace][index] = object
return
}
}
resources[namespace] = append(resources[namespace], object)
}
func (resources Resources) del(namespace string, object resource) {
for index, tmpObject := range resources[namespace] {
if tmpObject.equal(object) {
resources[namespace] = append(resources[namespace][:index], resources[namespace][index+1:]...)
return
}
}
}
func (workLoadObject WorkLoadObject) equal(object resource) bool {
tmp := object.(WorkLoadObject)
if workLoadObject.Name == tmp.Name && workLoadObject.Namespace == tmp.Namespace {
return true
}
return false
}
func (otherResourceObject OtherResourceObject) equal(object resource) bool {
tmp := object.(OtherResourceObject)
if otherResourceObject.Name == tmp.Name && otherResourceObject.Namespace == tmp.Namespace {
return true
}
return false
}

View File

@@ -18,84 +18,259 @@ package models
import (
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/base64"
"text/template"
"encoding/pem"
"io/ioutil"
"math/big"
rd "math/rand"
"time"
"github.com/golang/glog"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"gopkg.in/yaml.v2"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/options"
)
var kubeconfigTemp = "apiVersion: v1\n" +
"clusters:\n" +
"- cluster:\n" +
" certificate-authority-data: {{.Certificate}}\n" +
" server: {{.Server}}\n" +
" name: kubernetes\n" +
"contexts:\n" +
"- context:\n" +
" cluster: kubernetes\n" +
" user: {{.User}}\n" +
" namespace: {{.User}}\n" +
" name: default\n" +
"current-context: default\n" +
"kind: Config\n" +
"preferences: {}\n" +
"users:\n" +
"- name: {{.User}}\n" +
" user:\n" +
" token: {{.Token}}\n"
const (
caPath = "/etc/kubernetes/pki/ca.crt"
keyPath = "/etc/kubernetes/pki/ca.key"
clusterName = "kubernetes"
kubectlNamespace = "kubesphere"
kubectlConfigKey = "config"
)
const DefaultServiceAccount = "default"
type Config struct {
Certificate string
Server string
User string
Token string
type clusterInfo struct {
CertificateAuthorityData string `yaml:"certificate-authority-data"`
Server string `yaml:"server"`
}
func GetKubeConfig(namespace string) (string, error) {
tmpl, err := template.New("").Parse(kubeconfigTemp)
type cluster struct {
Cluster clusterInfo `yaml:"cluster"`
Name string `yaml:"name"`
}
type contextInfo struct {
Cluster string `yaml:"cluster"`
User string `yaml:"user"`
}
type contextObject struct {
Context contextInfo `yaml:"context"`
Name string `yaml:"name"`
}
type userInfo struct {
CaData string `yaml:"client-certificate-data"`
KeyData string `yaml:"client-key-data"`
}
type user struct {
Name string `yaml:"name"`
User userInfo `yaml:"user"`
}
type kubeConfig struct {
ApiVersion string `yaml:"apiVersion"`
Clusters []cluster `yaml:"clusters"`
Contexts []contextObject `yaml:"contexts"`
CurrentContext string `yaml:"current-context"`
Kind string `yaml:"kind"`
Preferences map[string]string `yaml:"preferences"`
Users []user `yaml:"users"`
}
type CertInformation struct {
Country []string
Organization []string
OrganizationalUnit []string
EmailAddress []string
Province []string
Locality []string
CommonName string
CrtName, KeyName string
IsCA bool
Names []pkix.AttributeTypeAndValue
}
func createCRT(RootCa *x509.Certificate, RootKey *rsa.PrivateKey, info CertInformation) ([]byte, []byte, error) {
var cert, key bytes.Buffer
Crt := newCertificate(info)
Key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
glog.Error(err)
return nil, nil, err
}
var buf []byte
buf, err = x509.CreateCertificate(rand.Reader, Crt, RootCa, &Key.PublicKey, RootKey)
if err != nil {
glog.Error(err)
return nil, nil, err
}
pem.Encode(&cert, &pem.Block{Type: "CERTIFICATE", Bytes: buf})
if err != nil {
glog.Error(err)
return nil, nil, err
}
buf = x509.MarshalPKCS1PrivateKey(Key)
pem.Encode(&key, &pem.Block{Type: "PRIVATE KEY", Bytes: buf})
return cert.Bytes(), key.Bytes(), nil
}
func Parse(crtPath, keyPath string) (rootcertificate *x509.Certificate, rootPrivateKey *rsa.PrivateKey, err error) {
rootcertificate, err = parseCrt(crtPath)
if err != nil {
glog.Error(err)
return nil, nil, err
}
rootPrivateKey, err = parseKey(keyPath)
return rootcertificate, rootPrivateKey, nil
}
func parseCrt(path string) (*x509.Certificate, error) {
buf, err := ioutil.ReadFile(path)
if err != nil {
glog.Error(err)
return nil, err
}
p := &pem.Block{}
p, buf = pem.Decode(buf)
return x509.ParseCertificate(p.Bytes)
}
func parseKey(path string) (*rsa.PrivateKey, error) {
buf, err := ioutil.ReadFile(path)
if err != nil {
glog.Error(err)
return nil, err
}
p, buf := pem.Decode(buf)
return x509.ParsePKCS1PrivateKey(p.Bytes)
}
func newCertificate(info CertInformation) *x509.Certificate {
rd.Seed(time.Now().UnixNano())
return &x509.Certificate{
SerialNumber: big.NewInt(rd.Int63()),
Subject: pkix.Name{
Country: info.Country,
Organization: info.Organization,
OrganizationalUnit: info.OrganizationalUnit,
Province: info.Province,
CommonName: info.CommonName,
Locality: info.Locality,
ExtraNames: info.Names,
},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(20, 0, 0),
BasicConstraintsValid: true,
IsCA: info.IsCA,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
EmailAddresses: info.EmailAddress,
}
}
func generateCaAndKey(user, caPath, keyPath string) (string, string, error) {
crtinfo := CertInformation{CommonName: user, IsCA: false}
crt, pri, err := Parse(caPath, keyPath)
if err != nil {
glog.Error(err)
return "", "", err
}
cert, key, err := createCRT(crt, pri, crtinfo)
if err != nil {
glog.Error(err)
return "", "", err
}
base64Cert := base64.StdEncoding.EncodeToString(cert)
base64Key := base64.StdEncoding.EncodeToString(key)
return base64Cert, base64Key, nil
}
func createKubeConfig(userName string) (string, error) {
tmpKubeConfig := kubeConfig{ApiVersion: "v1", Kind: "Config"}
serverCa, err := ioutil.ReadFile(caPath)
if err != nil {
glog.Errorln(err)
return "", err
}
base64ServerCa := base64.StdEncoding.EncodeToString(serverCa)
tmpClusterInfo := clusterInfo{CertificateAuthorityData: base64ServerCa, Server: options.ServerOptions.GetApiServerHost()}
tmpCluster := cluster{Cluster: tmpClusterInfo, Name: clusterName}
tmpKubeConfig.Clusters = append(tmpKubeConfig.Clusters, tmpCluster)
kubeConfig, err := getKubeConfig(namespace, options.ServerOptions.GetApiServerHost())
contextName := userName + "@" + clusterName
tmpContext := contextObject{Context: contextInfo{User: userName, Cluster: clusterName}, Name: contextName}
tmpKubeConfig.Contexts = append(tmpKubeConfig.Contexts, tmpContext)
cert, key, err := generateCaAndKey(userName, caPath, keyPath)
buf := bytes.NewBufferString("")
err = tmpl.Execute(buf, kubeConfig)
if err != nil {
glog.Errorln(err)
return "", err
}
return buf.String(), nil
tmpUser := user{User: userInfo{CaData: cert, KeyData: key}, Name: userName}
tmpKubeConfig.Users = append(tmpKubeConfig.Users, tmpUser)
tmpKubeConfig.CurrentContext = contextName
config, err := yaml.Marshal(tmpKubeConfig)
if err != nil {
return "", err
}
return string(config), nil
}
func getKubeConfig(namespace, apiserverHost string) (*Config, error) {
func CreateKubeConfig(user string) error {
k8sClient := client.NewK8sClient()
saInfo, err := k8sClient.CoreV1().ServiceAccounts(namespace).Get(DefaultServiceAccount, meta_v1.GetOptions{})
config, err := createKubeConfig(user)
if err != nil {
glog.Errorln(err)
return nil, err
return err
}
secretName := saInfo.Secrets[0].Name
secretInfo, err := k8sClient.CoreV1().Secrets(namespace).Get(secretName, meta_v1.GetOptions{})
data := map[string]string{"config": string(config)}
var configmap = v1.ConfigMap{metav1.TypeMeta{Kind: "Configmap", APIVersion: "v1"}, metav1.ObjectMeta{Name: user}, data}
_, err = k8sClient.CoreV1().ConfigMaps(kubectlNamespace).Create(&configmap)
if err != nil {
glog.Errorln(err)
return nil, err
return err
}
return nil
secretData := secretInfo.Data
certificate := string(secretData["ca.crt"])
certificate = base64.StdEncoding.EncodeToString([]byte(certificate))
server := apiserverHost
token := string(secretData["token"])
user := string(secretData["namespace"])
return &Config{Certificate: certificate, Server: server, Token: token, User: user}, nil
}
func GetKubeConfig(user string) (string, error) {
k8sClient := client.NewK8sClient()
configmap, err := k8sClient.CoreV1().ConfigMaps(kubectlNamespace).Get(user, metav1.GetOptions{})
if err != nil {
glog.Errorln(err)
return "", err
}
return configmap.Data[kubectlConfigKey], nil
}
func DelKubeConfig(user string) error {
k8sClient := client.NewK8sClient()
err := k8sClient.CoreV1().ConfigMaps(kubectlNamespace).Delete(user, &metav1.DeleteOptions{})
if err != nil {
glog.Errorln(err)
return err
}
return nil
}

View File

@@ -21,26 +21,27 @@ import (
"math/rand"
"github.com/golang/glog"
"k8s.io/api/apps/v1beta2"
"k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/options"
)
const (
deploymentName = "kubectl"
)
const namespace = constants.NameSpace
type kubectlPodInfo struct {
Namespace string `json: "namespace"`
Pod string `json: "podname"`
Container string `json: "container"`
Namespace string `json:"namespace"`
Pod string `json:"pod"`
Container string `json:"container"`
}
func GetKubectlPod(namespace string) (kubectlPodInfo, error) {
func GetKubectlPod(user string) (kubectlPodInfo, error) {
k8sClient := client.NewK8sClient()
deploy, err := k8sClient.AppsV1beta2().Deployments(namespace).Get(deploymentName, meta_v1.GetOptions{})
deploy, err := k8sClient.AppsV1beta2().Deployments(namespace).Get(user, meta_v1.GetOptions{})
if err != nil {
glog.Errorln(err)
return kubectlPodInfo{}, err
@@ -84,3 +85,53 @@ func selectCorrectPod(namespace string, pods []v1.Pod) (kubectlPod v1.Pod, err e
random := rand.Intn(len(kubectPodList))
return kubectPodList[random], nil
}
func CreateKubectlPod(user string) error {
replica := int32(1)
selector := meta_v1.LabelSelector{MatchLabels: map[string]string{"user": user}}
config := v1.ConfigMapVolumeSource{Items: []v1.KeyToPath{{Key: "config", Path: "config"}}, LocalObjectReference: v1.LocalObjectReference{Name: user}}
deployment := v1beta2.Deployment{
ObjectMeta: meta_v1.ObjectMeta{
Name: user,
},
Spec: v1beta2.DeploymentSpec{
Replicas: &replica,
Selector: &selector,
Template: v1.PodTemplateSpec{
ObjectMeta: meta_v1.ObjectMeta{
Labels: map[string]string{
"user": user,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Name: "kubectl",
Image: options.ServerOptions.GetKubectlImage(),
VolumeMounts: []v1.VolumeMount{{Name: "kubeconfig", MountPath: "/root/.kube"}},
},
},
Volumes: []v1.Volume{{Name: "kubeconfig", VolumeSource: v1.VolumeSource{ConfigMap: &config}}},
},
},
},
}
k8sClient := client.NewK8sClient()
_, err := k8sClient.AppsV1beta2().Deployments(namespace).Create(&deployment)
return err
}
func DelKubectlPod(user string) error {
k8sClient := client.NewK8sClient()
deploy, err := k8sClient.AppsV1beta2().Deployments(namespace).Get(user, meta_v1.GetOptions{})
if err != nil {
return err
}
replicas := int32(0)
deploy.Spec.Replicas = &replicas
k8sClient.AppsV1beta2().Deployments(namespace).Update(deploy)
err = k8sClient.AppsV1beta2().Deployments(namespace).Delete(user, &meta_v1.DeleteOptions{})
return err
}

66
pkg/models/quota.go Normal file
View File

@@ -0,0 +1,66 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package models
import (
"k8s.io/api/core/v1"
"encoding/json"
"errors"
"time"
"github.com/golang/glog"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
)
type resourceQuota struct {
NameSpace string `json:"namespace"`
Data v1.ResourceQuotaStatus `json:"data"`
UpdateTimeStamp int64 `json:"updateTimeStamp"`
}
func GetNamespaceQuota(namespace string) (*resourceQuota, error) {
cli, err := client.NewEtcdClient()
if err != nil {
glog.Error(err)
}
defer cli.Close()
key := constants.Root + "/" + constants.QuotaKey + "/" + namespace
value, err := cli.Get(key)
var data = v1.ResourceQuotaStatus{make(v1.ResourceList), make(v1.ResourceList)}
var res = resourceQuota{Data: data}
err = json.Unmarshal(value, &res)
if time.Now().Unix()-res.UpdateTimeStamp > 5*constants.UpdateCircle {
err = errors.New("internal server error")
return nil, err
}
if err != nil {
return nil, err
}
return &res, nil
}
func GetClusterQuota() (*resourceQuota, error) {
return GetNamespaceQuota("\"\"")
}

315
pkg/models/terminal.go Normal file
View File

@@ -0,0 +1,315 @@
// Copyright 2018 The Kubesphere Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
restful "github.com/emicklei/go-restful"
"github.com/golang/glog"
"gopkg.in/igm/sockjs-go.v2/sockjs"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/options"
)
// TerminalResponse is sent by handleExecShell. The Id is a random session id that binds the original REST request and the SockJS connection.
// Any clientapi in possession of this Id can hijack the terminal session.
type TerminalResponse struct {
Id string `json:"id"`
}
// PtyHandler is what remotecommand expects from a pty
type PtyHandler interface {
io.Reader
io.Writer
remotecommand.TerminalSizeQueue
}
// TerminalSession implements PtyHandler (using a SockJS connection)
type TerminalSession struct {
id string
bound chan error
sockJSSession sockjs.Session
sizeChan chan remotecommand.TerminalSize
}
// TerminalMessage is the messaging protocol between ShellController and TerminalSession.
//
// OP DIRECTION FIELD(S) USED DESCRIPTION
// ---------------------------------------------------------------------
// bind fe->be SessionID Id sent back from TerminalResponse
// stdin fe->be Data Keystrokes/paste buffer
// resize fe->be Rows, Cols New terminal size
// stdout be->fe Data Output from the process
// toast be->fe Data OOB message to be shown to the user
type TerminalMessage struct {
Op, Data, SessionID string
Rows, Cols uint16
}
// TerminalSize handles pty->process resize events
// Called in a loop from remotecommand as long as the process is running
func (t TerminalSession) Next() *remotecommand.TerminalSize {
select {
case size := <-t.sizeChan:
return &size
}
}
// Read handles pty->process messages (stdin, resize)
// Called in a loop from remotecommand as long as the process is running
func (t TerminalSession) Read(p []byte) (int, error) {
m, err := t.sockJSSession.Recv()
if err != nil {
return 0, err
}
var msg TerminalMessage
if err := json.Unmarshal([]byte(m), &msg); err != nil {
return 0, err
}
switch msg.Op {
case "stdin":
return copy(p, msg.Data), nil
case "resize":
t.sizeChan <- remotecommand.TerminalSize{msg.Cols, msg.Rows}
return 0, nil
default:
return 0, fmt.Errorf("unknown message type '%s'", msg.Op)
}
}
// Write handles process->pty stdout
// Called from remotecommand whenever there is any output
func (t TerminalSession) Write(p []byte) (int, error) {
msg, err := json.Marshal(TerminalMessage{
Op: "stdout",
Data: string(p),
})
if err != nil {
return 0, err
}
if err = t.sockJSSession.Send(string(msg)); err != nil {
return 0, err
}
return len(p), nil
}
// Toast can be used to send the user any OOB messages
// hterm puts these in the center of the terminal
func (t TerminalSession) Toast(p string) error {
msg, err := json.Marshal(TerminalMessage{
Op: "toast",
Data: p,
})
if err != nil {
return err
}
if err = t.sockJSSession.Send(string(msg)); err != nil {
return err
}
return nil
}
// Close shuts down the SockJS connection and sends the status code and reason to the client
// Can happen if the process exits or if there is an error starting up the process
// For now the status code is unused and reason is shown to the user (unless "")
func (t TerminalSession) Close(status uint32, reason string) {
t.sockJSSession.Close(status, reason)
}
// terminalSessions stores a map of all TerminalSession objects
// FIXME: this structure needs locking
var terminalSessions = make(map[string]TerminalSession)
// handleTerminalSession is Called by net/http for any new /api/sockjs connections
func handleTerminalSession(session sockjs.Session) {
glog.Infof("handleTerminalSession, ID:%s", session.ID())
var (
buf string
err error
msg TerminalMessage
terminalSession TerminalSession
ok bool
)
if buf, err = session.Recv(); err != nil {
glog.Errorf("handleTerminalSession: can't Recv: %v", err)
return
}
if err = json.Unmarshal([]byte(buf), &msg); err != nil {
glog.Errorf("handleTerminalSession: can't UnMarshal (%v): %s", err, buf)
return
}
if msg.Op != "bind" {
glog.Errorf("handleTerminalSession: expected 'bind' message, got: %s", buf)
return
}
if terminalSession, ok = terminalSessions[msg.SessionID]; !ok {
glog.Errorf("handleTerminalSession: can't find session '%s'", msg.SessionID)
return
}
terminalSession.sockJSSession = session
terminalSessions[msg.SessionID] = terminalSession
terminalSession.bound <- nil
}
// CreateAttachHandler is called from main for /api/sockjs
func CreateTerminalHandler(path string) http.Handler {
return sockjs.NewHandler(path, sockjs.DefaultOptions, handleTerminalSession)
}
// startProcess is called by handleAttach
// Executed cmd in the container specified in request and connects it up with the ptyHandler (a session)
func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config, request *restful.Request, cmd []string, ptyHandler PtyHandler) error {
namespace := request.PathParameter("namespace")
podName := request.PathParameter("pod")
containerName := request.PathParameter("container")
req := k8sClient.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec")
req.VersionedParams(&v1.PodExecOptions{
Container: containerName,
Command: cmd,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL())
if err != nil {
return err
}
err = exec.Stream(remotecommand.StreamOptions{
Stdin: ptyHandler,
Stdout: ptyHandler,
Stderr: ptyHandler,
TerminalSizeQueue: ptyHandler,
Tty: true,
})
if err != nil {
return err
}
return nil
}
// genTerminalSessionId generates a random session ID string. The format is not really interesting.
// This ID is used to identify the session when the client opens the SockJS connection.
// Not the same as the SockJS session id! We can't use that as that is generated
// on the client side and we don't have it yet at this point.
func genTerminalSessionId() (string, error) {
bytes := make([]byte, 16)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
id := make([]byte, hex.EncodedLen(len(bytes)))
hex.Encode(id, bytes)
glog.Infof("genTerminalSessionId, id:" + string(id))
return string(id), nil
}
// isValidShell checks if the shell is an allowed one
func isValidShell(validShells []string, shell string) bool {
for _, validShell := range validShells {
if validShell == shell {
return true
}
}
return false
}
// WaitForTerminal is called from apihandler.handleAttach as a goroutine
// Waits for the SockJS connection to be opened by the client the session to be bound in handleTerminalSession
func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *restful.Request, sessionId string) {
glog.Infof("WaitForTerminal, ID:%s", sessionId)
shell := request.QueryParameter("shell")
select {
case <-terminalSessions[sessionId].bound:
close(terminalSessions[sessionId].bound)
var err error
validShells := []string{"bash", "sh"}
if isValidShell(validShells, shell) {
cmd := []string{shell}
err = startProcess(k8sClient, cfg, request, cmd, terminalSessions[sessionId])
} else {
// No shell given or it was not valid: try some shells until one succeeds or all fail
// FIXME: if the first shell fails then the first keyboard event is lost
for _, testShell := range validShells {
cmd := []string{testShell}
if err = startProcess(k8sClient, cfg, request, cmd, terminalSessions[sessionId]); err == nil {
break
}
}
}
if err != nil {
terminalSessions[sessionId].Close(2, err.Error())
return
}
terminalSessions[sessionId].Close(1, "Process exited")
}
}
// Handles execute shell API call
func HandleExecShell(request *restful.Request) (*TerminalResponse, error) {
sessionId, err := genTerminalSessionId()
if err != nil {
return nil, err
}
terminalSessions[sessionId] = TerminalSession{
id: sessionId,
bound: make(chan error),
sizeChan: make(chan remotecommand.TerminalSize),
}
kubeconfig, err := options.ServerOptions.GetKubeConfig()
if err != nil {
return nil, err
}
go WaitForTerminal(client.NewK8sClient(), kubeconfig, request, sessionId)
return &TerminalResponse{Id: sessionId}, nil
}

View File

@@ -0,0 +1,70 @@
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package models
import (
"encoding/json"
"errors"
"time"
"github.com/golang/glog"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/constants"
)
type workLoadStatus struct {
NameSpace string `json:"namespace"`
Data map[string]int `json:"data"`
UpdateTimeStamp int64 `json:"updateTimeStamp"`
}
var resourceList = []string{"deployments", "daemonsets", "statefulsets"}
func GetNamespacesResourceStatus(namespace string) (*workLoadStatus, error) {
cli, err := client.NewEtcdClient()
if err != nil {
glog.Error(err)
return nil, err
}
defer cli.Close()
res := workLoadStatus{Data: make(map[string]int)}
key := constants.Root + "/" + constants.WorkloadStatusKey + "/" + namespace
value, err := cli.Get(key)
if err != nil {
return nil, err
}
err = json.Unmarshal(value, &res)
if err != nil {
return nil, err
}
if time.Now().Unix()-res.UpdateTimeStamp > 5*constants.UpdateCircle {
err = errors.New("data in etcd is too old")
return nil, err
}
return &res, nil
}
func GetClusterResourceStatus() (*workLoadStatus, error) {
return GetNamespacesResourceStatus("\"\"")
}

View File

@@ -20,8 +20,20 @@ package options
import (
goflag "flag"
"net"
"strings"
"github.com/spf13/pflag"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
const (
// High enough QPS to fit all expected use cases. QPS=0 is not set here, because
// client code is overriding it.
DefaultQPS = 1e6
// High enough Burst to fit all expected use cases. Burst=0 is not set here, because
// client code is overriding it.
DefaultBurst = 1e6
)
// ServerRunOptions runs a kubernetes api server.
@@ -34,6 +46,11 @@ type ServerRunOptions struct {
certFile string
keyFile string
kubeConfigFile string
etcdEndpoints string
etcdCertFile string
etcdKeyFile string
etcdCaFile string
kubectlImage string
}
// NewServerRunOptions creates a new ServerRunOptions object with default parameters
@@ -66,6 +83,20 @@ func (s *ServerRunOptions) addFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.kubeConfigFile, "kubeconfig", "",
"Path to kubeconfig file with authorization and master location information.")
fs.StringVar(&s.etcdEndpoints, "etcd-endpoints", "",
"Server addresses of etcd")
fs.StringVar(&s.etcdCertFile, "etcd-tls-cert-file", "",
"Cert File use to connect etcd in https mode.")
fs.StringVar(&s.etcdKeyFile, "etcd-tls-key-file", "",
"Privatekey File use to connect etcd in https mode.")
fs.StringVar(&s.etcdCaFile, "etcd-tls-ca-file", "",
"CA Fileuse to connect etcd in https mode.")
fs.StringVar(&s.kubectlImage, "kubectl-image", "kubectl:1.0",
"kubectl pod's image")
}
func (s *ServerRunOptions) GetApiServerHost() string {
@@ -100,6 +131,56 @@ func (s *ServerRunOptions) GetKubeConfigFile() string {
return s.kubeConfigFile
}
func (s *ServerRunOptions) GetKubeConfig() (kubeConfig *rest.Config, err error) {
kubeConfigFile := s.kubeConfigFile
if len(kubeConfigFile) > 0 {
kubeConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfigFile)
if err != nil {
return nil, err
}
} else {
kubeConfig, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
}
kubeConfig.QPS = DefaultQPS
kubeConfig.Burst = DefaultBurst
return kubeConfig, nil
}
func (s *ServerRunOptions) GetEtcdEndPoints() []string {
endpoints := strings.Split(s.etcdEndpoints, ",")
for k, v := range endpoints {
endpoints[k] = strings.TrimSpace(v)
}
return endpoints
}
func (s *ServerRunOptions) GetEtcdCertFile() string {
return s.etcdCertFile
}
func (s *ServerRunOptions) GetEtcdKeyFile() string {
return s.etcdKeyFile
}
func (s *ServerRunOptions) GetEtcdCaFile() string {
return s.etcdCaFile
}
func (s *ServerRunOptions) GetKubectlImage() string {
return s.kubectlImage
}
var ServerOptions = NewServerRunOptions()
func AddFlags(fs *pflag.FlagSet) {