Files
kubesphere/pkg/simple/client/monitoring/prometheus/prometheus.go
Guangzhe Huang 148a804726 refactor monitoring (#1751)
Signed-off-by: huanggze <loganhuang@yunify.com>
2020-03-07 12:34:52 +08:00

179 lines
4.0 KiB
Go

package prometheus
import (
"fmt"
"github.com/json-iterator/go"
"io/ioutil"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
"net/http"
"net/url"
"regexp"
"sync"
"time"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
// prometheus implements monitoring interface backed by Prometheus
type prometheus struct {
options *Options
client *http.Client
}
func NewPrometheus(options *Options) monitoring.Interface {
return &prometheus{
options: options,
client: &http.Client{Timeout: 10 * time.Second},
}
}
// TODO(huanggze): reserve for custom monitoring
func (p *prometheus) GetMetrics(stmts []string, time time.Time) ([]monitoring.Metric, error) {
panic("implement me")
}
// TODO(huanggze): reserve for custom monitoring
func (p *prometheus) GetMetricsOverTime(stmts []string, start, end time.Time, step time.Duration) ([]monitoring.Metric, error) {
panic("implement me")
}
func (p *prometheus) GetNamedMetrics(ts time.Time, o monitoring.QueryOption) ([]monitoring.Metric, error) {
metrics := make([]monitoring.Metric, 0)
var mtx sync.Mutex // guard metrics
var wg sync.WaitGroup
opts := monitoring.NewQueryOptions()
o.Apply(opts)
errCh := make(chan error)
for _, metric := range opts.NamedMetrics {
matched, _ := regexp.MatchString(opts.MetricFilter, metric)
if matched {
exp := makeExpression(metric, *opts)
wg.Add(1)
go func(metric, exp string) {
res, err := p.query(exp, ts)
if err != nil {
select {
case errCh <- err: // Record error once
default:
}
} else {
res.MetricName = metric // Add metric name
mtx.Lock()
metrics = append(metrics, res)
mtx.Unlock()
}
wg.Done()
}(metric, exp)
}
}
wg.Wait()
select {
case err := <-errCh:
return nil, err
default:
return metrics, nil
}
}
func (p *prometheus) GetNamedMetricsOverTime(start, end time.Time, step time.Duration, o monitoring.QueryOption) ([]monitoring.Metric, error) {
metrics := make([]monitoring.Metric, 0)
var mtx sync.Mutex // guard metrics
var wg sync.WaitGroup
opts := monitoring.NewQueryOptions()
o.Apply(opts)
errCh := make(chan error)
for _, metric := range opts.NamedMetrics {
matched, _ := regexp.MatchString(opts.MetricFilter, metric)
if matched {
exp := makeExpression(metric, *opts)
wg.Add(1)
go func(metric, exp string) {
res, err := p.rangeQuery(exp, start, end, step)
if err != nil {
select {
case errCh <- err: // Record error once
default:
}
} else {
res.MetricName = metric // Add metric name
mtx.Lock()
metrics = append(metrics, res)
mtx.Unlock()
}
wg.Done()
}(metric, exp)
}
}
wg.Wait()
select {
case err := <-errCh:
return nil, err
default:
return metrics, nil
}
}
func (p prometheus) query(exp string, ts time.Time) (monitoring.Metric, error) {
params := &url.Values{}
params.Set("time", ts.Format(time.RFC3339))
params.Set("query", exp)
u := fmt.Sprintf("%s/api/v1/query?%s", p.options.Endpoint, params.Encode())
var m monitoring.Metric
response, err := p.client.Get(u)
if err != nil {
return monitoring.Metric{}, err
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return monitoring.Metric{}, err
}
defer response.Body.Close()
err = json.Unmarshal(body, m)
if err != nil {
return monitoring.Metric{}, err
}
return m, nil
}
func (p prometheus) rangeQuery(exp string, start, end time.Time, step time.Duration) (monitoring.Metric, error) {
params := &url.Values{}
params.Set("start", start.Format(time.RFC3339))
params.Set("end", end.Format(time.RFC3339))
params.Set("step", step.String())
params.Set("query", exp)
u := fmt.Sprintf("%s/api/v1/query?%s", p.options.Endpoint, params.Encode())
var m monitoring.Metric
response, err := p.client.Get(u)
if err != nil {
return monitoring.Metric{}, err
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return monitoring.Metric{}, err
}
defer response.Body.Close()
err = json.Unmarshal(body, m)
if err != nil {
return monitoring.Metric{}, err
}
return m, nil
}