Merge pull request #3338 from junotx/ca

custom alerting tuning
This commit is contained in:
KubeSphere CI Bot
2021-02-18 10:40:26 +08:00
committed by GitHub
5 changed files with 262 additions and 90 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"sort"
"strings"
"time"
"github.com/pkg/errors"
promresourcesv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
@@ -48,7 +49,7 @@ type Operator interface {
// GetCustomAlertingRule gets the custom alerting rule with the given name.
GetCustomAlertingRule(ctx context.Context, namespace, ruleName string) (*v2alpha1.GettableAlertingRule, error)
// ListCustomRuleAlerts lists the alerts of the custom alerting rule with the given name.
ListCustomRuleAlerts(ctx context.Context, namespace, ruleName string) ([]*v2alpha1.Alert, error)
ListCustomRuleAlerts(ctx context.Context, namespace, ruleName string) (*v2alpha1.AlertList, error)
// CreateCustomAlertingRule creates a custom alerting rule.
CreateCustomAlertingRule(ctx context.Context, namespace string, rule *v2alpha1.PostableAlertingRule) error
// UpdateCustomAlertingRule updates the custom alerting rule with the given name.
@@ -65,7 +66,7 @@ type Operator interface {
// GetBuiltinAlertingRule gets the builtin(non-custom) alerting rule with the given id
GetBuiltinAlertingRule(ctx context.Context, ruleId string) (*v2alpha1.GettableAlertingRule, error)
// ListBuiltinRuleAlerts lists the alerts of the builtin(non-custom) alerting rule with the given id
ListBuiltinRuleAlerts(ctx context.Context, ruleId string) ([]*v2alpha1.Alert, error)
ListBuiltinRuleAlerts(ctx context.Context, ruleId string) (*v2alpha1.AlertList, error)
}
func NewOperator(informers informers.InformerFactory,
@@ -184,7 +185,7 @@ func (o *operator) GetCustomAlertingRule(ctx context.Context, namespace, ruleNam
}
func (o *operator) ListCustomRuleAlerts(ctx context.Context, namespace, ruleName string) (
[]*v2alpha1.Alert, error) {
*v2alpha1.AlertList, error) {
rule, err := o.GetCustomAlertingRule(ctx, namespace, ruleName)
if err != nil {
@@ -193,7 +194,10 @@ func (o *operator) ListCustomRuleAlerts(ctx context.Context, namespace, ruleName
if rule == nil {
return nil, v2alpha1.ErrAlertingRuleNotFound
}
return rule.Alerts, nil
return &v2alpha1.AlertList{
Total: len(rule.Alerts),
Items: rule.Alerts,
}, nil
}
func (o *operator) ListBuiltinAlertingRules(ctx context.Context,
@@ -223,7 +227,7 @@ func (o *operator) GetBuiltinAlertingRule(ctx context.Context, ruleId string) (
return o.getBuiltinAlertingRule(ctx, ruleId)
}
func (o *operator) ListBuiltinRuleAlerts(ctx context.Context, ruleId string) ([]*v2alpha1.Alert, error) {
func (o *operator) ListBuiltinRuleAlerts(ctx context.Context, ruleId string) (*v2alpha1.AlertList, error) {
rule, err := o.getBuiltinAlertingRule(ctx, ruleId)
if err != nil {
return nil, err
@@ -231,7 +235,10 @@ func (o *operator) ListBuiltinRuleAlerts(ctx context.Context, ruleId string) ([]
if rule == nil {
return nil, v2alpha1.ErrAlertingRuleNotFound
}
return rule.Alerts, nil
return &v2alpha1.AlertList{
Total: len(rule.Alerts),
Items: rule.Alerts,
}, nil
}
func (o *operator) ListClusterAlertingRules(ctx context.Context, customFlag string,
@@ -464,6 +471,8 @@ func (o *operator) CreateCustomAlertingRule(ctx context.Context, namespace strin
return v2alpha1.ErrAlertingRuleAlreadyExists
}
setRuleUpdateTime(rule, time.Now())
return ruler.AddAlertingRule(ctx, ruleNamespace, extraRuleResourceSelector,
customRuleGroupDefault, parseToPrometheusRule(rule), ruleResourceLabels)
}
@@ -515,6 +524,8 @@ func (o *operator) UpdateCustomAlertingRule(ctx context.Context, namespace, name
return v2alpha1.ErrAlertingRuleNotFound
}
setRuleUpdateTime(rule, time.Now())
return ruler.UpdateAlertingRule(ctx, ruleNamespace, extraRuleResourceSelector,
resourceRule.Group, parseToPrometheusRule(rule), ruleResourceLabels)
}
@@ -627,3 +638,13 @@ func pageAlerts(alertingRules []*v2alpha1.GettableAlertingRule,
Items: queryParams.Sub(alerts),
}
}
func setRuleUpdateTime(rule *v2alpha1.PostableAlertingRule, t time.Time) {
if rule.Annotations == nil {
rule.Annotations = make(map[string]string)
}
if t.IsZero() {
t = time.Now()
}
rule.Annotations[v2alpha1.AnnotationKeyRuleUpdateTime] = t.UTC().Format(time.RFC3339)
}

View File

@@ -3,16 +3,20 @@ package rules
import (
"context"
"fmt"
"net/http"
"sort"
"github.com/docker/docker/pkg/locker"
"github.com/ghodss/yaml"
"github.com/pkg/errors"
promresourcesv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
prominformersv1 "github.com/prometheus-operator/prometheus-operator/pkg/client/informers/externalversions/monitoring/v1"
promresourcesclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/retry"
"kubesphere.io/kubesphere/pkg/api/alerting/v2alpha1"
)
@@ -268,6 +272,7 @@ type ThanosRuler struct {
resource *promresourcesv1.ThanosRuler
informer prominformersv1.PrometheusRuleInformer
client promresourcesclient.Interface
locker locker.Locker
}
func NewThanosRuler(resource *promresourcesv1.ThanosRuler, informer prominformersv1.PrometheusRuleInformer,
@@ -345,7 +350,7 @@ func (r *ThanosRuler) AddAlertingRule(ctx context.Context, ruleNamespace *corev1
}
func (r *ThanosRuler) addAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace,
prometheusRules []*promresourcesv1.PrometheusRule, excludeRuleResources map[string]*ruleResource,
prometheusRules []*promresourcesv1.PrometheusRule, excludePrometheusRules map[string]*promresourcesv1.PrometheusRule,
group string, rule *promresourcesv1.Rule, ruleResourceLabels map[string]string) error {
sort.Slice(prometheusRules, func(i, j int) bool {
@@ -353,23 +358,30 @@ func (r *ThanosRuler) addAlertingRule(ctx context.Context, ruleNamespace *corev1
})
for _, prometheusRule := range prometheusRules {
if len(excludeRuleResources) > 0 {
if _, ok := excludeRuleResources[prometheusRule.Name]; ok {
if len(excludePrometheusRules) > 0 {
if _, ok := excludePrometheusRules[prometheusRule.Name]; ok {
continue
}
}
resource := ruleResource(*prometheusRule)
if ok, err := resource.addAlertingRule(group, rule); err != nil {
if err == errOutOfConfigMapSize {
break
}
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.addAlertingRule(group, rule); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
return err
}
}
return nil
}); err != nil {
if err == errOutOfConfigMapSize {
break
} else if resourceNotFound(err) {
continue
}
return err
}
return nil
}
// create a new rule resource and add rule into it when all existing rule resources are full.
newPromRule := promresourcesv1.PrometheusRule{
@@ -403,38 +415,52 @@ func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *cor
}
var (
found bool
success bool
resourcesToDelRule = make(map[string]*ruleResource)
found bool
success bool
prsToDelRule = make(map[string]*promresourcesv1.PrometheusRule)
)
for _, prometheusRule := range prometheusRules {
resource := ruleResource(*prometheusRule)
for i, prometheusRule := range prometheusRules {
if success { // If the update has been successful, delete the possible same rule in other resources
if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil {
if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
return err
}
}
return nil
}); err != nil && !resourceNotFound(err) {
return err
}
continue
}
if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.updateAlertingRule(group, rule); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
return err
}
}
continue
}
if ok, err := resource.updateAlertingRule(group, rule); err != nil {
if err == errOutOfConfigMapSize {
// updating the rule in the resource will oversize the size limit,
return nil
}); err != nil {
if resourceNotFound(err) {
continue
} else if err == errOutOfConfigMapSize {
// updating the rule in the resource may oversize the size limit,
// so delete it and then add the new rule to a new resource.
resourcesToDelRule[resource.Name] = &resource
prsToDelRule[prometheusRule.Name] = prometheusRules[i]
found = true
} else {
return err
continue
}
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
return err
}
found = true
success = true
return err
}
found = true
success = true
}
if !found {
@@ -442,18 +468,24 @@ func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *cor
}
if !success {
err := r.addAlertingRule(ctx, ruleNamespace, prometheusRules, resourcesToDelRule, group, rule, ruleResourceLabels)
err := r.addAlertingRule(ctx, ruleNamespace, prometheusRules, prsToDelRule, group, rule, ruleResourceLabels)
if err != nil {
return err
}
}
for _, resource := range resourcesToDelRule {
if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
for _, pr := range prsToDelRule {
if err := r.doRuleResourceOperation(ctx, pr, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
return err
}
}
return nil
}); err != nil && !resourceNotFound(err) {
return err
}
}
return nil
@@ -467,15 +499,23 @@ func (r *ThanosRuler) DeleteAlertingRule(ctx context.Context, ruleNamespace *cor
}
var success bool
for _, prometheusRule := range prometheusRules {
resource := ruleResource(*prometheusRule)
if ok, err := resource.deleteAlertingRule(name); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.deleteAlertingRule(name); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
return err
}
}
success = true
return nil
}); err != nil {
if resourceNotFound(err) {
continue
}
return err
}
success = true
}
if !success {
return v2alpha1.ErrAlertingRuleNotFound
@@ -483,6 +523,20 @@ func (r *ThanosRuler) DeleteAlertingRule(ctx context.Context, ruleNamespace *cor
return nil
}
func (r *ThanosRuler) doRuleResourceOperation(ctx context.Context, pr *promresourcesv1.PrometheusRule,
operation func(pr *promresourcesv1.PrometheusRule) error) error {
key := pr.Namespace + "/" + pr.Name
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
r.locker.Lock(key)
defer r.locker.Unlock(key)
pr, err := r.client.MonitoringV1().PrometheusRules(pr.Namespace).Get(ctx, pr.Name, metav1.GetOptions{})
if err != nil {
return err
}
return operation(pr)
})
}
func ruleNamespaceSelected(r Ruler, ruleNamespace *corev1.Namespace) (bool, error) {
rnSelector, err := r.RuleResourceNamespaceSelector()
if err != nil {
@@ -499,3 +553,13 @@ func ruleNamespaceSelected(r Ruler, ruleNamespace *corev1.Namespace) (bool, erro
}
return true, nil
}
func resourceNotFound(err error) bool {
switch e := err.(type) {
case *apierrors.StatusError:
if e.Status().Code == http.StatusNotFound {
return true
}
}
return false
}

View File

@@ -1,7 +1,6 @@
package rules
import (
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
"path/filepath"
"sort"
"strings"
@@ -16,6 +15,7 @@ import (
"github.com/prometheus/prometheus/rules"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api/alerting/v2alpha1"
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
)
const (
@@ -25,6 +25,9 @@ const (
LabelKeyInternalRuleName = "__rule_name__"
LabelKeyInternalRuleQuery = "__rule_query__"
LabelKeyInternalRuleDuration = "__rule_duration__"
LabelKeyThanosRulerReplica = "thanos_ruler_replica"
LabelKeyPrometheusReplica = "prometheus_replica"
)
func FormatExpr(expr string) (string, error) {
@@ -100,16 +103,21 @@ func GenEndpointRuleId(group string, epRule *alerting.AlertingRule,
}
duration := parseDurationSeconds(epRule.Duration)
var labelsMap map[string]string
if externalLabels == nil {
labelsMap = epRule.Labels
} else {
labelsMap = make(map[string]string)
extLabels := externalLabels()
for key, value := range epRule.Labels {
if v, ok := extLabels[key]; !(ok && value == v) {
labelsMap[key] = value
}
var extLabels map[string]string
if externalLabels != nil {
extLabels = externalLabels()
}
labelsMap := make(map[string]string)
for key, value := range epRule.Labels {
if key == LabelKeyPrometheusReplica || key == LabelKeyThanosRulerReplica {
continue
}
if extLabels == nil {
labelsMap[key] = value
continue
}
if v, ok := extLabels[key]; !(ok && value == v) {
labelsMap[key] = value
}
}
@@ -148,11 +156,11 @@ func GetAlertingRulesStatus(ruleNamespace string, ruleChunk *ResourceRuleChunk,
continue
}
for _, epRule := range group.Rules {
for i, epRule := range group.Rules {
if eid, err := GenEndpointRuleId(group.Name, epRule, extLabels); err != nil {
return nil, errors.Wrap(err, ErrGenRuleId)
} else {
idEpRules[eid] = epRule
idEpRules[eid] = group.Rules[i]
nameIds[epRule.Name] = append(nameIds[epRule.Name], eid)
}
}