diff --git a/pkg/api/alerting/v2alpha1/types.go b/pkg/api/alerting/v2alpha1/types.go index fbd37ff88..5e20fa17d 100644 --- a/pkg/api/alerting/v2alpha1/types.go +++ b/pkg/api/alerting/v2alpha1/types.go @@ -18,6 +18,7 @@ package v2alpha1 import ( "context" + "regexp" "sort" "strconv" "strings" @@ -47,6 +48,8 @@ var ( templateTestData = template.AlertTemplateData(map[string]string{}, map[string]string{}, 0) templateTestTextPrefix = "{{$labels := .Labels}}{{$externalLabels := .ExternalLabels}}{{$value := .Value}}" + + ruleNameMatcher = regexp.MustCompile(`^[a-z0-9]([-a-z0-9]*[a-z0-9])?$`) ) type RuleLevel string @@ -70,6 +73,10 @@ func (r *PostableAlertingRule) Validate() error { if r.Name == "" { errs = append(errs, errors.New("name can not be empty")) + } else { + if !ruleNameMatcher.MatchString(r.Name) { + errs = append(errs, errors.New("name is invalid which not matches ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$")) + } } if r.Query == "" { @@ -442,3 +449,79 @@ func parseLabelFilters(req *restful.Request) (map[string]string, map[string]stri } return labelEqualFilters, labelContainFilters } + +const ( + ErrBadData ErrorType = "bad_data" + ErrDuplicateName ErrorType = "duplicate_name" + ErrNotFound ErrorType = "not_found" + ErrServer ErrorType = "server_error" + + StatusSuccess Status = "success" + StatusError Status = "error" + + ResultCreated Result = "created" + ResultUpdated Result = "updated" + ResultDeleted Result = "deleted" +) + +type Status string + +type ErrorType string + +type Result string + +type BulkResponse struct { + Errors bool `json:"errors" description:"If true, one or more of the operations in the bulk request did not complete successfully"` + Items []*BulkItemResponse `json:"items" description:"It contains the result of each operation in the bulk request"` +} + +func (br *BulkResponse) Derive() *BulkResponse { + var ( + items []*BulkItemResponse + itemMap = make(map[string]*BulkItemResponse) + ) + for i, item := range br.Items { + if item.Status == StatusError { + br.Errors = true + } + pitem, ok := itemMap[item.RuleName] + if !ok || (pitem.Status == StatusSuccess || item.Status == StatusError) { + itemMap[item.RuleName] = br.Items[i] + } + } + for k := range itemMap { + item := itemMap[k] + if item.Error != nil { + item.ErrorStr = item.Error.Error() + } + items = append(items, itemMap[k]) + } + br.Items = items + return br +} + +type BulkItemResponse struct { + RuleName string `json:"ruleName,omitempty"` + Status Status `json:"status,omitempty" description:"It may be success or error"` + Result Result `json:"result,omitempty" description:"It may be created, updated or deleted, and only for successful operations"` + ErrorType ErrorType `json:"errorType,omitempty" description:"It may be bad_data, duplicate_name, not_found or server_error, and only for failed operations"` + Error error `json:"-"` + ErrorStr string `json:"error,omitempty" description:"It is only returned for failed operations"` +} + +func NewBulkItemSuccessResponse(ruleName string, result Result) *BulkItemResponse { + return &BulkItemResponse{ + RuleName: ruleName, + Status: StatusSuccess, + Result: result, + } +} + +func NewBulkItemErrorServerResponse(ruleName string, err error) *BulkItemResponse { + return &BulkItemResponse{ + RuleName: ruleName, + Status: StatusError, + ErrorType: ErrServer, + Error: err, + } +} diff --git a/pkg/kapis/alerting/v2alpha1/handler.go b/pkg/kapis/alerting/v2alpha1/handler.go index 94df298c9..ab95c1d90 100644 --- a/pkg/kapis/alerting/v2alpha1/handler.go +++ b/pkg/kapis/alerting/v2alpha1/handler.go @@ -283,3 +283,45 @@ func (h *handler) handleListBuiltinRuleAlerts(req *restful.Request, resp *restfu resp.WriteEntity(alerts) } + +func (h *handler) handleCreateOrUpdateCustomAlertingRules(req *restful.Request, resp *restful.Response) { + namespace := req.PathParameter("namespace") + + var rules []*v2alpha1.PostableAlertingRule + if err := req.ReadEntity(&rules); err != nil { + klog.Error(err) + ksapi.HandleBadRequest(resp, nil, err) + return + } + + bulkResp, err := h.operator.CreateOrUpdateCustomAlertingRules(req.Request.Context(), namespace, rules) + if err != nil { + klog.Error(err) + switch { + case err == v2alpha1.ErrThanosRulerNotEnabled: + ksapi.HandleBadRequest(resp, nil, err) + default: + ksapi.HandleInternalError(resp, nil, err) + } + return + } + resp.WriteEntity(bulkResp) +} + +func (h *handler) handleDeleteCustomAlertingRules(req *restful.Request, resp *restful.Response) { + namespace := req.PathParameter("namespace") + names := req.QueryParameters("name") + + bulkResp, err := h.operator.DeleteCustomAlertingRules(req.Request.Context(), namespace, names) + if err != nil { + klog.Error(err) + switch { + case err == v2alpha1.ErrThanosRulerNotEnabled: + ksapi.HandleBadRequest(resp, nil, err) + default: + ksapi.HandleInternalError(resp, nil, err) + } + return + } + resp.WriteEntity(bulkResp) +} diff --git a/pkg/kapis/alerting/v2alpha1/register.go b/pkg/kapis/alerting/v2alpha1/register.go index 95ddba912..6b2d4131a 100644 --- a/pkg/kapis/alerting/v2alpha1/register.go +++ b/pkg/kapis/alerting/v2alpha1/register.go @@ -102,6 +102,20 @@ func AddToContainer(container *restful.Container, informers informers.InformerFa Returns(http.StatusOK, ksapi.StatusOK, nil). Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag})) + ws.Route(ws.DELETE("/rules"). + To(handler.handleDeleteCustomAlertingRules). + Doc("delete multiple cluster-level custom alerting rules"). + Param(ws.QueryParameter("name", "rule name").CollectionFormat(restful.CollectionFormatMulti).AllowMultiple(true)). + Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.BulkResponse{}). + Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag})) + + ws.Route(ws.POST("/bulk_rules"). + To(handler.handleCreateOrUpdateCustomAlertingRules). + Doc("create or update cluster-level custom alerting rules in bulk"). + Reads([]alertingv2alpha1.PostableAlertingRule{}). + Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.BulkResponse{}). + Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag})) + ws.Route(ws.PUT("/rules/{rule_name}"). To(handler.handleUpdateCustomAlertingRule). Doc("update the cluster-level custom alerting rule with the specified name"). @@ -151,6 +165,13 @@ func AddToContainer(container *restful.Container, informers informers.InformerFa Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.AlertList{}). Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag})) + ws.Route(ws.DELETE("/namespaces/{namespace}/rules"). + To(handler.handleDeleteCustomAlertingRules). + Doc("delete multiple custom alerting rules in the specified namespace"). + Param(ws.QueryParameter("name", "rule name").CollectionFormat(restful.CollectionFormatMulti).AllowMultiple(true)). + Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.BulkResponse{}). + Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag})) + ws.Route(ws.POST("/namespaces/{namespace}/rules"). To(handler.handleCreateCustomAlertingRule). Doc("create a custom alerting rule in the specified namespace"). @@ -158,6 +179,13 @@ func AddToContainer(container *restful.Container, informers informers.InformerFa Returns(http.StatusOK, ksapi.StatusOK, ""). Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag})) + ws.Route(ws.POST("/namespaces/{namespace}/bulk_rules"). + To(handler.handleCreateOrUpdateCustomAlertingRules). + Doc("create or update custom alerting rules in bulk in the specified namespace"). + Reads([]alertingv2alpha1.PostableAlertingRule{}). + Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.BulkResponse{}). + Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag})) + ws.Route(ws.PUT("/namespaces/{namespace}/rules/{rule_name}"). To(handler.handleUpdateCustomAlertingRule). Doc("update the custom alerting rule with the specified name in the specified namespace"). diff --git a/pkg/models/alerting/alerting.go b/pkg/models/alerting/alerting.go index 949cc24c0..c8b21a145 100644 --- a/pkg/models/alerting/alerting.go +++ b/pkg/models/alerting/alerting.go @@ -27,11 +27,6 @@ const ( customRuleResourceLabelKeyLevel = "custom-alerting-rule-level" ) -var ( - maxSecretSize = corev1.MaxSecretSize - maxConfigMapDataSize = int(float64(maxSecretSize) * 0.45) -) - // Operator contains all operations to alerting rules. The operations may involve manipulations of prometheusrule // custom resources where the rules are persisted, and querying the rules state from prometheus endpoint and // thanos ruler endpoint. @@ -57,6 +52,11 @@ type Operator interface { // DeleteCustomAlertingRule deletes the custom alerting rule with the given name. DeleteCustomAlertingRule(ctx context.Context, namespace, ruleName string) error + // CreateOrUpdateCustomAlertingRules creates or updates custom alerting rules in bulk. + CreateOrUpdateCustomAlertingRules(ctx context.Context, namespace string, rs []*v2alpha1.PostableAlertingRule) (*v2alpha1.BulkResponse, error) + // DeleteCustomAlertingRules deletes a batch of custom alerting rules. + DeleteCustomAlertingRules(ctx context.Context, namespace string, ruleNames []string) (*v2alpha1.BulkResponse, error) + // ListBuiltinAlertingRules lists the builtin(non-custom) alerting rules ListBuiltinAlertingRules(ctx context.Context, queryParams *v2alpha1.AlertingRuleQueryParams) (*v2alpha1.GettableAlertingRuleList, error) @@ -241,40 +241,6 @@ func (o *operator) ListBuiltinRuleAlerts(ctx context.Context, ruleId string) (*v }, nil } -func (o *operator) ListClusterAlertingRules(ctx context.Context, customFlag string, - queryParams *v2alpha1.AlertingRuleQueryParams) (*v2alpha1.GettableAlertingRuleList, error) { - - namespace := rulerNamespace - ruleNamespace, err := o.namespaceInformer.Lister().Get(namespace) - if err != nil { - return nil, err - } - - alertingRules, err := o.listCustomAlertingRules(ctx, ruleNamespace, v2alpha1.RuleLevelCluster) - if err != nil { - return nil, err - } - - return pageAlertingRules(alertingRules, queryParams), nil -} - -func (o *operator) ListClusterRulesAlerts(ctx context.Context, - queryParams *v2alpha1.AlertQueryParams) (*v2alpha1.AlertList, error) { - - namespace := rulerNamespace - ruleNamespace, err := o.namespaceInformer.Lister().Get(namespace) - if err != nil { - return nil, err - } - - alertingRules, err := o.listCustomAlertingRules(ctx, ruleNamespace, v2alpha1.RuleLevelCluster) - if err != nil { - return nil, err - } - - return pageAlerts(alertingRules, queryParams), nil -} - func (o *operator) listCustomAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, level v2alpha1.RuleLevel) ([]*v2alpha1.GettableAlertingRule, error) { @@ -292,6 +258,10 @@ func (o *operator) listCustomAlertingRules(ctx context.Context, ruleNamespace *c return nil, err } + if len(resourceRulesMap) == 0 { + return nil, nil + } + ruleGroups, err := o.ruleClient.ThanosRules(ctx) if err != nil { return nil, err @@ -473,8 +443,20 @@ func (o *operator) CreateCustomAlertingRule(ctx context.Context, namespace strin setRuleUpdateTime(rule, time.Now()) - return ruler.AddAlertingRule(ctx, ruleNamespace, extraRuleResourceSelector, - ruleResourceLabels, &rules.ResourceRuleItem{Rule: parseToPrometheusRule(rule)}) + respItems, err := ruler.AddAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, + ruleResourceLabels, &rules.RuleWithGroup{Rule: *parseToPrometheusRule(rule)}) + if err != nil { + return err + } + for _, item := range respItems { + if item.Status == v2alpha1.StatusError { + if item.ErrorType == v2alpha1.ErrNotFound { + return v2alpha1.ErrAlertingRuleNotFound + } + return item.Error + } + } + return nil } func (o *operator) UpdateCustomAlertingRule(ctx context.Context, namespace, name string, @@ -526,8 +508,21 @@ func (o *operator) UpdateCustomAlertingRule(ctx context.Context, namespace, name setRuleUpdateTime(rule, time.Now()) - return ruler.UpdateAlertingRule(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels, - &rules.ResourceRuleItem{Group: resourceRule.Group, Rule: parseToPrometheusRule(rule)}) + respItems, err := ruler.UpdateAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels, + &rules.ResourceRuleItem{ResourceName: resourceRule.ResourceName, + RuleWithGroup: rules.RuleWithGroup{Group: resourceRule.Group, Rule: *parseToPrometheusRule(rule)}}) + if err != nil { + return err + } + for _, item := range respItems { + if item.Status == v2alpha1.StatusError { + if item.ErrorType == v2alpha1.ErrNotFound { + return v2alpha1.ErrAlertingRuleNotFound + } + return item.Error + } + } + return nil } func (o *operator) DeleteCustomAlertingRule(ctx context.Context, namespace, name string) error { @@ -555,16 +550,249 @@ func (o *operator) DeleteCustomAlertingRule(ctx context.Context, namespace, name } extraRuleResourceSelector := labels.SelectorFromSet(labels.Set{customRuleResourceLabelKeyLevel: string(level)}) - resourceRule, err := o.resourceRuleCache.GetRule(ruler, ruleNamespace, extraRuleResourceSelector, name) + resourceRules, err := o.resourceRuleCache.GetRuleByIdOrName(ruler, ruleNamespace, extraRuleResourceSelector, name) if err != nil { return err } - if resourceRule == nil { + if len(resourceRules) == 0 { return v2alpha1.ErrAlertingRuleNotFound } - return ruler.DeleteAlertingRule(ctx, ruleNamespace, extraRuleResourceSelector, - &rules.ResourceRuleItem{Group: resourceRule.Group, Rule: resourceRule.Rule}) + respItems, err := ruler.DeleteAlertingRules(ctx, ruleNamespace, resourceRules...) + if err != nil { + return err + } + for _, item := range respItems { + if item.Status == v2alpha1.StatusError { + if item.ErrorType == v2alpha1.ErrNotFound { + return v2alpha1.ErrAlertingRuleNotFound + } + return item.Error + } + } + return nil +} + +func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namespace string, + rs []*v2alpha1.PostableAlertingRule) (*v2alpha1.BulkResponse, error) { + + if l := len(rs); l == 0 { + return &v2alpha1.BulkResponse{}, nil + } + + ruler, err := o.getThanosRuler() + if err != nil { + return nil, err + } + if ruler == nil { + return nil, v2alpha1.ErrThanosRulerNotEnabled + } + + var ( + level v2alpha1.RuleLevel + ruleResourceLabels = make(map[string]string) + ) + for k, v := range o.thanosRuleResourceLabels { + ruleResourceLabels[k] = v + } + if namespace == "" { + namespace = rulerNamespace + level = v2alpha1.RuleLevelCluster + } else { + level = v2alpha1.RuleLevelNamespace + } + ruleResourceLabels[customRuleResourceLabelKeyLevel] = string(level) + ruleNamespace, err := o.namespaceInformer.Lister().Get(namespace) + if err != nil { + return nil, err + } + + extraRuleResourceSelector := labels.SelectorFromSet(labels.Set{customRuleResourceLabelKeyLevel: string(level)}) + + resourceRulesMap, err := o.resourceRuleCache.ListRules(ruler, ruleNamespace, extraRuleResourceSelector) + if err != nil { + return nil, err + } + exists := make(map[string][]*rules.ResourceRuleItem) + for _, c := range resourceRulesMap { + for n, items := range c.NameRules { + exists[n] = append(exists[n], items...) + } + } + + // check the rules + var ( + br = &v2alpha1.BulkResponse{} + nameSet = make(map[string]struct{}) + invalids = make(map[string]struct{}) + ) + for i := range rs { + var ( + r = rs[i] + name = r.Name + ) + + if _, ok := nameSet[name]; ok { + br.Items = append(br.Items, &v2alpha1.BulkItemResponse{ + RuleName: name, + Status: v2alpha1.StatusError, + ErrorType: v2alpha1.ErrDuplicateName, + Error: errors.Errorf("There is more than one rule named %s in the bulk request", name), + }) + invalids[name] = struct{}{} + continue + } else { + nameSet[name] = struct{}{} + } + if err := r.Validate(); err != nil { + br.Items = append(br.Items, &v2alpha1.BulkItemResponse{ + RuleName: name, + Status: v2alpha1.StatusError, + ErrorType: v2alpha1.ErrBadData, + Error: err, + }) + invalids[name] = struct{}{} + continue + } + if level == v2alpha1.RuleLevelNamespace { + expr, err := rules.InjectExprNamespaceLabel(r.Query, namespace) + if err != nil { + br.Items = append(br.Items, v2alpha1.NewBulkItemErrorServerResponse(name, err)) + invalids[name] = struct{}{} + continue + } + r.Query = expr + } + } + if len(nameSet) == len(invalids) { + return br.Derive(), nil + } + + // Confirm whether the rules are added or updated + var ( + addRules []*rules.RuleWithGroup + updRules []*rules.ResourceRuleItem + delRules []*rules.ResourceRuleItem // duplicate rules that need to deleted in other resources + + updateTime = time.Now() + ) + for i := range rs { + r := rs[i] + if _, ok := invalids[r.Name]; ok { + continue + } + setRuleUpdateTime(r, updateTime) + if items, ok := exists[r.Name]; ok && len(items) > 0 { + item := items[0] + updRules = append(updRules, &rules.ResourceRuleItem{ + ResourceName: item.ResourceName, + RuleWithGroup: rules.RuleWithGroup{Group: item.Group, Rule: *parseToPrometheusRule(r)}}) + if len(items) > 1 { + for j := 1; j < len(items); j++ { + if items[j].ResourceName != item.ResourceName { + delRules = append(delRules, items[j]) + } + } + } + } else { + addRules = append(addRules, &rules.RuleWithGroup{Rule: *parseToPrometheusRule(r)}) + } + } + + // do the actions + if len(addRules) > 0 { + resps, err := ruler.AddAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels, addRules...) + if err == nil { + br.Items = append(br.Items, resps...) + } else { + for _, rule := range addRules { + br.Items = append(br.Items, v2alpha1.NewBulkItemErrorServerResponse(rule.Alert, err)) + } + } + } + if len(updRules) > 0 { + resps, err := ruler.UpdateAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels, updRules...) + if err == nil { + br.Items = append(br.Items, resps...) + } else { + for _, rule := range updRules { + br.Items = append(br.Items, v2alpha1.NewBulkItemErrorServerResponse(rule.Alert, err)) + } + } + } + if len(delRules) > 0 { + _, err := ruler.DeleteAlertingRules(ctx, ruleNamespace, delRules...) + if err != nil { + for _, rule := range delRules { + br.Items = append(br.Items, v2alpha1.NewBulkItemErrorServerResponse(rule.Alert, err)) + } + } + } + return br.Derive(), nil +} + +func (o *operator) DeleteCustomAlertingRules(ctx context.Context, namespace string, + names []string) (*v2alpha1.BulkResponse, error) { + + if l := len(names); l == 0 { + return &v2alpha1.BulkResponse{}, nil + } + + ruler, err := o.getThanosRuler() + if err != nil { + return nil, err + } + if ruler == nil { + return nil, v2alpha1.ErrThanosRulerNotEnabled + } + + var ( + level v2alpha1.RuleLevel + ) + if namespace == "" { + namespace = rulerNamespace + level = v2alpha1.RuleLevelCluster + } else { + level = v2alpha1.RuleLevelNamespace + } + ruleNamespace, err := o.namespaceInformer.Lister().Get(namespace) + if err != nil { + return nil, err + } + + extraRuleResourceSelector := labels.SelectorFromSet(labels.Set{customRuleResourceLabelKeyLevel: string(level)}) + resourceRulesMap, err := o.resourceRuleCache.ListRules(ruler, ruleNamespace, extraRuleResourceSelector) + if err != nil { + return nil, err + } + exists := make(map[string][]*rules.ResourceRuleItem) + for _, c := range resourceRulesMap { + for n, items := range c.NameRules { + exists[n] = append(exists[n], items...) + } + } + + br := &v2alpha1.BulkResponse{} + var ruleItems []*rules.ResourceRuleItem + for _, n := range names { + if items, ok := exists[n]; ok { + ruleItems = append(ruleItems, items...) + } else { + br.Items = append(br.Items, &v2alpha1.BulkItemResponse{ + RuleName: n, + Status: v2alpha1.StatusError, + ErrorType: v2alpha1.ErrNotFound, + }) + } + } + + respItems, err := ruler.DeleteAlertingRules(ctx, ruleNamespace, ruleItems...) + if err != nil { + return nil, err + } + br.Items = append(br.Items, respItems...) + + return br.Derive(), nil } // getPrometheusRuler gets the cluster-in prometheus @@ -604,6 +832,9 @@ func (o *operator) getThanosRuler() (rules.Ruler, error) { } func parseToPrometheusRule(rule *v2alpha1.PostableAlertingRule) *promresourcesv1.Rule { + if _, ok := rule.Labels[rules.LabelKeyAlertType]; !ok { + rule.Labels[rules.LabelKeyAlertType] = rules.LabelValueAlertType + } return &promresourcesv1.Rule{ Alert: rule.Name, Expr: intstr.FromString(rule.Query), diff --git a/pkg/models/alerting/rules/cache.go b/pkg/models/alerting/rules/cache.go index ed94a5217..5284e4d07 100644 --- a/pkg/models/alerting/rules/cache.go +++ b/pkg/models/alerting/rules/cache.go @@ -107,6 +107,24 @@ func (c *RuleCache) getResourceRuleCaches(ruler Ruler, ruleNamespace *corev1.Nam func (c *RuleCache) GetRule(ruler Ruler, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector, idOrName string) (*ResourceRuleItem, error) { + rules, err := c.GetRuleByIdOrName(ruler, ruleNamespace, extraRuleResourceSelector, idOrName) + if err != nil { + return nil, err + } + if l := len(rules); l == 0 { + return nil, nil + } else if l > 1 { + // guarantees the stability of the get operations. + sort.Slice(rules, func(i, j int) bool { + return v2alpha1.AlertingRuleIdCompare(rules[i].Id, rules[j].Id) + }) + } + return rules[0], nil +} + +func (c *RuleCache) GetRuleByIdOrName(ruler Ruler, ruleNamespace *corev1.Namespace, + extraRuleResourceSelector labels.Selector, idOrName string) ([]*ResourceRuleItem, error) { + caches, err := c.getResourceRuleCaches(ruler, ruleNamespace, extraRuleResourceSelector) if err != nil { return nil, err @@ -121,9 +139,11 @@ func (c *RuleCache) GetRule(ruler Ruler, ruleNamespace *corev1.Namespace, for rn, rc := range caches { if rule, ok := rc.IdRules[idOrName]; ok { rules = append(rules, &ResourceRuleItem{ - Group: rule.Group, - Id: rule.Id, - Rule: rule.Rule.DeepCopy(), + RuleWithGroup: RuleWithGroup{ + Group: rule.Group, + Id: rule.Id, + Rule: *rule.Rule.DeepCopy(), + }, ResourceName: rn, }) } @@ -133,9 +153,11 @@ func (c *RuleCache) GetRule(ruler Ruler, ruleNamespace *corev1.Namespace, if nrules, ok := rc.NameRules[idOrName]; ok { for _, nrule := range nrules { rules = append(rules, &ResourceRuleItem{ - Group: nrule.Group, - Id: nrule.Id, - Rule: nrule.Rule.DeepCopy(), + RuleWithGroup: RuleWithGroup{ + Group: nrule.Group, + Id: nrule.Id, + Rule: *nrule.Rule.DeepCopy(), + }, ResourceName: rn, }) } @@ -145,15 +167,7 @@ func (c *RuleCache) GetRule(ruler Ruler, ruleNamespace *corev1.Namespace, return nil, errors.New("unsupported ruler type") } - if l := len(rules); l == 0 { - return nil, nil - } else if l > 1 { - // guarantees the stability of the get operations. - sort.Slice(rules, func(i, j int) bool { - return v2alpha1.AlertingRuleIdCompare(rules[i].Id, rules[j].Id) - }) - } - return rules[0], nil + return rules, err } func (c *RuleCache) ListRules(ruler Ruler, ruleNamespace *corev1.Namespace, @@ -178,9 +192,11 @@ func (c *RuleCache) ListRules(ruler Ruler, ruleNamespace *corev1.Namespace, for _, rule := range rules { rrs.GroupSet[rule.Group] = struct{}{} rr := &ResourceRuleItem{ - Group: rule.Group, - Id: rule.Id, - Rule: rule.Rule.DeepCopy(), + RuleWithGroup: RuleWithGroup{ + Group: rule.Group, + Id: rule.Id, + Rule: *rule.Rule.DeepCopy(), + }, ResourceName: rn, } rrs.IdRules[rr.Id] = rr diff --git a/pkg/models/alerting/rules/rule.go b/pkg/models/alerting/rules/rule.go index 6524187e8..2faade7fe 100644 --- a/pkg/models/alerting/rules/rule.go +++ b/pkg/models/alerting/rules/rule.go @@ -13,9 +13,7 @@ type ResourceRuleCollection struct { type ResourceRuleItem struct { ResourceName string - Group string - Id string - Rule *promresourcesv1.Rule + RuleWithGroup } type ResourceRule struct { @@ -29,3 +27,9 @@ type ResourceRuleChunk struct { Custom bool ResourceRulesMap map[string]*ResourceRuleCollection } + +type RuleWithGroup struct { + Group string + Id string + promresourcesv1.Rule +} diff --git a/pkg/models/alerting/rules/ruler.go b/pkg/models/alerting/rules/ruler.go index ed954266d..bbbf7740c 100644 --- a/pkg/models/alerting/rules/ruler.go +++ b/pkg/models/alerting/rules/ruler.go @@ -34,6 +34,8 @@ var ( maxConfigMapDataSize = int(float64(maxSecretSize) * 0.45) errOutOfConfigMapSize = errors.New("out of config map size") + + ruleResourceLocker locker.Locker ) type Ruler interface { @@ -44,35 +46,44 @@ type Ruler interface { ListRuleResources(ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector) ( []*promresourcesv1.PrometheusRule, error) - AddAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector, - ruleResourceLabels map[string]string, ruleItem *ResourceRuleItem) error - UpdateAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector, - ruleResourceLabels map[string]string, ruleItem *ResourceRuleItem) error - DeleteAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector, - ruleItem *ResourceRuleItem) error + AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector, + ruleResourceLabels map[string]string, rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) + UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector, + ruleResourceLabels map[string]string, ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) + DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, + ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) } type ruleResource promresourcesv1.PrometheusRule -// deleteAlertingRule deletes the rule. -// If the rule is deleted, return true to indicate the resource should be updated. -func (r *ruleResource) deleteAlertingRule(ruleItem *ResourceRuleItem) (bool, error) { +// deleteAlertingRules deletes the rules. +// If there are rules to be deleted, return true to indicate the resource should be updated. +func (r *ruleResource) deleteAlertingRules(rules ...*RuleWithGroup) (bool, error) { var ( - nGroups []promresourcesv1.RuleGroup - ok bool + gs []promresourcesv1.RuleGroup + dels = make(map[string]struct{}) + commit bool ) + for _, rule := range rules { + if rule != nil { + dels[rule.Alert] = struct{}{} + } + } + for _, g := range r.Spec.Groups { var rules []promresourcesv1.Rule for _, gr := range g.Rules { - if gr.Alert != "" && gr.Alert == ruleItem.Rule.Alert { - ok = true - continue + if gr.Alert != "" { + if _, ok := dels[gr.Alert]; ok { + commit = true + continue + } } rules = append(rules, gr) } if len(rules) > 0 { - nGroups = append(nGroups, promresourcesv1.RuleGroup{ + gs = append(gs, promresourcesv1.RuleGroup{ Name: g.Name, Interval: g.Interval, PartialResponseStrategy: g.PartialResponseStrategy, @@ -81,131 +92,149 @@ func (r *ruleResource) deleteAlertingRule(ruleItem *ResourceRuleItem) (bool, err } } - if ok { - r.Spec.Groups = nGroups + if commit { + r.Spec.Groups = gs } - return ok, nil + return commit, nil } -// updateAlertingRule updates the rule with the given group. -// If the rule is updated, return true to indicate the resource should be updated. -func (r *ruleResource) updateAlertingRule(ruleItem *ResourceRuleItem) (bool, error) { +// updateAlertingRule updates the rules. +// If there are rules to be updated, return true to indicate the resource should be updated. +func (r *ruleResource) updateAlertingRules(rules ...*RuleWithGroup) (bool, error) { var ( - ok bool - pr = (promresourcesv1.PrometheusRule)(*r) - npr = pr.DeepCopy() - groupMap = make(map[string]*promresourcesv1.RuleGroup) + commit bool + spec = r.Spec.DeepCopy() + ruleMap = make(map[string]*RuleWithGroup) ) - for _, g := range npr.Spec.Groups { - var rules []promresourcesv1.Rule - for i, gr := range g.Rules { - if gr.Alert != "" && gr.Alert == ruleItem.Rule.Alert { - ok = true + if spec == nil { + return false, nil + } + + for i, rule := range rules { + if rule != nil { + ruleMap[rule.Alert] = rules[i] + } + } + + for i, g := range spec.Groups { + for j, r := range g.Rules { + if r.Alert == "" { continue } - rules = append(rules, g.Rules[i]) - } - if len(rules) > 0 { - groupMap[g.Name] = &promresourcesv1.RuleGroup{ - Name: g.Name, - Interval: g.Interval, - PartialResponseStrategy: g.PartialResponseStrategy, - Rules: rules, + if b, ok := ruleMap[r.Alert]; ok { + if b == nil { + spec.Groups[i].Rules = append(g.Rules[:j], g.Rules[j+1:]...) + } else { + spec.Groups[i].Rules[j] = b.Rule + ruleMap[r.Alert] = nil // clear to mark it updated + } + commit = true } } } - if ok { - if g, exist := groupMap[ruleItem.Group]; exist { - g.Rules = append(g.Rules, *ruleItem.Rule) - } else { - groupMap[ruleItem.Group] = &promresourcesv1.RuleGroup{ - Name: ruleItem.Group, - Rules: []promresourcesv1.Rule{*ruleItem.Rule}, - } - } - - var groups []promresourcesv1.RuleGroup - for _, g := range groupMap { - groups = append(groups, *g) - } - - npr.Spec.Groups = groups - content, err := yaml.Marshal(npr.Spec) + if commit { + content, err := yaml.Marshal(spec) if err != nil { return false, errors.Wrap(err, "failed to unmarshal content") } - - if len(string(content)) < maxConfigMapDataSize { // check size limit - r.Spec.Groups = groups - return true, nil + if len(string(content)) > maxConfigMapDataSize { // check size limit + return false, errOutOfConfigMapSize } - return false, errOutOfConfigMapSize + r.Spec = *spec } - return false, nil + return commit, nil } -func (r *ruleResource) addAlertingRule(ruleItem *ResourceRuleItem) (bool, error) { +func (r *ruleResource) addAlertingRules(rules ...*RuleWithGroup) (bool, error) { var ( - err error - pr = (promresourcesv1.PrometheusRule)(*r) - npr = pr.DeepCopy() - ok bool + commit bool + spec = r.Spec.DeepCopy() + groupMax = -1 + cursor int + + rulesNoGroup []promresourcesv1.Rule + rulesWithGroup = make(map[string][]promresourcesv1.Rule) ) - if strings.TrimSpace(ruleItem.Group) == "" { - var tg string - var suffix = -1 - for i := 0; i < len(npr.Spec.Groups); i++ { - g := npr.Spec.Groups[i] - if strings.HasPrefix(g.Name, customRuleGroupDefaultPrefix) { - suf, err := strconv.Atoi(strings.TrimPrefix(g.Name, customRuleGroupDefaultPrefix)) - if err != nil { - continue - } - if suf > suffix { - suffix = suf - } - if suffix >= 0 && len(g.Rules) < customRuleGroupSize { - tg = g.Name - break - } - } - } - if tg == "" { - ruleItem.Group = fmt.Sprintf("%s%d", customRuleGroupDefaultPrefix, suffix+1) + for i, rule := range rules { + if len(strings.TrimSpace(rule.Group)) == 0 { + rulesNoGroup = append(rulesNoGroup, rules[i].Rule) } else { - ruleItem.Group = tg + rulesWithGroup[rule.Group] = append(rulesWithGroup[rule.Group], rules[i].Rule) } - } - for i := 0; i < len(npr.Spec.Groups); i++ { - if npr.Spec.Groups[i].Name == ruleItem.Group { - npr.Spec.Groups[i].Rules = append(npr.Spec.Groups[i].Rules, *ruleItem.Rule) - ok = true + if spec == nil { + spec = new(promresourcesv1.PrometheusRuleSpec) + } + + for i, g := range spec.Groups { + var ( + gName = g.Name + doneNoGroup = cursor >= len(rulesNoGroup) + doneWithGroup = len(rulesWithGroup) == 0 + ) + + if doneNoGroup && doneWithGroup { break } - } - if !ok { // add a group when there is no group with the specified group name - npr.Spec.Groups = append(npr.Spec.Groups, promresourcesv1.RuleGroup{ - Name: ruleItem.Group, - Rules: []promresourcesv1.Rule{*ruleItem.Rule}, - }) + + if !doneWithGroup { + if _, ok := rulesWithGroup[gName]; ok { + spec.Groups[i].Rules = append(spec.Groups[i].Rules, rulesWithGroup[gName]...) + delete(rulesWithGroup, gName) + commit = true + } + } + + g = spec.Groups[i] + if !doneNoGroup && strings.HasPrefix(gName, customRuleGroupDefaultPrefix) { + suf, err := strconv.Atoi(strings.TrimPrefix(gName, customRuleGroupDefaultPrefix)) + if err != nil { + continue + } + if suf > groupMax { + groupMax = suf + } + + if size := len(g.Rules); size < customRuleGroupSize { + num := customRuleGroupSize - size + var limit int + if limit = cursor + num; limit > len(rulesNoGroup) { + limit = len(rulesNoGroup) + } + spec.Groups[i].Rules = append(spec.Groups[i].Rules, rulesNoGroup[cursor:limit]...) + cursor = limit + commit = true + } + } } - content, err := yaml.Marshal(npr.Spec) - if err != nil { - return false, errors.Wrap(err, "failed to unmarshal content") + for groupMax++; cursor < len(rules); groupMax++ { + g := promresourcesv1.RuleGroup{Name: fmt.Sprintf("%s%d", customRuleGroupDefaultPrefix, groupMax)} + var limit int + if limit = cursor + customRuleGroupSize; limit > len(rulesNoGroup) { + limit = len(rulesNoGroup) + } + g.Rules = append(g.Rules, rulesNoGroup[cursor:limit]...) + spec.Groups = append(spec.Groups, g) + cursor = limit + commit = true } - if len(string(content)) < maxConfigMapDataSize { // check size limit - r.Spec.Groups = npr.Spec.Groups - return true, nil - } else { - return false, errOutOfConfigMapSize + if commit { + content, err := yaml.Marshal(spec) + if err != nil { + return false, errors.Wrap(err, "failed to unmarshal content") + } + if len(string(content)) > maxConfigMapDataSize { // check size limit + return false, errOutOfConfigMapSize + } + r.Spec = *spec } + return commit, nil } func (r *ruleResource) commit(ctx context.Context, prometheusResourceClient promresourcesclient.Interface) error { @@ -213,11 +242,11 @@ func (r *ruleResource) commit(ctx context.Context, prometheusResourceClient prom if len(pr.Spec.Groups) == 0 { return prometheusResourceClient.MonitoringV1().PrometheusRules(r.Namespace).Delete(ctx, r.Name, metav1.DeleteOptions{}) } - newPr, err := prometheusResourceClient.MonitoringV1().PrometheusRules(r.Namespace).Update(ctx, &pr, metav1.UpdateOptions{}) + npr, err := prometheusResourceClient.MonitoringV1().PrometheusRules(r.Namespace).Update(ctx, &pr, metav1.UpdateOptions{}) if err != nil { return err } - newPr.DeepCopyInto(&pr) + npr.DeepCopyInto(&pr) return nil } @@ -281,27 +310,27 @@ func (r *PrometheusRuler) ListRuleResources(ruleNamespace *corev1.Namespace, ext return r.informer.Lister().PrometheusRules(ruleNamespace.Name).List(rSelector) } -func (r *PrometheusRuler) AddAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, - extraRuleResourceSelector labels.Selector, - ruleResourceLabels map[string]string, ruleItem *ResourceRuleItem) error { - return errors.New("not supported to add rules for prometheus") +func (r *PrometheusRuler) AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, + extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string, + rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) { + return nil, errors.New("not supported to add rules for prometheus") } -func (r *PrometheusRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, - extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string, ruleItem *ResourceRuleItem) error { - return errors.New("not supported to update rules for prometheus") +func (r *PrometheusRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, + extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string, + ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) { + return nil, errors.New("not supported to update rules for prometheus") } -func (r *PrometheusRuler) DeleteAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, - extraRuleResourceSelector labels.Selector, ruleItem *ResourceRuleItem) error { - return errors.New("not supported to update rules for prometheus") +func (r *PrometheusRuler) DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, + ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) { + return nil, errors.New("not supported to delete rules for prometheus") } type ThanosRuler struct { resource *promresourcesv1.ThanosRuler informer prominformersv1.PrometheusRuleInformer client promresourcesclient.Interface - locker locker.Locker } func NewThanosRuler(resource *promresourcesv1.ThanosRuler, informer prominformersv1.PrometheusRuleInformer, @@ -366,96 +395,62 @@ func (r *ThanosRuler) ListRuleResources(ruleNamespace *corev1.Namespace, extraRu return r.informer.Lister().PrometheusRules(ruleNamespace.Name).List(rSelector) } -func (r *ThanosRuler) AddAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, - extraRuleResourceSelector labels.Selector, - ruleResourceLabels map[string]string, ruleItem *ResourceRuleItem) error { +func (r *ThanosRuler) AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, + extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string, + rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) { + + return r.addAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, nil, ruleResourceLabels, rules...) +} + +func (r *ThanosRuler) addAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, + extraRuleResourceSelector labels.Selector, excludePrometheusRules map[string]struct{}, + ruleResourceLabels map[string]string, rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) { prometheusRules, err := r.ListRuleResources(ruleNamespace, extraRuleResourceSelector) if err != nil { - return err + return nil, err } - - return r.addAlertingRule(ctx, ruleNamespace, prometheusRules, nil, ruleResourceLabels, ruleItem) -} - -func (r *ThanosRuler) addAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, - prometheusRules []*promresourcesv1.PrometheusRule, excludePrometheusRules map[string]*promresourcesv1.PrometheusRule, - ruleResourceLabels map[string]string, ruleItem *ResourceRuleItem) error { - + // sort by the left space to speed up the hit rate sort.Slice(prometheusRules, func(i, j int) bool { return len(fmt.Sprint(prometheusRules[i])) <= len(fmt.Sprint(prometheusRules[j])) }) - for _, prometheusRule := range prometheusRules { - if len(excludePrometheusRules) > 0 { - if _, ok := excludePrometheusRules[prometheusRule.Name]; ok { - continue - } - } - if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error { - resource := ruleResource(*pr) - if ok, err := resource.addAlertingRule(ruleItem); err != nil { - return err - } else if ok { - if err = resource.commit(ctx, r.client); err != nil { - return err - } - } - return nil - }); err != nil { - if err == errOutOfConfigMapSize { - break - } else if resourceNotFound(err) { - continue - } - return err - } - return nil - } - // create a new rule resource and add rule into it when all existing rule resources are full. - group := ruleItem.Group - if group == "" { - group = fmt.Sprintf("%s%d", customRuleGroupDefaultPrefix, 0) - } - newPromRule := promresourcesv1.PrometheusRule{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: ruleNamespace.Name, - GenerateName: customAlertingRuleResourcePrefix, - Labels: ruleResourceLabels, - }, - Spec: promresourcesv1.PrometheusRuleSpec{ - Groups: []promresourcesv1.RuleGroup{{ - Name: group, - Rules: []promresourcesv1.Rule{*ruleItem.Rule}, - }}, - }, - } - if _, err := r.client.MonitoringV1(). - PrometheusRules(ruleNamespace.Name).Create(ctx, &newPromRule, metav1.CreateOptions{}); err != nil { - return errors.Wrapf(err, "error creating a prometheusrule resource %s/%s", - newPromRule.Namespace, newPromRule.Name) - } - return nil -} - -func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, - extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string, ruleItem *ResourceRuleItem) error { - - prometheusRules, err := r.ListRuleResources(ruleNamespace, extraRuleResourceSelector) - if err != nil { - return err - } - var ( - found bool - success bool - prsToDelRule = make(map[string]*promresourcesv1.PrometheusRule) + respItems = make([]*v2alpha1.BulkItemResponse, 0, len(rules)) + cursor int ) - for i, prometheusRule := range prometheusRules { - if success { // If the update has been successful, delete the possible same rule in other resources - if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error { + + resp := func(rule *RuleWithGroup, err error) *v2alpha1.BulkItemResponse { + if err != nil { + return v2alpha1.NewBulkItemErrorServerResponse(rule.Alert, err) + } + return v2alpha1.NewBulkItemSuccessResponse(rule.Alert, v2alpha1.ResultCreated) + } + + for _, pr := range prometheusRules { + if cursor >= len(rules) { + break + } + if len(excludePrometheusRules) > 0 { + if _, ok := excludePrometheusRules[pr.Name]; ok { + continue + } + } + + var ( + err error + num = len(rules) - cursor + limit = len(rules) + rs []*RuleWithGroup + ) + + for i := 1; i <= 2; i++ { + limit = cursor + num/i + rs = rules[cursor:limit] + + err = r.doRuleResourceOperation(ctx, pr.Namespace, pr.Name, func(pr *promresourcesv1.PrometheusRule) error { resource := ruleResource(*pr) - if ok, err := resource.deleteAlertingRule(ruleItem); err != nil { + if ok, err := resource.addAlertingRules(rs...); err != nil { return err } else if ok { if err = resource.commit(ctx, r.client); err != nil { @@ -463,52 +458,223 @@ func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *cor } } return nil - }); err != nil && !resourceNotFound(err) { + }) + if err == errOutOfConfigMapSize && num > 1 { + continue + } + break + } + + switch { + case err == errOutOfConfigMapSize: + break + case resourceNotFound(err): + continue + default: + for _, rule := range rs { + respItems = append(respItems, resp(rule, err)) + } + cursor = limit + } + } + + // create new rule resources and add rest rules into them when all existing rule resources are full. + for cursor < len(rules) { + var ( + err error + num = len(rules) - cursor + limit = len(rules) + rs []*RuleWithGroup + ) + + for i := 1; ; i++ { + limit = cursor + num/i + rs = rules[cursor:limit] + + pr := &promresourcesv1.PrometheusRule{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ruleNamespace.Name, + GenerateName: customAlertingRuleResourcePrefix, + Labels: ruleResourceLabels, + }, + } + resource := ruleResource(*pr) + var ok bool + ok, err = resource.addAlertingRules(rs...) + if err == errOutOfConfigMapSize { + continue + } + if ok { + pr.Spec = resource.Spec + _, err = r.client.MonitoringV1().PrometheusRules(ruleNamespace.Name).Create(ctx, pr, metav1.CreateOptions{}) + } + break + } + + for _, rule := range rs { + respItems = append(respItems, resp(rule, err)) + } + cursor = limit + } + + return respItems, nil +} + +func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, + extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string, + ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) { + + var ( + itemsMap = make(map[string][]*ResourceRuleItem) + respItems = make([]*v2alpha1.BulkItemResponse, 0, len(ruleItems)) + successes = make(map[string]struct{}) + moveMap = make(map[string][]*ResourceRuleItem) + delMap = make(map[string][]*ResourceRuleItem) // duplicate rules that need to deleted in the same resource + + ) + + for i, item := range ruleItems { + itemsMap[item.ResourceName] = append(itemsMap[item.ResourceName], ruleItems[i]) + } + + for name, items := range itemsMap { + var ( + nrules []*RuleWithGroup + nitems []*ResourceRuleItem + ) + + for i := range items { + item := items[i] + if _, ok := successes[item.Alert]; ok { + delMap[name] = append(delMap[name], item) + continue + } + nrules = append(nrules, &item.RuleWithGroup) + nitems = append(nitems, item) + } + if len(nrules) == 0 { + continue + } + + err := r.doRuleResourceOperation(ctx, ruleNamespace.Name, name, func(pr *promresourcesv1.PrometheusRule) error { + resource := ruleResource(*pr) + if ok, err := resource.updateAlertingRules(nrules...); err != nil { return err + } else if ok { + if err = resource.commit(ctx, r.client); err != nil { + return err + } + } + return nil + }) + + switch { + case err == nil: + for _, item := range items { + successes[item.Alert] = struct{}{} + respItems = append(respItems, v2alpha1.NewBulkItemSuccessResponse(item.Alert, v2alpha1.ResultUpdated)) + } + case err == errOutOfConfigMapSize: + moveMap[name] = append(moveMap[name], nitems...) + case resourceNotFound(err): + for _, item := range items { + respItems = append(respItems, &v2alpha1.BulkItemResponse{ + RuleName: item.Alert, + Status: v2alpha1.StatusError, + ErrorType: v2alpha1.ErrNotFound, + }) + } + default: + for _, item := range items { + respItems = append(respItems, v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err)) + } + } + } + + for name, items := range moveMap { + var ( + nrules = make([]*RuleWithGroup, 0, len(items)) + nitems = make(map[string]*ResourceRuleItem, len(items)) + ) + for i := range items { + item := items[i] + nrules = append(nrules, &item.RuleWithGroup) + nitems[item.Alert] = item + } + if len(nrules) == 0 { + continue + } + + aRespItems, err := r.addAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, + map[string]struct{}{name: {}}, ruleResourceLabels, nrules...) + if err != nil { + for _, item := range items { + respItems = append(respItems, v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err)) } continue } - if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error { - resource := ruleResource(*pr) - if ok, err := resource.updateAlertingRule(ruleItem); err != nil { - return err - } else if ok { - if err = resource.commit(ctx, r.client); err != nil { - return err + for i := range aRespItems { + resp := aRespItems[i] + switch resp.Status { + case v2alpha1.StatusSuccess: + if item, ok := nitems[resp.RuleName]; ok { + delMap[name] = append(delMap[name], item) } + default: + respItems = append(respItems, resp) } - return nil - }); err != nil { - if resourceNotFound(err) { - continue - } else if err == errOutOfConfigMapSize { - // updating the rule in the resource may oversize the size limit, - // so delete it and then add the new rule to a new resource. - prsToDelRule[prometheusRule.Name] = prometheusRules[i] - found = true - continue - } - return err } - found = true - success = true } - if !found { - return v2alpha1.ErrAlertingRuleNotFound - } - - if !success { - err := r.addAlertingRule(ctx, ruleNamespace, prometheusRules, prsToDelRule, ruleResourceLabels, &ResourceRuleItem{Rule: ruleItem.Rule}) + for _, items := range delMap { + dRespItems, err := r.DeleteAlertingRules(ctx, ruleNamespace, items...) if err != nil { - return err + for _, item := range items { + respItems = append(respItems, v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err)) + } + continue + } + for i := range dRespItems { + resp := dRespItems[i] + if resp.Status == v2alpha1.StatusSuccess { + resp.Result = v2alpha1.ResultUpdated + } + respItems = append(respItems, resp) } } - for _, pr := range prsToDelRule { - if err := r.doRuleResourceOperation(ctx, pr, func(pr *promresourcesv1.PrometheusRule) error { + + return respItems, nil +} + +func (r *ThanosRuler) DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, + ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) { + + var ( + itemsMap = make(map[string][]*ResourceRuleItem) + respItems = make([]*v2alpha1.BulkItemResponse, 0, len(ruleItems)) + ) + + for i, ruleItem := range ruleItems { + itemsMap[ruleItem.ResourceName] = append(itemsMap[ruleItem.ResourceName], ruleItems[i]) + } + + resp := func(item *ResourceRuleItem, err error) *v2alpha1.BulkItemResponse { + if err != nil { + return v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err) + } + return v2alpha1.NewBulkItemSuccessResponse(item.Alert, v2alpha1.ResultDeleted) + } + + for name, items := range itemsMap { + var rules []*RuleWithGroup + for i := range items { + rules = append(rules, &items[i].RuleWithGroup) + } + + err := r.doRuleResourceOperation(ctx, ruleNamespace.Name, name, func(pr *promresourcesv1.PrometheusRule) error { resource := ruleResource(*pr) - if ok, err := resource.deleteAlertingRule(ruleItem); err != nil { + if ok, err := resource.deleteAlertingRules(rules...); err != nil { return err } else if ok { if err = resource.commit(ctx, r.client); err != nil { @@ -516,52 +682,22 @@ func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *cor } } return nil - }); err != nil && !resourceNotFound(err) { - return err + }) + for _, item := range items { + respItems = append(respItems, resp(item, err)) } } - return nil + + return respItems, nil } -func (r *ThanosRuler) DeleteAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, - extraRuleResourceSelector labels.Selector, ruleItem *ResourceRuleItem) error { - prometheusRules, err := r.ListRuleResources(ruleNamespace, extraRuleResourceSelector) - if err != nil { - return err - } - var success bool - for _, prometheusRule := range prometheusRules { - if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error { - resource := ruleResource(*pr) - if ok, err := resource.deleteAlertingRule(ruleItem); err != nil { - return err - } else if ok { - if err = resource.commit(ctx, r.client); err != nil { - return err - } - } - return nil - }); err != nil { - if resourceNotFound(err) { - continue - } - return err - } - success = true - } - if !success { - return v2alpha1.ErrAlertingRuleNotFound - } - return nil -} - -func (r *ThanosRuler) doRuleResourceOperation(ctx context.Context, pr *promresourcesv1.PrometheusRule, +func (r *ThanosRuler) doRuleResourceOperation(ctx context.Context, namespace, name string, operation func(pr *promresourcesv1.PrometheusRule) error) error { - key := pr.Namespace + "/" + pr.Name + key := namespace + "/" + name return retry.RetryOnConflict(retry.DefaultRetry, func() error { - r.locker.Lock(key) - defer r.locker.Unlock(key) - pr, err := r.client.MonitoringV1().PrometheusRules(pr.Namespace).Get(ctx, pr.Name, metav1.GetOptions{}) + ruleResourceLocker.Lock(key) + defer ruleResourceLocker.Unlock(key) + pr, err := r.client.MonitoringV1().PrometheusRules(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return err } diff --git a/pkg/models/alerting/rules/utils.go b/pkg/models/alerting/rules/utils.go index 5ee59db8b..d4995bae0 100644 --- a/pkg/models/alerting/rules/utils.go +++ b/pkg/models/alerting/rules/utils.go @@ -28,6 +28,9 @@ const ( LabelKeyThanosRulerReplica = "thanos_ruler_replica" LabelKeyPrometheusReplica = "prometheus_replica" + + LabelKeyAlertType = "alerttype" + LabelValueAlertType = "metric" ) func FormatExpr(expr string) (string, error) { @@ -208,7 +211,7 @@ func GetAlertingRulesStatus(ruleNamespace string, ruleChunk *ResourceRuleChunk, func GetAlertingRuleStatus(ruleNamespace string, rule *ResourceRule, epRuleGroups []*alerting.RuleGroup, extLabels func() map[string]string) (*v2alpha1.GettableAlertingRule, error) { - if rule == nil || rule.Rule == nil { + if rule == nil || rule.Alert == "" { return nil, nil } @@ -257,7 +260,7 @@ func GetAlertingRuleStatus(ruleNamespace string, rule *ResourceRule, epRuleGroup func getAlertingRuleStatus(resRule *ResourceRuleItem, epRule *alerting.AlertingRule, custom bool, level v2alpha1.RuleLevel) *v2alpha1.GettableAlertingRule { - if resRule == nil || resRule.Rule == nil { + if resRule == nil || resRule.Alert == "" { return nil } diff --git a/pkg/models/alerting/rules/utils_test.go b/pkg/models/alerting/rules/utils_test.go index 8a1887170..6d0643eda 100644 --- a/pkg/models/alerting/rules/utils_test.go +++ b/pkg/models/alerting/rules/utils_test.go @@ -1,7 +1,6 @@ package rules import ( - "kubesphere.io/kubesphere/pkg/simple/client/alerting" "testing" "github.com/google/go-cmp/cmp" @@ -9,6 +8,7 @@ import ( "github.com/prometheus/prometheus/rules" "k8s.io/apimachinery/pkg/util/intstr" "kubesphere.io/kubesphere/pkg/api/alerting/v2alpha1" + "kubesphere.io/kubesphere/pkg/simple/client/alerting" ) func TestGetAlertingRulesStatus(t *testing.T) { @@ -31,15 +31,17 @@ func TestGetAlertingRulesStatus(t *testing.T) { NameRules: map[string][]*ResourceRuleItem{ "ca7f09e76954e67c": []*ResourceRuleItem{{ ResourceName: "custom-alerting-rule-jqbgn", - Group: "alerting.custom.defaults", - Id: "ca7f09e76954e67c", - Rule: &promresourcesv1.Rule{ - Alert: "TestCPUUsageHigh", - Expr: intstr.FromString(`namespace:workload_cpu_usage:sum{namespace="test"} > 1`), - For: "1m", - Annotations: map[string]string{ - "alias": "The alias is here", - "description": "The description is here", + RuleWithGroup: RuleWithGroup{ + Group: "alerting.custom.defaults", + Id: "ca7f09e76954e67c", + Rule: promresourcesv1.Rule{ + Alert: "TestCPUUsageHigh", + Expr: intstr.FromString(`namespace:workload_cpu_usage:sum{namespace="test"} > 1`), + For: "1m", + Annotations: map[string]string{ + "alias": "The alias is here", + "description": "The description is here", + }, }, }, }}, diff --git a/tools/cmd/doc-gen/main.go b/tools/cmd/doc-gen/main.go index edb8f21eb..b9f407e23 100644 --- a/tools/cmd/doc-gen/main.go +++ b/tools/cmd/doc-gen/main.go @@ -22,7 +22,6 @@ import ( "flag" "fmt" "io/ioutil" - "kubesphere.io/kubesphere/pkg/version" "log" "github.com/emicklei/go-restful" @@ -32,6 +31,7 @@ import ( "github.com/go-openapi/strfmt" "github.com/go-openapi/validate" "github.com/pkg/errors" + promfake "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/fake" urlruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/apiserver/runtime" @@ -53,10 +53,12 @@ import ( tenantv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/tenant/v1alpha2" terminalv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/terminal/v1alpha2" "kubesphere.io/kubesphere/pkg/models/iam/group" + "kubesphere.io/kubesphere/pkg/simple/client/alerting" fakedevops "kubesphere.io/kubesphere/pkg/simple/client/devops/fake" "kubesphere.io/kubesphere/pkg/simple/client/k8s" "kubesphere.io/kubesphere/pkg/simple/client/openpitrix" fakes3 "kubesphere.io/kubesphere/pkg/simple/client/s3/fake" + "kubesphere.io/kubesphere/pkg/version" ) var output string @@ -129,7 +131,9 @@ func generateSwaggerJson() []byte { urlruntime.Must(terminalv1alpha2.AddToContainer(container, clientsets.Kubernetes(), nil)) urlruntime.Must(metricsv1alpha2.AddToContainer(container)) urlruntime.Must(networkv1alpha2.AddToContainer(container, "")) - urlruntime.Must(alertingv2alpha1.AddToContainer(container, informerFactory, nil, nil, nil)) + alertingOptions := &alerting.Options{} + alertingClient, _ := alerting.NewRuleClient(alertingOptions) + urlruntime.Must(alertingv2alpha1.AddToContainer(container, informerFactory, promfake.NewSimpleClientset(), alertingClient, alertingOptions)) config := restfulspec.Config{ WebServices: container.RegisteredWebServices(),