fixed deployment-pods metrics

This commit is contained in:
Carman Zhang
2018-11-14 16:04:24 +08:00
parent fbf053306b
commit beb7efdac0
4 changed files with 155 additions and 86 deletions

View File

@@ -30,6 +30,9 @@ import (
"k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"runtime/debug"
"sort"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/models"
"kubesphere.io/kubesphere/pkg/models/workspaces"
@@ -38,7 +41,7 @@ import (
var nodeStatusDelLables = []string{"endpoint", "instance", "job", "namespace", "pod", "service"}
const (
ChannelMaxCapacityWorkspaceMetric = 400
ChannelMaxCapacityWorkspaceMetric = 800
ChannelMaxCapacity = 100
)
@@ -126,6 +129,7 @@ func getPodNameRegexInWorkload(res string) string {
glog.Errorln("json parse failed", jsonErr)
}
var podNames []string
for _, item := range dat.Data.Result {
podName := item.KubePodMetric.Pod
podNames = append(podNames, podName)
@@ -134,8 +138,66 @@ func getPodNameRegexInWorkload(res string) string {
return podNamesFilter
}
func unifyMetricHistoryTimeRange(fmtMetrics *FormatedMetric) {
var timestampMap = make(map[float64]bool)
if fmtMetrics.Data.ResultType == ResultTypeMatrix {
for i, _ := range fmtMetrics.Data.Result {
values, exist := fmtMetrics.Data.Result[i][ResultItemValues]
if exist {
valueArray, sure := values.([]interface{})
if sure {
for j, _ := range valueArray {
timeAndValue := valueArray[j].([]interface{})
timestampMap[timeAndValue[0].(float64)] = true
}
}
}
}
}
timestampArray := make([]float64, len(timestampMap))
i := 0
for timestamp, _ := range timestampMap {
timestampArray[i] = timestamp
i++
}
sort.Float64s(timestampArray)
if fmtMetrics.Data.ResultType == ResultTypeMatrix {
for i := 0; i < len(fmtMetrics.Data.Result); i++ {
values, exist := fmtMetrics.Data.Result[i][ResultItemValues]
if exist {
valueArray, sure := values.([]interface{})
if sure {
formatValueArray := make([][]interface{}, len(timestampArray))
j := 0
for k, _ := range timestampArray {
valueItem, sure := valueArray[j].([]interface{})
if sure && valueItem[0].(float64) == timestampArray[k] {
formatValueArray[k] = []interface{}{int64(timestampArray[k]), valueItem[1]}
j++
} else {
formatValueArray[k] = []interface{}{int64(timestampArray[k]), "-1"}
}
}
fmtMetrics.Data.Result[i][ResultItemValues] = formatValueArray
}
}
}
}
}
func AssembleWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) {
rule := MakeWorkloadRule(monitoringRequest.WorkloadKind, monitoringRequest.WorkloadName, monitoringRequest.NsName)
nsName := monitoringRequest.NsName
wkName := monitoringRequest.WorkloadName
rule := MakeWorkloadRule(monitoringRequest.WorkloadKind, wkName, nsName)
paramValues := monitoringRequest.Params
params := makeRequestParamString(rule, paramValues)
@@ -144,7 +206,7 @@ func AssembleWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringReque
podNamesFilter := getPodNameRegexInWorkload(res)
queryType := monitoringRequest.QueryType
rule = MakePodPromQL(metricName, monitoringRequest.NsName, "", "", podNamesFilter)
rule = MakePodPromQL(metricName, nsName, "", "", podNamesFilter)
params = makeRequestParamString(rule, paramValues)
return queryType, params
@@ -167,7 +229,10 @@ func GetMetric(queryType, params, metricName string) *FormatedMetric {
}
func GetNodeAddressInfo() *map[string][]v1.NodeAddress {
nodeList, _ := client.NewK8sClient().CoreV1().Nodes().List(metaV1.ListOptions{})
nodeList, err := client.NewK8sClient().CoreV1().Nodes().List(metaV1.ListOptions{})
if err != nil {
glog.Errorln(err.Error())
}
var nodeAddress = make(map[string][]v1.NodeAddress)
for _, node := range nodeList.Items {
nodeAddress[node.Name] = node.Status.Addresses
@@ -179,11 +244,13 @@ func AddNodeAddressMetric(nodeMetric *FormatedMetric, nodeAddress *map[string][]
for i := 0; i < len(nodeMetric.Data.Result); i++ {
metricDesc := nodeMetric.Data.Result[i][ResultItemMetric]
metricDescMap := metricDesc.(map[string]interface{})
if nodeId, exist := metricDescMap["node"]; exist {
addr, exist := (*nodeAddress)[nodeId.(string)]
if exist {
metricDescMap["address"] = addr
metricDescMap, ensure := metricDesc.(map[string]interface{})
if ensure {
if nodeId, exist := metricDescMap["node"]; exist {
addr, exist := (*nodeAddress)[nodeId.(string)]
if exist {
metricDescMap["address"] = addr
}
}
}
}
@@ -223,6 +290,13 @@ func AssembleWorkspaceMetricRequestInfo(monitoringRequest *client.MonitoringRequ
func makeRequestParamString(rule string, paramValues url.Values) string {
defer func() {
if err := recover(); err != nil {
glog.Errorln(err)
debug.PrintStack()
}
}()
var values = make(url.Values)
for key, v := range paramValues {
values.Set(key, v[0])
@@ -265,7 +339,12 @@ func MonitorAllWorkspaces(monitoringRequest *client.MonitoringRequestParams) *Fo
var wgAll sync.WaitGroup
var wsAllch = make(chan *[]FormatedMetric, ChannelMaxCapacityWorkspaceMetric)
workspaceNamespaceMap, _, _ := workspaces.GetAllOrgAndProjList()
workspaceNamespaceMap, _, err := workspaces.GetAllOrgAndProjList()
if err != nil {
glog.Errorln(err.Error())
}
for ws, _ := range workspaceNamespaceMap {
bol, err := regexp.MatchString(monitoringRequest.WsFilter, ws)
if err == nil && bol {
@@ -320,11 +399,6 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w
for _, metricName := range filterMetricsName {
wg.Add(1)
go func(metricName string) {
defer func() {
if err := recover(); err != nil {
glog.Errorln(err)
}
}()
queryType, params := AssembleWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
ch <- GetMetric(queryType, params, metricName)
@@ -341,9 +415,11 @@ func collectWorkspaceMetric(monitoringRequest *client.MonitoringRequestParams, w
if oneMetric != nil {
// add "workspace" filed to oneMetric `metric` field
for i := 0; i < len(oneMetric.Data.Result); i++ {
tmap := oneMetric.Data.Result[i]["metric"].(map[string]interface{})
tmap[MetricLevelWorkspace] = ws
oneMetric.Data.Result[i]["metric"] = tmap
tmap, sure := oneMetric.Data.Result[i][ResultItemMetric].(map[string]interface{})
if sure {
tmap[MetricLevelWorkspace] = ws
oneMetric.Data.Result[i][ResultItemMetric] = tmap
}
}
metricsArray = append(metricsArray, *oneMetric)
}
@@ -406,19 +482,7 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
}
namespaceArray = filterNamespace(monitoringRequest.NsFilter, namespaceArray)
if monitoringRequest.Tp == "" {
for _, metricName := range WorkspaceMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
ch <- GetMetric(queryType, params, metricName)
wg.Done()
}(metricName)
}
}
} else {
if monitoringRequest.Tp == "rank" {
for _, metricName := range NamespaceMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
ns := "^(" + strings.Join(namespaceArray, "|") + ")$"
@@ -432,6 +496,19 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
}(metricName)
}
}
} else {
for _, metricName := range WorkspaceMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleWorkspaceMetricRequestInfo(monitoringRequest, namespaceArray, metricName)
ch <- GetMetric(queryType, params, metricName)
wg.Done()
}(metricName)
}
}
}
}
case MetricLevelNamespace:
@@ -457,7 +534,9 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
go func(metricName string) {
metricName = strings.TrimLeft(metricName, "workload_")
queryType, params := AssembleWorkloadMetricRequestInfo(monitoringRequest, metricName)
ch <- GetMetric(queryType, params, metricName)
fmtMetrics := GetMetric(queryType, params, metricName)
unifyMetricHistoryTimeRange(fmtMetrics)
ch <- fmtMetrics
wg.Done()
}(metricName)
}
@@ -563,11 +642,13 @@ func calcWorkspaceNamespace(metric *FormatedMetric) int {
var workspaceNamespaceCount = 0
for _, result := range metric.Data.Result {
tmpMap := result[ResultItemMetric].(map[string]interface{})
wsName, exist := tmpMap[WorkspaceJoinedKey]
tmpMap, sure := result[ResultItemMetric].(map[string]interface{})
if sure {
wsName, exist := tmpMap[WorkspaceJoinedKey]
if exist && wsName != "" {
workspaceNamespaceCount += 1
if exist && wsName != "" {
workspaceNamespaceCount += 1
}
}
}
@@ -788,11 +869,14 @@ func MonitorComponentStatus(monitoringRequest *client.MonitoringRequestParams) *
var normalNodes []string
var abnormalNodes []string
for _, result := range nodeStatusMetric.Data.Result {
tmap := result[ResultItemMetric].(map[string]interface{})
if tmap[MetricStatus].(string) == "false" {
abnormalNodes = append(abnormalNodes, tmap[MetricLevelNode].(string))
} else {
normalNodes = append(normalNodes, tmap[MetricLevelNode].(string))
tmap, sure := result[ResultItemMetric].(map[string]interface{})
if sure {
if tmap[MetricStatus].(string) == "false" {
abnormalNodes = append(abnormalNodes, tmap[MetricLevelNode].(string))
} else {
normalNodes = append(normalNodes, tmap[MetricLevelNode].(string))
}
}
}