Merge pull request #2007 from huanggze/dev-custom

feat: custom monitoring
This commit is contained in:
KubeSphere CI Bot
2020-04-20 20:04:22 +08:00
committed by GitHub
181 changed files with 37758 additions and 357 deletions

View File

@@ -77,6 +77,7 @@ const (
WorkloadMetricsTag = "Workload Metrics"
WorkspaceMetricsTag = "Workspace Metrics"
ComponentMetricsTag = "Component Metrics"
CustomMetricsTag = "Custom Metrics"
LogQueryTag = "Log Query"
TerminalTag = "Terminal"
)

View File

@@ -192,3 +192,36 @@ func (h handler) handleNamedMetricsQuery(resp *restful.Response, q queryOptions)
}
resp.WriteAsJson(res)
}
func (h handler) handleMetadataQuery(req *restful.Request, resp *restful.Response) {
res := h.mo.GetMetadata(req.PathParameter("namespace"))
resp.WriteAsJson(res)
}
func (h handler) handleAdhocQuery(req *restful.Request, resp *restful.Response) {
var res monitoring.Metric
params := parseRequestParams(req)
opt, err := h.makeQueryOptions(params, 0)
if err != nil {
if err.Error() == ErrNoHit {
resp.WriteAsJson(res)
return
}
api.HandleBadRequest(resp, nil, err)
return
}
if opt.isRangeQuery() {
res, err = h.mo.GetMetricOverTime(params.expression, params.namespaceName, opt.start, opt.end, opt.step)
} else {
res, err = h.mo.GetMetric(params.expression, params.namespaceName, opt.time)
}
if err != nil {
api.HandleBadRequest(resp, nil, err)
} else {
resp.WriteAsJson(res)
}
}

View File

@@ -49,6 +49,7 @@ type reqParams struct {
pvcName string
storageClassName string
componentType string
expression string
}
type queryOptions struct {
@@ -99,6 +100,7 @@ func parseRequestParams(req *restful.Request) reqParams {
r.pvcName = req.PathParameter("pvc")
r.storageClassName = req.PathParameter("storageclass")
r.componentType = req.PathParameter("component")
r.expression = req.QueryParameter("expr")
return r
}

View File

@@ -400,6 +400,29 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monito
Returns(http.StatusOK, RespOK, model.Metrics{})).
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces/{namespace}/targets/metadata").
To(h.handleMetadataQuery).
Doc("Get metadata of metrics for the specific namespace.").
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.CustomMetricsTag}).
Writes(model.Metadata{}).
Returns(http.StatusOK, RespOK, model.Metadata{})).
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces/{namespace}/targets/query").
To(h.handleAdhocQuery).
Doc("Make an ad-hoc query in the specific namespace.").
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
Param(ws.QueryParameter("expr", "The expression to be evaluated.").DataType("string").Required(false)).
Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").DataType("string").Required(true)).
Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").DataType("string").Required(false)).
Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)).
Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.CustomMetricsTag}).
Writes(monitoring.Metric{}).
Returns(http.StatusOK, RespOK, monitoring.Metric{})).
Produces(restful.MIME_JSON)
c.Add(ws)
return nil
}

View File

@@ -0,0 +1,99 @@
package prometheus
import (
"fmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/metric"
"kubesphere.io/kubesphere/pkg/models/monitoring/expressions"
)
func init() {
expressions.Register("prometheus", labelReplace)
}
func labelReplace(input, ns string) (string, error) {
root, err := promql.ParseExpr(input)
if err != nil {
return "", err
}
SetRecursive(root, ns)
if err != nil {
return "", err
}
return root.String(), nil
}
// Inspired by https://github.com/openshift/prom-label-proxy
func SetRecursive(node promql.Node, namespace string) (err error) {
switch n := node.(type) {
case *promql.EvalStmt:
if err := SetRecursive(n.Expr, namespace); err != nil {
return err
}
case promql.Expressions:
for _, e := range n {
if err := SetRecursive(e, namespace); err != nil {
return err
}
}
case *promql.AggregateExpr:
if err := SetRecursive(n.Expr, namespace); err != nil {
return err
}
case *promql.BinaryExpr:
if err := SetRecursive(n.LHS, namespace); err != nil {
return err
}
if err := SetRecursive(n.RHS, namespace); err != nil {
return err
}
case *promql.Call:
if err := SetRecursive(n.Args, namespace); err != nil {
return err
}
case *promql.ParenExpr:
if err := SetRecursive(n.Expr, namespace); err != nil {
return err
}
case *promql.UnaryExpr:
if err := SetRecursive(n.Expr, namespace); err != nil {
return err
}
case *promql.NumberLiteral, *promql.StringLiteral:
// nothing to do
case *promql.MatrixSelector:
n.LabelMatchers = enforceLabelMatchers(n.LabelMatchers, namespace)
case *promql.VectorSelector:
n.LabelMatchers = enforceLabelMatchers(n.LabelMatchers, namespace)
default:
return fmt.Errorf("promql.Walk: unhandled node type %T", node)
}
return err
}
func enforceLabelMatchers(matchers metric.LabelMatchers, namespace string) metric.LabelMatchers {
var found bool
for i, m := range matchers {
if m.Name == "namespace" {
matchers[i] = &metric.LabelMatcher{
Name: "namespace",
Type: metric.Equal,
Value: model.LabelValue(namespace),
}
found = true
break
}
}
if !found {
matchers = append(matchers, &metric.LabelMatcher{
Name: "namespace",
Type: metric.Equal,
Value: model.LabelValue(namespace),
})
}
return matchers
}

