Add metering api.

Signed-off-by: yunkunrao <yunkunrao@yunify.com>
This commit is contained in:
yunkunrao
2020-10-16 11:01:50 +08:00
committed by Rao Yunkun
parent 2f5202f38a
commit e9073f0486
31 changed files with 4794 additions and 101 deletions

View File

@@ -20,14 +20,17 @@ package v1alpha3
import (
"errors"
"regexp"
"strings"
"github.com/emicklei/go-restful"
"k8s.io/client-go/kubernetes"
"kubesphere.io/kubesphere/pkg/api"
"kubesphere.io/kubesphere/pkg/informers"
model "kubesphere.io/kubesphere/pkg/models/monitoring"
resourcev1alpha3 "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/resource"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix"
"regexp"
)
type handler struct {
@@ -35,8 +38,8 @@ type handler struct {
mo model.MonitoringOperator
}
func newHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, o openpitrix.Client) *handler {
return &handler{k, model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, o)}
func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, o openpitrix.Client, resourceGetter *resourcev1alpha3.ResourceGetter) *handler {
return &handler{k, model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, o, resourceGetter)}
}
func (h handler) handleKubeSphereMetricsQuery(req *restful.Request, resp *restful.Response) {
@@ -186,6 +189,10 @@ func (h handler) handleNamedMetricsQuery(resp *restful.Response, q queryOptions)
var metrics []string
for _, metric := range q.namedMetrics {
if strings.HasPrefix(metric, model.MetricMeterPrefix) {
// skip meter metric
continue
}
ok, _ := regexp.MatchString(q.metricFilter, metric)
if ok {
metrics = append(metrics, metric)

View File

@@ -17,14 +17,21 @@ limitations under the License.
package v1alpha3
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/emicklei/go-restful"
"github.com/pkg/errors"
corev1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"kubesphere.io/kubesphere/pkg/api"
model "kubesphere.io/kubesphere/pkg/models/monitoring"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
"strconv"
"time"
)
const (
@@ -34,47 +41,56 @@ const (
DefaultPage = 1
DefaultLimit = 5
OperationQuery = "query"
OperationExport = "export"
ComponentEtcd = "etcd"
ComponentAPIServer = "apiserver"
ComponentScheduler = "scheduler"
ErrNoHit = "'end' or 'time' must be after the namespace creation time."
ErrParamConflict = "'time' and the combination of 'start' and 'end' are mutually exclusive."
ErrInvalidStartEnd = "'start' must be before 'end'."
ErrInvalidPage = "Invalid parameter 'page'."
ErrInvalidLimit = "Invalid parameter 'limit'."
ErrNoHit = "'end' or 'time' must be after the namespace creation time."
ErrParamConflict = "'time' and the combination of 'start' and 'end' are mutually exclusive."
ErrInvalidStartEnd = "'start' must be before 'end'."
ErrInvalidPage = "Invalid parameter 'page'."
ErrInvalidLimit = "Invalid parameter 'limit'."
ErrParameterNotfound = "Parmameter [%s] not found"
)
type reqParams struct {
time string
start string
end string
step string
target string
order string
page string
limit string
metricFilter string
namespacedResourcesFilter string
resourceFilter string
nodeName string
workspaceName string
namespaceName string
workloadKind string
workloadName string
podName string
containerName string
pvcName string
storageClassName string
componentType string
expression string
metric string
operation string
time string
start string
end string
step string
target string
order string
page string
limit string
metricFilter string
resourceFilter string
nodeName string
workspaceName string
namespaceName string
workloadKind string
workloadName string
podName string
containerName string
pvcName string
storageClassName string
componentType string
expression string
metric string
applications string
services string
pvcFilter string
}
type queryOptions struct {
metricFilter string
namedMetrics []string
Operation string
start time.Time
end time.Time
time time.Time
@@ -99,6 +115,7 @@ func (q queryOptions) shouldSort() bool {
func parseRequestParams(req *restful.Request) reqParams {
var r reqParams
r.operation = req.QueryParameter("operation")
r.time = req.QueryParameter("time")
r.start = req.QueryParameter("start")
r.end = req.QueryParameter("end")
@@ -108,12 +125,24 @@ func parseRequestParams(req *restful.Request) reqParams {
r.page = req.QueryParameter("page")
r.limit = req.QueryParameter("limit")
r.metricFilter = req.QueryParameter("metrics_filter")
r.namespacedResourcesFilter = req.QueryParameter("namespaced_resources_filter")
r.resourceFilter = req.QueryParameter("resources_filter")
r.nodeName = req.PathParameter("node")
r.workspaceName = req.PathParameter("workspace")
r.namespaceName = req.PathParameter("namespace")
r.workloadKind = req.PathParameter("kind")
if req.QueryParameter("node") != "" {
r.nodeName = req.QueryParameter("node")
} else {
// compatible with monitoring request
r.nodeName = req.PathParameter("node")
}
if req.QueryParameter("kind") != "" {
r.workloadKind = req.QueryParameter("kind")
} else {
// compatible with monitoring request
r.workloadKind = req.PathParameter("kind")
}
r.workloadName = req.PathParameter("workload")
r.podName = req.PathParameter("pod")
r.containerName = req.PathParameter("container")
@@ -122,6 +151,10 @@ func parseRequestParams(req *restful.Request) reqParams {
r.componentType = req.PathParameter("component")
r.expression = req.QueryParameter("expr")
r.metric = req.QueryParameter("metric")
r.applications = req.QueryParameter("applications")
r.services = req.QueryParameter("services")
r.pvcFilter = req.QueryParameter("pvc_filter")
return r
}
@@ -135,70 +168,109 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
q.metricFilter = DefaultFilter
}
q.Operation = r.operation
if r.operation == "" {
q.Operation = OperationQuery
}
switch lvl {
case monitoring.LevelCluster:
q.option = monitoring.ClusterOption{}
q.namedMetrics = model.ClusterMetrics
case monitoring.LevelNode:
q.identifier = model.IdentifierNode
q.namedMetrics = model.NodeMetrics
q.option = monitoring.NodeOption{
ResourceFilter: r.resourceFilter,
NodeName: r.nodeName,
ResourceFilter: r.resourceFilter,
NodeName: r.nodeName,
PVCFilter: r.pvcFilter, // metering pvc
StorageClassName: r.storageClassName, // metering pvc
}
q.namedMetrics = model.NodeMetrics
case monitoring.LevelWorkspace:
q.identifier = model.IdentifierWorkspace
q.namedMetrics = model.WorkspaceMetrics
q.option = monitoring.WorkspaceOption{
ResourceFilter: r.resourceFilter,
WorkspaceName: r.workspaceName,
ResourceFilter: r.resourceFilter,
WorkspaceName: r.workspaceName,
PVCFilter: r.pvcFilter, // metering pvc
StorageClassName: r.storageClassName, // metering pvc
}
q.namedMetrics = model.WorkspaceMetrics
case monitoring.LevelNamespace:
q.identifier = model.IdentifierNamespace
q.namedMetrics = model.NamespaceMetrics
q.option = monitoring.NamespaceOption{
ResourceFilter: r.resourceFilter,
WorkspaceName: r.workspaceName,
NamespaceName: r.namespaceName,
ResourceFilter: r.resourceFilter,
WorkspaceName: r.workspaceName,
NamespaceName: r.namespaceName,
PVCFilter: r.pvcFilter, // metering pvc
StorageClassName: r.storageClassName, // metering pvc
}
q.namedMetrics = model.NamespaceMetrics
case monitoring.LevelApplication:
q.identifier = model.IdentifierApplication
if r.namespaceName == "" {
return q, errors.New(fmt.Sprintf(ErrParameterNotfound, "namespace"))
}
q.option = monitoring.ApplicationsOption{
NamespaceName: r.namespaceName,
Applications: strings.Split(r.applications, "|"),
StorageClassName: r.storageClassName, // metering pvc
}
q.namedMetrics = model.ApplicationMetrics
case monitoring.LevelWorkload:
q.identifier = model.IdentifierWorkload
q.namedMetrics = model.WorkloadMetrics
q.option = monitoring.WorkloadOption{
ResourceFilter: r.resourceFilter,
NamespaceName: r.namespaceName,
WorkloadKind: r.workloadKind,
}
q.namedMetrics = model.WorkloadMetrics
case monitoring.LevelPod:
q.identifier = model.IdentifierPod
q.namedMetrics = model.PodMetrics
q.option = monitoring.PodOption{
NamespacedResourcesFilter: r.namespacedResourcesFilter,
ResourceFilter: r.resourceFilter,
NodeName: r.nodeName,
NamespaceName: r.namespaceName,
WorkloadKind: r.workloadKind,
WorkloadName: r.workloadName,
PodName: r.podName,
ResourceFilter: r.resourceFilter,
NodeName: r.nodeName,
NamespaceName: r.namespaceName,
WorkloadKind: r.workloadKind,
WorkloadName: r.workloadName,
PodName: r.podName,
}
q.namedMetrics = model.PodMetrics
case monitoring.LevelService:
q.identifier = model.IdentifierService
q.option = monitoring.ServicesOption{
NamespaceName: r.namespaceName,
Services: strings.Split(r.services, "|"),
}
q.namedMetrics = model.ServiceMetrics
case monitoring.LevelContainer:
q.identifier = model.IdentifierContainer
q.namedMetrics = model.ContainerMetrics
q.option = monitoring.ContainerOption{
ResourceFilter: r.resourceFilter,
NamespaceName: r.namespaceName,
PodName: r.podName,
ContainerName: r.containerName,
}
q.namedMetrics = model.ContainerMetrics
case monitoring.LevelPVC:
q.identifier = model.IdentifierPVC
q.namedMetrics = model.PVCMetrics
q.option = monitoring.PVCOption{
ResourceFilter: r.resourceFilter,
NamespaceName: r.namespaceName,
StorageClassName: r.storageClassName,
PersistentVolumeClaimName: r.pvcName,
}
q.namedMetrics = model.PVCMetrics
case monitoring.LevelComponent:
q.option = monitoring.ComponentOption{}
switch r.componentType {
@@ -304,3 +376,35 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
return q, nil
}
func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
resp.Header().Set(restful.HEADER_ContentType, "text/plain")
resp.Header().Set("Content-Disposition", "attachment")
for i, _ := range metrics.Results {
ret := metrics.Results[i]
for j, _ := range ret.MetricValues {
ret.MetricValues[j].TransferToExportedMetricValue()
}
}
resBytes, err := json.MarshalIndent(metrics, "", " ")
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
output := new(bytes.Buffer)
_, err = output.Write(resBytes)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
_, err = io.Copy(resp, output)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
return
}

View File

@@ -18,6 +18,9 @@ package v1alpha3
import (
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -25,8 +28,6 @@ import (
"kubesphere.io/kubesphere/pkg/informers"
model "kubesphere.io/kubesphere/pkg/models/monitoring"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
"testing"
"time"
)
func TestIsRangeQuery(t *testing.T) {
@@ -84,6 +85,7 @@ func TestParseRequestParams(t *testing.T) {
metricFilter: ".*",
namedMetrics: model.ClusterMetrics,
option: monitoring.ClusterOption{},
Operation: OperationQuery,
},
expectedErr: false,
},
@@ -114,6 +116,7 @@ func TestParseRequestParams(t *testing.T) {
ResourceFilter: ".*",
NamespaceName: "default",
},
Operation: OperationQuery,
},
expectedErr: false,
},
@@ -181,6 +184,7 @@ func TestParseRequestParams(t *testing.T) {
metricFilter: "etcd_server_list",
namedMetrics: model.EtcdMetrics,
option: monitoring.ComponentOption{},
Operation: OperationQuery,
},
expectedErr: false,
},
@@ -208,6 +212,7 @@ func TestParseRequestParams(t *testing.T) {
order: "desc",
page: 1,
limit: 10,
Operation: OperationQuery,
},
expectedErr: false,
},
@@ -217,7 +222,7 @@ func TestParseRequestParams(t *testing.T) {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
client := fake.NewSimpleClientset(&tt.namespace)
fakeInformerFactory := informers.NewInformerFactories(client, nil, nil, nil, nil, nil)
handler := newHandler(client, nil, nil, fakeInformerFactory, nil)
handler := NewHandler(client, nil, nil, fakeInformerFactory, nil, nil)
result, err := handler.makeQueryOptions(tt.params, tt.lvl)
if err != nil {

View File

@@ -0,0 +1,327 @@
package v1alpha3
import (
"regexp"
"strings"
"github.com/emicklei/go-restful"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api"
model "kubesphere.io/kubesphere/pkg/models/monitoring"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
func (h handler) HandleClusterMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelCluster)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func getMetricPosMap(metrics []monitoring.Metric) map[string]int {
var metricMap = make(map[string]int)
for i, m := range metrics {
metricMap[m.MetricName] = i
}
return metricMap
}
func (h handler) handleApplicationMetersQuery(meters []string, resp *restful.Response, q queryOptions) {
var metricMap = make(map[string]int)
var res model.Metrics
var current_res model.Metrics
var err error
aso, ok := q.option.(monitoring.ApplicationsOption)
if !ok {
klog.Error("invalid application option")
return
}
componentsMap := h.mo.GetAppComponentsMap(aso.NamespaceName, aso.Applications)
for k, _ := range componentsMap {
opt := monitoring.ApplicationOption{
NamespaceName: aso.NamespaceName,
Application: k,
ApplicationComponents: componentsMap[k],
StorageClassName: aso.StorageClassName,
}
if q.isRangeQuery() {
current_res, err = h.mo.GetNamedMetersOverTime(meters, q.start, q.end, q.step, opt)
} else {
current_res, err = h.mo.GetNamedMeters(meters, q.time, opt)
}
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
if res.Results == nil {
res = current_res
metricMap = getMetricPosMap(res.Results)
} else {
for _, cur_res := range current_res.Results {
pos, ok := metricMap[cur_res.MetricName]
if ok {
res.Results[pos].MetricValues = append(res.Results[pos].MetricValues, cur_res.MetricValues...)
} else {
res.Results = append(res.Results, cur_res)
}
}
}
}
if !q.isRangeQuery() && q.shouldSort() {
res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit)
}
if q.Operation == OperationExport {
ExportMetrics(resp, res)
return
}
resp.WriteAsJson(res)
}
func (h handler) handleServiceMetersQuery(meters []string, resp *restful.Response, q queryOptions) {
var metricMap = make(map[string]int)
var res model.Metrics
var current_res model.Metrics
var err error
sso, ok := q.option.(monitoring.ServicesOption)
if !ok {
klog.Error("invalid service option")
return
}
svcPodsMap := h.mo.GetSerivePodsMap(sso.NamespaceName, sso.Services)
for k, _ := range svcPodsMap {
opt := monitoring.ServiceOption{
NamespaceName: sso.NamespaceName,
ServiceName: k,
PodNames: svcPodsMap[k],
}
if q.isRangeQuery() {
current_res, err = h.mo.GetNamedMetersOverTime(meters, q.start, q.end, q.step, opt)
} else {
current_res, err = h.mo.GetNamedMeters(meters, q.time, opt)
}
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
if res.Results == nil {
res = current_res
metricMap = getMetricPosMap(res.Results)
} else {
for _, cur_res := range current_res.Results {
pos, ok := metricMap[cur_res.MetricName]
if ok {
res.Results[pos].MetricValues = append(res.Results[pos].MetricValues, cur_res.MetricValues...)
} else {
res.Results = append(res.Results, cur_res)
}
}
}
}
if !q.isRangeQuery() && q.shouldSort() {
res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit)
}
if q.Operation == OperationExport {
ExportMetrics(resp, res)
return
}
resp.WriteAsJson(res)
}
func (h handler) handleNamedMetersQuery(resp *restful.Response, q queryOptions) {
var res model.Metrics
var err error
var meters []string
for _, meter := range q.namedMetrics {
if !strings.HasPrefix(meter, model.MetricMeterPrefix) {
// skip non-meter metric
continue
}
ok, _ := regexp.MatchString(q.metricFilter, meter)
if ok {
meters = append(meters, meter)
}
}
if len(meters) == 0 {
klog.Info("no meters found")
resp.WriteAsJson(res)
return
}
_, ok := q.option.(monitoring.ApplicationsOption)
if ok {
h.handleApplicationMetersQuery(meters, resp, q)
return
}
_, ok = q.option.(monitoring.ServicesOption)
if ok {
h.handleServiceMetersQuery(meters, resp, q)
return
}
if q.isRangeQuery() {
res, err = h.mo.GetNamedMetersOverTime(meters, q.start, q.end, q.step, q.option)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
} else {
res, err = h.mo.GetNamedMeters(meters, q.time, q.option)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
if q.shouldSort() {
res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit)
}
}
if q.Operation == OperationExport {
ExportMetrics(resp, res)
return
}
resp.WriteAsJson(res)
}
func (h handler) HandleNodeMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelNode)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleWorkspaceMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelWorkspace)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleNamespaceMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelNamespace)
if err != nil {
if err.Error() == ErrNoHit {
res := handleNoHit(opt.namedMetrics)
resp.WriteAsJson(res)
return
}
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleWorkloadMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelWorkload)
if err != nil {
if err.Error() == ErrNoHit {
res := handleNoHit(opt.namedMetrics)
resp.WriteAsJson(res)
return
}
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleApplicationMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelApplication)
if err != nil {
if err.Error() == ErrNoHit {
res := handleNoHit(opt.namedMetrics)
resp.WriteAsJson(res)
return
}
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandlePodMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelPod)
if err != nil {
if err.Error() == ErrNoHit {
res := handleNoHit(opt.namedMetrics)
resp.WriteAsJson(res)
return
}
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleServiceMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelService)
if err != nil {
if err.Error() == ErrNoHit {
res := handleNoHit(opt.namedMetrics)
resp.WriteAsJson(res)
return
}
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandlePVCMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelPVC)
if err != nil {
if err.Error() == ErrNoHit {
res := handleNoHit(opt.namedMetrics)
resp.WriteAsJson(res)
return
}
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}

View File

@@ -42,7 +42,7 @@ var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha3"}
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, opClient openpitrix.Client) error {
ws := runtime.NewWebService(GroupVersion)
h := newHandler(k8sClient, monitoringClient, metricsClient, factory, opClient)
h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, opClient, nil)
ws.Route(ws.GET("/kubesphere").
To(h.handleKubeSphereMetricsQuery).