refactor monitoring apis for high performance testing

This commit is contained in:
Carman Zhang
2018-12-03 15:15:53 +08:00
parent fa30f68102
commit c503efaa3b
5 changed files with 174 additions and 399 deletions

View File

@@ -17,7 +17,6 @@ limitations under the License.
package metrics
import (
"encoding/json"
"fmt"
"net/url"
"regexp"
@@ -26,18 +25,25 @@ import (
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"runtime/debug"
"sort"
"github.com/json-iterator/go"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
listerV1 "k8s.io/client-go/listers/core/v1"
"kubesphere.io/kubesphere/pkg/client"
"kubesphere.io/kubesphere/pkg/models"
"kubesphere.io/kubesphere/pkg/models/controllers"
"kubesphere.io/kubesphere/pkg/models/workspaces"
)
var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary
var nodeStatusDelLables = []string{"endpoint", "instance", "job", "namespace", "pod", "service"}
const (
@@ -120,26 +126,70 @@ type OneComponentStatus struct {
Error string `json:"error,omitempty"`
}
func getPodNameRegexInWorkload(res string) string {
func getAllWorkspaceNames(formatedMetric *FormatedMetric) map[string]int {
var wsMap = make(map[string]int)
for i := 0; i < len(formatedMetric.Data.Result); i++ {
metricDesc := formatedMetric.Data.Result[i][ResultItemMetric]
metricDescMap, ensure := metricDesc.(map[string]interface{})
if ensure {
if wsLabel, exist := metricDescMap[WorkspaceJoinedKey]; exist {
wsMap[wsLabel.(string)] = 1
}
}
}
return wsMap
}
func getAllWorkspaces() map[string]int {
paramValues := make(url.Values)
paramValues.Set("query", WorkspaceNamespaceLabelRule)
params := paramValues.Encode()
res := client.SendMonitoringRequest(client.DefaultQueryType, params)
metric := ReformatJson(res, "")
return getAllWorkspaceNames(metric)
}
func getPodNameRegexInWorkload(res, filter string) string {
data := []byte(res)
var dat CommonMetricsResult
jsonErr := json.Unmarshal(data, &dat)
jsonErr := jsonIter.Unmarshal(data, &dat)
if jsonErr != nil {
glog.Errorln("json parse failed", jsonErr)
glog.Errorln("json parse failed", jsonErr.Error(), res)
}
var podNames []string
for _, item := range dat.Data.Result {
podName := item.KubePodMetric.Pod
podNames = append(podNames, podName)
if filter != "" {
if bol, _ := regexp.MatchString(filter, podName); bol {
podNames = append(podNames, podName)
}
} else {
podNames = append(podNames, podName)
}
}
podNamesFilter := "^(" + strings.Join(podNames, "|") + ")$"
return podNamesFilter
}
func unifyMetricHistoryTimeRange(fmtMetrics *FormatedMetric) {
defer func() {
if err := recover(); err != nil {
glog.Errorln(err)
debug.PrintStack()
}
}()
var timestampMap = make(map[float64]bool)
if fmtMetrics.Data.ResultType == ResultTypeMatrix {
@@ -150,7 +200,7 @@ func unifyMetricHistoryTimeRange(fmtMetrics *FormatedMetric) {
if sure {
for j, _ := range valueArray {
timeAndValue := valueArray[j].([]interface{})
timestampMap[timeAndValue[0].(float64)] = true
timestampMap[float64(timeAndValue[0].(uint64))] = true
}
}
}
@@ -178,7 +228,7 @@ func unifyMetricHistoryTimeRange(fmtMetrics *FormatedMetric) {
for k, _ := range timestampArray {
valueItem, sure := valueArray[j].([]interface{})
if sure && valueItem[0].(float64) == timestampArray[k] {
if sure && float64(valueItem[0].(uint64)) == timestampArray[k] {
formatValueArray[k] = []interface{}{int64(timestampArray[k]), valueItem[1]}
j++
} else {
@@ -196,6 +246,7 @@ func AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest *client.Monitor
nsName := monitoringRequest.NsName
wkName := monitoringRequest.WorkloadName
podsFilter := monitoringRequest.PodsFilter
rule := MakeSpecificWorkloadRule(monitoringRequest.WorkloadKind, wkName, nsName)
paramValues := monitoringRequest.Params
@@ -203,7 +254,7 @@ func AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest *client.Monitor
res := client.SendMonitoringRequest(client.DefaultQueryType, params)
podNamesFilter := getPodNameRegexInWorkload(res)
podNamesFilter := getPodNameRegexInWorkload(res, podsFilter)
queryType := monitoringRequest.QueryType
rule = MakePodPromQL(metricName, nsName, "", "", podNamesFilter)
@@ -239,12 +290,20 @@ func GetMetric(queryType, params, metricName string) *FormatedMetric {
}
func GetNodeAddressInfo() *map[string][]v1.NodeAddress {
nodeList, err := client.NewK8sClient().CoreV1().Nodes().List(metaV1.ListOptions{})
lister, err := controllers.GetLister(controllers.Nodes)
if err != nil {
glog.Errorln(err)
}
nodeLister := lister.(listerV1.NodeLister)
nodes, err := nodeLister.List(labels.Everything())
if err != nil {
glog.Errorln(err.Error())
}
var nodeAddress = make(map[string][]v1.NodeAddress)
for _, node := range nodeList.Items {
for _, node := range nodes {
nodeAddress[node.Name] = node.Status.Addresses
}
return &nodeAddress
@@ -373,13 +432,9 @@ func MonitorAllWorkspaces(monitoringRequest *client.MonitoringRequestParams) *Fo
var wgAll sync.WaitGroup
var wsAllch = make(chan *[]FormatedMetric, ChannelMaxCapacityWorkspaceMetric)
workspaceNamespaceMap, _, err := workspaces.GetAllOrgAndProjList()
wsMap := getAllWorkspaces()
if err != nil {
glog.Errorln(err.Error())
}
for ws, _ := range workspaceNamespaceMap {
for ws := range wsMap {
bol, err := regexp.MatchString(monitoringRequest.WsFilter, ws)
if err == nil && bol {
// a workspace
@@ -552,18 +607,6 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
}
} else {
// sum all workspaces
_, namespaceWorkspaceMap, err := workspaces.GetAllOrgAndProjList()
if err != nil {
glog.Errorln(err.Error())
}
nsList := make([]string, 0)
for ns := range namespaceWorkspaceMap {
if namespaceWorkspaceMap[ns] == "" {
nsList = append(nsList, ns)
}
}
for _, metricName := range WorkspaceMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
@@ -574,14 +617,7 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
go func(metricName string) {
queryType, params := AssembleAllWorkspaceMetricRequestInfo(monitoringRequest, nil, metricName)
if metricName == MetricNameWorkspaceAllProjectCount {
res := GetMetric(queryType, params, metricName)
res = MonitorWorkspaceNamespaceHistory(res)
ch <- res
} else {
ch <- GetMetric(queryType, params, metricName)
}
ch <- GetMetric(queryType, params, metricName)
wg.Done()
}(metricName)
@@ -606,14 +642,14 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
case MetricLevelWorkload:
{
if monitoringRequest.Tp == "rank" {
for _, metricName := range WorkloadMetricsNames {
bol, err := regexp.MatchString(metricsFilter, metricName)
if err == nil && bol {
wg.Add(1)
go func(metricName string) {
queryType, params := AssembleAllWorkloadMetricRequestInfo(monitoringRequest, metricName)
ch <- GetMetric(queryType, params, metricName)
fmtMetrics := GetMetric(queryType, params, metricName)
ch <- fmtMetrics
wg.Done()
}(metricName)
}
@@ -688,82 +724,6 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour
}
}
func MonitorWorkspaceNamespaceHistory(metric *FormatedMetric) *FormatedMetric {
resultType := metric.Data.ResultType
//metric.Status
metricName := metric.MetricName
for i := 0; i < len(metric.Data.Result); i++ {
metricItem := metric.Data.Result[i]
if resultType == ResultTypeVector {
timeAndValue, sure := metricItem[ResultItemValue].([]interface{})
if !sure {
return metric
}
metric := getNamespaceHistoryMetric(timeAndValue[0].(float64), metricName)
workspaceNamespaceCount := calcWorkspaceNamespace(metric)
timeAndValue[1] = fmt.Sprintf("%d", workspaceNamespaceCount)
} else if resultType == ResultTypeMatrix {
values, sure := metricItem[ResultItemValues].([]interface{})
if !sure {
return metric
}
for _, valueItem := range values {
timeAndValue, sure := valueItem.([]interface{})
if !sure {
return metric
}
metric := getNamespaceHistoryMetric(timeAndValue[0].(float64), metricName)
workspaceNamespaceCount := calcWorkspaceNamespace(metric)
timeAndValue[1] = fmt.Sprintf("%d", workspaceNamespaceCount)
}
}
}
return metric
}
func getNamespaceHistoryMetric(timestamp float64, metricName string) *FormatedMetric {
var timeRelatedParams = make(url.Values)
timeRelatedParams.Set("time", fmt.Sprintf("%f", timestamp))
timeRelatedParams.Set("query", NamespaceLabelRule)
params := timeRelatedParams.Encode()
metric := GetMetric(client.DefaultQueryType, params, metricName)
return metric
}
// calculate all namespace which belong to workspaces
func calcWorkspaceNamespace(metric *FormatedMetric) int {
if metric.Status == "error" {
glog.Errorf("failed when retrive namespace history, the metric is %v", metric.Data.Result)
return 0
}
var workspaceNamespaceCount = 0
for _, result := range metric.Data.Result {
tmpMap, sure := result[ResultItemMetric].(map[string]interface{})
if sure {
wsName, exist := tmpMap[WorkspaceJoinedKey]
if exist && wsName != "" {
workspaceNamespaceCount += 1
}
}
}
return workspaceNamespaceCount
}
func MonitorAllWorkspacesStatistics() *FormatedLevelMetric {
wg := sync.WaitGroup{}
@@ -772,13 +732,10 @@ func MonitorAllWorkspacesStatistics() *FormatedLevelMetric {
var orgResultItem *FormatedMetric
var devopsResultItem *FormatedMetric
var clusterProjResultItem *FormatedMetric
var workspaceProjResultItem *FormatedMetric
var accountResultItem *FormatedMetric
wsMap, projs, errProj := workspaces.GetAllOrgAndProjList()
wg.Add(5)
wg.Add(4)
go func() {
orgNums, errOrg := workspaces.GetAllOrgNums()
@@ -799,10 +756,7 @@ func MonitorAllWorkspacesStatistics() *FormatedLevelMetric {
}()
go func() {
var projNums = 0
for _, v := range wsMap {
projNums += len(v)
}
projNums, errProj := workspaces.GetAllProjectNums()
if errProj != nil {
glog.Errorln(errProj.Error())
}
@@ -810,15 +764,6 @@ func MonitorAllWorkspacesStatistics() *FormatedLevelMetric {
wg.Done()
}()
go func() {
projNums := len(projs)
if errProj != nil {
glog.Errorln(errProj.Error())
}
clusterProjResultItem = getSpecificMetricItem(timestamp, MetricNameClusterAllProjectCount, WorkspaceResourceKindNamespace, projNums, errProj)
wg.Done()
}()
go func() {
actNums, errAct := workspaces.GetAllAccountNums()
if errAct != nil {
@@ -830,7 +775,7 @@ func MonitorAllWorkspacesStatistics() *FormatedLevelMetric {
wg.Wait()
metricsArray = append(metricsArray, *orgResultItem, *devopsResultItem, *workspaceProjResultItem, *accountResultItem, *clusterProjResultItem)
metricsArray = append(metricsArray, *orgResultItem, *devopsResultItem, *workspaceProjResultItem, *accountResultItem)
return &FormatedLevelMetric{
MetricsLevel: MetricLevelWorkspace,
@@ -854,16 +799,7 @@ func MonitorOneWorkspaceStatistics(wsName string) *FormatedLevelMetric {
go func() {
// add namespaces(project) metric
namespaces, errNs := workspaces.WorkspaceNamespaces(wsName)
namespaces, noneExistentNs := getExistingNamespace(namespaces)
if len(noneExistentNs) != 0 {
nsStr := strings.Join(noneExistentNs, "|")
errStr := "the namespaces " + nsStr + " do not exist"
if errNs == nil {
errNs = errors.New(errStr)
} else {
errNs = errors.New(errNs.Error() + "\t" + errStr)
}
}
if errNs != nil {
glog.Errorln(errNs.Error())
}
@@ -1056,33 +992,6 @@ func makeMetricItems(timestamp int64, statusMap map[string]int, resourceType str
return &metricItems
}
// monitor k8s event, there are two status: Normal, Warning
func MonitorEvents(nsFilter string) *[]v1.Event {
namespaceMap, err := getAllNamespace()
if err != nil {
glog.Errorln(err.Error())
}
var nsList = make([]string, 0)
for ns, _ := range namespaceMap {
nsList = append(nsList, ns)
}
filterNS := filterNamespace(nsFilter, nsList)
var eventsList = make([]v1.Event, 0)
for _, ns := range filterNS {
events, err := client.NewK8sClient().CoreV1().Events(ns).List(metaV1.ListOptions{})
if err != nil {
glog.Errorln(err.Error())
} else {
for _, item := range events.Items {
eventsList = append(eventsList, item)
}
}
}
return &eventsList
}
func AssembleClusterMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) {
queryType := monitoringRequest.QueryType
paramValues := monitoringRequest.Params
@@ -1100,87 +1009,3 @@ func AssembleNodeMetricRequestInfo(monitoringRequest *client.MonitoringRequestPa
return queryType, params
}
func getExistingNamespace(namespaces []string) ([]string, []string) {
namespaceMap, err := getAllNamespace()
var existedNS []string
var noneExistedNS []string
if err != nil {
return namespaces, nil
}
for _, ns := range namespaces {
if _, exist := namespaceMap[ns]; exist {
existedNS = append(existedNS, ns)
} else {
noneExistedNS = append(noneExistedNS, ns)
}
}
return existedNS, noneExistedNS
}
func getAllNamespace() (map[string]int, error) {
k8sClient := client.NewK8sClient()
nsList, err := k8sClient.CoreV1().Namespaces().List(metaV1.ListOptions{})
if err != nil {
glog.Errorln(err.Error())
return nil, err
}
namespaceMap := make(map[string]int)
for _, item := range nsList.Items {
namespaceMap[item.Name] = 0
}
return namespaceMap, nil
}
func MonitorWorkloadCount(namespace string) *FormatedMetric {
quotaMetric, err := models.GetNamespaceQuota(namespace)
fMetric := convertQuota2MetricStruct(quotaMetric)
// whether the namespace in request parameters exists?
namespaceMap, err := getAllNamespace()
_, exist := namespaceMap[namespace]
if err != nil {
exist = true
}
if !exist || err != nil {
fMetric.Status = MetricStatusError
fMetric.Data.ResultType = ""
errInfo := make(map[string]interface{})
if err != nil {
errInfo["errormsg"] = err.Error()
} else {
errInfo["errormsg"] = "namespace " + namespace + " does not exist"
}
fMetric.Data.Result = []map[string]interface{}{errInfo}
}
return fMetric
}
func convertQuota2MetricStruct(quotaMetric *models.ResourceQuota) *FormatedMetric {
var fMetric FormatedMetric
fMetric.MetricName = MetricNameWorkloadCount
fMetric.Status = MetricStatusSuccess
fMetric.Data.ResultType = ResultTypeVector
timestamp := int64(time.Now().Unix())
var resultItems []map[string]interface{}
hardMap := make(map[string]string)
for resourceName, v := range quotaMetric.Data.Hard {
hardMap[resourceName.String()] = v.String()
}
for resourceName, v := range quotaMetric.Data.Used {
resultItem := make(map[string]interface{})
tmp := make(map[string]string)
tmp[ResultItemMetricResource] = resourceName.String()
resultItem[ResultItemMetric] = tmp
resultItem[ResultItemValue] = []interface{}{timestamp, hardMap[resourceName.String()], v.String()}
resultItems = append(resultItems, resultItem)
}
fMetric.Data.Result = resultItems
return &fMetric
}