View File

@@ -0,0 +1,51 @@
package prometheus
import (
"fmt"
"github.com/google/go-cmp/cmp"
"testing"
)
func TestLabelReplace(t *testing.T) {
tests := []struct {
expr string
expected string
expectedErr bool
}{
{
expr: "up",
expected: `up{namespace="default"}`,
expectedErr: false,
},
{
expr: `up{namespace="random"}`,
expected: `up{namespace="default"}`,
expectedErr: false,
},
{
expr: `up{namespace="random"} + up{job="test"}`,
expected: `up{namespace="default"} + up{job="test",namespace="default"}`,
expectedErr: false,
},
{
expr: `@@@@`,
expectedErr: true,
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
result, err := labelReplace(tt.expr, "default")
if err != nil {
if !tt.expectedErr {
t.Fatal(err)
}
return
}
if diff := cmp.Diff(result, tt.expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", tt.expected, diff)
}
})
}
}

View File

@@ -0,0 +1,9 @@
package expressions
type labelReplaceFn func(expr, ns string) (string, error)
var ReplaceNamespaceFns = make(map[string]labelReplaceFn)
func Register(name string, fn labelReplaceFn) {
ReplaceNamespaceFns[name] = fn
}

View File

@@ -19,15 +19,17 @@
package monitoring
import (
"kubesphere.io/kubesphere/pkg/models/monitoring/expressions"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
"time"
)
type MonitoringOperator interface {
GetMetrics(stmts []string, time time.Time) Metrics
GetMetricsOverTime(stmts []string, start, end time.Time, step time.Duration) Metrics
GetMetric(expr, namespace string, time time.Time) (monitoring.Metric, error)
GetMetricOverTime(expr, namespace string, start, end time.Time, step time.Duration) (monitoring.Metric, error)
GetNamedMetrics(metrics []string, time time.Time, opt monitoring.QueryOption) Metrics
GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) Metrics
GetMetadata(namespace string) Metadata
}
type monitoringOperator struct {
@@ -38,14 +40,28 @@ func NewMonitoringOperator(client monitoring.Interface) MonitoringOperator {
return &monitoringOperator{client}
}
// TODO(huanggze): reserve for custom monitoring
func (mo monitoringOperator) GetMetrics(stmts []string, time time.Time) Metrics {
panic("implement me")
func (mo monitoringOperator) GetMetric(expr, namespace string, time time.Time) (monitoring.Metric, error) {
// Different monitoring backend implementations have different ways to enforce namespace isolation.
// Each implementation should register itself to `ReplaceNamespaceFns` during init().
// We hard code "prometheus" here because we only support this datasource so far.
// In the future, maybe the value should be returned from a method like `mo.c.GetMonitoringServiceName()`.
expr, err := expressions.ReplaceNamespaceFns["prometheus"](expr, namespace)
if err != nil {
return monitoring.Metric{}, err
}
return mo.c.GetMetric(expr, time), nil
}
// TODO(huanggze): reserve for custom monitoring
func (mo monitoringOperator) GetMetricsOverTime(stmts []string, start, end time.Time, step time.Duration) Metrics {
panic("implement me")
func (mo monitoringOperator) GetMetricOverTime(expr, namespace string, start, end time.Time, step time.Duration) (monitoring.Metric, error) {
// Different monitoring backend implementations have different ways to enforce namespace isolation.
// Each implementation should register itself to `ReplaceNamespaceFns` during init().
// We hard code "prometheus" here because we only support this datasource so far.
// In the future, maybe the value should be returned from a method like `mo.c.GetMonitoringServiceName()`.
expr, err := expressions.ReplaceNamespaceFns["prometheus"](expr, namespace)
if err != nil {
return monitoring.Metric{}, err
}
return mo.c.GetMetricOverTime(expr, start, end, step), nil
}
func (mo monitoringOperator) GetNamedMetrics(metrics []string, time time.Time, opt monitoring.QueryOption) Metrics {
@@ -57,3 +73,8 @@ func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, en
ress := mo.c.GetNamedMetricsOverTime(metrics, start, end, step, opt)
return Metrics{Results: ress}
}
func (mo monitoringOperator) GetMetadata(namespace string) Metadata {
data := mo.c.GetMetadata(namespace)
return Metadata{Data: data}
}

View File

@@ -8,3 +8,7 @@ type Metrics struct {
TotalPages int `json:"total_page,omitempty" description:"total number of pages"`
TotalItems int `json:"total_item,omitempty" description:"page size"`
}
type Metadata struct {
Data []monitoring.Metadata `json:"data" description:"actual array of results"`
}

View File

@@ -3,8 +3,9 @@ package monitoring
import "time"
type Interface interface {
GetMetrics(exprs []string, time time.Time) []Metric
GetMetricsOverTime(exprs []string, start, end time.Time, step time.Duration) []Metric
GetMetric(expr string, time time.Time) Metric
GetMetricOverTime(expr string, start, end time.Time, step time.Duration) Metric
GetNamedMetrics(metrics []string, time time.Time, opt QueryOption) []Metric
GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt QueryOption) []Metric
GetMetadata(namespace string) []Metadata
}

View File

@@ -2,6 +2,7 @@ package prometheus
import (
"context"
"fmt"
"github.com/prometheus/client_golang/api"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
@@ -24,14 +25,35 @@ func NewPrometheus(options *Options) (monitoring.Interface, error) {
return prometheus{client: apiv1.NewAPI(client)}, err
}
// TODO(huanggze): reserve for custom monitoring
func (p prometheus) GetMetrics(stmts []string, time time.Time) []monitoring.Metric {
panic("implement me")
func (p prometheus) GetMetric(expr string, ts time.Time) monitoring.Metric {
var parsedResp monitoring.Metric
value, err := p.client.Query(context.Background(), expr, ts)
if err != nil {
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryResp(value)
}
return parsedResp
}
// TODO(huanggze): reserve for custom monitoring
func (p prometheus) GetMetricsOverTime(stmts []string, start, end time.Time, step time.Duration) []monitoring.Metric {
panic("implement me")
func (p prometheus) GetMetricOverTime(expr string, start, end time.Time, step time.Duration) monitoring.Metric {
timeRange := apiv1.Range{
Start: start,
End: end,
Step: step,
}
value, err := p.client.QueryRange(context.Background(), expr, timeRange)
var parsedResp monitoring.Metric
if err != nil {
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryRangeResp(value)
}
return parsedResp
}
func (p prometheus) GetNamedMetrics(metrics []string, ts time.Time, o monitoring.QueryOption) []monitoring.Metric {
@@ -49,7 +71,7 @@ func (p prometheus) GetNamedMetrics(metrics []string, ts time.Time, o monitoring
value, err := p.client.Query(context.Background(), makeExpr(metric, *opts), ts)
if err != nil {
parsedResp.Error = err.(*apiv1.Error).Msg
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryResp(value)
}
@@ -88,7 +110,7 @@ func (p prometheus) GetNamedMetricsOverTime(metrics []string, start, end time.Ti
value, err := p.client.QueryRange(context.Background(), makeExpr(metric, *opts), timeRange)
if err != nil {
parsedResp.Error = err.(*apiv1.Error).Msg
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryRangeResp(value)
}
@@ -106,6 +128,26 @@ func (p prometheus) GetNamedMetricsOverTime(metrics []string, start, end time.Ti
return res
}
func (p prometheus) GetMetadata(namespace string) []monitoring.Metadata {
var meta []monitoring.Metadata
// Filter metrics available to members of this namespace
matchTarget := fmt.Sprintf("{namespace=\"%s\"}", namespace)
items, err := p.client.TargetsMetadata(context.Background(), matchTarget, "", "")
if err != nil {
return meta
}
for _, item := range items {
meta = append(meta, monitoring.Metadata{
Metric: item.Metric,
Type: string(item.Type),
Help: item.Help,
})
}
return meta
}
func parseQueryRangeResp(value model.Value) monitoring.MetricData {
res := monitoring.MetricData{MetricType: monitoring.MetricTypeMatrix}

View File

@@ -24,7 +24,8 @@ func TestGetNamedMetrics(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
expected, err := jsonFromFile(tt.expected)
expected := make([]monitoring.Metric, 0)
err := jsonFromFile(tt.expected, &expected)
if err != nil {
t.Fatal(err)
}
@@ -53,7 +54,8 @@ func TestGetNamedMetricsOverTime(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
expected, err := jsonFromFile(tt.expected)
expected := make([]monitoring.Metric, 0)
err := jsonFromFile(tt.expected, &expected)
if err != nil {
t.Fatal(err)
}
@@ -70,6 +72,44 @@ func TestGetNamedMetricsOverTime(t *testing.T) {
}
}
func TestGetMetadata(t *testing.T) {
tests := []struct {
fakeResp string
expected string
}{
{
fakeResp: "metadata-prom.json",
expected: "metadata-res.json",
},
{
fakeResp: "metadata-notfound-prom.json",
expected: "metadata-notfound-res.json",
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
expected := make([]monitoring.Metadata, 0)
err := jsonFromFile(tt.expected, &expected)
if err != nil {
t.Fatal(err)
}
if len(expected) == 0 {
expected = nil
}
srv := mockPrometheusService("/api/v1/targets/metadata", tt.fakeResp)
defer srv.Close()
client, _ := NewPrometheus(&Options{Endpoint: srv.URL})
result := client.GetMetadata("default")
if diff := cmp.Diff(result, expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", expected, diff)
}
})
}
}
func mockPrometheusService(pattern, fakeResp string) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc(pattern, func(res http.ResponseWriter, req *http.Request) {
@@ -79,17 +119,15 @@ func mockPrometheusService(pattern, fakeResp string) *httptest.Server {
return httptest.NewServer(mux)
}
func jsonFromFile(expectedFile string) ([]monitoring.Metric, error) {
expectedJson := []monitoring.Metric{}
func jsonFromFile(expectedFile string, expectedJsonPtr interface{}) error {
json, err := ioutil.ReadFile(fmt.Sprintf("./testdata/%s", expectedFile))
if err != nil {
return expectedJson, err
return err
}
err = jsoniter.Unmarshal(json, &expectedJson)
err = jsoniter.Unmarshal(json, expectedJsonPtr)
if err != nil {
return expectedJson, err
return err
}
return expectedJson, nil
return nil
}

View File

@@ -0,0 +1,5 @@
{
"status":"error",
"errorType":"not_found",
"error":"specified metadata not found"
}

View File

@@ -0,0 +1 @@
[]

View File

@@ -0,0 +1,25 @@
{
"status": "success",
"data": [
{
"target": {
"instance": "127.0.0.1:9090",
"job": "prometheus"
},
"metric": "prometheus_treecache_zookeeper_failures_total",
"type": "counter",
"help": "The total number of ZooKeeper failures.",
"unit": ""
},
{
"target": {
"instance": "127.0.0.1:9090",
"job": "prometheus"
},
"metric": "prometheus_tsdb_reloads_total",
"type": "counter",
"help": "Number of times the database reloaded block data from disk.",
"unit": ""
}
]
}

View File

@@ -0,0 +1,12 @@
[
{
"metric": "prometheus_treecache_zookeeper_failures_total",
"type": "counter",
"help": "The total number of ZooKeeper failures."
},
{
"metric": "prometheus_tsdb_reloads_total",
"type": "counter",
"help": "Number of times the database reloaded block data from disk."
}
]

View File

@@ -1,6 +1,6 @@
[
{
"metric_name": "cluster_cpu_utilisation",
"error": "inconsistent body for response code"
"error": "bad_response: inconsistent body for response code"
}
]

View File

@@ -5,6 +5,12 @@ const (
MetricTypeVector = "vector"
)
type Metadata struct {
Metric string `json:"metric,omitempty" description:"metric name"`
Type string `json:"type,omitempty" description:"metric type"`
Help string `json:"help,omitempty" description:"metric description"`
}
type Metric struct {
MetricName string `json:"metric_name,omitempty" description:"metric name, eg. scheduler_up_sum"`
MetricData `json:"data,omitempty" description:"actual metric result"`