add pod level metrics for edge node

Signed-off-by: zhu733756 <talonzhu@yunify.com>
This commit is contained in:
zhu733756
2021-03-19 16:51:58 +08:00
committed by zhu733756
parent ea93f3832d
commit 96d60da98e
14 changed files with 1014 additions and 289 deletions

View File

@@ -19,9 +19,12 @@ package metricsserver
import (
"context"
"errors"
"regexp"
"strings"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
@@ -103,12 +106,28 @@ func (m metricsServer) filterEdgeNodeNames(edgeNodes map[string]v1.Node, opts *m
return edgeNodeNamesFiltered
}
func (m metricsServer) parseEdgePods(opts *monitoring.QueryOptions) map[string]bool {
edgePods := make(map[string]bool)
r, _ := regexp.Compile(`\s*\|\s*|\$`)
filters := r.Split(opts.ResourceFilter, -1)
for _, p := range filters {
if p != "" {
edgePods[p] = true
}
}
return edgePods
}
// node metrics of edge nodes
func (m metricsServer) getNodeMetricsFromMetricsAPI() (*metricsapi.NodeMetricsList, error) {
var err error
versionedMetrics := &metricsV1beta1.NodeMetricsList{}
mc := m.metricsClient.MetricsV1beta1()
nm := mc.NodeMetricses()
versionedMetrics, err = nm.List(context.TODO(), metav1.ListOptions{LabelSelector: edgeNodeLabel})
versionedMetrics, err := nm.List(context.TODO(), metav1.ListOptions{LabelSelector: edgeNodeLabel})
if err != nil {
return nil, err
}
@@ -120,6 +139,87 @@ func (m metricsServer) getNodeMetricsFromMetricsAPI() (*metricsapi.NodeMetricsLi
return metrics, nil
}
// pods metrics of edge nodes
func (m metricsServer) getPodMetricsFromMetricsAPI(edgePods map[string]bool, podName string, ns string) ([]metricsapi.PodMetrics, error) {
mc := m.metricsClient.MetricsV1beta1()
// single pod request
if ns != "" && podName != "" {
pm := mc.PodMetricses(ns)
versionedMetrics, err := pm.Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
klog.Error("Get pod metrics on edge node error:", err)
return nil, err
}
metrics := &metricsapi.PodMetrics{}
err = metricsV1beta1.Convert_v1beta1_PodMetrics_To_metrics_PodMetrics(versionedMetrics, metrics, nil)
if err != nil {
klog.Error("Convert pod metrics on edge node error:", err)
return nil, err
}
return []metricsapi.PodMetrics{*metrics}, nil
}
if len(edgePods) == 0 {
return nil, nil
}
var isNamespacedEdgePod bool
for p, _ := range edgePods {
if ok := strings.Contains(p, "/"); ok {
isNamespacedEdgePod = true
}
break
}
combinedPodMetrics := []metricsapi.PodMetrics{}
// handle cases with when edgePodName contains namespaceName
if isNamespacedEdgePod {
for p, _ := range edgePods {
splitedPodName := strings.Split(p, "/")
ns, p = strings.ReplaceAll(splitedPodName[0], " ", ""), strings.ReplaceAll(splitedPodName[1], " ", "")
pm := mc.PodMetricses(ns)
versionedMetrics, err := pm.Get(context.TODO(), p, metav1.GetOptions{})
if err != nil {
klog.Error("Get pod metrics on edge node error:", err)
continue
}
metrics := &metricsapi.PodMetrics{}
err = metricsV1beta1.Convert_v1beta1_PodMetrics_To_metrics_PodMetrics(versionedMetrics, metrics, nil)
if err != nil {
klog.Error("Convert pod metrics on edge node error:", err)
continue
}
combinedPodMetrics = append(combinedPodMetrics, *metrics)
}
return combinedPodMetrics, nil
}
// use list request in other cases
pm := mc.PodMetricses(ns)
versionedMetricsList, err := pm.List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Error("List pod metrics on edge node error:", err)
return nil, err
}
podMetrics := &metricsapi.PodMetricsList{}
err = metricsV1beta1.Convert_v1beta1_PodMetricsList_To_metrics_PodMetricsList(versionedMetricsList, podMetrics, nil)
if err != nil {
klog.Error("Convert pod metrics on edge node error:", err)
return nil, err
}
for _, podMetric := range podMetrics.Items {
if _, ok := edgePods[podMetric.Name]; !ok {
continue
}
combinedPodMetrics = append(combinedPodMetrics, podMetric)
}
return combinedPodMetrics, nil
}
func NewMetricsClient(k kubernetes.Interface, options *k8s.KubernetesOptions) monitoring.Interface {
config, err := clientcmd.BuildConfigFromFlags("", options.KubeConfig)
if err != nil {
@@ -172,6 +272,7 @@ func (m metricsServer) GetMetricOverTime(expr string, start, end time.Time, step
return parsedResp
}
// node metrics definition
const (
metricsNodeCPUUsage = "node_cpu_usage"
metricsNodeCPUTotal = "node_cpu_total"
@@ -183,6 +284,20 @@ const (
var edgeNodeMetrics = []string{metricsNodeCPUUsage, metricsNodeCPUTotal, metricsNodeCPUUltilisation, metricsNodeMemoryUsageWoCache, metricsNodeMemoryTotal, metricsNodeMemoryUltilisation}
// pod metrics definition
const (
metricsPodCPUUsage = "pod_cpu_usage"
metricsPodMemoryUsage = "pod_memory_usage_wo_cache"
)
var (
edgePodMetrics = []string{metricsPodCPUUsage, metricsPodMemoryUsage}
MeasuredResources = []v1.ResourceName{
v1.ResourceCPU,
v1.ResourceMemory,
}
)
func (m metricsServer) parseErrorResp(metrics []string, err error) []monitoring.Metric {
var res []monitoring.Metric
@@ -199,118 +314,213 @@ func (m metricsServer) GetNamedMetrics(metrics []string, ts time.Time, o monitor
opts := monitoring.NewQueryOptions()
o.Apply(opts)
if opts.Level == monitoring.LevelNode {
if !m.metricsAPIAvailable {
klog.Warningf("Metrics API not available.")
return m.parseErrorResp(metrics, errors.New("Metrics API not available."))
if !m.metricsAPIAvailable {
klog.Warningf("Metrics API not available.")
return m.parseErrorResp(metrics, errors.New("Metrics API not available."))
}
switch opts.Level {
case monitoring.LevelNode:
return m.GetNodeLevelNamedMetrics(metrics, ts, opts)
case monitoring.LevelPod:
return m.GetPodLevelNamedMetrics(metrics, ts, opts)
default:
return res
}
}
func (m metricsServer) GetNodeLevelNamedMetrics(metrics []string, ts time.Time, opts *monitoring.QueryOptions) []monitoring.Metric {
var res []monitoring.Metric
edgeNodes, err := m.listEdgeNodes()
if err != nil {
klog.Errorf("List edge nodes error %v\n", err)
return m.parseErrorResp(metrics, err)
}
edgeNodeNamesFiltered := m.filterEdgeNodeNames(edgeNodes, opts)
if len(edgeNodeNamesFiltered) == 0 {
klog.V(4).Infof("No edge node metrics is requested")
return res
}
status := make(map[string]v1.NodeStatus)
for n, _ := range edgeNodeNamesFiltered {
status[n] = edgeNodes[n].Status
}
metricsResult, err := m.getNodeMetricsFromMetricsAPI()
if err != nil {
klog.Errorf("Get edge node metrics error %v\n", err)
return m.parseErrorResp(metrics, err)
}
metricsMap := make(map[string]bool)
for _, m := range metrics {
metricsMap[m] = true
}
nodeMetrics := make(map[string]*monitoring.MetricData)
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
nodeMetrics[enm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeVector}
}
}
var usage v1.ResourceList
var cap v1.ResourceList
for _, m := range metricsResult.Items {
_, ok := edgeNodeNamesFiltered[m.Name]
if !ok {
continue
}
edgeNodes, err := m.listEdgeNodes()
if err != nil {
klog.Errorf("List edge nodes error %v\n", err)
return m.parseErrorResp(metrics, err)
}
m.Usage.DeepCopyInto(&usage)
status[m.Name].Capacity.DeepCopyInto(&cap)
edgeNodeNamesFiltered := m.filterEdgeNodeNames(edgeNodes, opts)
if len(edgeNodeNamesFiltered) == 0 {
klog.V(4).Infof("No edge node metrics is requested")
return res
}
metricValues := make(map[string]*monitoring.MetricValue)
metricsResult, err := m.getNodeMetricsFromMetricsAPI()
if err != nil {
klog.Errorf("Get edge node metrics error %v\n", err)
return m.parseErrorResp(metrics, err)
}
metricsMap := make(map[string]bool)
for _, m := range metrics {
metricsMap[m] = true
}
status := make(map[string]v1.NodeStatus)
for n, _ := range edgeNodeNamesFiltered {
status[n] = edgeNodes[n].Status
}
nodeMetrics := make(map[string]*monitoring.MetricData)
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
nodeMetrics[enm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeVector}
metricValues[enm] = &monitoring.MetricValue{
Metadata: make(map[string]string),
}
metricValues[enm].Metadata["node"] = m.Name
metricValues[enm].Metadata["role"] = "edge"
}
var usage v1.ResourceList
var cap v1.ResourceList
for _, m := range metricsResult.Items {
_, ok := edgeNodeNamesFiltered[m.Name]
if !ok {
continue
}
m.Usage.DeepCopyInto(&usage)
status[m.Name].Capacity.DeepCopyInto(&cap)
metricValues := make(map[string]*monitoring.MetricValue)
for _, enm := range edgeNodeMetrics {
metricValues[enm] = &monitoring.MetricValue{
Metadata: make(map[string]string),
for k, v := range metricsMap {
switch k {
case metricsNodeCPUUsage:
if v {
metricValues[metricsNodeCPUUsage].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / 1000}
}
metricValues[enm].Metadata["node"] = m.Name
metricValues[enm].Metadata["role"] = "edge"
}
for _, addr := range status[m.Name].Addresses {
if addr.Type == v1.NodeInternalIP {
for _, enm := range edgeNodeMetrics {
metricValues[enm].Metadata["host_ip"] = addr.Address
}
break
case metricsNodeCPUTotal:
if v {
metricValues[metricsNodeCPUTotal].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Cpu().MilliValue()) / 1000}
}
}
_, ok = metricsMap[metricsNodeCPUUsage]
if ok {
metricValues[metricsNodeCPUUsage].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / 1000}
}
_, ok = metricsMap[metricsNodeCPUTotal]
if ok {
metricValues[metricsNodeCPUTotal].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Cpu().MilliValue()) / 1000}
}
_, ok = metricsMap[metricsNodeCPUUltilisation]
if ok {
metricValues[metricsNodeCPUUltilisation].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / float64(cap.Cpu().MilliValue())}
}
_, ok = metricsMap[metricsNodeMemoryUsageWoCache]
if ok {
metricValues[metricsNodeMemoryUsageWoCache].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value())}
}
_, ok = metricsMap[metricsNodeMemoryTotal]
if ok {
metricValues[metricsNodeMemoryTotal].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Memory().Value())}
}
_, ok = metricsMap[metricsNodeMemoryUltilisation]
if ok {
metricValues[metricsNodeMemoryUltilisation].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value()) / float64(cap.Memory().Value())}
}
for _, enm := range edgeNodeMetrics {
_, ok = metricsMap[enm]
if ok {
nodeMetrics[enm].MetricValues = append(nodeMetrics[enm].MetricValues, *metricValues[enm])
case metricsNodeCPUUltilisation:
if v {
metricValues[metricsNodeCPUUltilisation].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / float64(cap.Cpu().MilliValue())}
}
case metricsNodeMemoryUsageWoCache:
if v {
metricValues[metricsNodeMemoryUsageWoCache].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value())}
}
case metricsNodeMemoryTotal:
if v {
metricValues[metricsNodeMemoryTotal].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Memory().Value())}
}
case metricsNodeMemoryUltilisation:
if v {
metricValues[metricsNodeMemoryUltilisation].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value()) / float64(cap.Memory().Value())}
}
}
}
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
_, ok = metricsMap[enm]
if ok {
res = append(res, monitoring.Metric{MetricName: enm, MetricData: *nodeMetrics[enm]})
nodeMetrics[enm].MetricValues = append(nodeMetrics[enm].MetricValues, *metricValues[enm])
}
}
}
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
res = append(res, monitoring.Metric{MetricName: enm, MetricData: *nodeMetrics[enm]})
}
}
return res
}
func (m metricsServer) GetPodLevelNamedMetrics(metrics []string, ts time.Time, opts *monitoring.QueryOptions) []monitoring.Metric {
var res []monitoring.Metric
edgePods := m.parseEdgePods(opts)
if len(edgePods) == 0 && opts.PodName == "" {
klog.Errorf("Edge node filter regexp error: %v\n", errors.New("no edge node pods metrics is requested or resource filter invalid"))
return res
}
podMetricsFromMetricsAPI, err := m.getPodMetricsFromMetricsAPI(edgePods, opts.PodName, opts.NamespaceName)
if err != nil {
klog.Errorf("Get pod metrics of edge nodes error %v\n", err)
return m.parseErrorResp(metrics, err)
}
metricsMap := make(map[string]bool)
for _, m := range metrics {
metricsMap[m] = true
}
// init
podMetrics := make(map[string]*monitoring.MetricData)
for _, epm := range edgePodMetrics {
_, ok := metricsMap[epm]
if ok {
podMetrics[epm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeVector}
}
}
for _, p := range podMetricsFromMetricsAPI {
metricValues := make(map[string]*monitoring.MetricValue)
for _, epm := range edgePodMetrics {
metricValues[epm] = &monitoring.MetricValue{
Metadata: make(map[string]string),
}
metricValues[epm].Metadata["pod"] = p.Name
metricValues[epm].Metadata["namespace"] = p.Namespace
}
podMetricsUsge := make(v1.ResourceList)
for _, res := range MeasuredResources {
podMetricsUsge[res], _ = resource.ParseQuantity("0")
}
for _, podContainer := range p.Containers {
for _, res := range MeasuredResources {
quantity := podMetricsUsge[res]
quantity.Add(podContainer.Usage[res])
podMetricsUsge[res] = quantity
}
}
for k, v := range metricsMap {
switch k {
case metricsPodCPUUsage:
if v {
cpuQuantity := podMetricsUsge[v1.ResourceCPU]
metricValues[metricsPodCPUUsage].Sample = &monitoring.Point{float64(p.Timestamp.Unix()), float64(cpuQuantity.MilliValue()) / 1000}
}
case metricsPodMemoryUsage:
if v {
memoryQuantity := podMetricsUsge[v1.ResourceMemory]
metricValues[metricsPodMemoryUsage].Sample = &monitoring.Point{float64(p.Timestamp.Unix()), float64(memoryQuantity.Value()) / (1024 * 1024)}
}
}
}
for _, epm := range edgePodMetrics {
_, ok := metricsMap[epm]
if ok {
podMetrics[epm].MetricValues = append(podMetrics[epm].MetricValues, *metricValues[epm])
}
}
}
for _, epm := range edgePodMetrics {
_, ok := metricsMap[epm]
if ok {
res = append(res, monitoring.Metric{MetricName: epm, MetricData: *podMetrics[epm]})
}
}
return res
}
@@ -319,106 +529,115 @@ func (m metricsServer) GetNamedMetricsOverTime(metrics []string, start, end time
opts := monitoring.NewQueryOptions()
o.Apply(opts)
if opts.Level == monitoring.LevelNode {
if !m.metricsAPIAvailable {
klog.Warningf("Metrics API not available.")
return m.parseErrorResp(metrics, errors.New("Metrics API not available."))
if !m.metricsAPIAvailable {
klog.Warningf("Metrics API not available.")
return m.parseErrorResp(metrics, errors.New("Metrics API not available."))
}
switch opts.Level {
case monitoring.LevelNode:
return m.GetNodeLevelNamedMetricsOverTime(metrics, start, end, step, opts)
case monitoring.LevelPod:
return m.GetPodLevelNamedMetricsOverTime(metrics, start, end, step, opts)
default:
return res
}
}
func (m metricsServer) GetNodeLevelNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opts *monitoring.QueryOptions) []monitoring.Metric {
var res []monitoring.Metric
edgeNodes, err := m.listEdgeNodes()
if err != nil {
klog.Errorf("List edge nodes error %v\n", err)
return m.parseErrorResp(metrics, err)
}
edgeNodeNamesFiltered := m.filterEdgeNodeNames(edgeNodes, opts)
if len(edgeNodeNamesFiltered) == 0 {
klog.V(4).Infof("No edge node metrics is requested")
return res
}
metricsResult, err := m.getNodeMetricsFromMetricsAPI()
if err != nil {
klog.Errorf("Get edge node metrics error %v\n", err)
return m.parseErrorResp(metrics, err)
}
metricsMap := make(map[string]bool)
for _, m := range metrics {
metricsMap[m] = true
}
status := make(map[string]v1.NodeStatus)
for n, _ := range edgeNodeNamesFiltered {
status[n] = edgeNodes[n].Status
}
nodeMetrics := make(map[string]*monitoring.MetricData)
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
nodeMetrics[enm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeMatrix}
}
}
var usage v1.ResourceList
var cap v1.ResourceList
for _, m := range metricsResult.Items {
_, ok := edgeNodeNamesFiltered[m.Name]
if !ok {
continue
}
edgeNodes, err := m.listEdgeNodes()
if err != nil {
klog.Errorf("List edge nodes error %v\n", err)
return m.parseErrorResp(metrics, err)
}
m.Usage.DeepCopyInto(&usage)
status[m.Name].Capacity.DeepCopyInto(&cap)
edgeNodeNamesFiltered := m.filterEdgeNodeNames(edgeNodes, opts)
if len(edgeNodeNamesFiltered) == 0 {
klog.V(4).Infof("No edge node metrics is requested")
return res
}
metricValues := make(map[string]*monitoring.MetricValue)
metricsResult, err := m.getNodeMetricsFromMetricsAPI()
if err != nil {
klog.Errorf("Get edge node metrics error %v\n", err)
return m.parseErrorResp(metrics, err)
}
metricsMap := make(map[string]bool)
for _, m := range metrics {
metricsMap[m] = true
}
status := make(map[string]v1.NodeStatus)
for n, _ := range edgeNodeNamesFiltered {
status[n] = edgeNodes[n].Status
}
nodeMetrics := make(map[string]*monitoring.MetricData)
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
nodeMetrics[enm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeMatrix}
metricValues[enm] = &monitoring.MetricValue{
Metadata: make(map[string]string),
}
metricValues[enm].Metadata["node"] = m.Name
metricValues[enm].Metadata["role"] = "edge"
}
for _, addr := range status[m.Name].Addresses {
if addr.Type == v1.NodeInternalIP {
for _, enm := range edgeNodeMetrics {
metricValues[enm].Metadata["host_ip"] = addr.Address
}
break
}
}
var usage v1.ResourceList
var cap v1.ResourceList
for _, m := range metricsResult.Items {
_, ok := edgeNodeNamesFiltered[m.Name]
if !ok {
continue
}
m.Usage.DeepCopyInto(&usage)
status[m.Name].Capacity.DeepCopyInto(&cap)
metricValues := make(map[string]*monitoring.MetricValue)
for _, enm := range edgeNodeMetrics {
metricValues[enm] = &monitoring.MetricValue{
Metadata: make(map[string]string),
for k, v := range metricsMap {
switch k {
case metricsNodeCPUUsage:
if v {
metricValues[metricsNodeCPUUsage].Series = append(metricValues[metricsNodeCPUUsage].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / 1000})
}
metricValues[enm].Metadata["node"] = m.Name
metricValues[enm].Metadata["role"] = "edge"
}
for _, addr := range status[m.Name].Addresses {
if addr.Type == v1.NodeInternalIP {
for _, enm := range edgeNodeMetrics {
metricValues[enm].Metadata["host_ip"] = addr.Address
}
break
case metricsNodeCPUTotal:
if v {
metricValues[metricsNodeCPUTotal].Series = append(metricValues[metricsNodeCPUTotal].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Cpu().MilliValue()) / 1000})
}
}
_, ok = metricsMap[metricsNodeCPUUsage]
if ok {
metricValues[metricsNodeCPUUsage].Series = append(metricValues[metricsNodeCPUUsage].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / 1000})
}
_, ok = metricsMap[metricsNodeCPUTotal]
if ok {
metricValues[metricsNodeCPUTotal].Series = append(metricValues[metricsNodeCPUTotal].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Cpu().MilliValue()) / 1000})
}
_, ok = metricsMap[metricsNodeCPUUltilisation]
if ok {
metricValues[metricsNodeCPUUltilisation].Series = append(metricValues[metricsNodeCPUUltilisation].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / float64(cap.Cpu().MilliValue())})
}
_, ok = metricsMap[metricsNodeMemoryUsageWoCache]
if ok {
metricValues[metricsNodeMemoryUsageWoCache].Series = append(metricValues[metricsNodeMemoryUsageWoCache].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value())})
}
_, ok = metricsMap[metricsNodeMemoryTotal]
if ok {
metricValues[metricsNodeMemoryTotal].Series = append(metricValues[metricsNodeMemoryTotal].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Memory().Value())})
}
_, ok = metricsMap[metricsNodeMemoryUltilisation]
if ok {
metricValues[metricsNodeMemoryUltilisation].Series = append(metricValues[metricsNodeMemoryUltilisation].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value()) / float64(cap.Memory().Value())})
}
for _, enm := range edgeNodeMetrics {
_, ok = metricsMap[enm]
if ok {
nodeMetrics[enm].MetricValues = append(nodeMetrics[enm].MetricValues, *metricValues[enm])
case metricsNodeCPUUltilisation:
if v {
metricValues[metricsNodeCPUUltilisation].Series = append(metricValues[metricsNodeCPUUltilisation].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / float64(cap.Cpu().MilliValue())})
}
case metricsNodeMemoryUsageWoCache:
if v {
metricValues[metricsNodeMemoryUsageWoCache].Series = append(metricValues[metricsNodeMemoryUsageWoCache].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value())})
}
case metricsNodeMemoryTotal:
if v {
metricValues[metricsNodeMemoryTotal].Series = append(metricValues[metricsNodeMemoryTotal].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Memory().Value())})
}
case metricsNodeMemoryUltilisation:
if v {
metricValues[metricsNodeMemoryUltilisation].Series = append(metricValues[metricsNodeMemoryUltilisation].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value()) / float64(cap.Memory().Value())})
}
}
}
@@ -426,11 +645,104 @@ func (m metricsServer) GetNamedMetricsOverTime(metrics []string, start, end time
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
res = append(res, monitoring.Metric{MetricName: enm, MetricData: *nodeMetrics[enm]})
nodeMetrics[enm].MetricValues = append(nodeMetrics[enm].MetricValues, *metricValues[enm])
}
}
}
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
res = append(res, monitoring.Metric{MetricName: enm, MetricData: *nodeMetrics[enm]})
}
}
return res
}
func (m metricsServer) GetPodLevelNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opts *monitoring.QueryOptions) []monitoring.Metric {
var res []monitoring.Metric
edgePods := m.parseEdgePods(opts)
if len(edgePods) == 0 && opts.PodName == "" {
klog.Errorf("Edge node filter regexp error: %v\n", errors.New("no edge node pods metrics is requested or resource filter invalid"))
return res
}
podMetricsFromMetricsAPI, err := m.getPodMetricsFromMetricsAPI(edgePods, opts.PodName, opts.NamespaceName)
if err != nil {
klog.Errorf("Get pod metrics of edge nodes error %v\n", err)
return m.parseErrorResp(metrics, err)
}
metricsMap := make(map[string]bool)
for _, m := range metrics {
metricsMap[m] = true
}
// init
podMetrics := make(map[string]*monitoring.MetricData)
for _, epm := range edgePodMetrics {
_, ok := metricsMap[epm]
if ok {
podMetrics[epm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeMatrix}
}
}
for _, p := range podMetricsFromMetricsAPI {
metricValues := make(map[string]*monitoring.MetricValue)
for _, epm := range edgePodMetrics {
metricValues[epm] = &monitoring.MetricValue{
Metadata: make(map[string]string),
}
metricValues[epm].Metadata["pod"] = p.Name
metricValues[epm].Metadata["namespace"] = p.Namespace
}
podMetricsUsge := make(v1.ResourceList)
for _, res := range MeasuredResources {
podMetricsUsge[res], _ = resource.ParseQuantity("0")
}
for _, podContainer := range p.Containers {
for _, res := range MeasuredResources {
quantity := podMetricsUsge[res]
quantity.Add(podContainer.Usage[res])
podMetricsUsge[res] = quantity
}
}
for k, v := range metricsMap {
switch k {
case metricsPodCPUUsage:
if v {
cpuQuantity := podMetricsUsge[v1.ResourceCPU]
metricValues[metricsPodCPUUsage].Series = append(metricValues[metricsPodCPUUsage].Series, monitoring.Point{float64(p.Timestamp.Unix()), float64(cpuQuantity.MilliValue()) / 1000})
}
case metricsPodMemoryUsage:
if v {
memoryQuantity := podMetricsUsge[v1.ResourceMemory]
metricValues[metricsPodMemoryUsage].Series = append(metricValues[metricsPodMemoryUsage].Series, monitoring.Point{float64(p.Timestamp.Unix()), float64(memoryQuantity.Value()) / (1024 * 1024)})
}
}
}
for _, epm := range edgePodMetrics {
_, ok := metricsMap[epm]
if ok {
podMetrics[epm].MetricValues = append(podMetrics[epm].MetricValues, *metricValues[epm])
}
}
}
for _, epm := range edgePodMetrics {
_, ok := metricsMap[epm]
if ok {
res = append(res, monitoring.Metric{MetricName: epm, MetricData: *podMetrics[epm]})
}
}
return res
}