monitor: add tests

Signed-off-by: huanggze <loganhuang@yunify.com>
This commit is contained in:
zryfish
2020-04-01 17:41:50 +08:00
committed by huanggze
parent 17013d3519
commit 372a52e70e
63 changed files with 5405 additions and 4462 deletions

View File

@@ -1,178 +1,153 @@
package prometheus
import (
"fmt"
"github.com/json-iterator/go"
"io/ioutil"
"context"
"github.com/prometheus/client_golang/api"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
"net/http"
"net/url"
"regexp"
"sync"
"time"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
// prometheus implements monitoring interface backed by Prometheus
type prometheus struct {
options *Options
client *http.Client
client apiv1.API
}
func NewPrometheus(options *Options) monitoring.Interface {
return &prometheus{
options: options,
client: &http.Client{Timeout: 10 * time.Second},
func NewPrometheus(options *Options) (monitoring.Interface, error) {
cfg := api.Config{
Address: options.Endpoint,
}
client, err := api.NewClient(cfg)
return prometheus{client: apiv1.NewAPI(client)}, err
}
// TODO(huanggze): reserve for custom monitoring
func (p *prometheus) GetMetrics(stmts []string, time time.Time) ([]monitoring.Metric, error) {
func (p prometheus) GetMetrics(stmts []string, time time.Time) []monitoring.Metric {
panic("implement me")
}
// TODO(huanggze): reserve for custom monitoring
func (p *prometheus) GetMetricsOverTime(stmts []string, start, end time.Time, step time.Duration) ([]monitoring.Metric, error) {
func (p prometheus) GetMetricsOverTime(stmts []string, start, end time.Time, step time.Duration) []monitoring.Metric {
panic("implement me")
}
func (p *prometheus) GetNamedMetrics(ts time.Time, o monitoring.QueryOption) ([]monitoring.Metric, error) {
metrics := make([]monitoring.Metric, 0)
var mtx sync.Mutex // guard metrics
func (p prometheus) GetNamedMetrics(metrics []string, ts time.Time, o monitoring.QueryOption) []monitoring.Metric {
var res []monitoring.Metric
var mtx sync.Mutex
var wg sync.WaitGroup
opts := monitoring.NewQueryOptions()
o.Apply(opts)
errCh := make(chan error)
for _, metric := range opts.NamedMetrics {
matched, _ := regexp.MatchString(opts.MetricFilter, metric)
if matched {
exp := makeExpression(metric, *opts)
wg.Add(1)
go func(metric, exp string) {
res, err := p.query(exp, ts)
if err != nil {
select {
case errCh <- err: // Record error once
default:
}
} else {
res.MetricName = metric // Add metric name
mtx.Lock()
metrics = append(metrics, res)
mtx.Unlock()
}
wg.Done()
}(metric, exp)
}
for _, metric := range metrics {
wg.Add(1)
go func(metric string) {
parsedResp := monitoring.Metric{MetricName: metric}
value, err := p.client.Query(context.Background(), makeExpr(metric, *opts), ts)
if err != nil {
parsedResp.Error = err.(*apiv1.Error).Msg
} else {
parsedResp.MetricData = parseQueryResp(value)
}
mtx.Lock()
res = append(res, parsedResp)
mtx.Unlock()
wg.Done()
}(metric)
}
wg.Wait()
select {
case err := <-errCh:
return nil, err
default:
return metrics, nil
}
return res
}
func (p *prometheus) GetNamedMetricsOverTime(start, end time.Time, step time.Duration, o monitoring.QueryOption) ([]monitoring.Metric, error) {
metrics := make([]monitoring.Metric, 0)
var mtx sync.Mutex // guard metrics
func (p prometheus) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, o monitoring.QueryOption) []monitoring.Metric {
var res []monitoring.Metric
var mtx sync.Mutex
var wg sync.WaitGroup
opts := monitoring.NewQueryOptions()
o.Apply(opts)
errCh := make(chan error)
for _, metric := range opts.NamedMetrics {
matched, _ := regexp.MatchString(opts.MetricFilter, metric)
if matched {
exp := makeExpression(metric, *opts)
wg.Add(1)
go func(metric, exp string) {
res, err := p.rangeQuery(exp, start, end, step)
if err != nil {
select {
case errCh <- err: // Record error once
default:
}
} else {
res.MetricName = metric // Add metric name
mtx.Lock()
metrics = append(metrics, res)
mtx.Unlock()
}
wg.Done()
}(metric, exp)
}
timeRange := apiv1.Range{
Start: start,
End: end,
Step: step,
}
for _, metric := range metrics {
wg.Add(1)
go func(metric string) {
parsedResp := monitoring.Metric{MetricName: metric}
value, err := p.client.QueryRange(context.Background(), makeExpr(metric, *opts), timeRange)
if err != nil {
parsedResp.Error = err.(*apiv1.Error).Msg
} else {
parsedResp.MetricData = parseQueryRangeResp(value)
}
mtx.Lock()
res = append(res, parsedResp)
mtx.Unlock()
wg.Done()
}(metric)
}
wg.Wait()
select {
case err := <-errCh:
return nil, err
default:
return metrics, nil
}
return res
}
func (p prometheus) query(exp string, ts time.Time) (monitoring.Metric, error) {
params := &url.Values{}
params.Set("time", ts.Format(time.RFC3339))
params.Set("query", exp)
func parseQueryRangeResp(value model.Value) monitoring.MetricData {
res := monitoring.MetricData{MetricType: monitoring.MetricTypeMatrix}
u := fmt.Sprintf("%s/api/v1/query?%s", p.options.Endpoint, params.Encode())
data, _ := value.(model.Matrix)
var m monitoring.Metric
response, err := p.client.Get(u)
if err != nil {
return monitoring.Metric{}, err
for _, v := range data {
mv := monitoring.MetricValue{
Metadata: make(map[string]string),
}
for k, v := range v.Metric {
mv.Metadata[string(k)] = string(v)
}
for _, k := range v.Values {
mv.Series = append(mv.Series, monitoring.Point{float64(k.Timestamp) / 1000, float64(k.Value)})
}
res.MetricValues = append(res.MetricValues, mv)
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return monitoring.Metric{}, err
}
defer response.Body.Close()
err = json.Unmarshal(body, m)
if err != nil {
return monitoring.Metric{}, err
}
return m, nil
return res
}
func (p prometheus) rangeQuery(exp string, start, end time.Time, step time.Duration) (monitoring.Metric, error) {
params := &url.Values{}
params.Set("start", start.Format(time.RFC3339))
params.Set("end", end.Format(time.RFC3339))
params.Set("step", step.String())
params.Set("query", exp)
func parseQueryResp(value model.Value) monitoring.MetricData {
res := monitoring.MetricData{MetricType: monitoring.MetricTypeVector}
u := fmt.Sprintf("%s/api/v1/query?%s", p.options.Endpoint, params.Encode())
data, _ := value.(model.Vector)
var m monitoring.Metric
response, err := p.client.Get(u)
if err != nil {
return monitoring.Metric{}, err
for _, v := range data {
mv := monitoring.MetricValue{
Metadata: make(map[string]string),
}
for k, v := range v.Metric {
mv.Metadata[string(k)] = string(v)
}
mv.Sample = monitoring.Point{float64(v.Timestamp) / 1000, float64(v.Value)}
res.MetricValues = append(res.MetricValues, mv)
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return monitoring.Metric{}, err
}
defer response.Body.Close()
err = json.Unmarshal(body, m)
if err != nil {
return monitoring.Metric{}, err
}
return m, nil
return res
}