Add test code to metrics client, and fix metrics items process

Signed-off-by: root <danma@yunify.com>
This commit is contained in:
root
2021-01-22 14:50:49 +00:00
parent 8c86c9e1a5
commit 3095fd9403
13 changed files with 765 additions and 71 deletions

View File

@@ -118,18 +118,14 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS
if s.MonitoringOptions == nil || len(s.MonitoringOptions.Endpoint) == 0 { if s.MonitoringOptions == nil || len(s.MonitoringOptions.Endpoint) == 0 {
return nil, fmt.Errorf("moinitoring service address in configuration MUST not be empty, please check configmap/kubesphere-config in kubesphere-system namespace") return nil, fmt.Errorf("moinitoring service address in configuration MUST not be empty, please check configmap/kubesphere-config in kubesphere-system namespace")
} else { } else {
prometheusClient, err := prometheus.NewPrometheus(s.MonitoringOptions) monitoringClient, err := prometheus.NewPrometheus(s.MonitoringOptions)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to connect to prometheus, please check prometheus status, error: %v", err) return nil, fmt.Errorf("failed to connect to prometheus, please check prometheus status, error: %v", err)
} }
apiServer.PrometheusClient = prometheusClient apiServer.MonitoringClient = monitoringClient
} }
metricsClient, err := metricsserver.NewMetricsServer(kubernetesClient.Kubernetes(), s.KubernetesOptions) apiServer.MetricsClient = metricsserver.NewMetricsClient(kubernetesClient.Kubernetes(), s.KubernetesOptions)
if err != nil {
return nil, fmt.Errorf("failed to connect to metrics-server, please check metrics-server status, error: %v", err)
}
apiServer.MetricsClient = metricsClient
if s.LoggingOptions.Host != "" { if s.LoggingOptions.Host != "" {
loggingClient, err := esclient.NewClient(s.LoggingOptions) loggingClient, err := esclient.NewClient(s.LoggingOptions)

View File

@@ -130,7 +130,7 @@ type APIServer struct {
CacheClient cache.Interface CacheClient cache.Interface
// monitoring client set // monitoring client set
PrometheusClient monitoring.Interface MonitoringClient monitoring.Interface
MetricsClient monitoring.Interface MetricsClient monitoring.Interface
@@ -214,7 +214,7 @@ func (s *APIServer) installKubeSphereAPIs() {
urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config)) urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache)) urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.PrometheusClient, s.MetricsClient, s.InformerFactory, s.OpenpitrixClient)) urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.OpenpitrixClient))
urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.OpenpitrixClient)) urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.OpenpitrixClient))
urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes())) urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory, urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory,

View File

