From beb7efdac04047a6a0b4d7056a2abc9558c46651 Mon Sep 17 00:00:00 2001 From: Carman Zhang Date: Wed, 14 Nov 2018 16:04:24 +0800 Subject: [PATCH] fixed deployment-pods metrics --- .../v1alpha/monitoring/monitor_handler.go | 69 +++----- pkg/models/metrics/metrics.go | 166 +++++++++++++----- pkg/models/metrics/metricsrule.go | 2 +- pkg/models/metrics/util.go | 4 +- 4 files changed, 155 insertions(+), 86 deletions(-) diff --git a/pkg/apis/v1alpha/monitoring/monitor_handler.go b/pkg/apis/v1alpha/monitoring/monitor_handler.go index 36b580722..262b3d116 100755 --- a/pkg/apis/v1alpha/monitoring/monitor_handler.go +++ b/pkg/apis/v1alpha/monitoring/monitor_handler.go @@ -1,9 +1,12 @@ /* Copyright 2018 The KubeSphere Authors. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -66,38 +69,31 @@ func (u Monitor) monitorWorkload(request *restful.Request, response *restful.Res } } -// merge multiple metric: all-devops, all-roles, all-projects...this api is designed for admin -func (u Monitor) monitorAllWorkspacesStatistics(request *restful.Request, response *restful.Response) { - res := metrics.MonitorAllWorkspacesStatistics() - response.WriteAsJson(res) -} - -// merge multiple metric: devops, roles, projects... -func (u Monitor) monitorOneWorkspaceStatistics(request *restful.Request, response *restful.Response) { - requestParams := client.ParseMonitoringRequestParams(request) - wsName := requestParams.WsName - res := metrics.MonitorOneWorkspaceStatistics(wsName) - response.WriteAsJson(res) -} - func (u Monitor) monitorAllWorkspaces(request *restful.Request, response *restful.Response) { requestParams := client.ParseMonitoringRequestParams(request) - rawMetrics := metrics.MonitorAllWorkspaces(requestParams) - // sorting - sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelWorkspace) - // paging - pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount) + if requestParams.Tp == "_statistics" { + // merge multiple metric: all-devops, all-roles, all-projects...this api is designed for admin + res := metrics.MonitorAllWorkspacesStatistics() - response.WriteAsJson(pagedMetrics) + response.WriteAsJson(res) + } else { + rawMetrics := metrics.MonitorAllWorkspaces(requestParams) + // sorting + sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelWorkspace) + // paging + pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount) + + response.WriteAsJson(pagedMetrics) + } } func (u Monitor) monitorOneWorkspace(request *restful.Request, response *restful.Response) { requestParams := client.ParseMonitoringRequestParams(request) tp := requestParams.Tp - if tp != "" { + if tp == "rank" { // multiple rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace) // sorting @@ -107,6 +103,12 @@ func (u Monitor) monitorOneWorkspace(request *restful.Request, response *restful response.WriteAsJson(pagedMetrics) + } else if tp == "_statistics" { + wsName := requestParams.WsName + + // merge multiple metric: devops, roles, projects... + res := metrics.MonitorOneWorkspaceStatistics(wsName) + response.WriteAsJson(res) } else { res := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace) response.WriteAsJson(res) @@ -350,17 +352,8 @@ func Register(ws *restful.WebService, subPath string) { Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) - // merge multiple metric: devops, roles, projects... - ws.Route(ws.GET(subPath+"/cluster_workspaces/{workspace_name}/_statistics").To(u.monitorOneWorkspaceStatistics). - Filter(route.RouteLogging). - Doc("monitor specific workspace level metrics"). - Param(ws.PathParameter("workspace_name", "workspace name").DataType("string").Required(true)). - Metadata(restfulspec.KeyOpenAPITags, tags)). - Consumes(restful.MIME_JSON, restful.MIME_XML). - Produces(restful.MIME_JSON) - // list all namespace in this workspace by selected metrics - ws.Route(ws.GET(subPath+"/cluster_workspaces/{workspace_name}").To(u.monitorOneWorkspace). + ws.Route(ws.GET(subPath+"/workspaces/{workspace_name}").To(u.monitorOneWorkspace). Filter(route.RouteLogging). Doc("monitor workspaces level metrics"). Param(ws.PathParameter("workspace_name", "workspace name").DataType("string").Required(true)). @@ -370,21 +363,12 @@ func Register(ws *restful.WebService, subPath string) { Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). - Param(ws.QueryParameter("type", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")). Metadata(restfulspec.KeyOpenAPITags, tags)). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) - // metrics from system users or projects, merge multiple metric: all-devops, all-roles, all-projects... - ws.Route(ws.GET(subPath+"/cluster_workspaces/_statistics").To(u.monitorAllWorkspacesStatistics). - Filter(route.RouteLogging). - Doc("monitor specific workspace level metrics"). - Metadata(restfulspec.KeyOpenAPITags, tags)). - Consumes(restful.MIME_JSON, restful.MIME_XML). - Produces(restful.MIME_JSON) - - // metrics from prometheus - ws.Route(ws.GET(subPath+"/cluster_workspaces").To(u.monitorAllWorkspaces). + ws.Route(ws.GET(subPath+"/workspaces").To(u.monitorAllWorkspaces). Filter(route.RouteLogging). Doc("monitor workspaces level metrics"). Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("workspace_memory_utilisation")). @@ -393,6 +377,7 @@ func Register(ws *restful.WebService, subPath string) { Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")). Metadata(restfulspec.KeyOpenAPITags, tags)). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) diff --git a/pkg/models/metrics/metrics.go b/pkg/models/metrics/metrics.go index 0127dd5ab..5287ab71c 100644 --- a/pkg/models/metrics/metrics.go +++ b/pkg/models/metrics/metrics.go @@ -30,6 +30,9 @@ import ( "k8s.io/api/core/v1" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "runtime/debug" + "sort" + "kubesphere.io/kubesphere/pkg/client" "kubesphere.io/kubesphere/pkg/models" "kubesphere.io/kubesphere/pkg/models/workspaces" @@ -38,7 +41,7 @@ import ( var nodeStatusDelLables = []string{"endpoint", "instance", "job", "namespace", "pod", "service"} const ( - ChannelMaxCapacityWorkspaceMetric = 400 + ChannelMaxCapacityWorkspaceMetric = 800 ChannelMaxCapacity = 100 ) @@ -126,6 +129,7 @@ func getPodNameRegexInWorkload(res string) string { glog.Errorln("json parse failed", jsonErr) } var podNames []string + for _, item := range dat.Data.Result { podName := item.KubePodMetric.Pod podNames = append(podNames, podName) @@ -134,8 +138,66 @@ func getPodNameRegexInWorkload(res string) string { return podNamesFilter } +func unifyMetricHistoryTimeRange(fmtMetrics *FormatedMetric) { + + var timestampMap = make(map[float64]bool) + + if fmtMetrics.Data.ResultType == ResultTypeMatrix { + for i, _ := range fmtMetrics.Data.Result { + values, exist := fmtMetrics.Data.Result[i][ResultItemValues] + if exist { + valueArray, sure := values.([]interface{}) + if sure { + for j, _ := range valueArray { + timeAndValue := valueArray[j].([]interface{}) + timestampMap[timeAndValue[0].(float64)] = true + } + } + } + } + } + + timestampArray := make([]float64, len(timestampMap)) + i := 0 + for timestamp, _ := range timestampMap { + timestampArray[i] = timestamp + i++ + } + sort.Float64s(timestampArray) + + if fmtMetrics.Data.ResultType == ResultTypeMatrix { + for i := 0; i < len(fmtMetrics.Data.Result); i++ { + + values, exist := fmtMetrics.Data.Result[i][ResultItemValues] + if exist { + valueArray, sure := values.([]interface{}) + if sure { + + formatValueArray := make([][]interface{}, len(timestampArray)) + j := 0 + + for k, _ := range timestampArray { + valueItem, sure := valueArray[j].([]interface{}) + if sure && valueItem[0].(float64) == timestampArray[k] { + formatValueArray[k] = []interface{}{int64(timestampArray[k]), valueItem[1]} + j++ + } else { + formatValueArray[k] = []interface{}{int64(timestampArray[k]), "-1"} + } + } + fmtMetrics.Data.Result[i][ResultItemValues] = formatValueArray + } + } + } + } +} + func AssembleWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) { - rule := MakeWorkloadRule(monitoringRequest.WorkloadKind, monitoringRequest.WorkloadName, monitoringRequest.NsName) + + nsName := monitoringRequest.NsName + wkName := monitoringRequest.WorkloadName + + rule := MakeWorkloadRule(monitoringRequest.WorkloadKind, wkName, nsName) paramValues := monitoringRequest.Params params := makeRequestParamString(rule, paramValues) @@ -144,7 +206,7 @@ func AssembleWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringReque podNamesFilter := getPodNameRegexInWorkload(res) queryType := monitoringRequest.QueryType - rule = MakePodPromQL(metricName, monitoringRequest.NsName, "", "", podNamesFilter) + rule = MakePodPromQL(metricName, nsName, "", "", podNamesFilter) params = makeRequestParamString(rule, paramValues) return queryType, params @@ -167,7 +229,10 @@ func GetMetric(queryType, params, metricName string) *FormatedMetric { } func GetNodeAddressInfo() *map[string][]v1.NodeAddress { - nodeList, _ := client.NewK8sClient().CoreV1().Nodes().List(metaV1.ListOptions{}) + nodeList, err := client.NewK8sClient().CoreV1().Nodes().List(metaV1.ListOptions{}) + if err != nil { + glog.Errorln(err.Error()) + } var nodeAddress = make(map[string][]v1.NodeAddress) for _, node := range nodeList.Items { nodeAddress[node.Name] = node.Status.Addresses @@ -179,11 +244,13 @@ func AddNodeAddressMetric(nodeMetric *FormatedMetric, nodeAddress *map[string][] for i := 0; i < len(nodeMetric.Data.Result); i++ { metricDesc := nodeMetric.Data.Result[i][ResultItemMetric] - metricDescMap := metricDesc.(map[string]interface{}) - if nodeId, exist := metricDescMap["node"]; exist { - addr, exist := (*nodeAddress)[nodeId.(string)] - if exist { - metricDescMap["address"] = addr + metricDescMap, ensure := metricDesc.(map[string]interface{}) + if ensure { + if nodeId, exist := metricDescMap["node"]; exist { + addr, exist := (*nodeAddress)[nodeId.(string)] + if exist { + metricDescMap["address"] = addr + } } } } @@ -223,6 +290,13 @@ func AssembleWorkspaceMetricRequestInfo(monitoringRequest *client.MonitoringRequ func makeRequestParamString(rule string, paramValues url.Values) string { + defer func() { + if err := recover(); err != nil { + glog.Errorln(err) + debug.PrintStack() + } + }() + var values = make(url.Values) for key, v := range paramValues { values.Set(key, v[0]) @@ -265,7 +339,12 @@ func MonitorAllWorkspaces(monitoringRequest *client.MonitoringRequestParams) *Fo var wgAll sync.WaitGroup var wsAllch = make(chan *[]FormatedMetric, ChannelMaxCapacityWorkspaceMetric) - workspaceNamespaceMap, _, _ := workspaces.GetAllOrgAndProjList() + workspaceNamespaceMap, _, err := workspaces.GetAllOrgAndProjList() + + if err != nil { + glog.Errorln(err.Error()) + } + for ws, _ := range workspaceNamespaceMap { bol, err := regexp.MatchString(monitoringRequest.WsFilter, ws) if err == nil && bol { @@ -320,11 +399,6 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w for _, metricName := range filterMetricsName { wg.Add(1) go func(metricName string) { - defer func() { - if err := recover(); err != nil { - glog.Errorln(err) - } - }() queryType, params := AssembleWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName) ch <- GetMetric(queryType, params, metricName) @@ -341,9 +415,11 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w if oneMetric != nil { // add "workspace" filed to oneMetric `metric` field for i := 0; i < len(oneMetric.Data.Result); i++ { - tmap := oneMetric.Data.Result[i]["metric"].(map[string]interface{}) - tmap[MetricLevelWorkspace] = ws - oneMetric.Data.Result[i]["metric"] = tmap + tmap, sure := oneMetric.Data.Result[i][ResultItemMetric].(map[string]interface{}) + if sure { + tmap[MetricLevelWorkspace] = ws + oneMetric.Data.Result[i][ResultItemMetric] = tmap + } } metricsArray = append(metricsArray, *oneMetric) } @@ -406,19 +482,7 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour } namespaceArray = filterNamespace(monitoringRequest.NsFilter, namespaceArray) - if monitoringRequest.Tp == "" { - for _, metricName := range WorkspaceMetricsNames { - bol, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && bol { - wg.Add(1) - go func(metricName string) { - queryType, params := AssembleWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName) - ch <- GetMetric(queryType, params, metricName) - wg.Done() - }(metricName) - } - } - } else { + if monitoringRequest.Tp == "rank" { for _, metricName := range NamespaceMetricsNames { bol, err := regexp.MatchString(metricsFilter, metricName) ns := "^(" + strings.Join(namespaceArray, "|") + ")$" @@ -432,6 +496,19 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour }(metricName) } } + + } else { + for _, metricName := range WorkspaceMetricsNames { + bol, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && bol { + wg.Add(1) + go func(metricName string) { + queryType, params := AssembleWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName) + ch <- GetMetric(queryType, params, metricName) + wg.Done() + }(metricName) + } + } } } case MetricLevelNamespace: @@ -457,7 +534,9 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour go func(metricName string) { metricName = strings.TrimLeft(metricName, "workload_") queryType, params := AssembleWorkloadMetricRequestInfo(monitoringRequest, metricName) - ch <- GetMetric(queryType, params, metricName) + fmtMetrics := GetMetric(queryType, params, metricName) + unifyMetricHistoryTimeRange(fmtMetrics) + ch <- fmtMetrics wg.Done() }(metricName) } @@ -563,11 +642,13 @@ func calcWorkspaceNamespace(metric *FormatedMetric) int { var workspaceNamespaceCount = 0 for _, result := range metric.Data.Result { - tmpMap := result[ResultItemMetric].(map[string]interface{}) - wsName, exist := tmpMap[WorkspaceJoinedKey] + tmpMap, sure := result[ResultItemMetric].(map[string]interface{}) + if sure { + wsName, exist := tmpMap[WorkspaceJoinedKey] - if exist && wsName != "" { - workspaceNamespaceCount += 1 + if exist && wsName != "" { + workspaceNamespaceCount += 1 + } } } @@ -788,11 +869,14 @@ func MonitorComponentStatus(monitoringRequest *client.MonitoringRequestParams) * var normalNodes []string var abnormalNodes []string for _, result := range nodeStatusMetric.Data.Result { - tmap := result[ResultItemMetric].(map[string]interface{}) - if tmap[MetricStatus].(string) == "false" { - abnormalNodes = append(abnormalNodes, tmap[MetricLevelNode].(string)) - } else { - normalNodes = append(normalNodes, tmap[MetricLevelNode].(string)) + tmap, sure := result[ResultItemMetric].(map[string]interface{}) + + if sure { + if tmap[MetricStatus].(string) == "false" { + abnormalNodes = append(abnormalNodes, tmap[MetricLevelNode].(string)) + } else { + normalNodes = append(normalNodes, tmap[MetricLevelNode].(string)) + } } } diff --git a/pkg/models/metrics/metricsrule.go b/pkg/models/metrics/metricsrule.go index b2321b6b1..f0ff0811e 100755 --- a/pkg/models/metrics/metricsrule.go +++ b/pkg/models/metrics/metricsrule.go @@ -29,7 +29,7 @@ func MakeWorkloadRule(wkKind, wkName, namespace string) string { case "deployment": wkKind = ReplicaSet if wkName != "" { - wkName = "~\"" + wkName + ".*\"" + wkName = "~\"^" + wkName + `-(\\w)+$"` } else { wkName = "~\".*\"" } diff --git a/pkg/models/metrics/util.go b/pkg/models/metrics/util.go index 78f62e46e..36b081461 100644 --- a/pkg/models/metrics/util.go +++ b/pkg/models/metrics/util.go @@ -230,8 +230,8 @@ func ReformatJson(metric string, metricsName string, needDelParams ...string) *F result := formatMetric.Data.Result for _, res := range result { metric, exist := res[ResultItemMetric] - metricMap := metric.(map[string]interface{}) - if exist { + metricMap, sure := metric.(map[string]interface{}) + if exist && sure { delete(metricMap, "__name__") } if len(needDelParams) > 0 {