From 20a4525d587c27768be36a472b3e506356a5759c Mon Sep 17 00:00:00 2001 From: huanggze Date: Sun, 28 Apr 2019 13:43:09 +0800 Subject: [PATCH] refactor monitoring modules Signed-off-by: huanggze --- pkg/apiserver/monitoring/monitoring.go | 26 +- pkg/models/metrics/metrics.go | 674 +++++++++++++++---------- pkg/models/metrics/metricsruleconst.go | 33 +- pkg/models/metrics/namespaces.go | 8 +- pkg/models/metrics/util.go | 20 +- 5 files changed, 464 insertions(+), 297 deletions(-) diff --git a/pkg/apiserver/monitoring/monitoring.go b/pkg/apiserver/monitoring/monitoring.go index d1a9887b3..5b6f5c8a3 100644 --- a/pkg/apiserver/monitoring/monitoring.go +++ b/pkg/apiserver/monitoring/monitoring.go @@ -33,13 +33,13 @@ func MonitorPod(request *restful.Request, response *restful.Response) { var res *metrics.FormatedMetric if !nullRule { metricsStr := prometheus.SendMonitoringRequest(prometheus.PrometheusEndpoint, queryType, params) - res = metrics.ReformatJson(metricsStr, metricName, map[string]string{"pod_name": ""}) + res = metrics.ReformatJson(metricsStr, metricName, map[string]string{metrics.MetricLevelPodName: ""}) } response.WriteAsJson(res) } else { // multiple - rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelPod) + rawMetrics := metrics.GetPodLevelMetrics(requestParams) // sorting sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) // paging @@ -52,7 +52,7 @@ func MonitorContainer(request *restful.Request, response *restful.Response) { requestParams := prometheus.ParseMonitoringRequestParams(request) metricName := requestParams.MetricsName if requestParams.MetricsFilter != "" { - rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelContainer) + rawMetrics := metrics.GetContainerLevelMetrics(requestParams) // sorting sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) // paging @@ -70,7 +70,7 @@ func MonitorContainer(request *restful.Request, response *restful.Response) { func MonitorWorkload(request *restful.Request, response *restful.Response) { requestParams := prometheus.ParseMonitoringRequestParams(request) - rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkload) + rawMetrics := metrics.GetWorkloadLevelMetrics(requestParams) // sorting sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) @@ -89,7 +89,7 @@ func MonitorAllWorkspaces(request *restful.Request, response *restful.Response) tp := requestParams.Tp if tp == "statistics" { // merge multiple metric: all-devops, all-roles, all-projects...this api is designed for admin - res := metrics.MonitorAllWorkspacesStatistics() + res := metrics.GetAllWorkspacesStatistics() response.WriteAsJson(res) } else if tp == "rank" { @@ -114,7 +114,7 @@ func MonitorOneWorkspace(request *restful.Request, response *restful.Response) { tp := requestParams.Tp if tp == "rank" { // multiple - rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace) + rawMetrics := metrics.GetWorkspaceLevelMetrics(requestParams) // sorting sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) @@ -130,7 +130,7 @@ func MonitorOneWorkspace(request *restful.Request, response *restful.Response) { res := metrics.MonitorOneWorkspaceStatistics(wsName) response.WriteAsJson(res) } else { - res := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace) + res := metrics.GetWorkspaceLevelMetrics(requestParams) response.WriteAsJson(res) } } @@ -138,7 +138,7 @@ func MonitorOneWorkspace(request *restful.Request, response *restful.Response) { func MonitorNamespace(request *restful.Request, response *restful.Response) { requestParams := prometheus.ParseMonitoringRequestParams(request) // multiple - rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelNamespace) + rawMetrics := metrics.GetNamespaceLevelMetrics(requestParams) // sorting sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) @@ -155,12 +155,12 @@ func MonitorCluster(request *restful.Request, response *restful.Response) { // single queryType, params := metrics.AssembleClusterMetricRequestInfo(requestParams, metricName) metricsStr := prometheus.SendMonitoringRequest(prometheus.PrometheusEndpoint, queryType, params) - res := metrics.ReformatJson(metricsStr, metricName, map[string]string{"cluster": "local"}) + res := metrics.ReformatJson(metricsStr, metricName, map[string]string{metrics.MetricLevelCluster: "local"}) response.WriteAsJson(res) } else { // multiple - res := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelCluster) + res := metrics.GetClusterLevelMetrics(requestParams) response.WriteAsJson(res) } } @@ -173,7 +173,7 @@ func MonitorNode(request *restful.Request, response *restful.Response) { // single queryType, params := metrics.AssembleNodeMetricRequestInfo(requestParams, metricName) metricsStr := prometheus.SendMonitoringRequest(prometheus.PrometheusEndpoint, queryType, params) - res := metrics.ReformatJson(metricsStr, metricName, map[string]string{"node": ""}) + res := metrics.ReformatJson(metricsStr, metricName, map[string]string{metrics.MetricLevelNode: ""}) // The raw node-exporter result doesn't include ip address information // Thereby, append node ip address to .data.result[].metric @@ -183,7 +183,7 @@ func MonitorNode(request *restful.Request, response *restful.Response) { response.WriteAsJson(res) } else { // multiple - rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelNode) + rawMetrics := metrics.GetNodeLevelMetrics(requestParams) nodeAddress := metrics.GetNodeAddressInfo() for i := 0; i < len(rawMetrics.Results); i++ { @@ -206,7 +206,7 @@ func MonitorComponent(request *restful.Request, response *restful.Response) { requestParams.MetricsFilter = requestParams.ComponentName + "_.*" } - rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelComponent) + rawMetrics := metrics.GetComponentLevelMetrics(requestParams) response.WriteAsJson(rawMetrics) } diff --git a/pkg/models/metrics/metrics.go b/pkg/models/metrics/metrics.go index a250ab582..82c4b5ed1 100644 --- a/pkg/models/metrics/metrics.go +++ b/pkg/models/metrics/metrics.go @@ -294,7 +294,7 @@ func AddNodeAddressMetric(nodeMetric *FormatedMetric, nodeAddress *map[string][] metricDesc := nodeMetric.Data.Result[i][ResultItemMetric] metricDescMap, ensure := metricDesc.(map[string]interface{}) if ensure { - if nodeId, exist := metricDescMap["resource_name"]; exist { + if nodeId, exist := metricDescMap[ResultItemMetricResourceName]; exist { addr, exist := (*nodeAddress)[nodeId.(string)] if exist { metricDescMap["address"] = addr @@ -307,7 +307,7 @@ func AddNodeAddressMetric(nodeMetric *FormatedMetric, nodeAddress *map[string][] func MonitorContainer(monitoringRequest *client.MonitoringRequestParams, metricName string) *FormatedMetric { queryType, params := AssembleContainerMetricRequestInfo(monitoringRequest, metricName) metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - res := ReformatJson(metricsStr, metricName, map[string]string{"container_name": ""}) + res := ReformatJson(metricsStr, metricName, map[string]string{MetricLevelContainerName: ""}) return res } @@ -481,8 +481,7 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName) metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - ch <- ReformatJson(metricsStr, metricName, map[string]string{"resource_name": ws}) // It's adding "resource_name" field - + ch <- ReformatJson(metricsStr, metricName, map[string]string{ResultItemMetricResourceName: ws}) wg.Done() }(metricName) } @@ -508,7 +507,7 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w wsAllch <- &metricsArray } -func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resourceType string) *FormatedLevelMetric { +func GetClusterLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric { metricsFilter := monitoringRequest.MetricsFilter if metricsFilter == "" { metricsFilter = ".*" @@ -517,275 +516,180 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour var ch = make(chan *FormatedMetric, ChannelMaxCapacity) var wg sync.WaitGroup - switch resourceType { - case MetricLevelCluster: - { - for _, metricName := range ClusterMetricsNames { - matched, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && matched { - wg.Add(1) - go func(metricName string) { - queryType, params := AssembleClusterMetricRequestInfo(monitoringRequest, metricName) - metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - ch <- ReformatJson(metricsStr, metricName, map[string]string{"cluster": "local"}) - wg.Done() - }(metricName) - } - } + for _, metricName := range ClusterMetricsNames { + matched, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && matched { + wg.Add(1) + go func(metricName string) { + queryType, params := AssembleClusterMetricRequestInfo(monitoringRequest, metricName) + metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{MetricLevelCluster: "local"}) + wg.Done() + }(metricName) } - case MetricLevelNode: - { - for _, metricName := range NodeMetricsNames { - matched, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && matched { - wg.Add(1) - go func(metricName string) { - queryType, params := AssembleNodeMetricRequestInfo(monitoringRequest, metricName) - metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - ch <- ReformatJson(metricsStr, metricName, map[string]string{"node": ""}) - wg.Done() - }(metricName) - } - } + } + + wg.Wait() + close(ch) + + var metricsArray []FormatedMetric + + for oneMetric := range ch { + if oneMetric != nil { + metricsArray = append(metricsArray, *oneMetric) } - case MetricLevelWorkspace: - { - // a specific workspace's metrics - if monitoringRequest.WsName != "" { - namespaceArray, err := workspaces.WorkspaceNamespaces(monitoringRequest.WsName) - if err != nil { - glog.Errorln(err.Error()) - } - namespaceArray = filterNamespace(monitoringRequest.ResourcesFilter, namespaceArray) + } - if monitoringRequest.Tp == "rank" { - for _, metricName := range NamespaceMetricsNames { - if metricName == MetricNameWorkspaceAllProjectCount { - continue - } + return &FormatedLevelMetric{ + MetricsLevel: MetricLevelCluster, + Results: metricsArray, + } +} - matched, err := regexp.MatchString(metricsFilter, metricName) - if err != nil || !matched { - continue - } +func GetNodeLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric { + metricsFilter := monitoringRequest.MetricsFilter + if metricsFilter == "" { + metricsFilter = ".*" + } - wg.Add(1) - go func(metricName string) { + var ch = make(chan *FormatedMetric, ChannelMaxCapacity) + var wg sync.WaitGroup - var chForOneMetric = make(chan *FormatedMetric, ChannelMaxCapacity) - var wgForOneMetric sync.WaitGroup - - for _, ns := range namespaceArray { - wgForOneMetric.Add(1) - go func(metricName string, namespace string) { - - queryType, params := AssembleNamespaceMetricRequestInfoByNamesapce(monitoringRequest, namespace, metricName) - metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - chForOneMetric <- ReformatJson(metricsStr, metricName, map[string]string{"resource_name": namespace}) - wgForOneMetric.Done() - }(metricName, ns) - } - - wgForOneMetric.Wait() - close(chForOneMetric) - - // ranking is for vector type result only - aggregatedResult := FormatedMetric{MetricName: metricName, Status: "success", Data: FormatedMetricData{Result: []map[string]interface{}{}, ResultType: ResultTypeVector}} - - for oneMetric := range chForOneMetric { - - if oneMetric != nil { - - // wrapper layer 1: append .data.result[0] - if len(oneMetric.Data.Result) > 0 { - aggregatedResult.Data.Result = append(aggregatedResult.Data.Result, oneMetric.Data.Result[0]) - } - } - } - - ch <- &aggregatedResult - wg.Done() - }(metricName) - - } - - } else { - - workspace := monitoringRequest.WsName - - for _, metricName := range WorkspaceMetricsNames { - - if metricName == MetricNameWorkspaceAllProjectCount { - continue - } - - matched, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && matched { - wg.Add(1) - go func(metricName string, workspace string) { - queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName) - metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - ch <- ReformatJson(metricsStr, metricName, map[string]string{"resource_name": workspace}) - wg.Done() - }(metricName, workspace) - } - } - } - } else { - // sum all workspaces - - for _, metricName := range WorkspaceMetricsNames { - matched, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && matched { - - wg.Add(1) - - go func(metricName string) { - queryType, params := AssembleAllWorkspaceMetricRequestInfo(monitoringRequest, nil, metricName) - metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - ch <- ReformatJson(metricsStr, metricName, map[string]string{"workspace": "workspaces"}) - - wg.Done() - }(metricName) - } - } - } + for _, metricName := range NodeMetricsNames { + matched, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && matched { + wg.Add(1) + go func(metricName string) { + queryType, params := AssembleNodeMetricRequestInfo(monitoringRequest, metricName) + metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{MetricLevelNode: ""}) + wg.Done() + }(metricName) } - case MetricLevelNamespace: - { + } + + wg.Wait() + close(ch) + + var metricsArray []FormatedMetric + + for oneMetric := range ch { + if oneMetric != nil { + metricsArray = append(metricsArray, *oneMetric) + } + } + + return &FormatedLevelMetric{ + MetricsLevel: MetricLevelNode, + Results: metricsArray, + } +} + +func GetWorkspaceLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric { + metricsFilter := monitoringRequest.MetricsFilter + if metricsFilter == "" { + metricsFilter = ".*" + } + + var ch = make(chan *FormatedMetric, ChannelMaxCapacity) + var wg sync.WaitGroup + + // a specific workspace's metrics + if monitoringRequest.WsName != "" { + namespaceArray, err := workspaces.WorkspaceNamespaces(monitoringRequest.WsName) + if err != nil { + glog.Errorln(err.Error()) + } + namespaceArray = filterNamespace(monitoringRequest.ResourcesFilter, namespaceArray) + + if monitoringRequest.Tp == "rank" { for _, metricName := range NamespaceMetricsNames { - matched, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && matched { - wg.Add(1) - go func(metricName string) { - - queryType, params := AssembleNamespaceMetricRequestInfo(monitoringRequest, metricName) - metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - - rawResult := ReformatJson(metricsStr, metricName, map[string]string{"namespace": ""}) - ch <- rawResult - - wg.Done() - }(metricName) + if metricName == MetricNameWorkspaceAllProjectCount { + continue } - } - } - case MetricLevelWorkload: - { - if monitoringRequest.WorkloadName == "" { - for _, metricName := range WorkloadMetricsNames { - matched, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && matched { - wg.Add(1) - go func(metricName string) { - queryType, params := AssembleAllWorkloadMetricRequestInfo(monitoringRequest, metricName) + + matched, err := regexp.MatchString(metricsFilter, metricName) + if err != nil || !matched { + continue + } + + wg.Add(1) + go func(metricName string) { + + var chForOneMetric = make(chan *FormatedMetric, ChannelMaxCapacity) + var wgForOneMetric sync.WaitGroup + + for _, ns := range namespaceArray { + wgForOneMetric.Add(1) + go func(metricName string, namespace string) { + + queryType, params := AssembleNamespaceMetricRequestInfoByNamesapce(monitoringRequest, namespace, metricName) metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - reformattedResult := ReformatJson(metricsStr, metricName, map[string]string{"workload": ""}) - // no need to append a null result - ch <- reformattedResult - wg.Done() - }(metricName) + chForOneMetric <- ReformatJson(metricsStr, metricName, map[string]string{ResultItemMetricResourceName: namespace}) + wgForOneMetric.Done() + }(metricName, ns) } - } - } else { - for _, metricName := range WorkloadMetricsNames { - bol, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && bol { - wg.Add(1) - go func(metricName string) { - metricName = strings.TrimLeft(metricName, "workload_") - queryType, params, nullRule := AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest, metricName) - if !nullRule { - metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - fmtMetrics := ReformatJson(metricsStr, metricName, map[string]string{"pod_name": ""}) - unifyMetricHistoryTimeRange(fmtMetrics) - ch <- fmtMetrics - } - wg.Done() - }(metricName) - } - } - } - } - case MetricLevelPod: - { - for _, metricName := range PodMetricsNames { - matched, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && matched { - wg.Add(1) - go func(metricName string) { - queryType, params, nullRule := AssemblePodMetricRequestInfo(monitoringRequest, metricName) - if !nullRule { - metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - ch <- ReformatJson(metricsStr, metricName, map[string]string{"pod_name": ""}) - } else { - ch <- nil - } - wg.Done() - }(metricName) - } - } - } - case MetricLevelContainer: - { - for _, metricName := range ContainerMetricsNames { - matched, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && matched { - wg.Add(1) - go func(metricName string) { - queryType, params := AssembleContainerMetricRequestInfo(monitoringRequest, metricName) - metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) - ch <- ReformatJson(metricsStr, metricName, map[string]string{"container_name": ""}) - wg.Done() - }(metricName) - } - } - } - case MetricLevelComponent: - { - for _, metricName := range ComponentMetricsNames { - matched, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && matched { - wg.Add(1) - go func(metricName string) { - queryType, params := AssembleComponentRequestInfo(monitoringRequest, metricName) - metricsStr := client.SendMonitoringRequest(client.SecondaryPrometheusEndpoint, queryType, params) - formattedJson := ReformatJson(metricsStr, metricName, map[string]string{"resource_name": monitoringRequest.ComponentName}) - if metricName == "etcd_server_list" { + wgForOneMetric.Wait() + close(chForOneMetric) - nodeMap := make(map[string]string, 0) + // ranking is for vector type result only + aggregatedResult := FormatedMetric{MetricName: metricName, Status: MetricStatusSuccess, Data: FormatedMetricData{Result: []map[string]interface{}{}, ResultType: ResultTypeVector}} - nodeAddress := GetNodeAddressInfo() - for nodeName, nodeInfo := range *nodeAddress { + for oneMetric := range chForOneMetric { - var nodeIp string - for _, item := range nodeInfo { - if item.Type == v1.NodeInternalIP { - nodeIp = item.Address - break - } - } + if oneMetric != nil { - nodeMap[nodeIp] = nodeName - } - - // add node_name label to metrics - for i := 0; i < len(formattedJson.Data.Result); i++ { - metricDesc := formattedJson.Data.Result[i][ResultItemMetric] - metricDescMap, ensure := metricDesc.(map[string]interface{}) - if ensure { - if nodeIp, exist := metricDescMap[ResultItemMetricNodeIp]; exist { - metricDescMap[ResultItemMetricNodeName] = nodeMap[nodeIp.(string)] - } - } + // append .data.result[0] + if len(oneMetric.Data.Result) > 0 { + aggregatedResult.Data.Result = append(aggregatedResult.Data.Result, oneMetric.Data.Result[0]) } } + } - ch <- formattedJson - wg.Done() - }(metricName) + ch <- &aggregatedResult + wg.Done() + }(metricName) + + } + + } else { + + workspace := monitoringRequest.WsName + + for _, metricName := range WorkspaceMetricsNames { + + if metricName == MetricNameWorkspaceAllProjectCount { + continue } + + matched, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && matched { + wg.Add(1) + go func(metricName string, workspace string) { + queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName) + metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{ResultItemMetricResourceName: workspace}) + wg.Done() + }(metricName, workspace) + } + } + } + } else { + // sum all workspaces + for _, metricName := range WorkspaceMetricsNames { + matched, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && matched { + + wg.Add(1) + + go func(metricName string) { + queryType, params := AssembleAllWorkspaceMetricRequestInfo(monitoringRequest, nil, metricName) + metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{MetricLevelWorkspace: "workspaces"}) + + wg.Done() + }(metricName) } } } @@ -801,14 +705,270 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour } } - // wrapper layer 2: return &FormatedLevelMetric{ - MetricsLevel: resourceType, + MetricsLevel: MetricLevelWorkspace, Results: metricsArray, } } -func MonitorAllWorkspacesStatistics() *FormatedLevelMetric { +func GetNamespaceLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric { + metricsFilter := monitoringRequest.MetricsFilter + if metricsFilter == "" { + metricsFilter = ".*" + } + + var ch = make(chan *FormatedMetric, ChannelMaxCapacity) + var wg sync.WaitGroup + + for _, metricName := range NamespaceMetricsNames { + matched, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && matched { + wg.Add(1) + go func(metricName string) { + + queryType, params := AssembleNamespaceMetricRequestInfo(monitoringRequest, metricName) + metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) + + rawResult := ReformatJson(metricsStr, metricName, map[string]string{MetricLevelNamespace: ""}) + ch <- rawResult + + wg.Done() + }(metricName) + } + } + + wg.Wait() + close(ch) + + var metricsArray []FormatedMetric + + for oneMetric := range ch { + if oneMetric != nil { + metricsArray = append(metricsArray, *oneMetric) + } + } + + return &FormatedLevelMetric{ + MetricsLevel: MetricLevelNamespace, + Results: metricsArray, + } +} + +func GetWorkloadLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric { + metricsFilter := monitoringRequest.MetricsFilter + if metricsFilter == "" { + metricsFilter = ".*" + } + + var ch = make(chan *FormatedMetric, ChannelMaxCapacity) + var wg sync.WaitGroup + + if monitoringRequest.WorkloadName == "" { + for _, metricName := range WorkloadMetricsNames { + matched, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && matched { + wg.Add(1) + go func(metricName string) { + queryType, params := AssembleAllWorkloadMetricRequestInfo(monitoringRequest, metricName) + metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) + reformattedResult := ReformatJson(metricsStr, metricName, map[string]string{MetricLevelWorkload: ""}) + // no need to append a null result + ch <- reformattedResult + wg.Done() + }(metricName) + } + } + } else { + for _, metricName := range WorkloadMetricsNames { + bol, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && bol { + wg.Add(1) + go func(metricName string) { + metricName = strings.TrimLeft(metricName, "workload_") + queryType, params, nullRule := AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest, metricName) + if !nullRule { + metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) + fmtMetrics := ReformatJson(metricsStr, metricName, map[string]string{MetricLevelPodName: ""}) + unifyMetricHistoryTimeRange(fmtMetrics) + ch <- fmtMetrics + } + wg.Done() + }(metricName) + } + } + } + + wg.Wait() + close(ch) + + var metricsArray []FormatedMetric + + for oneMetric := range ch { + if oneMetric != nil { + metricsArray = append(metricsArray, *oneMetric) + } + } + + return &FormatedLevelMetric{ + MetricsLevel: MetricLevelWorkload, + Results: metricsArray, + } +} + +func GetPodLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric { + metricsFilter := monitoringRequest.MetricsFilter + if metricsFilter == "" { + metricsFilter = ".*" + } + + var ch = make(chan *FormatedMetric, ChannelMaxCapacity) + var wg sync.WaitGroup + + for _, metricName := range PodMetricsNames { + matched, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && matched { + wg.Add(1) + go func(metricName string) { + queryType, params, nullRule := AssemblePodMetricRequestInfo(monitoringRequest, metricName) + if !nullRule { + metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{MetricLevelPodName: ""}) + } else { + ch <- nil + } + wg.Done() + }(metricName) + } + } + + wg.Wait() + close(ch) + + var metricsArray []FormatedMetric + + for oneMetric := range ch { + if oneMetric != nil { + metricsArray = append(metricsArray, *oneMetric) + } + } + + return &FormatedLevelMetric{ + MetricsLevel: MetricLevelPod, + Results: metricsArray, + } +} + +func GetContainerLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric { + metricsFilter := monitoringRequest.MetricsFilter + if metricsFilter == "" { + metricsFilter = ".*" + } + + var ch = make(chan *FormatedMetric, ChannelMaxCapacity) + var wg sync.WaitGroup + + for _, metricName := range ContainerMetricsNames { + matched, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && matched { + wg.Add(1) + go func(metricName string) { + queryType, params := AssembleContainerMetricRequestInfo(monitoringRequest, metricName) + metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) + ch <- ReformatJson(metricsStr, metricName, map[string]string{MetricLevelContainerName: ""}) + wg.Done() + }(metricName) + } + } + + wg.Wait() + close(ch) + + var metricsArray []FormatedMetric + + for oneMetric := range ch { + if oneMetric != nil { + metricsArray = append(metricsArray, *oneMetric) + } + } + + return &FormatedLevelMetric{ + MetricsLevel: MetricLevelContainer, + Results: metricsArray, + } +} + +func GetComponentLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric { + metricsFilter := monitoringRequest.MetricsFilter + if metricsFilter == "" { + metricsFilter = ".*" + } + + var ch = make(chan *FormatedMetric, ChannelMaxCapacity) + var wg sync.WaitGroup + + for _, metricName := range ComponentMetricsNames { + matched, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && matched { + wg.Add(1) + go func(metricName string) { + queryType, params := AssembleComponentRequestInfo(monitoringRequest, metricName) + metricsStr := client.SendMonitoringRequest(client.SecondaryPrometheusEndpoint, queryType, params) + formattedJson := ReformatJson(metricsStr, metricName, map[string]string{ResultItemMetricResourceName: monitoringRequest.ComponentName}) + + if metricName == EtcdServerList { + + nodeMap := make(map[string]string, 0) + + nodeAddress := GetNodeAddressInfo() + for nodeName, nodeInfo := range *nodeAddress { + + var nodeIp string + for _, item := range nodeInfo { + if item.Type == v1.NodeInternalIP { + nodeIp = item.Address + break + } + } + + nodeMap[nodeIp] = nodeName + } + + // add node_name label to metrics + for i := 0; i < len(formattedJson.Data.Result); i++ { + metricDesc := formattedJson.Data.Result[i][ResultItemMetric] + metricDescMap, ensure := metricDesc.(map[string]interface{}) + if ensure { + if nodeIp, exist := metricDescMap[ResultItemMetricNodeIp]; exist { + metricDescMap[ResultItemMetricNodeName] = nodeMap[nodeIp.(string)] + } + } + } + } + + ch <- formattedJson + wg.Done() + }(metricName) + } + } + + wg.Wait() + close(ch) + + var metricsArray []FormatedMetric + + for oneMetric := range ch { + if oneMetric != nil { + metricsArray = append(metricsArray, *oneMetric) + } + } + + return &FormatedLevelMetric{ + MetricsLevel: MetricLevelComponent, + Results: metricsArray, + } +} + +func GetAllWorkspacesStatistics() *FormatedLevelMetric { wg := sync.WaitGroup{} var metricsArray []FormatedMetric diff --git a/pkg/models/metrics/metricsruleconst.go b/pkg/models/metrics/metricsruleconst.go index 069b5ad3f..4147c6f1d 100644 --- a/pkg/models/metrics/metricsruleconst.go +++ b/pkg/models/metrics/metricsruleconst.go @@ -14,19 +14,20 @@ limitations under the License. package metrics const ( - ResultTypeVector = "vector" - ResultTypeMatrix = "matrix" - MetricStatus = "status" - MetricStatusError = "error" - MetricStatusSuccess = "success" - ResultItemMetric = "metric" - ResultItemMetricResource = "resource" - ResultItemMetricNodeIp = "node_ip" - ResultItemMetricNodeName = "node_name" - ResultItemValue = "value" - ResultItemValues = "values" - ResultSortTypeDesc = "desc" - ResultSortTypeAsc = "asc" + ResultTypeVector = "vector" + ResultTypeMatrix = "matrix" + MetricStatus = "status" + MetricStatusError = "error" + MetricStatusSuccess = "success" + ResultItemMetric = "metric" + ResultItemMetricResource = "resource" + ResultItemMetricResourceName = "resource_name" + ResultItemMetricNodeIp = "node_ip" + ResultItemMetricNodeName = "node_name" + ResultItemValue = "value" + ResultItemValues = "values" + ResultSortTypeDesc = "desc" + ResultSortTypeAsc = "asc" ) const ( @@ -90,6 +91,12 @@ const ( WorkspaceJoinedKey = "label_kubesphere_io_workspace" ) +// The metrics need to include extra info out of prometheus +// eg. add node name info to the etcd_server_list metric +const ( + EtcdServerList = "etcd_server_list" +) + type MetricMap map[string]string var ClusterMetricsNames = []string{ diff --git a/pkg/models/metrics/namespaces.go b/pkg/models/metrics/namespaces.go index 3db24b027..ca48b4b14 100644 --- a/pkg/models/metrics/namespaces.go +++ b/pkg/models/metrics/namespaces.go @@ -41,14 +41,14 @@ func GetNamespacesWithMetrics(namespaces []*v1.Namespace) []*v1.Namespace { MetricsFilter: "namespace_cpu_usage|namespace_memory_usage_wo_cache|namespace_pod_count", } - rawMetrics := MonitorAllMetrics(¶ms, MetricLevelNamespace) + rawMetrics := GetNamespaceLevelMetrics(¶ms) for _, result := range rawMetrics.Results { for _, data := range result.Data.Result { - metricDescMap, ok := data["metric"].(map[string]interface{}) + metricDescMap, ok := data[ResultItemMetric].(map[string]interface{}) if ok { - if ns, exist := metricDescMap["resource_name"]; exist { - timeAndValue, ok := data["value"].([]interface{}) + if ns, exist := metricDescMap[ResultItemMetricResourceName]; exist { + timeAndValue, ok := data[ResultItemValue].([]interface{}) if ok && len(timeAndValue) == 2 { for i := 0; i < len(namespaces); i++ { if namespaces[i].Name == ns { diff --git a/pkg/models/metrics/util.go b/pkg/models/metrics/util.go index 75d72f568..ad6662ceb 100644 --- a/pkg/models/metrics/util.go +++ b/pkg/models/metrics/util.go @@ -89,8 +89,8 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri v1, _ := strconv.ParseFloat(value1[len(value1)-1].(string), 64) v2, _ := strconv.ParseFloat(value2[len(value2)-1].(string), 64) if v1 == v2 { - resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})["resource_name"] - resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})["resource_name"] + resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName] + resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName] return resourceName1.(string) < resourceName2.(string) } @@ -105,8 +105,8 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri v2, _ := strconv.ParseFloat(value2[len(value2)-1].(string), 64) if v1 == v2 { - resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})["resource_name"] - resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})["resource_name"] + resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName] + resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName] return resourceName1.(string) > resourceName2.(string) } @@ -116,8 +116,8 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri for _, r := range metricItem.Data.Result { // record the ordering of resource_name to indexMap - // example: {"metric":{"resource_name": "Deployment:xxx"},"value":[1541142931.731,"3"]} - resourceName, exist := r[ResultItemMetric].(map[string]interface{})["resource_name"] + // example: {"metric":{ResultItemMetricResourceName: "Deployment:xxx"},"value":[1541142931.731,"3"]} + resourceName, exist := r[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName] if exist { if _, exist := indexMap[resourceName.(string)]; !exist { indexMap[resourceName.(string)] = i @@ -129,7 +129,7 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri // iterator all metric to find max metricItems length for _, r := range metricItem.Data.Result { - k, ok := r[ResultItemMetric].(map[string]interface{})["resource_name"] + k, ok := r[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName] if ok { currentResourceMap[k.(string)] = 1 } @@ -158,7 +158,7 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri sortedMetric := make([]map[string]interface{}, len(indexMap)) for j := 0; j < len(re.Data.Result); j++ { r := re.Data.Result[j] - k, exist := r[ResultItemMetric].(map[string]interface{})["resource_name"] + k, exist := r[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName] if exist { index, exist := indexMap[k.(string)] if exist { @@ -290,9 +290,9 @@ func ReformatJson(metric string, metricsName string, needAddParams map[string]st for n := range needAddParams { if v, ok := metricMap[n]; ok { delete(metricMap, n) - metricMap["resource_name"] = v + metricMap[ResultItemMetricResourceName] = v } else { - metricMap["resource_name"] = needAddParams[n] + metricMap[ResultItemMetricResourceName] = needAddParams[n] } } }