@@ -35,8 +35,8 @@ type handler struct {
mo model.MonitoringOperator mo model.MonitoringOperator
} }
func newHandler(k kubernetes.Interface, prometheusClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, o openpitrix.Client) *handler { func newHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, o openpitrix.Client) *handler {
return &handler{k, model.NewMonitoringOperator(prometheusClient, metricsClient, k, f, o)} return &handler{k, model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, o)}
} }
func (h handler) handleKubeSphereMetricsQuery(req *restful.Request, resp *restful.Response) { func (h handler) handleKubeSphereMetricsQuery(req *restful.Request, resp *restful.Response) {

View File

@@ -39,10 +39,10 @@ const (
var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha3"} var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha3"}
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, prometheusClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, opClient openpitrix.Client) error { func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, opClient openpitrix.Client) error {
ws := runtime.NewWebService(GroupVersion) ws := runtime.NewWebService(GroupVersion)
h := newHandler(k8sClient, prometheusClient, metricsClient, factory, opClient) h := newHandler(k8sClient, monitoringClient, metricsClient, factory, opClient)
ws.Route(ws.GET("/kubesphere"). ws.Route(ws.GET("/kubesphere").
To(h.handleKubeSphereMetricsQuery). To(h.handleKubeSphereMetricsQuery).

View File

@@ -54,9 +54,9 @@ type monitoringOperator struct {
op openpitrix.Interface op openpitrix.Interface
} }
func NewMonitoringOperator(prometheusClient monitoring.Interface, metricsClient monitoring.Interface, k8s kubernetes.Interface, factory informers.InformerFactory, opClient opclient.Client) MonitoringOperator { func NewMonitoringOperator(monitoringClient monitoring.Interface, metricsClient monitoring.Interface, k8s kubernetes.Interface, factory informers.InformerFactory, opClient opclient.Client) MonitoringOperator {
return &monitoringOperator{ return &monitoringOperator{
prometheus: prometheusClient, prometheus: monitoringClient,
metricsserver: metricsClient, metricsserver: metricsClient,
k8s: k8s, k8s: k8s,
ks: factory.KubeSphereSharedInformerFactory(), ks: factory.KubeSphereSharedInformerFactory(),
@@ -96,6 +96,8 @@ func (mo monitoringOperator) GetMetricOverTime(expr, namespace string, start, en
func (mo monitoringOperator) GetNamedMetrics(metrics []string, time time.Time, opt monitoring.QueryOption) Metrics { func (mo monitoringOperator) GetNamedMetrics(metrics []string, time time.Time, opt monitoring.QueryOption) Metrics {
ress := mo.prometheus.GetNamedMetrics(metrics, time, opt) ress := mo.prometheus.GetNamedMetrics(metrics, time, opt)
if mo.metricsserver != nil {
mr := mo.metricsserver.GetNamedMetrics(metrics, time, opt) mr := mo.metricsserver.GetNamedMetrics(metrics, time, opt)
//Merge edge node metrics data //Merge edge node metrics data
@@ -109,12 +111,15 @@ func (mo monitoringOperator) GetNamedMetrics(metrics []string, time time.Time, o
ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...) ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...)
} }
} }
}
return Metrics{Results: ress} return Metrics{Results: ress}
} }
func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) Metrics { func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) Metrics {
ress := mo.prometheus.GetNamedMetricsOverTime(metrics, start, end, step, opt) ress := mo.prometheus.GetNamedMetricsOverTime(metrics, start, end, step, opt)
if mo.metricsserver != nil {
mr := mo.metricsserver.GetNamedMetricsOverTime(metrics, start, end, step, opt) mr := mo.metricsserver.GetNamedMetricsOverTime(metrics, start, end, step, opt)
//Merge edge node metrics data //Merge edge node metrics data
@@ -128,6 +133,7 @@ func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, en
ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...) ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...)
} }
} }
}
return Metrics{Results: ress} return Metrics{Results: ress}
} }

View File

