fix metrics query bug for pods with duplicate name in one node

Signed-off-by: junotx <junotx@126.com>
This commit is contained in:
junotx
2020-12-23 10:50:36 +08:00
parent f2e96bce7f
commit 9304e839b1
7 changed files with 139 additions and 62 deletions

View File

@@ -19,13 +19,15 @@ package prometheus
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/api"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
"sync"
"time"
)
// prometheus implements monitoring interface backed by Prometheus
@@ -49,7 +51,7 @@ func (p prometheus) GetMetric(expr string, ts time.Time) monitoring.Metric {
if err != nil {
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryResp(value)
parsedResp.MetricData = parseQueryResp(value, nil)
}
return parsedResp
@@ -68,7 +70,7 @@ func (p prometheus) GetMetricOverTime(expr string, start, end time.Time, step ti
if err != nil {
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryRangeResp(value)
parsedResp.MetricData = parseQueryRangeResp(value, nil)
}
return parsedResp
}
@@ -90,7 +92,7 @@ func (p prometheus) GetNamedMetrics(metrics []string, ts time.Time, o monitoring
if err != nil {
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryResp(value)
parsedResp.MetricData = parseQueryResp(value, genMetricFilter(o))
}
mtx.Lock()
@@ -129,7 +131,7 @@ func (p prometheus) GetNamedMetricsOverTime(metrics []string, start, end time.Ti
if err != nil {
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryRangeResp(value)
parsedResp.MetricData = parseQueryRangeResp(value, genMetricFilter(o))
}
mtx.Lock()
@@ -200,12 +202,15 @@ func (p prometheus) GetMetricLabelSet(expr string, start, end time.Time) []map[s
return res
}
func parseQueryRangeResp(value model.Value) monitoring.MetricData {
func parseQueryRangeResp(value model.Value, metricFilter func(metric model.Metric) bool) monitoring.MetricData {
res := monitoring.MetricData{MetricType: monitoring.MetricTypeMatrix}
data, _ := value.(model.Matrix)
for _, v := range data {
if metricFilter != nil && !metricFilter(v.Metric) {
continue
}
mv := monitoring.MetricValue{
Metadata: make(map[string]string),
}
@@ -224,12 +229,15 @@ func parseQueryRangeResp(value model.Value) monitoring.MetricData {
return res
}
func parseQueryResp(value model.Value) monitoring.MetricData {
func parseQueryResp(value model.Value, metricFilter func(metric model.Metric) bool) monitoring.MetricData {
res := monitoring.MetricData{MetricType: monitoring.MetricTypeVector}
data, _ := value.(model.Vector)
for _, v := range data {
if metricFilter != nil && !metricFilter(v.Metric) {
continue
}
mv := monitoring.MetricValue{
Metadata: make(map[string]string),
}
@@ -245,3 +253,26 @@ func parseQueryResp(value model.Value) monitoring.MetricData {
return res
}
func genMetricFilter(o monitoring.QueryOption) func(metric model.Metric) bool {
if o != nil {
if po, ok := o.(monitoring.PodOption); ok {
if po.NamespacedResourcesFilter != "" {
namespacedPodsMap := make(map[string]struct{})
for _, s := range strings.Split(po.NamespacedResourcesFilter, "|") {
namespacedPodsMap[s] = struct{}{}
}
return func(metric model.Metric) bool {
if len(metric) == 0 {
return false
}
_, ok := namespacedPodsMap[string(metric["namespace"])+"/"+string(metric["pod"])]
return ok
}
}
}
}
return func(metric model.Metric) bool {
return true
}
}

View File

@@ -15,8 +15,9 @@ package prometheus
import (
"fmt"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
"strings"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
const (
@@ -349,12 +350,6 @@ func makePodMetricExpr(tmpl string, o monitoring.QueryOptions) string {
}
}
// For monitoring pods in the whole cluster
// Get /pods
if o.NamespaceName == "" && o.NodeName == "" {
podSelector = fmt.Sprintf(`pod=~"%s"`, o.ResourceFilter)
}
// For monitoring pods in the specific namespace
// GET /namespaces/{namespace}/workloads/{kind}/{workload}/pods or
// GET /namespaces/{namespace}/pods/{pod} or
@@ -365,17 +360,59 @@ func makePodMetricExpr(tmpl string, o monitoring.QueryOptions) string {
} else {
podSelector = fmt.Sprintf(`pod=~"%s", namespace="%s"`, o.ResourceFilter, o.NamespaceName)
}
}
} else {
var namespaces, pods []string
if o.NamespacedResourcesFilter != "" {
for _, np := range strings.Split(o.NamespacedResourcesFilter, "|") {
if nparr := strings.SplitN(np, "/", 2); len(nparr) > 1 {
namespaces = append(namespaces, nparr[0])
pods = append(pods, nparr[1])
} else {
pods = append(pods, np)
}
}
}
// For monitoring pods on the specific node
// GET /nodes/{node}/pods/{pod}
// GET /nodes/{node}/pods
if o.NodeName != "" {
if o.PodName != "" {
if nparr := strings.SplitN(o.PodName, "/", 2); len(nparr) > 1 {
podSelector = fmt.Sprintf(`namespace="%s",pod="%s", node="%s"`, nparr[0], nparr[1], o.NodeName)
} else {
podSelector = fmt.Sprintf(`pod="%s", node="%s"`, o.PodName, o.NodeName)
}
} else {
var ps []string
ps = append(ps, fmt.Sprintf(`node="%s"`, o.NodeName))
if o.ResourceFilter != "" {
ps = append(ps, fmt.Sprintf(`pod=~"%s"`, o.ResourceFilter))
}
// For monitoring pods on the specific node
// GET /nodes/{node}/pods/{pod}
if o.NodeName != "" {
if o.PodName != "" {
podSelector = fmt.Sprintf(`pod="%s", node="%s"`, o.PodName, o.NodeName)
if len(namespaces) > 0 {
ps = append(ps, fmt.Sprintf(`namespace=~"%s"`, strings.Join(namespaces, "|")))
}
if len(pods) > 0 {
ps = append(ps, fmt.Sprintf(`pod=~"%s"`, strings.Join(pods, "|")))
}
podSelector = strings.Join(ps, ",")
}
} else {
podSelector = fmt.Sprintf(`pod=~"%s", node="%s"`, o.ResourceFilter, o.NodeName)
// For monitoring pods in the whole cluster
// Get /pods
var ps []string
if len(namespaces) > 0 {
ps = append(ps, fmt.Sprintf(`namespace=~"%s"`, strings.Join(namespaces, "|")))
}
if len(pods) > 0 {
ps = append(ps, fmt.Sprintf(`pod=~"%s"`, strings.Join(pods, "|")))
}
if len(ps) > 0 {
podSelector = strings.Join(ps, ",")
}
}
}
return strings.NewReplacer("$1", workloadSelector, "$2", podSelector).Replace(tmpl)
}

View File

@@ -138,8 +138,8 @@ func TestMakeExpr(t *testing.T) {
{
name: "pod_net_bytes_transmitted",
opts: monitoring.QueryOptions{
Level: monitoring.LevelPod,
ResourceFilter: "elasticsearch-0",
Level: monitoring.LevelPod,
NamespacedResourcesFilter: "logging/elasticsearch-0|ks/redis",
},
},
{

View File

@@ -31,7 +31,7 @@ var PromQLs = map[string]string{
"pod_cpu_usage": `round(sum by (namespace, pod) (irate(container_cpu_usage_seconds_total{job="kubelet", pod!="", image!=""}[5m])) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{owner_kind="ReplicaSet", owner_name=~"^elasticsearch-[^-]{1,10}$"} * on (namespace, pod) group_left(node) kube_pod_info{pod=~"elasticsearch-0", namespace="default"}, 0.001)`,
"pod_memory_usage": `sum by (namespace, pod) (container_memory_usage_bytes{job="kubelet", pod!="", image!=""}) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{} * on (namespace, pod) group_left(node) kube_pod_info{pod="elasticsearch-12345", namespace="default"}`,
"pod_memory_usage_wo_cache": `sum by (namespace, pod) (container_memory_working_set_bytes{job="kubelet", pod!="", image!=""}) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{} * on (namespace, pod) group_left(node) kube_pod_info{pod="elasticsearch-12345", node="i-2dazc1d6"}`,
"pod_net_bytes_transmitted": `sum by (namespace, pod) (irate(container_network_transmit_bytes_total{pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m])) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{} * on (namespace, pod) group_left(node) kube_pod_info{pod=~"elasticsearch-0"}`,
"pod_net_bytes_transmitted": `sum by (namespace, pod) (irate(container_network_transmit_bytes_total{pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m])) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{} * on (namespace, pod) group_left(node) kube_pod_info{namespace=~"logging|ks",pod=~"elasticsearch-0|redis"}`,
"container_cpu_usage": `round(sum by (namespace, pod, container) (irate(container_cpu_usage_seconds_total{job="kubelet", container!="POD", container!="", image!="", pod="elasticsearch-12345", namespace="default", container="syscall"}[5m])), 0.001)`,
"container_memory_usage": `sum by (namespace, pod, container) (container_memory_usage_bytes{job="kubelet", container!="POD", container!="", image!="", pod="elasticsearch-12345", namespace="default", container=~"syscall"})`,
"pvc_inodes_available": `max by (namespace, persistentvolumeclaim) (kubelet_volume_stats_inodes_free) * on (namespace, persistentvolumeclaim) group_left (storageclass) kube_persistentvolumeclaim_info{namespace="default", persistentvolumeclaim="db-123"}`,