optimize alerting rule concurrency
Signed-off-by: junotx <junotx@126.com>
This commit is contained in:
@@ -3,16 +3,20 @@ package rules
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
|
||||
"github.com/docker/docker/pkg/locker"
|
||||
"github.com/ghodss/yaml"
|
||||
"github.com/pkg/errors"
|
||||
promresourcesv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
|
||||
prominformersv1 "github.com/prometheus-operator/prometheus-operator/pkg/client/informers/externalversions/monitoring/v1"
|
||||
promresourcesclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"kubesphere.io/kubesphere/pkg/api/alerting/v2alpha1"
|
||||
)
|
||||
|
||||
@@ -268,6 +272,7 @@ type ThanosRuler struct {
|
||||
resource *promresourcesv1.ThanosRuler
|
||||
informer prominformersv1.PrometheusRuleInformer
|
||||
client promresourcesclient.Interface
|
||||
locker locker.Locker
|
||||
}
|
||||
|
||||
func NewThanosRuler(resource *promresourcesv1.ThanosRuler, informer prominformersv1.PrometheusRuleInformer,
|
||||
@@ -345,7 +350,7 @@ func (r *ThanosRuler) AddAlertingRule(ctx context.Context, ruleNamespace *corev1
|
||||
}
|
||||
|
||||
func (r *ThanosRuler) addAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace,
|
||||
prometheusRules []*promresourcesv1.PrometheusRule, excludeRuleResources map[string]*ruleResource,
|
||||
prometheusRules []*promresourcesv1.PrometheusRule, excludePrometheusRules map[string]*promresourcesv1.PrometheusRule,
|
||||
group string, rule *promresourcesv1.Rule, ruleResourceLabels map[string]string) error {
|
||||
|
||||
sort.Slice(prometheusRules, func(i, j int) bool {
|
||||
@@ -353,23 +358,30 @@ func (r *ThanosRuler) addAlertingRule(ctx context.Context, ruleNamespace *corev1
|
||||
})
|
||||
|
||||
for _, prometheusRule := range prometheusRules {
|
||||
if len(excludeRuleResources) > 0 {
|
||||
if _, ok := excludeRuleResources[prometheusRule.Name]; ok {
|
||||
if len(excludePrometheusRules) > 0 {
|
||||
if _, ok := excludePrometheusRules[prometheusRule.Name]; ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
resource := ruleResource(*prometheusRule)
|
||||
if ok, err := resource.addAlertingRule(group, rule); err != nil {
|
||||
if err == errOutOfConfigMapSize {
|
||||
break
|
||||
}
|
||||
return err
|
||||
} else if ok {
|
||||
if err = resource.commit(ctx, r.client); err != nil {
|
||||
if err := r.doRuleResourceOperation(prometheusRule, func(newerPr *promresourcesv1.PrometheusRule) error {
|
||||
resource := ruleResource(*newerPr)
|
||||
if ok, err := resource.addAlertingRule(group, rule); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
if err = resource.commit(ctx, r.client); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
if err == errOutOfConfigMapSize {
|
||||
break
|
||||
} else if resourceNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// create a new rule resource and add rule into it when all existing rule resources are full.
|
||||
newPromRule := promresourcesv1.PrometheusRule{
|
||||
@@ -403,38 +415,52 @@ func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *cor
|
||||
}
|
||||
|
||||
var (
|
||||
found bool
|
||||
success bool
|
||||
resourcesToDelRule = make(map[string]*ruleResource)
|
||||
found bool
|
||||
success bool
|
||||
prsToDelRule = make(map[string]*promresourcesv1.PrometheusRule)
|
||||
)
|
||||
for _, prometheusRule := range prometheusRules {
|
||||
resource := ruleResource(*prometheusRule)
|
||||
for i, prometheusRule := range prometheusRules {
|
||||
if success { // If the update has been successful, delete the possible same rule in other resources
|
||||
if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil {
|
||||
if err := r.doRuleResourceOperation(prometheusRule, func(newerPr *promresourcesv1.PrometheusRule) error {
|
||||
resource := ruleResource(*newerPr)
|
||||
if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
if err = resource.commit(ctx, r.client); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil && !resourceNotFound(err) {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if err := r.doRuleResourceOperation(prometheusRule, func(newerPr *promresourcesv1.PrometheusRule) error {
|
||||
resource := ruleResource(*newerPr)
|
||||
if ok, err := resource.updateAlertingRule(group, rule); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
if err = resource.commit(ctx, r.client); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
if ok, err := resource.updateAlertingRule(group, rule); err != nil {
|
||||
if err == errOutOfConfigMapSize {
|
||||
// updating the rule in the resource will oversize the size limit,
|
||||
return nil
|
||||
}); err != nil {
|
||||
if resourceNotFound(err) {
|
||||
continue
|
||||
} else if err == errOutOfConfigMapSize {
|
||||
// updating the rule in the resource may oversize the size limit,
|
||||
// so delete it and then add the new rule to a new resource.
|
||||
resourcesToDelRule[resource.Name] = &resource
|
||||
prsToDelRule[prometheusRule.Name] = prometheusRules[i]
|
||||
found = true
|
||||
} else {
|
||||
return err
|
||||
continue
|
||||
}
|
||||
} else if ok {
|
||||
if err = resource.commit(ctx, r.client); err != nil {
|
||||
return err
|
||||
}
|
||||
found = true
|
||||
success = true
|
||||
return err
|
||||
}
|
||||
found = true
|
||||
success = true
|
||||
}
|
||||
|
||||
if !found {
|
||||
@@ -442,18 +468,24 @@ func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *cor
|
||||
}
|
||||
|
||||
if !success {
|
||||
err := r.addAlertingRule(ctx, ruleNamespace, prometheusRules, resourcesToDelRule, group, rule, ruleResourceLabels)
|
||||
err := r.addAlertingRule(ctx, ruleNamespace, prometheusRules, prsToDelRule, group, rule, ruleResourceLabels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, resource := range resourcesToDelRule {
|
||||
if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
if err = resource.commit(ctx, r.client); err != nil {
|
||||
for _, pr := range prsToDelRule {
|
||||
if err := r.doRuleResourceOperation(pr, func(newerPr *promresourcesv1.PrometheusRule) error {
|
||||
resource := ruleResource(*newerPr)
|
||||
if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
if err = resource.commit(ctx, r.client); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil && !resourceNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -467,15 +499,23 @@ func (r *ThanosRuler) DeleteAlertingRule(ctx context.Context, ruleNamespace *cor
|
||||
}
|
||||
var success bool
|
||||
for _, prometheusRule := range prometheusRules {
|
||||
resource := ruleResource(*prometheusRule)
|
||||
if ok, err := resource.deleteAlertingRule(name); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
if err = resource.commit(ctx, r.client); err != nil {
|
||||
if err := r.doRuleResourceOperation(prometheusRule, func(newerPr *promresourcesv1.PrometheusRule) error {
|
||||
resource := ruleResource(*newerPr)
|
||||
if ok, err := resource.deleteAlertingRule(name); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
if err = resource.commit(ctx, r.client); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
success = true
|
||||
return nil
|
||||
}); err != nil {
|
||||
if resourceNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
success = true
|
||||
}
|
||||
if !success {
|
||||
return v2alpha1.ErrAlertingRuleNotFound
|
||||
@@ -483,6 +523,20 @@ func (r *ThanosRuler) DeleteAlertingRule(ctx context.Context, ruleNamespace *cor
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ThanosRuler) doRuleResourceOperation(pr *promresourcesv1.PrometheusRule,
|
||||
operation func(newerPr *promresourcesv1.PrometheusRule) error) error {
|
||||
key := pr.Namespace + "/" + pr.Name
|
||||
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
r.locker.Lock(key)
|
||||
defer r.locker.Unlock(key)
|
||||
pr, err := r.informer.Lister().PrometheusRules(pr.Namespace).Get(pr.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return operation(pr)
|
||||
})
|
||||
}
|
||||
|
||||
func ruleNamespaceSelected(r Ruler, ruleNamespace *corev1.Namespace) (bool, error) {
|
||||
rnSelector, err := r.RuleResourceNamespaceSelector()
|
||||
if err != nil {
|
||||
@@ -499,3 +553,13 @@ func ruleNamespaceSelected(r Ruler, ruleNamespace *corev1.Namespace) (bool, erro
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func resourceNotFound(err error) bool {
|
||||
switch e := err.(type) {
|
||||
case *apierrors.StatusError:
|
||||
if e.Status().Code == http.StatusNotFound {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user