diff --git a/pkg/api/alerting/v2alpha1/types.go b/pkg/api/alerting/v2alpha1/types.go index 5e20fa17d..f3fbbfaf0 100644 --- a/pkg/api/alerting/v2alpha1/types.go +++ b/pkg/api/alerting/v2alpha1/types.go @@ -75,7 +75,7 @@ func (r *PostableAlertingRule) Validate() error { errs = append(errs, errors.New("name can not be empty")) } else { if !ruleNameMatcher.MatchString(r.Name) { - errs = append(errs, errors.New("name is invalid which not matches ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$")) + errs = append(errs, errors.New("rule name must match regular expression ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$")) } } @@ -471,11 +471,12 @@ type ErrorType string type Result string type BulkResponse struct { - Errors bool `json:"errors" description:"If true, one or more of the operations in the bulk request did not complete successfully"` + Errors bool `json:"errors" description:"If true, one or more operations in the bulk request don't complete successfully"` Items []*BulkItemResponse `json:"items" description:"It contains the result of each operation in the bulk request"` } -func (br *BulkResponse) Derive() *BulkResponse { +// Neaten neatens the internal items and sets the errors +func (br *BulkResponse) Neaten() *BulkResponse { var ( items []*BulkItemResponse itemMap = make(map[string]*BulkItemResponse) diff --git a/pkg/kapis/alerting/v2alpha1/register.go b/pkg/kapis/alerting/v2alpha1/register.go index 6b2d4131a..3960c5901 100644 --- a/pkg/kapis/alerting/v2alpha1/register.go +++ b/pkg/kapis/alerting/v2alpha1/register.go @@ -109,7 +109,7 @@ func AddToContainer(container *restful.Container, informers informers.InformerFa Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.BulkResponse{}). Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag})) - ws.Route(ws.POST("/bulk_rules"). + ws.Route(ws.POST("/bulkrules"). To(handler.handleCreateOrUpdateCustomAlertingRules). Doc("create or update cluster-level custom alerting rules in bulk"). Reads([]alertingv2alpha1.PostableAlertingRule{}). @@ -179,7 +179,7 @@ func AddToContainer(container *restful.Container, informers informers.InformerFa Returns(http.StatusOK, ksapi.StatusOK, ""). Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag})) - ws.Route(ws.POST("/namespaces/{namespace}/bulk_rules"). + ws.Route(ws.POST("/namespaces/{namespace}/bulkrules"). To(handler.handleCreateOrUpdateCustomAlertingRules). Doc("create or update custom alerting rules in bulk in the specified namespace"). Reads([]alertingv2alpha1.PostableAlertingRule{}). diff --git a/pkg/models/alerting/alerting.go b/pkg/models/alerting/alerting.go index c8b21a145..a7dd8949e 100644 --- a/pkg/models/alerting/alerting.go +++ b/pkg/models/alerting/alerting.go @@ -620,7 +620,7 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp } } - // check the rules + // check all the rules var ( br = &v2alpha1.BulkResponse{} nameSet = make(map[string]struct{}) @@ -665,10 +665,13 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp } } if len(nameSet) == len(invalids) { - return br.Derive(), nil + return br.Neaten(), nil } - // Confirm whether the rules are added or updated + // Confirm whether the rules should be added or updated. For each rule that is committed, + // it will be added if the rule does not exist, or updated otherwise. + // If there are rules with the same name in the existing rules to update, the first will be + // updated but the others will be deleted var ( addRules []*rules.RuleWithGroup updRules []*rules.ResourceRuleItem @@ -699,7 +702,7 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp } } - // do the actions + // add rules if len(addRules) > 0 { resps, err := ruler.AddAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels, addRules...) if err == nil { @@ -710,6 +713,7 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp } } } + // update existing rules if len(updRules) > 0 { resps, err := ruler.UpdateAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels, updRules...) if err == nil { @@ -720,6 +724,7 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp } } } + // delete possible duplicate rules if len(delRules) > 0 { _, err := ruler.DeleteAlertingRules(ctx, ruleNamespace, delRules...) if err != nil { @@ -728,7 +733,7 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp } } } - return br.Derive(), nil + return br.Neaten(), nil } func (o *operator) DeleteCustomAlertingRules(ctx context.Context, namespace string, @@ -792,7 +797,7 @@ func (o *operator) DeleteCustomAlertingRules(ctx context.Context, namespace stri } br.Items = append(br.Items, respItems...) - return br.Derive(), nil + return br.Neaten(), nil } // getPrometheusRuler gets the cluster-in prometheus diff --git a/pkg/models/alerting/rules/ruler.go b/pkg/models/alerting/rules/ruler.go index bbbf7740c..27622019b 100644 --- a/pkg/models/alerting/rules/ruler.go +++ b/pkg/models/alerting/rules/ruler.go @@ -98,7 +98,7 @@ func (r *ruleResource) deleteAlertingRules(rules ...*RuleWithGroup) (bool, error return commit, nil } -// updateAlertingRule updates the rules. +// updateAlertingRules updates the rules. // If there are rules to be updated, return true to indicate the resource should be updated. func (r *ruleResource) updateAlertingRules(rules ...*RuleWithGroup) (bool, error) { var ( @@ -147,15 +147,18 @@ func (r *ruleResource) updateAlertingRules(rules ...*RuleWithGroup) (bool, error return commit, nil } +// addAlertingRules adds the rules. +// If there are rules to be added, return true to indicate the resource should be updated. func (r *ruleResource) addAlertingRules(rules ...*RuleWithGroup) (bool, error) { var ( commit bool spec = r.Spec.DeepCopy() groupMax = -1 - cursor int - rulesNoGroup []promresourcesv1.Rule - rulesWithGroup = make(map[string][]promresourcesv1.Rule) + cursor int // indicates which rule to start adding for the rules with no groups + + rulesNoGroup []promresourcesv1.Rule // rules that do not specify group names + rulesWithGroup = make(map[string][]promresourcesv1.Rule) // rules that have specific group names ) for i, rule := range rules { @@ -170,11 +173,13 @@ func (r *ruleResource) addAlertingRules(rules ...*RuleWithGroup) (bool, error) { spec = new(promresourcesv1.PrometheusRuleSpec) } + // For the rules that have specific group names, add them to the matched groups. + // For the rules that do not specify group names, add them to the automatically generated groups until the limit is reached. for i, g := range spec.Groups { var ( gName = g.Name - doneNoGroup = cursor >= len(rulesNoGroup) - doneWithGroup = len(rulesWithGroup) == 0 + doneNoGroup = cursor >= len(rulesNoGroup) // whether all rules without groups have been added + doneWithGroup = len(rulesWithGroup) == 0 // whether all rules with groups have been added ) if doneNoGroup && doneWithGroup { @@ -201,26 +206,35 @@ func (r *ruleResource) addAlertingRules(rules ...*RuleWithGroup) (bool, error) { if size := len(g.Rules); size < customRuleGroupSize { num := customRuleGroupSize - size - var limit int - if limit = cursor + num; limit > len(rulesNoGroup) { - limit = len(rulesNoGroup) + var stop int + if stop = cursor + num; stop > len(rulesNoGroup) { + stop = len(rulesNoGroup) } - spec.Groups[i].Rules = append(spec.Groups[i].Rules, rulesNoGroup[cursor:limit]...) - cursor = limit + spec.Groups[i].Rules = append(spec.Groups[i].Rules, rulesNoGroup[cursor:stop]...) + cursor = stop commit = true } } } + // If no groups are available, new groups will be created to place the remaining rules. + for gName := range rulesWithGroup { + rules := rulesWithGroup[gName] + if len(rules) == 0 { + continue + } + spec.Groups = append(spec.Groups, promresourcesv1.RuleGroup{Name: gName, Rules: rules}) + commit = true + } for groupMax++; cursor < len(rules); groupMax++ { g := promresourcesv1.RuleGroup{Name: fmt.Sprintf("%s%d", customRuleGroupDefaultPrefix, groupMax)} - var limit int - if limit = cursor + customRuleGroupSize; limit > len(rulesNoGroup) { - limit = len(rulesNoGroup) + var stop int + if stop = cursor + customRuleGroupSize; stop > len(rulesNoGroup) { + stop = len(rulesNoGroup) } - g.Rules = append(g.Rules, rulesNoGroup[cursor:limit]...) + g.Rules = append(g.Rules, rulesNoGroup[cursor:stop]...) spec.Groups = append(spec.Groups, g) - cursor = limit + cursor = stop commit = true } @@ -313,18 +327,18 @@ func (r *PrometheusRuler) ListRuleResources(ruleNamespace *corev1.Namespace, ext func (r *PrometheusRuler) AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string, rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) { - return nil, errors.New("not supported to add rules for prometheus") + return nil, errors.New("Adding Prometheus rules not supported") } func (r *PrometheusRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string, ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) { - return nil, errors.New("not supported to update rules for prometheus") + return nil, errors.New("Updating Prometheus rules not supported") } func (r *PrometheusRuler) DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) { - return nil, errors.New("not supported to delete rules for prometheus") + return nil, errors.New("Deleting Prometheus rules not supported.") } type ThanosRuler struct { @@ -438,15 +452,17 @@ func (r *ThanosRuler) addAlertingRules(ctx context.Context, ruleNamespace *corev } var ( - err error - num = len(rules) - cursor - limit = len(rules) - rs []*RuleWithGroup + err error + num = len(rules) - cursor + stop = len(rules) + rs []*RuleWithGroup ) + // First add all the rules to this resource, + // and if the limit is exceeded, add half for i := 1; i <= 2; i++ { - limit = cursor + num/i - rs = rules[cursor:limit] + stop = cursor + num/i + rs = rules[cursor:stop] err = r.doRuleResourceOperation(ctx, pr.Namespace, pr.Name, func(pr *promresourcesv1.PrometheusRule) error { resource := ruleResource(*pr) @@ -474,22 +490,24 @@ func (r *ThanosRuler) addAlertingRules(ctx context.Context, ruleNamespace *corev for _, rule := range rs { respItems = append(respItems, resp(rule, err)) } - cursor = limit + cursor = stop } } - // create new rule resources and add rest rules into them when all existing rule resources are full. + // create new rule resources and add rest rules into them + // when all existing rule resources are full. for cursor < len(rules) { var ( - err error - num = len(rules) - cursor - limit = len(rules) - rs []*RuleWithGroup + err error + num = len(rules) - cursor + stop = len(rules) + rs []*RuleWithGroup ) - + // If adding the rules to the new resource exceeds the limit, + // reduce the amount to 1/2, 1/3... of rest rules until it can accommodate. for i := 1; ; i++ { - limit = cursor + num/i - rs = rules[cursor:limit] + stop = cursor + num/i + rs = rules[cursor:stop] pr := &promresourcesv1.PrometheusRule{ ObjectMeta: metav1.ObjectMeta{ @@ -514,7 +532,7 @@ func (r *ThanosRuler) addAlertingRules(ctx context.Context, ruleNamespace *corev for _, rule := range rs { respItems = append(respItems, resp(rule, err)) } - cursor = limit + cursor = stop } return respItems, nil @@ -527,16 +545,22 @@ func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *co var ( itemsMap = make(map[string][]*ResourceRuleItem) respItems = make([]*v2alpha1.BulkItemResponse, 0, len(ruleItems)) + // rules updated successfully. The key is the rule name. successes = make(map[string]struct{}) - moveMap = make(map[string][]*ResourceRuleItem) - delMap = make(map[string][]*ResourceRuleItem) // duplicate rules that need to deleted in the same resource - + // rules to be moved to other resources. The key is the resource name in which the rules were. + moveMap = make(map[string][]*ResourceRuleItem) + // duplicate rules to be deleted + delMap = make(map[string][]*ResourceRuleItem) ) for i, item := range ruleItems { itemsMap[item.ResourceName] = append(itemsMap[item.ResourceName], ruleItems[i]) } + // Update the rules in the resources where the rules reside. + // If duplicate rules are found, the first will be updated and the others will be deleted. + // if updating the rules in the original resources causes exceeding size limit, + // they will be moved to other resources and then be updated. for name, items := range itemsMap { var ( nrules []*RuleWithGroup @@ -574,7 +598,7 @@ func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *co successes[item.Alert] = struct{}{} respItems = append(respItems, v2alpha1.NewBulkItemSuccessResponse(item.Alert, v2alpha1.ResultUpdated)) } - case err == errOutOfConfigMapSize: + case err == errOutOfConfigMapSize: // Cannot update the rules in the original resource moveMap[name] = append(moveMap[name], nitems...) case resourceNotFound(err): for _, item := range items { @@ -591,6 +615,9 @@ func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *co } } + // The move here is not really move, because the move also requires an update. + // So the actual operations are firstly add the new rules in other resources + // and then delete the old rules in old resources. for name, items := range moveMap { var ( nrules = make([]*RuleWithGroup, 0, len(items)) @@ -638,6 +665,7 @@ func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *co for i := range dRespItems { resp := dRespItems[i] if resp.Status == v2alpha1.StatusSuccess { + // The delete operation here is for updating, so update the result to v2alpha1.ResultUpdated resp.Result = v2alpha1.ResultUpdated } respItems = append(respItems, resp) @@ -693,6 +721,7 @@ func (r *ThanosRuler) DeleteAlertingRules(ctx context.Context, ruleNamespace *co func (r *ThanosRuler) doRuleResourceOperation(ctx context.Context, namespace, name string, operation func(pr *promresourcesv1.PrometheusRule) error) error { + // Lock here is used to lock specific resource in order to prevent frequent conflicts key := namespace + "/" + name return retry.RetryOnConflict(retry.DefaultRetry, func() error { ruleResourceLocker.Lock(key)