refactor monitoring modules

Signed-off-by: huanggze <loganhuang@yunify.com>
This commit is contained in:
huanggze
2019-04-28 13:43:09 +08:00
committed by zryfish
parent 5d8fd5c6ac
commit 20a4525d58
5 changed files with 464 additions and 297 deletions

View File

@@ -33,13 +33,13 @@ func MonitorPod(request *restful.Request, response *restful.Response) {
var res *metrics.FormatedMetric var res *metrics.FormatedMetric
if !nullRule { if !nullRule {
metricsStr := prometheus.SendMonitoringRequest(prometheus.PrometheusEndpoint, queryType, params) metricsStr := prometheus.SendMonitoringRequest(prometheus.PrometheusEndpoint, queryType, params)
res = metrics.ReformatJson(metricsStr, metricName, map[string]string{"pod_name": ""}) res = metrics.ReformatJson(metricsStr, metricName, map[string]string{metrics.MetricLevelPodName: ""})
} }
response.WriteAsJson(res) response.WriteAsJson(res)
} else { } else {
// multiple // multiple
rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelPod) rawMetrics := metrics.GetPodLevelMetrics(requestParams)
// sorting // sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
// paging // paging
@@ -52,7 +52,7 @@ func MonitorContainer(request *restful.Request, response *restful.Response) {
requestParams := prometheus.ParseMonitoringRequestParams(request) requestParams := prometheus.ParseMonitoringRequestParams(request)
metricName := requestParams.MetricsName metricName := requestParams.MetricsName
if requestParams.MetricsFilter != "" { if requestParams.MetricsFilter != "" {
rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelContainer) rawMetrics := metrics.GetContainerLevelMetrics(requestParams)
// sorting // sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
// paging // paging
@@ -70,7 +70,7 @@ func MonitorContainer(request *restful.Request, response *restful.Response) {
func MonitorWorkload(request *restful.Request, response *restful.Response) { func MonitorWorkload(request *restful.Request, response *restful.Response) {
requestParams := prometheus.ParseMonitoringRequestParams(request) requestParams := prometheus.ParseMonitoringRequestParams(request)
rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkload) rawMetrics := metrics.GetWorkloadLevelMetrics(requestParams)
// sorting // sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
@@ -89,7 +89,7 @@ func MonitorAllWorkspaces(request *restful.Request, response *restful.Response)
tp := requestParams.Tp tp := requestParams.Tp
if tp == "statistics" { if tp == "statistics" {
// merge multiple metric: all-devops, all-roles, all-projects...this api is designed for admin // merge multiple metric: all-devops, all-roles, all-projects...this api is designed for admin
res := metrics.MonitorAllWorkspacesStatistics() res := metrics.GetAllWorkspacesStatistics()
response.WriteAsJson(res) response.WriteAsJson(res)
} else if tp == "rank" { } else if tp == "rank" {
@@ -114,7 +114,7 @@ func MonitorOneWorkspace(request *restful.Request, response *restful.Response) {
tp := requestParams.Tp tp := requestParams.Tp
if tp == "rank" { if tp == "rank" {
// multiple // multiple
rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace) rawMetrics := metrics.GetWorkspaceLevelMetrics(requestParams)
// sorting // sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
@@ -130,7 +130,7 @@ func MonitorOneWorkspace(request *restful.Request, response *restful.Response) {
res := metrics.MonitorOneWorkspaceStatistics(wsName) res := metrics.MonitorOneWorkspaceStatistics(wsName)
response.WriteAsJson(res) response.WriteAsJson(res)
} else { } else {
res := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace) res := metrics.GetWorkspaceLevelMetrics(requestParams)
response.WriteAsJson(res) response.WriteAsJson(res)
} }
} }
@@ -138,7 +138,7 @@ func MonitorOneWorkspace(request *restful.Request, response *restful.Response) {
func MonitorNamespace(request *restful.Request, response *restful.Response) { func MonitorNamespace(request *restful.Request, response *restful.Response) {
requestParams := prometheus.ParseMonitoringRequestParams(request) requestParams := prometheus.ParseMonitoringRequestParams(request)
// multiple // multiple
rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelNamespace) rawMetrics := metrics.GetNamespaceLevelMetrics(requestParams)
// sorting // sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics) sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
@@ -155,12 +155,12 @@ func MonitorCluster(request *restful.Request, response *restful.Response) {
// single // single
queryType, params := metrics.AssembleClusterMetricRequestInfo(requestParams, metricName) queryType, params := metrics.AssembleClusterMetricRequestInfo(requestParams, metricName)
metricsStr := prometheus.SendMonitoringRequest(prometheus.PrometheusEndpoint, queryType, params) metricsStr := prometheus.SendMonitoringRequest(prometheus.PrometheusEndpoint, queryType, params)
res := metrics.ReformatJson(metricsStr, metricName, map[string]string{"cluster": "local"}) res := metrics.ReformatJson(metricsStr, metricName, map[string]string{metrics.MetricLevelCluster: "local"})
response.WriteAsJson(res) response.WriteAsJson(res)
} else { } else {
// multiple // multiple
res := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelCluster) res := metrics.GetClusterLevelMetrics(requestParams)
response.WriteAsJson(res) response.WriteAsJson(res)
} }
} }
@@ -173,7 +173,7 @@ func MonitorNode(request *restful.Request, response *restful.Response) {
// single // single
queryType, params := metrics.AssembleNodeMetricRequestInfo(requestParams, metricName) queryType, params := metrics.AssembleNodeMetricRequestInfo(requestParams, metricName)
metricsStr := prometheus.SendMonitoringRequest(prometheus.PrometheusEndpoint, queryType, params) metricsStr := prometheus.SendMonitoringRequest(prometheus.PrometheusEndpoint, queryType, params)
res := metrics.ReformatJson(metricsStr, metricName, map[string]string{"node": ""}) res := metrics.ReformatJson(metricsStr, metricName, map[string]string{metrics.MetricLevelNode: ""})
// The raw node-exporter result doesn't include ip address information // The raw node-exporter result doesn't include ip address information
// Thereby, append node ip address to .data.result[].metric // Thereby, append node ip address to .data.result[].metric
@@ -183,7 +183,7 @@ func MonitorNode(request *restful.Request, response *restful.Response) {
response.WriteAsJson(res) response.WriteAsJson(res)
} else { } else {
// multiple // multiple
rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelNode) rawMetrics := metrics.GetNodeLevelMetrics(requestParams)
nodeAddress := metrics.GetNodeAddressInfo() nodeAddress := metrics.GetNodeAddressInfo()
for i := 0; i < len(rawMetrics.Results); i++ { for i := 0; i < len(rawMetrics.Results); i++ {
@@ -206,7 +206,7 @@ func MonitorComponent(request *restful.Request, response *restful.Response) {
requestParams.MetricsFilter = requestParams.ComponentName + "_.*" requestParams.MetricsFilter = requestParams.ComponentName + "_.*"
} }
rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelComponent) rawMetrics := metrics.GetComponentLevelMetrics(requestParams)
response.WriteAsJson(rawMetrics) response.WriteAsJson(rawMetrics)
} }

View File

@@ -294,7 +294,7 @@ func AddNodeAddressMetric(nodeMetric *FormatedMetric, nodeAddress *map[string][]
metricDesc := nodeMetric.Data.Result[i][ResultItemMetric] metricDesc := nodeMetric.Data.Result[i][ResultItemMetric]
metricDescMap, ensure := metricDesc.(map[string]interface{}) metricDescMap, ensure := metricDesc.(map[string]interface{})
if ensure { if ensure {
if nodeId, exist := metricDescMap["resource_name"]; exist { if nodeId, exist := metricDescMap[ResultItemMetricResourceName]; exist {
addr, exist := (*nodeAddress)[nodeId.(string)] addr, exist := (*nodeAddress)[nodeId.(string)]
if exist { if exist {
metricDescMap["address"] = addr metricDescMap["address"] = addr
@@ -307,7 +307,7 @@ func AddNodeAddressMetric(nodeMetric *FormatedMetric, nodeAddress *map[string][]
func MonitorContainer(monitoringRequest *client.MonitoringRequestParams, metricName string) *FormatedMetric { func MonitorContainer(monitoringRequest *client.MonitoringRequestParams, metricName string) *FormatedMetric {
queryType, params := AssembleContainerMetricRequestInfo(monitoringRequest, metricName) queryType, params := AssembleContainerMetricRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
res := ReformatJson(metricsStr, metricName, map[string]string{"container_name": ""}) res := ReformatJson(metricsStr, metricName, map[string]string{MetricLevelContainerName: ""})
return res return res
} }
@@ -481,8 +481,7 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w
queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName) queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"resource_name": ws}) // It's adding "resource_name" field ch <- ReformatJson(metricsStr, metricName, map[string]string{ResultItemMetricResourceName: ws})
wg.Done() wg.Done()
}(metricName) }(metricName)
} }
@@ -508,7 +507,7 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w
wsAllch <- &metricsArray wsAllch <- &metricsArray
} }
func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resourceType string) *FormatedLevelMetric { func GetClusterLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric {
metricsFilter := monitoringRequest.MetricsFilter metricsFilter := monitoringRequest.MetricsFilter
if metricsFilter == "" { if metricsFilter == "" {
metricsFilter = ".*" metricsFilter = ".*"
@@ -517,275 +516,180 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
var ch = make(chan *FormatedMetric, ChannelMaxCapacity) var ch = make(chan *FormatedMetric, ChannelMaxCapacity)
var wg sync.WaitGroup var wg sync.WaitGroup
switch resourceType { for _, metricName := range ClusterMetricsNames {
case MetricLevelCluster: matched, err := regexp.MatchString(metricsFilter, metricName)
{ if err == nil && matched {
for _, metricName := range ClusterMetricsNames { wg.Add(1)
matched, err := regexp.MatchString(metricsFilter, metricName) go func(metricName string) {
if err == nil && matched { queryType, params := AssembleClusterMetricRequestInfo(monitoringRequest, metricName)
wg.Add(1) metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
go func(metricName string) { ch <- ReformatJson(metricsStr, metricName, map[string]string{MetricLevelCluster: "local"})
queryType, params := AssembleClusterMetricRequestInfo(monitoringRequest, metricName) wg.Done()
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) }(metricName)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"cluster": "local"})
wg.Done()
}(metricName)
}
}
} }
case MetricLevelNode: }
{
for _, metricName := range NodeMetricsNames { wg.Wait()
matched, err := regexp.MatchString(metricsFilter, metricName) close(ch)
if err == nil && matched {
wg.Add(1) var metricsArray []FormatedMetric
go func(metricName string) {
queryType, params := AssembleNodeMetricRequestInfo(monitoringRequest, metricName) for oneMetric := range ch {
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) if oneMetric != nil {
ch <- ReformatJson(metricsStr, metricName, map[string]string{"node": ""}) metricsArray = append(metricsArray, *oneMetric)
wg.Done()
}(metricName)
}
}
} }
case MetricLevelWorkspace: }
{
// a specific workspace's metrics
if monitoringRequest.WsName != "" {
namespaceArray, err := workspaces.WorkspaceNamespaces(monitoringRequest.WsName)
if err != nil {
glog.Errorln(err.Error())
}
namespaceArray = filterNamespace(monitoringRequest.ResourcesFilter, namespaceArray)
if monitoringRequest.Tp == "rank" { return &FormatedLevelMetric{
for _, metricName := range NamespaceMetricsNames { MetricsLevel: MetricLevelCluster,
if metricName == MetricNameWorkspaceAllProjectCount { Results: metricsArray,
continue }
} }
matched, err := regexp.MatchString(metricsFilter, metricName) func GetNodeLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric {
if err != nil || !matched { metricsFilter := monitoringRequest.MetricsFilter
continue if metricsFilter == "" {
} metricsFilter = ".*"
}
wg.Add(1) var ch = make(chan *FormatedMetric, ChannelMaxCapacity)
go func(metricName string) { var wg sync.WaitGroup
var chForOneMetric = make(chan *FormatedMetric, ChannelMaxCapacity) for _, metricName := range NodeMetricsNames {
var wgForOneMetric sync.WaitGroup matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
for _, ns := range namespaceArray { wg.Add(1)
wgForOneMetric.Add(1) go func(metricName string) {
go func(metricName string, namespace string) { queryType, params := AssembleNodeMetricRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
queryType, params := AssembleNamespaceMetricRequestInfoByNamesapce(monitoringRequest, namespace, metricName) ch <- ReformatJson(metricsStr, metricName, map[string]string{MetricLevelNode: ""})
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) wg.Done()
chForOneMetric <- ReformatJson(metricsStr, metricName, map[string]string{"resource_name": namespace}) }(metricName)
wgForOneMetric.Done()
}(metricName, ns)
}
wgForOneMetric.Wait()
close(chForOneMetric)
// ranking is for vector type result only
aggregatedResult := FormatedMetric{MetricName: metricName, Status: "success", Data: FormatedMetricData{Result: []map[string]interface{}{}, ResultType: ResultTypeVector}}
for oneMetric := range chForOneMetric {
if oneMetric != nil {
// wrapper layer 1: append .data.result[0]
if len(oneMetric.Data.Result) > 0 {
aggregatedResult.Data.Result = append(aggregatedResult.Data.Result, oneMetric.Data.Result[0])
}
}
}
ch <- &aggregatedResult
wg.Done()
}(metricName)
}
} else {
workspace := monitoringRequest.WsName
for _, metricName := range WorkspaceMetricsNames {
if metricName == MetricNameWorkspaceAllProjectCount {
continue
}
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string, workspace string) {
queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"resource_name": workspace})
wg.Done()
}(metricName, workspace)
}
}
}
} else {
// sum all workspaces
for _, metricName := range WorkspaceMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleAllWorkspaceMetricRequestInfo(monitoringRequest, nil, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"workspace": "workspaces"})
wg.Done()
}(metricName)
}
}
}
} }
case MetricLevelNamespace: }
{
wg.Wait()
close(ch)
var metricsArray []FormatedMetric
for oneMetric := range ch {
if oneMetric != nil {
metricsArray = append(metricsArray, *oneMetric)
}
}
return &FormatedLevelMetric{
MetricsLevel: MetricLevelNode,
Results: metricsArray,
}
}
func GetWorkspaceLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric {
metricsFilter := monitoringRequest.MetricsFilter
if metricsFilter == "" {
metricsFilter = ".*"
}
var ch = make(chan *FormatedMetric, ChannelMaxCapacity)
var wg sync.WaitGroup
// a specific workspace's metrics
if monitoringRequest.WsName != "" {
namespaceArray, err := workspaces.WorkspaceNamespaces(monitoringRequest.WsName)
if err != nil {
glog.Errorln(err.Error())
}
namespaceArray = filterNamespace(monitoringRequest.ResourcesFilter, namespaceArray)
if monitoringRequest.Tp == "rank" {
for _, metricName := range NamespaceMetricsNames { for _, metricName := range NamespaceMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName) if metricName == MetricNameWorkspaceAllProjectCount {
if err == nil && matched { continue
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleNamespaceMetricRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
rawResult := ReformatJson(metricsStr, metricName, map[string]string{"namespace": ""})
ch <- rawResult
wg.Done()
}(metricName)
} }
}
} matched, err := regexp.MatchString(metricsFilter, metricName)
case MetricLevelWorkload: if err != nil || !matched {
{ continue
if monitoringRequest.WorkloadName == "" { }
for _, metricName := range WorkloadMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName) wg.Add(1)
if err == nil && matched { go func(metricName string) {
wg.Add(1)
go func(metricName string) { var chForOneMetric = make(chan *FormatedMetric, ChannelMaxCapacity)
queryType, params := AssembleAllWorkloadMetricRequestInfo(monitoringRequest, metricName) var wgForOneMetric sync.WaitGroup
for _, ns := range namespaceArray {
wgForOneMetric.Add(1)
go func(metricName string, namespace string) {
queryType, params := AssembleNamespaceMetricRequestInfoByNamesapce(monitoringRequest, namespace, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params) metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
reformattedResult := ReformatJson(metricsStr, metricName, map[string]string{"workload": ""}) chForOneMetric <- ReformatJson(metricsStr, metricName, map[string]string{ResultItemMetricResourceName: namespace})
// no need to append a null result wgForOneMetric.Done()
ch <- reformattedResult }(metricName, ns)
wg.Done()
}(metricName)
} }
}
} else {
for _, metricName := range WorkloadMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
wg.Add(1)
go func(metricName string) {
metricName = strings.TrimLeft(metricName, "workload_")
queryType, params, nullRule := AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest, metricName)
if !nullRule {
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
fmtMetrics := ReformatJson(metricsStr, metricName, map[string]string{"pod_name": ""})
unifyMetricHistoryTimeRange(fmtMetrics)
ch <- fmtMetrics
}
wg.Done()
}(metricName)
}
}
}
}
case MetricLevelPod:
{
for _, metricName := range PodMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params, nullRule := AssemblePodMetricRequestInfo(monitoringRequest, metricName)
if !nullRule {
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"pod_name": ""})
} else {
ch <- nil
}
wg.Done()
}(metricName)
}
}
}
case MetricLevelContainer:
{
for _, metricName := range ContainerMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleContainerMetricRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"container_name": ""})
wg.Done()
}(metricName)
}
}
}
case MetricLevelComponent:
{
for _, metricName := range ComponentMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleComponentRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(client.SecondaryPrometheusEndpoint, queryType, params)
formattedJson := ReformatJson(metricsStr, metricName, map[string]string{"resource_name": monitoringRequest.ComponentName})
if metricName == "etcd_server_list" { wgForOneMetric.Wait()
close(chForOneMetric)
nodeMap := make(map[string]string, 0) // ranking is for vector type result only
aggregatedResult := FormatedMetric{MetricName: metricName, Status: MetricStatusSuccess, Data: FormatedMetricData{Result: []map[string]interface{}{}, ResultType: ResultTypeVector}}
nodeAddress := GetNodeAddressInfo() for oneMetric := range chForOneMetric {
for nodeName, nodeInfo := range *nodeAddress {
var nodeIp string if oneMetric != nil {
for _, item := range nodeInfo {
if item.Type == v1.NodeInternalIP {
nodeIp = item.Address
break
}
}
nodeMap[nodeIp] = nodeName // append .data.result[0]
} if len(oneMetric.Data.Result) > 0 {
aggregatedResult.Data.Result = append(aggregatedResult.Data.Result, oneMetric.Data.Result[0])
// add node_name label to metrics
for i := 0; i < len(formattedJson.Data.Result); i++ {
metricDesc := formattedJson.Data.Result[i][ResultItemMetric]
metricDescMap, ensure := metricDesc.(map[string]interface{})
if ensure {
if nodeIp, exist := metricDescMap[ResultItemMetricNodeIp]; exist {
metricDescMap[ResultItemMetricNodeName] = nodeMap[nodeIp.(string)]
}
}
} }
} }
}
ch <- formattedJson ch <- &aggregatedResult
wg.Done() wg.Done()
}(metricName) }(metricName)
}
} else {
workspace := monitoringRequest.WsName
for _, metricName := range WorkspaceMetricsNames {
if metricName == MetricNameWorkspaceAllProjectCount {
continue
} }
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string, workspace string) {
queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{ResultItemMetricResourceName: workspace})
wg.Done()
}(metricName, workspace)
}
}
}
} else {
// sum all workspaces
for _, metricName := range WorkspaceMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleAllWorkspaceMetricRequestInfo(monitoringRequest, nil, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{MetricLevelWorkspace: "workspaces"})
wg.Done()
}(metricName)
} }
} }
} }
@@ -801,14 +705,270 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
} }
} }
// wrapper layer 2:
return &FormatedLevelMetric{ return &FormatedLevelMetric{
MetricsLevel: resourceType, MetricsLevel: MetricLevelWorkspace,
Results: metricsArray, Results: metricsArray,
} }
} }
func MonitorAllWorkspacesStatistics() *FormatedLevelMetric { func GetNamespaceLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric {
metricsFilter := monitoringRequest.MetricsFilter
if metricsFilter == "" {
metricsFilter = ".*"
}
var ch = make(chan *FormatedMetric, ChannelMaxCapacity)
var wg sync.WaitGroup
for _, metricName := range NamespaceMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleNamespaceMetricRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
rawResult := ReformatJson(metricsStr, metricName, map[string]string{MetricLevelNamespace: ""})
ch <- rawResult
wg.Done()
}(metricName)
}
}
wg.Wait()
close(ch)
var metricsArray []FormatedMetric
for oneMetric := range ch {
if oneMetric != nil {
metricsArray = append(metricsArray, *oneMetric)
}
}
return &FormatedLevelMetric{
MetricsLevel: MetricLevelNamespace,
Results: metricsArray,
}
}
func GetWorkloadLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric {
metricsFilter := monitoringRequest.MetricsFilter
if metricsFilter == "" {
metricsFilter = ".*"
}
var ch = make(chan *FormatedMetric, ChannelMaxCapacity)
var wg sync.WaitGroup
if monitoringRequest.WorkloadName == "" {
for _, metricName := range WorkloadMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleAllWorkloadMetricRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
reformattedResult := ReformatJson(metricsStr, metricName, map[string]string{MetricLevelWorkload: ""})
// no need to append a null result
ch <- reformattedResult
wg.Done()
}(metricName)
}
}
} else {
for _, metricName := range WorkloadMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
wg.Add(1)
go func(metricName string) {
metricName = strings.TrimLeft(metricName, "workload_")
queryType, params, nullRule := AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest, metricName)
if !nullRule {
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
fmtMetrics := ReformatJson(metricsStr, metricName, map[string]string{MetricLevelPodName: ""})
unifyMetricHistoryTimeRange(fmtMetrics)
ch <- fmtMetrics
}
wg.Done()
}(metricName)
}
}
}
wg.Wait()
close(ch)
var metricsArray []FormatedMetric
for oneMetric := range ch {
if oneMetric != nil {
metricsArray = append(metricsArray, *oneMetric)
}
}
return &FormatedLevelMetric{
MetricsLevel: MetricLevelWorkload,
Results: metricsArray,
}
}
func GetPodLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric {
metricsFilter := monitoringRequest.MetricsFilter
if metricsFilter == "" {
metricsFilter = ".*"
}
var ch = make(chan *FormatedMetric, ChannelMaxCapacity)
var wg sync.WaitGroup
for _, metricName := range PodMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params, nullRule := AssemblePodMetricRequestInfo(monitoringRequest, metricName)
if !nullRule {
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{MetricLevelPodName: ""})
} else {
ch <- nil
}
wg.Done()
}(metricName)
}
}
wg.Wait()
close(ch)
var metricsArray []FormatedMetric
for oneMetric := range ch {
if oneMetric != nil {
metricsArray = append(metricsArray, *oneMetric)
}
}
return &FormatedLevelMetric{
MetricsLevel: MetricLevelPod,
Results: metricsArray,
}
}
func GetContainerLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric {
metricsFilter := monitoringRequest.MetricsFilter
if metricsFilter == "" {
metricsFilter = ".*"
}
var ch = make(chan *FormatedMetric, ChannelMaxCapacity)
var wg sync.WaitGroup
for _, metricName := range ContainerMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleContainerMetricRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(client.PrometheusEndpoint, queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{MetricLevelContainerName: ""})
wg.Done()
}(metricName)
}
}
wg.Wait()
close(ch)
var metricsArray []FormatedMetric
for oneMetric := range ch {
if oneMetric != nil {
metricsArray = append(metricsArray, *oneMetric)
}
}
return &FormatedLevelMetric{
MetricsLevel: MetricLevelContainer,
Results: metricsArray,
}
}
func GetComponentLevelMetrics(monitoringRequest *client.MonitoringRequestParams) *FormatedLevelMetric {
metricsFilter := monitoringRequest.MetricsFilter
if metricsFilter == "" {
metricsFilter = ".*"
}
var ch = make(chan *FormatedMetric, ChannelMaxCapacity)
var wg sync.WaitGroup
for _, metricName := range ComponentMetricsNames {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleComponentRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(client.SecondaryPrometheusEndpoint, queryType, params)
formattedJson := ReformatJson(metricsStr, metricName, map[string]string{ResultItemMetricResourceName: monitoringRequest.ComponentName})
if metricName == EtcdServerList {
nodeMap := make(map[string]string, 0)
nodeAddress := GetNodeAddressInfo()
for nodeName, nodeInfo := range *nodeAddress {
var nodeIp string
for _, item := range nodeInfo {
if item.Type == v1.NodeInternalIP {
nodeIp = item.Address
break
}
}
nodeMap[nodeIp] = nodeName
}
// add node_name label to metrics
for i := 0; i < len(formattedJson.Data.Result); i++ {
metricDesc := formattedJson.Data.Result[i][ResultItemMetric]
metricDescMap, ensure := metricDesc.(map[string]interface{})
if ensure {
if nodeIp, exist := metricDescMap[ResultItemMetricNodeIp]; exist {
metricDescMap[ResultItemMetricNodeName] = nodeMap[nodeIp.(string)]
}
}
}
}
ch <- formattedJson
wg.Done()
}(metricName)
}
}
wg.Wait()
close(ch)
var metricsArray []FormatedMetric
for oneMetric := range ch {
if oneMetric != nil {
metricsArray = append(metricsArray, *oneMetric)
}
}
return &FormatedLevelMetric{
MetricsLevel: MetricLevelComponent,
Results: metricsArray,
}
}
func GetAllWorkspacesStatistics() *FormatedLevelMetric {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
var metricsArray []FormatedMetric var metricsArray []FormatedMetric

View File

@@ -14,19 +14,20 @@ limitations under the License.
package metrics package metrics
const ( const (
ResultTypeVector = "vector" ResultTypeVector = "vector"
ResultTypeMatrix = "matrix" ResultTypeMatrix = "matrix"
MetricStatus = "status" MetricStatus = "status"
MetricStatusError = "error" MetricStatusError = "error"
MetricStatusSuccess = "success" MetricStatusSuccess = "success"
ResultItemMetric = "metric" ResultItemMetric = "metric"
ResultItemMetricResource = "resource" ResultItemMetricResource = "resource"
ResultItemMetricNodeIp = "node_ip" ResultItemMetricResourceName = "resource_name"
ResultItemMetricNodeName = "node_name" ResultItemMetricNodeIp = "node_ip"
ResultItemValue = "value" ResultItemMetricNodeName = "node_name"
ResultItemValues = "values" ResultItemValue = "value"
ResultSortTypeDesc = "desc" ResultItemValues = "values"
ResultSortTypeAsc = "asc" ResultSortTypeDesc = "desc"
ResultSortTypeAsc = "asc"
) )
const ( const (
@@ -90,6 +91,12 @@ const (
WorkspaceJoinedKey = "label_kubesphere_io_workspace" WorkspaceJoinedKey = "label_kubesphere_io_workspace"
) )
// The metrics need to include extra info out of prometheus
// eg. add node name info to the etcd_server_list metric
const (
EtcdServerList = "etcd_server_list"
)
type MetricMap map[string]string type MetricMap map[string]string
var ClusterMetricsNames = []string{ var ClusterMetricsNames = []string{

View File

@@ -41,14 +41,14 @@ func GetNamespacesWithMetrics(namespaces []*v1.Namespace) []*v1.Namespace {
MetricsFilter: "namespace_cpu_usage|namespace_memory_usage_wo_cache|namespace_pod_count", MetricsFilter: "namespace_cpu_usage|namespace_memory_usage_wo_cache|namespace_pod_count",
} }
rawMetrics := MonitorAllMetrics(&params, MetricLevelNamespace) rawMetrics := GetNamespaceLevelMetrics(&params)
for _, result := range rawMetrics.Results { for _, result := range rawMetrics.Results {
for _, data := range result.Data.Result { for _, data := range result.Data.Result {
metricDescMap, ok := data["metric"].(map[string]interface{}) metricDescMap, ok := data[ResultItemMetric].(map[string]interface{})
if ok { if ok {
if ns, exist := metricDescMap["resource_name"]; exist { if ns, exist := metricDescMap[ResultItemMetricResourceName]; exist {
timeAndValue, ok := data["value"].([]interface{}) timeAndValue, ok := data[ResultItemValue].([]interface{})
if ok && len(timeAndValue) == 2 { if ok && len(timeAndValue) == 2 {
for i := 0; i < len(namespaces); i++ { for i := 0; i < len(namespaces); i++ {
if namespaces[i].Name == ns { if namespaces[i].Name == ns {

View File

@@ -89,8 +89,8 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri
v1, _ := strconv.ParseFloat(value1[len(value1)-1].(string), 64) v1, _ := strconv.ParseFloat(value1[len(value1)-1].(string), 64)
v2, _ := strconv.ParseFloat(value2[len(value2)-1].(string), 64) v2, _ := strconv.ParseFloat(value2[len(value2)-1].(string), 64)
if v1 == v2 { if v1 == v2 {
resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})["resource_name"] resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})["resource_name"] resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
return resourceName1.(string) < resourceName2.(string) return resourceName1.(string) < resourceName2.(string)
} }
@@ -105,8 +105,8 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri
v2, _ := strconv.ParseFloat(value2[len(value2)-1].(string), 64) v2, _ := strconv.ParseFloat(value2[len(value2)-1].(string), 64)
if v1 == v2 { if v1 == v2 {
resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})["resource_name"] resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})["resource_name"] resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
return resourceName1.(string) > resourceName2.(string) return resourceName1.(string) > resourceName2.(string)
} }
@@ -116,8 +116,8 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri
for _, r := range metricItem.Data.Result { for _, r := range metricItem.Data.Result {
// record the ordering of resource_name to indexMap // record the ordering of resource_name to indexMap
// example: {"metric":{"resource_name": "Deployment:xxx"},"value":[1541142931.731,"3"]} // example: {"metric":{ResultItemMetricResourceName: "Deployment:xxx"},"value":[1541142931.731,"3"]}
resourceName, exist := r[ResultItemMetric].(map[string]interface{})["resource_name"] resourceName, exist := r[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
if exist { if exist {
if _, exist := indexMap[resourceName.(string)]; !exist { if _, exist := indexMap[resourceName.(string)]; !exist {
indexMap[resourceName.(string)] = i indexMap[resourceName.(string)] = i
@@ -129,7 +129,7 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri
// iterator all metric to find max metricItems length // iterator all metric to find max metricItems length
for _, r := range metricItem.Data.Result { for _, r := range metricItem.Data.Result {
k, ok := r[ResultItemMetric].(map[string]interface{})["resource_name"] k, ok := r[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
if ok { if ok {
currentResourceMap[k.(string)] = 1 currentResourceMap[k.(string)] = 1
} }
@@ -158,7 +158,7 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri
sortedMetric := make([]map[string]interface{}, len(indexMap)) sortedMetric := make([]map[string]interface{}, len(indexMap))
for j := 0; j < len(re.Data.Result); j++ { for j := 0; j < len(re.Data.Result); j++ {
r := re.Data.Result[j] r := re.Data.Result[j]
k, exist := r[ResultItemMetric].(map[string]interface{})["resource_name"] k, exist := r[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
if exist { if exist {
index, exist := indexMap[k.(string)] index, exist := indexMap[k.(string)]
if exist { if exist {
@@ -290,9 +290,9 @@ func ReformatJson(metric string, metricsName string, needAddParams map[string]st
for n := range needAddParams { for n := range needAddParams {
if v, ok := metricMap[n]; ok { if v, ok := metricMap[n]; ok {
delete(metricMap, n) delete(metricMap, n)
metricMap["resource_name"] = v metricMap[ResultItemMetricResourceName] = v
} else { } else {
metricMap["resource_name"] = needAddParams[n] metricMap[ResultItemMetricResourceName] = needAddParams[n]
} }
} }
} }