monitoring model bug fix

Signed-off-by: huanggze <loganhuang@yunify.com>
This commit is contained in:
huanggze
2019-04-05 20:26:44 +08:00
committed by zryfish
parent 5d8a907cae
commit 9a9664f06c
7 changed files with 334 additions and 192 deletions

View File

@@ -19,17 +19,16 @@ limitations under the License.
package metrics
import (
"github.com/golang/glog"
"kubesphere.io/kubesphere/pkg/informers"
"net/url"
"regexp"
"runtime/debug"
"sort"
"strings"
"sync"
"time"
"github.com/golang/glog"
"runtime/debug"
"sort"
"github.com/json-iterator/go"
"k8s.io/api/core/v1"
@@ -119,6 +118,7 @@ func getAllWorkspaceNames(formatedMetric *FormatedMetric) map[string]int {
var wsMap = make(map[string]int)
for i := 0; i < len(formatedMetric.Data.Result); i++ {
// metricDesc needs clear naming
metricDesc := formatedMetric.Data.Result[i][ResultItemMetric]
metricDescMap, ensure := metricDesc.(map[string]interface{})
if ensure {
@@ -256,7 +256,8 @@ func AssembleAllWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringRe
paramValues := monitoringRequest.Params
rule := MakeWorkloadPromQL(metricName, monitoringRequest.NsName, monitoringRequest.ResourcesFilter)
rule := MakeWorkloadPromQL(metricName, monitoringRequest.NsName, monitoringRequest.ResourcesFilter, monitoringRequest.WorkloadKind)
params := makeRequestParamString(rule, paramValues)
return queryType, params
}
@@ -293,7 +294,7 @@ func AddNodeAddressMetric(nodeMetric *FormatedMetric, nodeAddress *map[string][]
metricDesc := nodeMetric.Data.Result[i][ResultItemMetric]
metricDescMap, ensure := metricDesc.(map[string]interface{})
if ensure {
if nodeId, exist := metricDescMap["node"]; exist {
if nodeId, exist := metricDescMap["resource_name"]; exist {
addr, exist := (*nodeAddress)[nodeId.(string)]
if exist {
metricDescMap["address"] = addr
@@ -330,6 +331,17 @@ func AssembleNamespaceMetricRequestInfo(monitoringRequest *client.MonitoringRequ
return queryType, params
}
func AssembleNamespaceMetricRequestInfoByNamesapce(monitoringRequest *client.MonitoringRequestParams, namespace string, metricName string) (string, string) {
queryType := monitoringRequest.QueryType
paramValues := monitoringRequest.Params
rule := MakeNamespacePromQL(namespace, monitoringRequest.ResourcesFilter, metricName)
params := makeRequestParamString(rule, paramValues)
return queryType, params
}
func AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, namespaceList []string, metricName string) (string, string) {
nsFilter := "^(" + strings.Join(namespaceList, "|") + ")$"
@@ -414,6 +426,7 @@ func MonitorAllWorkspaces(monitoringRequest *client.MonitoringRequestParams) *Fo
wsMap := getAllWorkspaces()
for ws := range wsMap {
// Only execute Prometheus queries for specific metrics on specific workspaces
bol, err := regexp.MatchString(monitoringRequest.ResourcesFilter, ws)
if err == nil && bol {
// a workspace
@@ -461,8 +474,9 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w
var ch = make(chan *FormatedMetric, ChannelMaxCapacity)
namespaceArray, err := workspaces.WorkspaceNamespaces(ws)
if err != nil {
glog.Errorln(err.Error())
glog.Errorln(err)
}
// add by namespace
for _, metricName := range filterMetricsName {
wg.Add(1)
@@ -470,7 +484,7 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w
queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
metricsStr := client.SendMonitoringRequest(queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"namespace": ""})
ch <- ReformatJson(metricsStr, metricName, map[string]string{"resource_name": ws}) // It's adding "resource_name" field
wg.Done()
}(metricName)
@@ -482,7 +496,7 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w
var metricsArray []FormatedMetric
for oneMetric := range ch {
if oneMetric != nil {
// add "workspace" filed to oneMetric `metric` field
// add "workspace" to oneMetric "metric" field
for i := 0; i < len(oneMetric.Data.Result); i++ {
tmap, sure := oneMetric.Data.Result[i][ResultItemMetric].(map[string]interface{})
if sure {
@@ -510,8 +524,8 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
case MetricLevelCluster:
{
for _, metricName := range ClusterMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleClusterMetricRequestInfo(monitoringRequest, metricName)
@@ -525,8 +539,8 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
case MetricLevelNode:
{
for _, metricName := range NodeMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleNodeMetricRequestInfo(monitoringRequest, metricName)
@@ -553,36 +567,70 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
continue
}
bol, err := regexp.MatchString(metricsFilter, metricName)
ns := "^(" + strings.Join(namespaceArray, "|") + ")$"
monitoringRequest.ResourcesFilter = ns
if err == nil && bol {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleNamespaceMetricRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"workspace": "workspace"})
wg.Done()
}(metricName)
matched, err := regexp.MatchString(metricsFilter, metricName)
if err != nil || !matched {
continue
}
wg.Add(1)
go func(metricName string) {
var chForOneMetric = make(chan *FormatedMetric, ChannelMaxCapacity)
var wgForOneMetric sync.WaitGroup
for _, ns := range namespaceArray {
wgForOneMetric.Add(1)
go func(metricName string, namespace string) {
queryType, params := AssembleNamespaceMetricRequestInfoByNamesapce(monitoringRequest, namespace, metricName)
metricsStr := client.SendMonitoringRequest(queryType, params)
chForOneMetric <- ReformatJson(metricsStr, metricName, map[string]string{"resource_name": namespace})
wgForOneMetric.Done()
}(metricName, ns)
}
wgForOneMetric.Wait()
close(chForOneMetric)
// ranking is for vector type result only
aggregatedResult := FormatedMetric{MetricName: metricName, Status: "success", Data: FormatedMetricData{Result: []map[string]interface{}{}, ResultType: ResultTypeVector}}
for oneMetric := range chForOneMetric {
if oneMetric != nil {
// wrapper layer 1: append .data.result[0]
if len(oneMetric.Data.Result) > 0 {
aggregatedResult.Data.Result = append(aggregatedResult.Data.Result, oneMetric.Data.Result[0])
}
}
}
ch <- &aggregatedResult
wg.Done()
}(metricName)
}
} else {
workspace := monitoringRequest.WsName
for _, metricName := range WorkspaceMetricsNames {
if metricName == MetricNameWorkspaceAllProjectCount {
continue
}
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
go func(metricName string, workspace string) {
queryType, params := AssembleSpecificWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
metricsStr := client.SendMonitoringRequest(queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"workspace": "workspace"})
ch <- ReformatJson(metricsStr, metricName, map[string]string{"resource_name": workspace})
wg.Done()
}(metricName)
}(metricName, workspace)
}
}
}
@@ -590,8 +638,8 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
// sum all workspaces
for _, metricName := range WorkspaceMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
@@ -599,6 +647,7 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
queryType, params := AssembleAllWorkspaceMetricRequestInfo(monitoringRequest, nil, metricName)
metricsStr := client.SendMonitoringRequest(queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"workspace": "workspaces"})
wg.Done()
}(metricName)
}
@@ -608,13 +657,17 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
case MetricLevelNamespace:
{
for _, metricName := range NamespaceMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleNamespaceMetricRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"namespace": ""})
rawResult := ReformatJson(metricsStr, metricName, map[string]string{"namespace": ""})
ch <- rawResult
wg.Done()
}(metricName)
}
@@ -624,13 +677,15 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
{
if monitoringRequest.WorkloadName == "" {
for _, metricName := range WorkloadMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleAllWorkloadMetricRequestInfo(monitoringRequest, metricName)
metricsStr := client.SendMonitoringRequest(queryType, params)
ch <- ReformatJson(metricsStr, metricName, map[string]string{"workload": ""})
reformattedResult := ReformatJson(metricsStr, metricName, map[string]string{"workload": ""})
// no need to append a null result
ch <- reformattedResult
wg.Done()
}(metricName)
}
@@ -658,8 +713,8 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
case MetricLevelPod:
{
for _, metricName := range PodMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params, nullRule := AssemblePodMetricRequestInfo(monitoringRequest, metricName)
@@ -677,8 +732,8 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
case MetricLevelContainer:
{
for _, metricName := range ContainerMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
matched, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && matched {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleContainerMetricRequestInfo(monitoringRequest, metricName)
@@ -702,6 +757,7 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
}
}
// wrapper layer 2:
return &FormatedLevelMetric{
MetricsLevel: resourceType,
Results: metricsArray,