custom alerting use the same API group and flagset to alerting

Signed-off-by: junotx <junotx@126.com>
This commit is contained in:
junotx
2021-01-12 16:43:13 +08:00
parent 6f9d306368
commit 514fec7eb4
21 changed files with 261 additions and 294 deletions

View File

@@ -16,8 +16,21 @@ limitations under the License.
package alerting
import (
"fmt"
"strings"
"github.com/spf13/pflag"
"kubesphere.io/kubesphere/pkg/utils/reflectutils"
)
type Options struct {
Endpoint string `json:"endpoint" yaml:"endpoint"`
// The following options are for the alerting with v2alpha1 version or higher versions
PrometheusEndpoint string `json:"prometheusEndpoint" yaml:"prometheusEndpoint"`
ThanosRulerEndpoint string `json:"thanosRulerEndpoint" yaml:"thanosRulerEndpoint"`
ThanosRuleResourceLabels string `json:"thanosRuleResourceLabels" yaml:"thanosRuleResourceLabels"`
}
func NewAlertingOptions() *Options {
@@ -26,13 +39,37 @@ func NewAlertingOptions() *Options {
}
}
func (s *Options) ApplyTo(options *Options) {
if options == nil {
options = s
return
func (o *Options) ApplyTo(options *Options) {
reflectutils.Override(options, o)
}
func (o *Options) Validate() []error {
errs := []error{}
if len(o.ThanosRuleResourceLabels) > 0 {
lblStrings := strings.Split(o.ThanosRuleResourceLabels, ",")
for _, lblString := range lblStrings {
if len(lblString) > 0 {
lbl := strings.Split(lblString, "=")
if len(lbl) != 2 {
errs = append(errs, fmt.Errorf("invalid alerting-thanos-rule-resource-labels arg: %s", o.ThanosRuleResourceLabels))
break
}
}
}
}
if s.Endpoint != "" {
options.Endpoint = s.Endpoint
}
return errs
}
func (o *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
fs.StringVar(&o.Endpoint, "alerting-server-endpoint", c.Endpoint,
"alerting server endpoint for alerting v1.")
fs.StringVar(&o.PrometheusEndpoint, "alerting-prometheus-endpoint", c.PrometheusEndpoint,
"Prometheus service endpoint from which built-in alerting rules are fetched(alerting v2alpha1 or higher required)")
fs.StringVar(&o.ThanosRulerEndpoint, "alerting-thanos-ruler-endpoint", c.ThanosRulerEndpoint,
"Thanos ruler service endpoint from which custom alerting rules are fetched(alerting v2alpha1 or higher required)")
fs.StringVar(&o.ThanosRuleResourceLabels, "alerting-thanos-rule-resource-labels", c.ThanosRuleResourceLabels,
"Labels used by Thanos Ruler to select PrometheusRule custom resources. eg: thanosruler=thanos-ruler,role=custom-alerting-rules (alerting v2alpha1 or higher required)")
}

View File

@@ -0,0 +1,173 @@
package alerting
import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"net/http"
)
import "github.com/prometheus/client_golang/api"
const (
apiPrefix = "/api/v1"
epRules = apiPrefix + "/rules"
statusAPIError = 422
statusSuccess status = "success"
statusError status = "error"
ErrBadData ErrorType = "bad_data"
ErrTimeout ErrorType = "timeout"
ErrCanceled ErrorType = "canceled"
ErrExec ErrorType = "execution"
ErrBadResponse ErrorType = "bad_response"
ErrServer ErrorType = "server_error"
ErrClient ErrorType = "client_error"
)
type status string
type ErrorType string
type Error struct {
Type ErrorType
Msg string
Detail string
}
func (e *Error) Error() string {
return fmt.Sprintf("%s: %s", e.Type, e.Msg)
}
type response struct {
Status status `json:"status"`
Data json.RawMessage `json:"data,omitempty"`
ErrorType ErrorType `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
Warnings []string `json:"warnings,omitempty"`
}
type RuleClient interface {
PrometheusRules(ctx context.Context) ([]*RuleGroup, error)
ThanosRules(ctx context.Context) ([]*RuleGroup, error)
}
type ruleClient struct {
prometheus api.Client
thanosruler api.Client
}
func (c *ruleClient) PrometheusRules(ctx context.Context) ([]*RuleGroup, error) {
if c.prometheus != nil {
return c.rules(c.prometheus, ctx)
}
return nil, nil
}
func (c *ruleClient) ThanosRules(ctx context.Context) ([]*RuleGroup, error) {
if c.thanosruler != nil {
return c.rules(c.thanosruler, ctx)
}
return nil, nil
}
func (c *ruleClient) rules(client api.Client, ctx context.Context) ([]*RuleGroup, error) {
u := client.URL(epRules, nil)
q := u.Query()
q.Add("type", "alert")
u.RawQuery = q.Encode()
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "error creating request: ")
}
_, body, _, err := c.do(client, ctx, req)
if err != nil {
return nil, errors.Wrap(err, "error doing request: ")
}
var result struct {
Groups []*RuleGroup
}
err = json.Unmarshal(body, &result)
if err != nil {
return nil, errors.Wrap(err, "")
}
return result.Groups, nil
}
func (c *ruleClient) do(client api.Client, ctx context.Context, req *http.Request) (*http.Response, []byte, []string, error) {
resp, body, e := client.Do(ctx, req)
if e != nil {
return resp, body, nil, e
}
code := resp.StatusCode
if code/100 != 2 && !apiError(code) {
errorType, errorMsg := errorTypeAndMsgFor(resp)
return resp, body, nil, &Error{
Type: errorType,
Msg: errorMsg,
Detail: string(body),
}
}
var result response
if http.StatusNoContent != code {
if jsonErr := json.Unmarshal(body, &result); jsonErr != nil {
return resp, body, nil, &Error{
Type: ErrBadResponse,
Msg: jsonErr.Error(),
}
}
}
var err error
if apiError(code) && result.Status == "success" {
err = &Error{
Type: ErrBadResponse,
Msg: "inconsistent body for response code",
}
}
if result.Status == "error" {
err = &Error{
Type: result.ErrorType,
Msg: result.Error,
}
}
return resp, []byte(result.Data), result.Warnings, err
}
func errorTypeAndMsgFor(resp *http.Response) (ErrorType, string) {
switch resp.StatusCode / 100 {
case 4:
return ErrClient, fmt.Sprintf("client error: %d", resp.StatusCode)
case 5:
return ErrServer, fmt.Sprintf("server error: %d", resp.StatusCode)
}
return ErrBadResponse, fmt.Sprintf("bad response code %d", resp.StatusCode)
}
func apiError(code int) bool {
// These are the codes that rule server sends when it returns an error.
return code == statusAPIError || code == http.StatusBadRequest ||
code == http.StatusServiceUnavailable || code == http.StatusInternalServerError
}
func NewRuleClient(options *Options) (RuleClient, error) {
var (
c ruleClient
e error
)
if options.PrometheusEndpoint != "" {
c.prometheus, e = api.NewClient(api.Config{Address: options.PrometheusEndpoint})
}
if options.ThanosRulerEndpoint != "" {
c.thanosruler, e = api.NewClient(api.Config{Address: options.ThanosRulerEndpoint})
}
return &c, e
}

View File

@@ -0,0 +1,100 @@
package alerting
import (
"context"
"net/http"
"net/http/httptest"
"testing"
)
func TestListRules(t *testing.T) {
var tests = []struct {
description string
fakeCode int
fakeResp string
expectError bool
}{{
description: "list alerting rules from prometheus endpoint",
expectError: false,
fakeCode: 200,
fakeResp: `
{
"status": "success",
"data": {
"groups": [
{
"name": "kubernetes-resources",
"file": "/etc/prometheus/rules/prometheus-k8s-rulefiles-0/kubesphere-monitoring-system-prometheus-k8s-rules.yaml",
"rules": [
{
"state": "firing",
"name": "KubeCPUOvercommit",
"query": "sum(namespace:kube_pod_container_resource_requests_cpu_cores:sum) / sum(kube_node_status_allocatable_cpu_cores) > (count(kube_node_status_allocatable_cpu_cores) - 1) / count(kube_node_status_allocatable_cpu_cores)",
"duration": 300,
"labels": {
"severity": "warning"
},
"annotations": {
"message": "Cluster has overcommitted CPU resource requests for Pods and cannot tolerate node failure.",
"runbook_url": "https://github.com/kubernetes-monitoring/kubernetes-mixin/tree/master/runbook.md#alert-name-kubecpuovercommit"
},
"alerts": [
{
"labels": {
"alertname": "KubeCPUOvercommit",
"severity": "warning"
},
"annotations": {
"message": "Cluster has overcommitted CPU resource requests for Pods and cannot tolerate node failure.",
"runbook_url": "https://github.com/ kubernetes-monitoring/kubernetes-mixin/tree/master/runbook.md#alert-name-kubecpuovercommit"
},
"state": "firing",
"activeAt": "2020-09-22T06:18:47.55260138Z",
"value": "4.405e-01"
}
],
"health": "ok",
"evaluationTime": 0.000894038,
"lastEvaluation": "2020-09-22T08:57:17.566233983Z",
"type": "alerting"
}
]
}
]
}
}
`,
}}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
mock := MockService(epRules, test.fakeCode, test.fakeResp)
defer mock.Close()
c, e := NewRuleClient(&Options{PrometheusEndpoint: mock.URL})
if e != nil {
t.Fatal(e)
}
rgs, e := c.PrometheusRules(context.TODO())
if test.expectError {
} else {
if e != nil {
t.Fatal(e)
} else if len(rgs) == 1 && len(rgs[0].Rules) == 1 {
} else {
t.Fatalf("expect %d group and %d rule but got %d group and %d rule", 1, 1, len(rgs), len(rgs[0].Rules))
}
}
})
}
}
func MockService(pattern string, fakeCode int, fakeResp string) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc(pattern, func(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(fakeCode)
res.Write([]byte(fakeResp))
})
return httptest.NewServer(mux)
}

View File

@@ -0,0 +1,40 @@
package alerting
import (
"time"
)
type RuleGroup struct {
Name string `json:"name"`
File string `json:"file"`
Rules []*AlertingRule `json:"rules"`
Interval float64 `json:"interval"`
EvaluationTime float64 `json:"evaluationTime"`
LastEvaluation *time.Time `json:"lastEvaluation"`
}
type AlertingRule struct {
// State can be "pending", "firing", "inactive".
State string `json:"state"`
Name string `json:"name"`
Query string `json:"query"`
Duration float64 `json:"duration"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Alerts []*Alert `json:"alerts"`
// Health can be "ok", "err", "unknown".
Health string `json:"health"`
LastError string `json:"lastError,omitempty"`
EvaluationTime float64 `json:"evaluationTime"`
LastEvaluation *time.Time `json:"lastEvaluation"`
// Type of an alertingRule is always "alerting".
Type string `json:"type"`
}
type Alert struct {
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
State string `json:"state"`
ActiveAt *time.Time `json:"activeAt,omitempty"`
Value string `json:"value"`
}