@@ -18,6 +18,7 @@ package metricsserver
import ( import (
"context" "context"
"errors"
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
@@ -38,7 +39,7 @@ import (
// metricsServer implements monitoring interface backend by metrics-server // metricsServer implements monitoring interface backend by metrics-server
type metricsServer struct { type metricsServer struct {
metricsAPIAvailable bool metricsAPIAvailable bool
metricsClient *metricsclient.Clientset metricsClient metricsclient.Interface
k8s kubernetes.Interface k8s kubernetes.Interface
} }
@@ -119,38 +120,44 @@ func (m metricsServer) getNodeMetricsFromMetricsAPI() (*metricsapi.NodeMetricsLi
return metrics, nil return metrics, nil
} }
func NewMetricsServer(k kubernetes.Interface, options *k8s.KubernetesOptions) (monitoring.Interface, error) { func NewMetricsClient(k kubernetes.Interface, options *k8s.KubernetesOptions) monitoring.Interface {
var metricsServer metricsServer
config, err := clientcmd.BuildConfigFromFlags("", options.KubeConfig) config, err := clientcmd.BuildConfigFromFlags("", options.KubeConfig)
if err != nil { if err != nil {
klog.Error(err) klog.Error(err)
return metricsServer, err return nil
} }
metricsServer.k8s = k
discoveryClient := k.Discovery() discoveryClient := k.Discovery()
apiGroups, err := discoveryClient.ServerGroups() apiGroups, err := discoveryClient.ServerGroups()
if err != nil { if err != nil {
klog.Error(err) klog.Error(err)
return metricsServer, err return nil
} }
metricsServer.metricsAPIAvailable = metricsAPISupported(apiGroups) metricsAPIAvailable := metricsAPISupported(apiGroups)
if !metricsServer.metricsAPIAvailable { if !metricsAPIAvailable {
klog.Warningf("Metrics API not available.") klog.Warningf("Metrics API not available.")
return metricsServer, err return nil
} }
metricsClient, err := metricsclient.NewForConfig(config) metricsClient, err := metricsclient.NewForConfig(config)
if err != nil { if err != nil {
klog.Error(err) klog.Error(err)
return metricsServer, err return nil
} }
metricsServer.metricsClient = metricsClient return NewMetricsServer(k, metricsAPIAvailable, metricsClient)
return metricsServer, nil }
func NewMetricsServer(k kubernetes.Interface, a bool, m metricsclient.Interface) monitoring.Interface {
var metricsServer metricsServer
metricsServer.k8s = k
metricsServer.metricsAPIAvailable = a
metricsServer.metricsClient = m
return metricsServer
} }
func (m metricsServer) GetMetric(expr string, ts time.Time) monitoring.Metric { func (m metricsServer) GetMetric(expr string, ts time.Time) monitoring.Metric {
@@ -165,7 +172,27 @@ func (m metricsServer) GetMetricOverTime(expr string, start, end time.Time, step
return parsedResp return parsedResp
} }
var edgeNodeMetrics = []string{"node_cpu_usage", "node_cpu_total", "node_cpu_utilisation", "node_memory_usage_wo_cache", "node_memory_total", "node_memory_utilisation"} const (
metricsNodeCPUUsage = "node_cpu_usage"
metricsNodeCPUTotal = "node_cpu_total"
metricsNodeCPUUltilisation = "node_cpu_utilisation"
metricsNodeMemoryUsageWoCache = "node_memory_usage_wo_cache"
metricsNodeMemoryTotal = "node_memory_total"
metricsNodeMemoryUltilisation = "node_memory_utilisation"
)
var edgeNodeMetrics = []string{metricsNodeCPUUsage, metricsNodeCPUTotal, metricsNodeCPUUltilisation, metricsNodeMemoryUsageWoCache, metricsNodeMemoryTotal, metricsNodeMemoryUltilisation}
func (m metricsServer) parseErrorResp(metrics []string, err error) []monitoring.Metric {
var res []monitoring.Metric
for _, metric := range metrics {
parsedResp := monitoring.Metric{MetricName: metric}
parsedResp.Error = err.Error()
}
return res
}
func (m metricsServer) GetNamedMetrics(metrics []string, ts time.Time, o monitoring.QueryOption) []monitoring.Metric { func (m metricsServer) GetNamedMetrics(metrics []string, ts time.Time, o monitoring.QueryOption) []monitoring.Metric {
var res []monitoring.Metric var res []monitoring.Metric
@@ -175,13 +202,13 @@ func (m metricsServer) GetNamedMetrics(metrics []string, ts time.Time, o monitor
if opts.Level == monitoring.LevelNode { if opts.Level == monitoring.LevelNode {
if !m.metricsAPIAvailable { if !m.metricsAPIAvailable {
klog.Warningf("Metrics API not available.") klog.Warningf("Metrics API not available.")
return res return m.parseErrorResp(metrics, errors.New("Metrics API not available."))
} }
edgeNodes, err := m.listEdgeNodes() edgeNodes, err := m.listEdgeNodes()
if err != nil { if err != nil {
klog.Errorf("List edge nodes error %v\n", err) klog.Errorf("List edge nodes error %v\n", err)
return res return m.parseErrorResp(metrics, err)
} }
edgeNodeNamesFiltered := m.filterEdgeNodeNames(edgeNodes, opts) edgeNodeNamesFiltered := m.filterEdgeNodeNames(edgeNodes, opts)
@@ -193,7 +220,12 @@ func (m metricsServer) GetNamedMetrics(metrics []string, ts time.Time, o monitor
metricsResult, err := m.getNodeMetricsFromMetricsAPI() metricsResult, err := m.getNodeMetricsFromMetricsAPI()
if err != nil { if err != nil {
klog.Errorf("Get edge node metrics error %v\n", err) klog.Errorf("Get edge node metrics error %v\n", err)
return res return m.parseErrorResp(metrics, err)
}
metricsMap := make(map[string]bool)
for _, m := range metrics {
metricsMap[m] = true
} }
status := make(map[string]v1.NodeStatus) status := make(map[string]v1.NodeStatus)
@@ -203,8 +235,11 @@ func (m metricsServer) GetNamedMetrics(metrics []string, ts time.Time, o monitor
nodeMetrics := make(map[string]*monitoring.MetricData) nodeMetrics := make(map[string]*monitoring.MetricData)
for _, enm := range edgeNodeMetrics { for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
nodeMetrics[enm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeVector} nodeMetrics[enm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeVector}
} }
}
var usage v1.ResourceList var usage v1.ResourceList
var cap v1.ResourceList var cap v1.ResourceList
@@ -235,22 +270,46 @@ func (m metricsServer) GetNamedMetrics(metrics []string, ts time.Time, o monitor
} }
} }
metricValues["node_cpu_usage"].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / 1000} _, ok = metricsMap[metricsNodeCPUUsage]
metricValues["node_cpu_total"].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Cpu().MilliValue()) / 1000} if ok {
metricValues["node_cpu_utilisation"].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / float64(cap.Cpu().MilliValue())} metricValues[metricsNodeCPUUsage].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / 1000}
metricValues["node_memory_usage_wo_cache"].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value())} }
metricValues["node_memory_total"].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Memory().Value())} _, ok = metricsMap[metricsNodeCPUTotal]
metricValues["node_memory_utilisation"].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value()) / float64(cap.Memory().Value())} if ok {
metricValues[metricsNodeCPUTotal].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Cpu().MilliValue()) / 1000}
}
_, ok = metricsMap[metricsNodeCPUUltilisation]
if ok {
metricValues[metricsNodeCPUUltilisation].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / float64(cap.Cpu().MilliValue())}
}
_, ok = metricsMap[metricsNodeMemoryUsageWoCache]
if ok {
metricValues[metricsNodeMemoryUsageWoCache].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value())}
}
_, ok = metricsMap[metricsNodeMemoryTotal]
if ok {
metricValues[metricsNodeMemoryTotal].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Memory().Value())}
}
_, ok = metricsMap[metricsNodeMemoryUltilisation]
if ok {
metricValues[metricsNodeMemoryUltilisation].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value()) / float64(cap.Memory().Value())}
}
for _, enm := range edgeNodeMetrics { for _, enm := range edgeNodeMetrics {
_, ok = metricsMap[enm]
if ok {
nodeMetrics[enm].MetricValues = append(nodeMetrics[enm].MetricValues, *metricValues[enm]) nodeMetrics[enm].MetricValues = append(nodeMetrics[enm].MetricValues, *metricValues[enm])
} }
} }
}
for _, enm := range edgeNodeMetrics { for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
res = append(res, monitoring.Metric{MetricName: enm, MetricData: *nodeMetrics[enm]}) res = append(res, monitoring.Metric{MetricName: enm, MetricData: *nodeMetrics[enm]})
} }
} }
}
return res return res
} }
@@ -263,13 +322,13 @@ func (m metricsServer) GetNamedMetricsOverTime(metrics []string, start, end time
if opts.Level == monitoring.LevelNode { if opts.Level == monitoring.LevelNode {
if !m.metricsAPIAvailable { if !m.metricsAPIAvailable {
klog.Warningf("Metrics API not available.") klog.Warningf("Metrics API not available.")
return res return m.parseErrorResp(metrics, errors.New("Metrics API not available."))
} }
edgeNodes, err := m.listEdgeNodes() edgeNodes, err := m.listEdgeNodes()
if err != nil { if err != nil {
klog.Errorf("List edge nodes error %v\n", err) klog.Errorf("List edge nodes error %v\n", err)
return res return m.parseErrorResp(metrics, err)
} }
edgeNodeNamesFiltered := m.filterEdgeNodeNames(edgeNodes, opts) edgeNodeNamesFiltered := m.filterEdgeNodeNames(edgeNodes, opts)
@@ -281,7 +340,12 @@ func (m metricsServer) GetNamedMetricsOverTime(metrics []string, start, end time
metricsResult, err := m.getNodeMetricsFromMetricsAPI() metricsResult, err := m.getNodeMetricsFromMetricsAPI()
if err != nil { if err != nil {
klog.Errorf("Get edge node metrics error %v\n", err) klog.Errorf("Get edge node metrics error %v\n", err)
return res return m.parseErrorResp(metrics, err)
}
metricsMap := make(map[string]bool)
for _, m := range metrics {
metricsMap[m] = true
} }
status := make(map[string]v1.NodeStatus) status := make(map[string]v1.NodeStatus)
@@ -291,8 +355,11 @@ func (m metricsServer) GetNamedMetricsOverTime(metrics []string, start, end time
nodeMetrics := make(map[string]*monitoring.MetricData) nodeMetrics := make(map[string]*monitoring.MetricData)
for _, enm := range edgeNodeMetrics { for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
nodeMetrics[enm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeMatrix} nodeMetrics[enm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeMatrix}
} }
}
var usage v1.ResourceList var usage v1.ResourceList
var cap v1.ResourceList var cap v1.ResourceList
@@ -323,22 +390,46 @@ func (m metricsServer) GetNamedMetricsOverTime(metrics []string, start, end time
} }
} }
metricValues["node_cpu_usage"].Series = append(metricValues["node_cpu_usage"].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / 1000}) _, ok = metricsMap[metricsNodeCPUUsage]
metricValues["node_cpu_total"].Series = append(metricValues["node_cpu_total"].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Cpu().MilliValue()) / 1000}) if ok {
metricValues["node_cpu_utilisation"].Series = append(metricValues["node_cpu_utilisation"].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / float64(cap.Cpu().MilliValue())}) metricValues[metricsNodeCPUUsage].Series = append(metricValues[metricsNodeCPUUsage].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / 1000})
metricValues["node_memory_usage_wo_cache"].Series = append(metricValues["node_memory_usage_wo_cache"].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value())}) }
metricValues["node_memory_total"].Series = append(metricValues["node_memory_total"].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Memory().Value())}) _, ok = metricsMap[metricsNodeCPUTotal]
metricValues["node_memory_utilisation"].Series = append(metricValues["node_memory_utilisation"].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value()) / float64(cap.Memory().Value())}) if ok {
metricValues[metricsNodeCPUTotal].Series = append(metricValues[metricsNodeCPUTotal].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Cpu().MilliValue()) / 1000})
}
_, ok = metricsMap[metricsNodeCPUUltilisation]
if ok {
metricValues[metricsNodeCPUUltilisation].Series = append(metricValues[metricsNodeCPUUltilisation].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / float64(cap.Cpu().MilliValue())})
}
_, ok = metricsMap[metricsNodeMemoryUsageWoCache]
if ok {
metricValues[metricsNodeMemoryUsageWoCache].Series = append(metricValues[metricsNodeMemoryUsageWoCache].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value())})
}
_, ok = metricsMap[metricsNodeMemoryTotal]
if ok {
metricValues[metricsNodeMemoryTotal].Series = append(metricValues[metricsNodeMemoryTotal].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Memory().Value())})
}
_, ok = metricsMap[metricsNodeMemoryUltilisation]
if ok {
metricValues[metricsNodeMemoryUltilisation].Series = append(metricValues[metricsNodeMemoryUltilisation].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value()) / float64(cap.Memory().Value())})
}
for _, enm := range edgeNodeMetrics { for _, enm := range edgeNodeMetrics {
_, ok = metricsMap[enm]
if ok {
nodeMetrics[enm].MetricValues = append(nodeMetrics[enm].MetricValues, *metricValues[enm]) nodeMetrics[enm].MetricValues = append(nodeMetrics[enm].MetricValues, *metricValues[enm])
} }
} }
}
for _, enm := range edgeNodeMetrics { for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
res = append(res, monitoring.Metric{MetricName: enm, MetricData: *nodeMetrics[enm]}) res = append(res, monitoring.Metric{MetricName: enm, MetricData: *nodeMetrics[enm]})
} }
} }
}
return res return res
} }

