Merge pull request #204 from carmanzhang/monitor

fixed deployment-pods metrics
This commit is contained in:
zryfish
2018-11-15 11:24:56 +08:00
committed by GitHub
4 changed files with 155 additions and 86 deletions

View File

@@ -1,9 +1,12 @@
/* /*
Copyright 2018 The KubeSphere Authors. Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -66,24 +69,16 @@ func (u Monitor) monitorWorkload(request *restful.Request, response *restful.Res
} }
} }
// merge multiple metric: all-devops, all-roles, all-projects...this api is designed for admin
func (u Monitor) monitorAllWorkspacesStatistics(request *restful.Request, response *restful.Response) {
res := metrics.MonitorAllWorkspacesStatistics()
response.WriteAsJson(res)
}
// merge multiple metric: devops, roles, projects...
func (u Monitor) monitorOneWorkspaceStatistics(request *restful.Request, response *restful.Response) {
requestParams := client.ParseMonitoringRequestParams(request)
wsName := requestParams.WsName
res := metrics.MonitorOneWorkspaceStatistics(wsName)
response.WriteAsJson(res)
}
func (u Monitor) monitorAllWorkspaces(request *restful.Request, response *restful.Response) { func (u Monitor) monitorAllWorkspaces(request *restful.Request, response *restful.Response) {
requestParams := client.ParseMonitoringRequestParams(request) requestParams := client.ParseMonitoringRequestParams(request)
if requestParams.Tp == "_statistics" {
// merge multiple metric: all-devops, all-roles, all-projects...this api is designed for admin
res := metrics.MonitorAllWorkspacesStatistics()
response.WriteAsJson(res)
} else {
rawMetrics := metrics.MonitorAllWorkspaces(requestParams) rawMetrics := metrics.MonitorAllWorkspaces(requestParams)
// sorting // sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelWorkspace) sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelWorkspace)
@@ -92,12 +87,13 @@ func (u Monitor) monitorAllWorkspaces(request *restful.Request, response *restfu
response.WriteAsJson(pagedMetrics) response.WriteAsJson(pagedMetrics)
} }
}
func (u Monitor) monitorOneWorkspace(request *restful.Request, response *restful.Response) { func (u Monitor) monitorOneWorkspace(request *restful.Request, response *restful.Response) {
requestParams := client.ParseMonitoringRequestParams(request) requestParams := client.ParseMonitoringRequestParams(request)
tp := requestParams.Tp tp := requestParams.Tp
if tp != "" { if tp == "rank" {
// multiple // multiple
rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace) rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace)
// sorting // sorting
@@ -107,6 +103,12 @@ func (u Monitor) monitorOneWorkspace(request *restful.Request, response *restful
response.WriteAsJson(pagedMetrics) response.WriteAsJson(pagedMetrics)
} else if tp == "_statistics" {
wsName := requestParams.WsName
// merge multiple metric: devops, roles, projects...
res := metrics.MonitorOneWorkspaceStatistics(wsName)
response.WriteAsJson(res)
} else { } else {
res := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace) res := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkspace)
response.WriteAsJson(res) response.WriteAsJson(res)
@@ -350,17 +352,8 @@ func Register(ws *restful.WebService, subPath string) {
Consumes(restful.MIME_JSON, restful.MIME_XML). Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON) Produces(restful.MIME_JSON)
// merge multiple metric: devops, roles, projects...
ws.Route(ws.GET(subPath+"/cluster_workspaces/{workspace_name}/_statistics").To(u.monitorOneWorkspaceStatistics).
Filter(route.RouteLogging).
Doc("monitor specific workspace level metrics").
Param(ws.PathParameter("workspace_name", "workspace name").DataType("string").Required(true)).
Metadata(restfulspec.KeyOpenAPITags, tags)).
Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON)
// list all namespace in this workspace by selected metrics // list all namespace in this workspace by selected metrics
ws.Route(ws.GET(subPath+"/cluster_workspaces/{workspace_name}").To(u.monitorOneWorkspace). ws.Route(ws.GET(subPath+"/workspaces/{workspace_name}").To(u.monitorOneWorkspace).
Filter(route.RouteLogging). Filter(route.RouteLogging).
Doc("monitor workspaces level metrics"). Doc("monitor workspaces level metrics").
Param(ws.PathParameter("workspace_name", "workspace name").DataType("string").Required(true)). Param(ws.PathParameter("workspace_name", "workspace name").DataType("string").Required(true)).
@@ -370,21 +363,12 @@ func Register(ws *restful.WebService, subPath string) {
Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)).
Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")).
Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")).
Param(ws.QueryParameter("type", "page number").DataType("string").Required(false).DefaultValue("1")). Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")).
Metadata(restfulspec.KeyOpenAPITags, tags)). Metadata(restfulspec.KeyOpenAPITags, tags)).
Consumes(restful.MIME_JSON, restful.MIME_XML). Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON) Produces(restful.MIME_JSON)
// metrics from system users or projects, merge multiple metric: all-devops, all-roles, all-projects... ws.Route(ws.GET(subPath+"/workspaces").To(u.monitorAllWorkspaces).
ws.Route(ws.GET(subPath+"/cluster_workspaces/_statistics").To(u.monitorAllWorkspacesStatistics).
Filter(route.RouteLogging).
Doc("monitor specific workspace level metrics").
Metadata(restfulspec.KeyOpenAPITags, tags)).
Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON)
// metrics from prometheus
ws.Route(ws.GET(subPath+"/cluster_workspaces").To(u.monitorAllWorkspaces).
Filter(route.RouteLogging). Filter(route.RouteLogging).
Doc("monitor workspaces level metrics"). Doc("monitor workspaces level metrics").
Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("workspace_memory_utilisation")). Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("workspace_memory_utilisation")).
@@ -393,6 +377,7 @@ func Register(ws *restful.WebService, subPath string) {
Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)).
Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")).
Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")).
Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")).
Metadata(restfulspec.KeyOpenAPITags, tags)). Metadata(restfulspec.KeyOpenAPITags, tags)).
Consumes(restful.MIME_JSON, restful.MIME_XML). Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON) Produces(restful.MIME_JSON)

View File

@@ -30,6 +30,9 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"runtime/debug"
"sort"
"kubesphere.io/kubesphere/pkg/client" "kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/models" "kubesphere.io/kubesphere/pkg/models"
"kubesphere.io/kubesphere/pkg/models/workspaces" "kubesphere.io/kubesphere/pkg/models/workspaces"
@@ -38,7 +41,7 @@ import (
var nodeStatusDelLables = []string{"endpoint", "instance", "job", "namespace", "pod", "service"} var nodeStatusDelLables = []string{"endpoint", "instance", "job", "namespace", "pod", "service"}
const ( const (
ChannelMaxCapacityWorkspaceMetric = 400 ChannelMaxCapacityWorkspaceMetric = 800
ChannelMaxCapacity = 100 ChannelMaxCapacity = 100
) )
@@ -126,6 +129,7 @@ func getPodNameRegexInWorkload(res string) string {
glog.Errorln("json parse failed", jsonErr) glog.Errorln("json parse failed", jsonErr)
} }
var podNames []string var podNames []string
for _, item := range dat.Data.Result { for _, item := range dat.Data.Result {
podName := item.KubePodMetric.Pod podName := item.KubePodMetric.Pod
podNames = append(podNames, podName) podNames = append(podNames, podName)
@@ -134,8 +138,66 @@ func getPodNameRegexInWorkload(res string) string {
return podNamesFilter return podNamesFilter
} }
func unifyMetricHistoryTimeRange(fmtMetrics *FormatedMetric) {
var timestampMap = make(map[float64]bool)
if fmtMetrics.Data.ResultType == ResultTypeMatrix {
for i, _ := range fmtMetrics.Data.Result {
values, exist := fmtMetrics.Data.Result[i][ResultItemValues]
if exist {
valueArray, sure := values.([]interface{})
if sure {
for j, _ := range valueArray {
timeAndValue := valueArray[j].([]interface{})
timestampMap[timeAndValue[0].(float64)] = true
}
}
}
}
}
timestampArray := make([]float64, len(timestampMap))
i := 0
for timestamp, _ := range timestampMap {
timestampArray[i] = timestamp
i++
}
sort.Float64s(timestampArray)
if fmtMetrics.Data.ResultType == ResultTypeMatrix {
for i := 0; i < len(fmtMetrics.Data.Result); i++ {
values, exist := fmtMetrics.Data.Result[i][ResultItemValues]
if exist {
valueArray, sure := values.([]interface{})
if sure {
formatValueArray := make([][]interface{}, len(timestampArray))
j := 0
for k, _ := range timestampArray {
valueItem, sure := valueArray[j].([]interface{})
if sure && valueItem[0].(float64) == timestampArray[k] {
formatValueArray[k] = []interface{}{int64(timestampArray[k]), valueItem[1]}
j++
} else {
formatValueArray[k] = []interface{}{int64(timestampArray[k]), "-1"}
}
}
fmtMetrics.Data.Result[i][ResultItemValues] = formatValueArray
}
}
}
}
}
func AssembleWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) { func AssembleWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) {
rule := MakeWorkloadRule(monitoringRequest.WorkloadKind, monitoringRequest.WorkloadName, monitoringRequest.NsName)
nsName := monitoringRequest.NsName
wkName := monitoringRequest.WorkloadName
rule := MakeWorkloadRule(monitoringRequest.WorkloadKind, wkName, nsName)
paramValues := monitoringRequest.Params paramValues := monitoringRequest.Params
params := makeRequestParamString(rule, paramValues) params := makeRequestParamString(rule, paramValues)
@@ -144,7 +206,7 @@ func AssembleWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringReque
podNamesFilter := getPodNameRegexInWorkload(res) podNamesFilter := getPodNameRegexInWorkload(res)
queryType := monitoringRequest.QueryType queryType := monitoringRequest.QueryType
rule = MakePodPromQL(metricName, monitoringRequest.NsName, "", "", podNamesFilter) rule = MakePodPromQL(metricName, nsName, "", "", podNamesFilter)
params = makeRequestParamString(rule, paramValues) params = makeRequestParamString(rule, paramValues)
return queryType, params return queryType, params
@@ -167,7 +229,10 @@ func GetMetric(queryType, params, metricName string) *FormatedMetric {
} }
func GetNodeAddressInfo() *map[string][]v1.NodeAddress { func GetNodeAddressInfo() *map[string][]v1.NodeAddress {
nodeList, _ := client.NewK8sClient().CoreV1().Nodes().List(metaV1.ListOptions{}) nodeList, err := client.NewK8sClient().CoreV1().Nodes().List(metaV1.ListOptions{})
if err != nil {
glog.Errorln(err.Error())
}
var nodeAddress = make(map[string][]v1.NodeAddress) var nodeAddress = make(map[string][]v1.NodeAddress)
for _, node := range nodeList.Items { for _, node := range nodeList.Items {
nodeAddress[node.Name] = node.Status.Addresses nodeAddress[node.Name] = node.Status.Addresses
@@ -179,7 +244,8 @@ func AddNodeAddressMetric(nodeMetric *FormatedMetric, nodeAddress *map[string][]
for i := 0; i < len(nodeMetric.Data.Result); i++ { for i := 0; i < len(nodeMetric.Data.Result); i++ {
metricDesc := nodeMetric.Data.Result[i][ResultItemMetric] metricDesc := nodeMetric.Data.Result[i][ResultItemMetric]
metricDescMap := metricDesc.(map[string]interface{}) metricDescMap, ensure := metricDesc.(map[string]interface{})
if ensure {
if nodeId, exist := metricDescMap["node"]; exist { if nodeId, exist := metricDescMap["node"]; exist {
addr, exist := (*nodeAddress)[nodeId.(string)] addr, exist := (*nodeAddress)[nodeId.(string)]
if exist { if exist {
@@ -188,6 +254,7 @@ func AddNodeAddressMetric(nodeMetric *FormatedMetric, nodeAddress *map[string][]
} }
} }
} }
}
func MonitorContainer(monitoringRequest *client.MonitoringRequestParams) *FormatedMetric { func MonitorContainer(monitoringRequest *client.MonitoringRequestParams) *FormatedMetric {
queryType := monitoringRequest.QueryType queryType := monitoringRequest.QueryType
@@ -223,6 +290,13 @@ func AssembleWorkspaceMetricRequestInfo(monitoringRequest *client.MonitoringRequ
func makeRequestParamString(rule string, paramValues url.Values) string { func makeRequestParamString(rule string, paramValues url.Values) string {
defer func() {
if err := recover(); err != nil {
glog.Errorln(err)
debug.PrintStack()
}
}()
var values = make(url.Values) var values = make(url.Values)
for key, v := range paramValues { for key, v := range paramValues {
values.Set(key, v[0]) values.Set(key, v[0])
@@ -265,7 +339,12 @@ func MonitorAllWorkspaces(monitoringRequest *client.MonitoringRequestParams) *Fo
var wgAll sync.WaitGroup var wgAll sync.WaitGroup
var wsAllch = make(chan *[]FormatedMetric, ChannelMaxCapacityWorkspaceMetric) var wsAllch = make(chan *[]FormatedMetric, ChannelMaxCapacityWorkspaceMetric)
workspaceNamespaceMap, _, _ := workspaces.GetAllOrgAndProjList() workspaceNamespaceMap, _, err := workspaces.GetAllOrgAndProjList()
if err != nil {
glog.Errorln(err.Error())
}
for ws, _ := range workspaceNamespaceMap { for ws, _ := range workspaceNamespaceMap {
bol, err := regexp.MatchString(monitoringRequest.WsFilter, ws) bol, err := regexp.MatchString(monitoringRequest.WsFilter, ws)
if err == nil && bol { if err == nil && bol {
@@ -320,11 +399,6 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w
for _, metricName := range filterMetricsName { for _, metricName := range filterMetricsName {
wg.Add(1) wg.Add(1)
go func(metricName string) { go func(metricName string) {
defer func() {
if err := recover(); err != nil {
glog.Errorln(err)
}
}()
queryType, params := AssembleWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName) queryType, params := AssembleWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
ch <- GetMetric(queryType, params, metricName) ch <- GetMetric(queryType, params, metricName)
@@ -341,9 +415,11 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w
if oneMetric != nil { if oneMetric != nil {
// add "workspace" filed to oneMetric `metric` field // add "workspace" filed to oneMetric `metric` field
for i := 0; i < len(oneMetric.Data.Result); i++ { for i := 0; i < len(oneMetric.Data.Result); i++ {
tmap := oneMetric.Data.Result[i]["metric"].(map[string]interface{}) tmap, sure := oneMetric.Data.Result[i][ResultItemMetric].(map[string]interface{})
if sure {
tmap[MetricLevelWorkspace] = ws tmap[MetricLevelWorkspace] = ws
oneMetric.Data.Result[i]["metric"] = tmap oneMetric.Data.Result[i][ResultItemMetric] = tmap
}
} }
metricsArray = append(metricsArray, *oneMetric) metricsArray = append(metricsArray, *oneMetric)
} }
@@ -406,19 +482,7 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
} }
namespaceArray = filterNamespace(monitoringRequest.NsFilter, namespaceArray) namespaceArray = filterNamespace(monitoringRequest.NsFilter, namespaceArray)
if monitoringRequest.Tp == "" { if monitoringRequest.Tp == "rank" {
for _, metricName := range WorkspaceMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
ch <- GetMetric(queryType, params, metricName)
wg.Done()
}(metricName)
}
}
} else {
for _, metricName := range NamespaceMetricsNames { for _, metricName := range NamespaceMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName) bol, err := regexp.MatchString(metricsFilter, metricName)
ns := "^(" + strings.Join(namespaceArray, "|") + ")$" ns := "^(" + strings.Join(namespaceArray, "|") + ")$"
@@ -432,6 +496,19 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
}(metricName) }(metricName)
} }
} }
} else {
for _, metricName := range WorkspaceMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
ch <- GetMetric(queryType, params, metricName)
wg.Done()
}(metricName)
}
}
} }
} }
case MetricLevelNamespace: case MetricLevelNamespace:
@@ -457,7 +534,9 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
go func(metricName string) { go func(metricName string) {
metricName = strings.TrimLeft(metricName, "workload_") metricName = strings.TrimLeft(metricName, "workload_")
queryType, params := AssembleWorkloadMetricRequestInfo(monitoringRequest, metricName) queryType, params := AssembleWorkloadMetricRequestInfo(monitoringRequest, metricName)
ch <- GetMetric(queryType, params, metricName) fmtMetrics := GetMetric(queryType, params, metricName)
unifyMetricHistoryTimeRange(fmtMetrics)
ch <- fmtMetrics
wg.Done() wg.Done()
}(metricName) }(metricName)
} }
@@ -563,13 +642,15 @@ func calcWorkspaceNamespace(metric *FormatedMetric) int {
var workspaceNamespaceCount = 0 var workspaceNamespaceCount = 0
for _, result := range metric.Data.Result { for _, result := range metric.Data.Result {
tmpMap := result[ResultItemMetric].(map[string]interface{}) tmpMap, sure := result[ResultItemMetric].(map[string]interface{})
if sure {
wsName, exist := tmpMap[WorkspaceJoinedKey] wsName, exist := tmpMap[WorkspaceJoinedKey]
if exist && wsName != "" { if exist && wsName != "" {
workspaceNamespaceCount += 1 workspaceNamespaceCount += 1
} }
} }
}
return workspaceNamespaceCount return workspaceNamespaceCount
} }
@@ -788,13 +869,16 @@ func MonitorComponentStatus(monitoringRequest *client.MonitoringRequestParams) *
var normalNodes []string var normalNodes []string
var abnormalNodes []string var abnormalNodes []string
for _, result := range nodeStatusMetric.Data.Result { for _, result := range nodeStatusMetric.Data.Result {
tmap := result[ResultItemMetric].(map[string]interface{}) tmap, sure := result[ResultItemMetric].(map[string]interface{})
if sure {
if tmap[MetricStatus].(string) == "false" { if tmap[MetricStatus].(string) == "false" {
abnormalNodes = append(abnormalNodes, tmap[MetricLevelNode].(string)) abnormalNodes = append(abnormalNodes, tmap[MetricLevelNode].(string))
} else { } else {
normalNodes = append(normalNodes, tmap[MetricLevelNode].(string)) normalNodes = append(normalNodes, tmap[MetricLevelNode].(string))
} }
} }
}
Components, err := models.GetAllComponentsStatus() Components, err := models.GetAllComponentsStatus()

View File

@@ -29,7 +29,7 @@ func MakeWorkloadRule(wkKind, wkName, namespace string) string {
case "deployment": case "deployment":
wkKind = ReplicaSet wkKind = ReplicaSet
if wkName != "" { if wkName != "" {
wkName = "~\"" + wkName + ".*\"" wkName = "~\"^" + wkName + `-(\\w)+$"`
} else { } else {
wkName = "~\".*\"" wkName = "~\".*\""
} }

View File

@@ -230,8 +230,8 @@ func ReformatJson(metric string, metricsName string, needDelParams ...string) *F
result := formatMetric.Data.Result result := formatMetric.Data.Result
for _, res := range result { for _, res := range result {
metric, exist := res[ResultItemMetric] metric, exist := res[ResultItemMetric]
metricMap := metric.(map[string]interface{}) metricMap, sure := metric.(map[string]interface{})
if exist { if exist && sure {
delete(metricMap, "__name__") delete(metricMap, "__name__")
} }
if len(needDelParams) > 0 { if len(needDelParams) > 0 {