View File

@@ -0,0 +1,273 @@
package metricsserver
import (
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/json-iterator/go"
"io/ioutil"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
fakek8s "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
metricsV1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
// mergeResourceLists will merge resoure lists. When two lists have the same resourece, the value from
// the last list will be present in the result
func mergeResourceLists(resourceLists ...corev1.ResourceList) corev1.ResourceList {
result := corev1.ResourceList{}
for _, rl := range resourceLists {
for resource, quantity := range rl {
result[resource] = quantity
}
}
return result
}
func getResourceList(cpu, memory string) corev1.ResourceList {
res := corev1.ResourceList{}
if cpu != "" {
res[corev1.ResourceCPU] = resource.MustParse(cpu)
}
if memory != "" {
res[corev1.ResourceMemory] = resource.MustParse(memory)
}
return res
}
var nodeCapacity = mergeResourceLists(getResourceList("8", "8Gi"))
var node1 = &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-1",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Status: corev1.NodeStatus{
Capacity: nodeCapacity,
},
}
var node2 = &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-2",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Status: corev1.NodeStatus{
Capacity: nodeCapacity,
},
}
func TestGetNamedMetrics(t *testing.T) {
tests := []struct {
metrics []string
filter string
expected string
}{
{
metrics: []string{"node_cpu_usage", "node_memory_usage_wo_cache"},
filter: ".*",
expected: "metrics-vector-1.json",
},
{
metrics: []string{"node_cpu_usage", "node_cpu_utilisation"},
filter: "edgenode-2",
expected: "metrics-vector-2.json",
},
{
metrics: []string{"node_memory_usage_wo_cache", "node_memory_utilisation"},
filter: "edgenode-1|edgenode-2",
expected: "metrics-vector-3.json",
},
}
fakeK8sClient := fakek8s.NewSimpleClientset(node1, node2)
informer := informers.NewSharedInformerFactory(fakeK8sClient, 0)
informer.Core().V1().Nodes().Informer().GetIndexer().Add(node1)
informer.Core().V1().Nodes().Informer().GetIndexer().Add(node2)
fakeMetricsclient := &fakemetricsclient.Clientset{}
layout := "2006-01-02T15:04:05.000Z"
str := "2021-01-25T12:34:56.789Z"
metricsTime, _ := time.Parse(layout, str)
fakeMetricsclient.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
metrics := &metricsV1beta1.NodeMetricsList{}
nodeMetric1 := metricsV1beta1.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-1",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Timestamp: metav1.Time{Time: metricsTime},
Window: metav1.Duration{Duration: time.Minute},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(1000),
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(1024*1024),
resource.BinarySI),
},
}
nodeMetric2 := metricsV1beta1.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-2",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Timestamp: metav1.Time{Time: metricsTime},
Window: metav1.Duration{Duration: time.Minute},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(2000),
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(2*1024*1024),
resource.BinarySI),
},
}
metrics.Items = append(metrics.Items, nodeMetric1)
metrics.Items = append(metrics.Items, nodeMetric2)
return true, metrics, nil
})
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
expected := make([]monitoring.Metric, 0)
err := jsonFromFile(tt.expected, &expected)
if err != nil {
t.Fatal(err)
}
client := NewMetricsServer(fakeK8sClient, true, fakeMetricsclient)
result := client.GetNamedMetrics(tt.metrics, time.Now(), monitoring.NodeOption{ResourceFilter: tt.filter})
if diff := cmp.Diff(result, expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", expected, diff)
}
})
}
}
func TestGetNamedMetricsOverTime(t *testing.T) {
tests := []struct {
metrics []string
filter string
expected string
}{
{
metrics: []string{"node_cpu_usage", "node_memory_usage_wo_cache"},
filter: ".*",
expected: "metrics-matrix-1.json",
},
{
metrics: []string{"node_cpu_usage", "node_cpu_utilisation"},
filter: "edgenode-2",
expected: "metrics-matrix-2.json",
},
{
metrics: []string{"node_memory_usage_wo_cache", "node_memory_utilisation"},
filter: "edgenode-1|edgenode-2",
expected: "metrics-matrix-3.json",
},
}
fakeK8sClient := fakek8s.NewSimpleClientset(node1, node2)
informer := informers.NewSharedInformerFactory(fakeK8sClient, 0)
informer.Core().V1().Nodes().Informer().GetIndexer().Add(node1)
informer.Core().V1().Nodes().Informer().GetIndexer().Add(node2)
fakeMetricsclient := &fakemetricsclient.Clientset{}
layout := "2006-01-02T15:04:05.000Z"
str := "2021-01-25T12:34:56.789Z"
metricsTime, _ := time.Parse(layout, str)
fakeMetricsclient.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
metrics := &metricsV1beta1.NodeMetricsList{}
nodeMetric1 := metricsV1beta1.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-1",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Timestamp: metav1.Time{Time: metricsTime},
Window: metav1.Duration{Duration: time.Minute},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(1000),
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(1024*1024),
resource.BinarySI),
},
}
nodeMetric2 := metricsV1beta1.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-2",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Timestamp: metav1.Time{Time: metricsTime},
Window: metav1.Duration{Duration: time.Minute},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(2000),
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(2*1024*1024),
resource.BinarySI),
},
}
metrics.Items = append(metrics.Items, nodeMetric1)
metrics.Items = append(metrics.Items, nodeMetric2)
return true, metrics, nil
})
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
expected := make([]monitoring.Metric, 0)
err := jsonFromFile(tt.expected, &expected)
if err != nil {
t.Fatal(err)
}
client := NewMetricsServer(fakeK8sClient, true, fakeMetricsclient)
result := client.GetNamedMetricsOverTime(tt.metrics, time.Now().Add(-time.Minute*3), time.Now(), time.Minute, monitoring.NodeOption{ResourceFilter: tt.filter})
if diff := cmp.Diff(result, expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", expected, diff)
}
})
}
}
func jsonFromFile(expectedFile string, expectedJsonPtr interface{}) error {
json, err := ioutil.ReadFile(fmt.Sprintf("./testdata/%s", expectedFile))
if err != nil {
return err
}
err = jsoniter.Unmarshal(json, expectedJsonPtr)
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,66 @@
[
{
"metric_name": "node_cpu_usage",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"values": [
[
1611578096,
"1"
]
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"2"
]
]
}
]
}
},
{
"metric_name": "node_memory_usage_wo_cache",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"values": [
[
1611578096,
"1048576"
]
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"2097152"
]
]
}
]
}
}
]

View File

@@ -0,0 +1,42 @@
[
{
"metric_name": "node_cpu_usage",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"2"
]
]
}
]
}
},
{
"metric_name": "node_cpu_utilisation",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"0.25"
]
]
}
]
}
}
]

View File

@@ -0,0 +1,66 @@
[
{
"metric_name": "node_memory_usage_wo_cache",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"values": [
[
1611578096,
"1048576"
]
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"2097152"
]
]
}
]
}
},
{
"metric_name": "node_memory_utilisation",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"values": [
[
1611578096,
"0.0001220703125"
]
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"0.000244140625"
]
]
}
]
}
}
]

View File

@@ -0,0 +1,58 @@
[
{
"metric_name": "node_cpu_usage",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"value": [
1611578096,
"1"
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"2"
]
}
]
}
},
{
"metric_name": "node_memory_usage_wo_cache",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"value": [
1611578096,
"1048576"
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"2097152"
]
}
]
}
}
]

View File

@@ -0,0 +1,38 @@
[
{
"metric_name": "node_cpu_usage",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"2"
]
}
]
}
},
{
"metric_name": "node_cpu_utilisation",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"0.25"
]
}
]
}
}
]

View File

@@ -0,0 +1,58 @@
[
{
"metric_name": "node_memory_usage_wo_cache",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"value": [
1611578096,
"1048576"
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"2097152"
]
}
]
}
},
{
"metric_name": "node_memory_utilisation",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"value": [
1611578096,
"0.0001220703125"
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"0.000244140625"
]
}
]
}
}
]