Merge pull request #3409 from junotx/ca

add alerting rule bulk api
This commit is contained in:
KubeSphere CI Bot
2021-03-06 19:46:29 +08:00
committed by GitHub
10 changed files with 948 additions and 331 deletions

View File

@@ -18,6 +18,7 @@ package v2alpha1
import (
"context"
"regexp"
"sort"
"strconv"
"strings"
@@ -47,6 +48,8 @@ var (
templateTestData = template.AlertTemplateData(map[string]string{}, map[string]string{}, 0)
templateTestTextPrefix = "{{$labels := .Labels}}{{$externalLabels := .ExternalLabels}}{{$value := .Value}}"
ruleNameMatcher = regexp.MustCompile(`^[a-z0-9]([-a-z0-9]*[a-z0-9])?$`)
)
type RuleLevel string
@@ -70,6 +73,10 @@ func (r *PostableAlertingRule) Validate() error {
if r.Name == "" {
errs = append(errs, errors.New("name can not be empty"))
} else {
if !ruleNameMatcher.MatchString(r.Name) {
errs = append(errs, errors.New("rule name must match regular expression ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$"))
}
}
if r.Query == "" {
@@ -442,3 +449,80 @@ func parseLabelFilters(req *restful.Request) (map[string]string, map[string]stri
}
return labelEqualFilters, labelContainFilters
}
const (
ErrBadData ErrorType = "bad_data"
ErrDuplicateName ErrorType = "duplicate_name"
ErrNotFound ErrorType = "not_found"
ErrServer ErrorType = "server_error"
StatusSuccess Status = "success"
StatusError Status = "error"
ResultCreated Result = "created"
ResultUpdated Result = "updated"
ResultDeleted Result = "deleted"
)
type Status string
type ErrorType string
type Result string
type BulkResponse struct {
Errors bool `json:"errors" description:"If true, one or more operations in the bulk request don't complete successfully"`
Items []*BulkItemResponse `json:"items" description:"It contains the result of each operation in the bulk request"`
}
// MakeBulkResponse tidies the internal items and sets the errors
func (br *BulkResponse) MakeBulkResponse() *BulkResponse {
var (
items []*BulkItemResponse
itemMap = make(map[string]*BulkItemResponse)
)
for i, item := range br.Items {
if item.Status == StatusError {
br.Errors = true
}
pitem, ok := itemMap[item.RuleName]
if !ok || (pitem.Status == StatusSuccess || item.Status == StatusError) {
itemMap[item.RuleName] = br.Items[i]
}
}
for k := range itemMap {
item := itemMap[k]
if item.Error != nil {
item.ErrorStr = item.Error.Error()
}
items = append(items, itemMap[k])
}
br.Items = items
return br
}
type BulkItemResponse struct {
RuleName string `json:"ruleName,omitempty"`
Status Status `json:"status,omitempty" description:"It may be success or error"`
Result Result `json:"result,omitempty" description:"It may be created, updated or deleted, and only for successful operations"`
ErrorType ErrorType `json:"errorType,omitempty" description:"It may be bad_data, duplicate_name, not_found or server_error, and only for failed operations"`
Error error `json:"-"`
ErrorStr string `json:"error,omitempty" description:"It is only returned for failed operations"`
}
func NewBulkItemSuccessResponse(ruleName string, result Result) *BulkItemResponse {
return &BulkItemResponse{
RuleName: ruleName,
Status: StatusSuccess,
Result: result,
}
}
func NewBulkItemErrorServerResponse(ruleName string, err error) *BulkItemResponse {
return &BulkItemResponse{
RuleName: ruleName,
Status: StatusError,
ErrorType: ErrServer,
Error: err,
}
}

View File

@@ -283,3 +283,45 @@ func (h *handler) handleListBuiltinRuleAlerts(req *restful.Request, resp *restfu
resp.WriteEntity(alerts)
}
func (h *handler) handleCreateOrUpdateCustomAlertingRules(req *restful.Request, resp *restful.Response) {
namespace := req.PathParameter("namespace")
var rules []*v2alpha1.PostableAlertingRule
if err := req.ReadEntity(&rules); err != nil {
klog.Error(err)
ksapi.HandleBadRequest(resp, nil, err)
return
}
bulkResp, err := h.operator.CreateOrUpdateCustomAlertingRules(req.Request.Context(), namespace, rules)
if err != nil {
klog.Error(err)
switch {
case err == v2alpha1.ErrThanosRulerNotEnabled:
ksapi.HandleBadRequest(resp, nil, err)
default:
ksapi.HandleInternalError(resp, nil, err)
}
return
}
resp.WriteEntity(bulkResp)
}
func (h *handler) handleDeleteCustomAlertingRules(req *restful.Request, resp *restful.Response) {
namespace := req.PathParameter("namespace")
names := req.QueryParameters("name")
bulkResp, err := h.operator.DeleteCustomAlertingRules(req.Request.Context(), namespace, names)
if err != nil {
klog.Error(err)
switch {
case err == v2alpha1.ErrThanosRulerNotEnabled:
ksapi.HandleBadRequest(resp, nil, err)
default:
ksapi.HandleInternalError(resp, nil, err)
}
return
}
resp.WriteEntity(bulkResp)
}

View File

@@ -102,6 +102,20 @@ func AddToContainer(container *restful.Container, informers informers.InformerFa
Returns(http.StatusOK, ksapi.StatusOK, nil).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag}))
ws.Route(ws.DELETE("/rules").
To(handler.handleDeleteCustomAlertingRules).
Doc("delete multiple cluster-level custom alerting rules").
Param(ws.QueryParameter("name", "rule name").CollectionFormat(restful.CollectionFormatMulti).AllowMultiple(true)).
Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.BulkResponse{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag}))
ws.Route(ws.POST("/bulkrules").
To(handler.handleCreateOrUpdateCustomAlertingRules).
Doc("create or update cluster-level custom alerting rules in bulk").
Reads([]alertingv2alpha1.PostableAlertingRule{}).
Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.BulkResponse{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag}))
ws.Route(ws.PUT("/rules/{rule_name}").
To(handler.handleUpdateCustomAlertingRule).
Doc("update the cluster-level custom alerting rule with the specified name").
@@ -151,6 +165,13 @@ func AddToContainer(container *restful.Container, informers informers.InformerFa
Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.AlertList{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag}))
ws.Route(ws.DELETE("/namespaces/{namespace}/rules").
To(handler.handleDeleteCustomAlertingRules).
Doc("delete multiple custom alerting rules in the specified namespace").
Param(ws.QueryParameter("name", "rule name").CollectionFormat(restful.CollectionFormatMulti).AllowMultiple(true)).
Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.BulkResponse{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag}))
ws.Route(ws.POST("/namespaces/{namespace}/rules").
To(handler.handleCreateCustomAlertingRule).
Doc("create a custom alerting rule in the specified namespace").
@@ -158,6 +179,13 @@ func AddToContainer(container *restful.Container, informers informers.InformerFa
Returns(http.StatusOK, ksapi.StatusOK, "").
Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag}))
ws.Route(ws.POST("/namespaces/{namespace}/bulkrules").
To(handler.handleCreateOrUpdateCustomAlertingRules).
Doc("create or update custom alerting rules in bulk in the specified namespace").
Reads([]alertingv2alpha1.PostableAlertingRule{}).
Returns(http.StatusOK, ksapi.StatusOK, alertingv2alpha1.BulkResponse{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.AlertingTag}))
ws.Route(ws.PUT("/namespaces/{namespace}/rules/{rule_name}").
To(handler.handleUpdateCustomAlertingRule).
Doc("update the custom alerting rule with the specified name in the specified namespace").

View File

@@ -22,14 +22,9 @@ import (
)
const (
rulerNamespace = constants.KubeSphereMonitoringNamespace
customRuleGroupDefault = "alerting.custom.defaults"
customRuleResourceLabelKeyLevel = "custom-alerting-rule-level"
)
rulerNamespace = constants.KubeSphereMonitoringNamespace
var (
maxSecretSize = corev1.MaxSecretSize
maxConfigMapDataSize = int(float64(maxSecretSize) * 0.45)
customRuleResourceLabelKeyLevel = "custom-alerting-rule-level"
)
// Operator contains all operations to alerting rules. The operations may involve manipulations of prometheusrule
@@ -57,6 +52,11 @@ type Operator interface {
// DeleteCustomAlertingRule deletes the custom alerting rule with the given name.
DeleteCustomAlertingRule(ctx context.Context, namespace, ruleName string) error
// CreateOrUpdateCustomAlertingRules creates or updates custom alerting rules in bulk.
CreateOrUpdateCustomAlertingRules(ctx context.Context, namespace string, rs []*v2alpha1.PostableAlertingRule) (*v2alpha1.BulkResponse, error)
// DeleteCustomAlertingRules deletes a batch of custom alerting rules.
DeleteCustomAlertingRules(ctx context.Context, namespace string, ruleNames []string) (*v2alpha1.BulkResponse, error)
// ListBuiltinAlertingRules lists the builtin(non-custom) alerting rules
ListBuiltinAlertingRules(ctx context.Context,
queryParams *v2alpha1.AlertingRuleQueryParams) (*v2alpha1.GettableAlertingRuleList, error)
@@ -241,40 +241,6 @@ func (o *operator) ListBuiltinRuleAlerts(ctx context.Context, ruleId string) (*v
}, nil
}
func (o *operator) ListClusterAlertingRules(ctx context.Context, customFlag string,
queryParams *v2alpha1.AlertingRuleQueryParams) (*v2alpha1.GettableAlertingRuleList, error) {
namespace := rulerNamespace
ruleNamespace, err := o.namespaceInformer.Lister().Get(namespace)
if err != nil {
return nil, err
}
alertingRules, err := o.listCustomAlertingRules(ctx, ruleNamespace, v2alpha1.RuleLevelCluster)
if err != nil {
return nil, err
}
return pageAlertingRules(alertingRules, queryParams), nil
}
func (o *operator) ListClusterRulesAlerts(ctx context.Context,
queryParams *v2alpha1.AlertQueryParams) (*v2alpha1.AlertList, error) {
namespace := rulerNamespace
ruleNamespace, err := o.namespaceInformer.Lister().Get(namespace)
if err != nil {
return nil, err
}
alertingRules, err := o.listCustomAlertingRules(ctx, ruleNamespace, v2alpha1.RuleLevelCluster)
if err != nil {
return nil, err
}
return pageAlerts(alertingRules, queryParams), nil
}
func (o *operator) listCustomAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
level v2alpha1.RuleLevel) ([]*v2alpha1.GettableAlertingRule, error) {
@@ -292,6 +258,10 @@ func (o *operator) listCustomAlertingRules(ctx context.Context, ruleNamespace *c
return nil, err
}
if len(resourceRulesMap) == 0 {
return nil, nil
}
ruleGroups, err := o.ruleClient.ThanosRules(ctx)
if err != nil {
return nil, err
@@ -473,8 +443,20 @@ func (o *operator) CreateCustomAlertingRule(ctx context.Context, namespace strin
setRuleUpdateTime(rule, time.Now())
return ruler.AddAlertingRule(ctx, ruleNamespace, extraRuleResourceSelector,
customRuleGroupDefault, parseToPrometheusRule(rule), ruleResourceLabels)
respItems, err := ruler.AddAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector,
ruleResourceLabels, &rules.RuleWithGroup{Rule: *parseToPrometheusRule(rule)})
if err != nil {
return err
}
for _, item := range respItems {
if item.Status == v2alpha1.StatusError {
if item.ErrorType == v2alpha1.ErrNotFound {
return v2alpha1.ErrAlertingRuleNotFound
}
return item.Error
}
}
return nil
}
func (o *operator) UpdateCustomAlertingRule(ctx context.Context, namespace, name string,
@@ -526,8 +508,21 @@ func (o *operator) UpdateCustomAlertingRule(ctx context.Context, namespace, name
setRuleUpdateTime(rule, time.Now())
return ruler.UpdateAlertingRule(ctx, ruleNamespace, extraRuleResourceSelector,
resourceRule.Group, parseToPrometheusRule(rule), ruleResourceLabels)
respItems, err := ruler.UpdateAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels,
&rules.ResourceRuleItem{ResourceName: resourceRule.ResourceName,
RuleWithGroup: rules.RuleWithGroup{Group: resourceRule.Group, Rule: *parseToPrometheusRule(rule)}})
if err != nil {
return err
}
for _, item := range respItems {
if item.Status == v2alpha1.StatusError {
if item.ErrorType == v2alpha1.ErrNotFound {
return v2alpha1.ErrAlertingRuleNotFound
}
return item.Error
}
}
return nil
}
func (o *operator) DeleteCustomAlertingRule(ctx context.Context, namespace, name string) error {
@@ -555,15 +550,254 @@ func (o *operator) DeleteCustomAlertingRule(ctx context.Context, namespace, name
}
extraRuleResourceSelector := labels.SelectorFromSet(labels.Set{customRuleResourceLabelKeyLevel: string(level)})
resourceRule, err := o.resourceRuleCache.GetRule(ruler, ruleNamespace, extraRuleResourceSelector, name)
resourceRules, err := o.resourceRuleCache.GetRuleByIdOrName(ruler, ruleNamespace, extraRuleResourceSelector, name)
if err != nil {
return err
}
if resourceRule == nil {
if len(resourceRules) == 0 {
return v2alpha1.ErrAlertingRuleNotFound
}
return ruler.DeleteAlertingRule(ctx, ruleNamespace, extraRuleResourceSelector, resourceRule.Group, name)
respItems, err := ruler.DeleteAlertingRules(ctx, ruleNamespace, resourceRules...)
if err != nil {
return err
}
for _, item := range respItems {
if item.Status == v2alpha1.StatusError {
if item.ErrorType == v2alpha1.ErrNotFound {
return v2alpha1.ErrAlertingRuleNotFound
}
return item.Error
}
}
return nil
}
func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namespace string,
rs []*v2alpha1.PostableAlertingRule) (*v2alpha1.BulkResponse, error) {
if l := len(rs); l == 0 {
return &v2alpha1.BulkResponse{}, nil
}
ruler, err := o.getThanosRuler()
if err != nil {
return nil, err
}
if ruler == nil {
return nil, v2alpha1.ErrThanosRulerNotEnabled
}
var (
level v2alpha1.RuleLevel
ruleResourceLabels = make(map[string]string)
)
for k, v := range o.thanosRuleResourceLabels {
ruleResourceLabels[k] = v
}
if namespace == "" {
namespace = rulerNamespace
level = v2alpha1.RuleLevelCluster
} else {
level = v2alpha1.RuleLevelNamespace
}
ruleResourceLabels[customRuleResourceLabelKeyLevel] = string(level)
ruleNamespace, err := o.namespaceInformer.Lister().Get(namespace)
if err != nil {
return nil, err
}
extraRuleResourceSelector := labels.SelectorFromSet(labels.Set{customRuleResourceLabelKeyLevel: string(level)})
resourceRulesMap, err := o.resourceRuleCache.ListRules(ruler, ruleNamespace, extraRuleResourceSelector)
if err != nil {
return nil, err
}
exists := make(map[string][]*rules.ResourceRuleItem)
for _, c := range resourceRulesMap {
for n, items := range c.NameRules {
exists[n] = append(exists[n], items...)
}
}
// check all the rules
var (
br = &v2alpha1.BulkResponse{}
nameSet = make(map[string]struct{})
invalids = make(map[string]struct{})
)
for i := range rs {
var (
r = rs[i]
name = r.Name
)
if _, ok := nameSet[name]; ok {
br.Items = append(br.Items, &v2alpha1.BulkItemResponse{
RuleName: name,
Status: v2alpha1.StatusError,
ErrorType: v2alpha1.ErrDuplicateName,
Error: errors.Errorf("There is more than one rule named %s in the bulk request", name),
})
invalids[name] = struct{}{}
continue
} else {
nameSet[name] = struct{}{}
}
if err := r.Validate(); err != nil {
br.Items = append(br.Items, &v2alpha1.BulkItemResponse{
RuleName: name,
Status: v2alpha1.StatusError,
ErrorType: v2alpha1.ErrBadData,
Error: err,
})
invalids[name] = struct{}{}
continue
}
if level == v2alpha1.RuleLevelNamespace {
expr, err := rules.InjectExprNamespaceLabel(r.Query, namespace)
if err != nil {
br.Items = append(br.Items, v2alpha1.NewBulkItemErrorServerResponse(name, err))
invalids[name] = struct{}{}
continue
}
r.Query = expr
}
}
if len(nameSet) == len(invalids) {
return br.MakeBulkResponse(), nil
}
// Confirm whether the rules should be added or updated. For each rule that is committed,
// it will be added if the rule does not exist, or updated otherwise.
// If there are rules with the same name in the existing rules to update, the first will be
// updated but the others will be deleted
var (
addRules []*rules.RuleWithGroup
updRules []*rules.ResourceRuleItem
delRules []*rules.ResourceRuleItem // duplicate rules that need to deleted in other resources
updateTime = time.Now()
)
for i := range rs {
r := rs[i]
if _, ok := invalids[r.Name]; ok {
continue
}
setRuleUpdateTime(r, updateTime)
if items, ok := exists[r.Name]; ok && len(items) > 0 {
item := items[0]
updRules = append(updRules, &rules.ResourceRuleItem{
ResourceName: item.ResourceName,
RuleWithGroup: rules.RuleWithGroup{Group: item.Group, Rule: *parseToPrometheusRule(r)}})
if len(items) > 1 {
for j := 1; j < len(items); j++ {
if items[j].ResourceName != item.ResourceName {
delRules = append(delRules, items[j])
}
}
}
} else {
addRules = append(addRules, &rules.RuleWithGroup{Rule: *parseToPrometheusRule(r)})
}
}
// add rules
if len(addRules) > 0 {
resps, err := ruler.AddAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels, addRules...)
if err == nil {
br.Items = append(br.Items, resps...)
} else {
for _, rule := range addRules {
br.Items = append(br.Items, v2alpha1.NewBulkItemErrorServerResponse(rule.Alert, err))
}
}
}
// update existing rules
if len(updRules) > 0 {
resps, err := ruler.UpdateAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels, updRules...)
if err == nil {
br.Items = append(br.Items, resps...)
} else {
for _, rule := range updRules {
br.Items = append(br.Items, v2alpha1.NewBulkItemErrorServerResponse(rule.Alert, err))
}
}
}
// delete possible duplicate rules
if len(delRules) > 0 {
_, err := ruler.DeleteAlertingRules(ctx, ruleNamespace, delRules...)
if err != nil {
for _, rule := range delRules {
br.Items = append(br.Items, v2alpha1.NewBulkItemErrorServerResponse(rule.Alert, err))
}
}
}
return br.MakeBulkResponse(), nil
}
func (o *operator) DeleteCustomAlertingRules(ctx context.Context, namespace string,
names []string) (*v2alpha1.BulkResponse, error) {
if l := len(names); l == 0 {
return &v2alpha1.BulkResponse{}, nil
}
ruler, err := o.getThanosRuler()
if err != nil {
return nil, err
}
if ruler == nil {
return nil, v2alpha1.ErrThanosRulerNotEnabled
}
var (
level v2alpha1.RuleLevel
)
if namespace == "" {
namespace = rulerNamespace
level = v2alpha1.RuleLevelCluster
} else {
level = v2alpha1.RuleLevelNamespace
}
ruleNamespace, err := o.namespaceInformer.Lister().Get(namespace)
if err != nil {
return nil, err
}
extraRuleResourceSelector := labels.SelectorFromSet(labels.Set{customRuleResourceLabelKeyLevel: string(level)})
resourceRulesMap, err := o.resourceRuleCache.ListRules(ruler, ruleNamespace, extraRuleResourceSelector)
if err != nil {
return nil, err
}
exists := make(map[string][]*rules.ResourceRuleItem)
for _, c := range resourceRulesMap {
for n, items := range c.NameRules {
exists[n] = append(exists[n], items...)
}
}
br := &v2alpha1.BulkResponse{}
var ruleItems []*rules.ResourceRuleItem
for _, n := range names {
if items, ok := exists[n]; ok {
ruleItems = append(ruleItems, items...)
} else {
br.Items = append(br.Items, &v2alpha1.BulkItemResponse{
RuleName: n,
Status: v2alpha1.StatusError,
ErrorType: v2alpha1.ErrNotFound,
})
}
}
respItems, err := ruler.DeleteAlertingRules(ctx, ruleNamespace, ruleItems...)
if err != nil {
return nil, err
}
br.Items = append(br.Items, respItems...)
return br.MakeBulkResponse(), nil
}
// getPrometheusRuler gets the cluster-in prometheus
@@ -603,6 +837,9 @@ func (o *operator) getThanosRuler() (rules.Ruler, error) {
}
func parseToPrometheusRule(rule *v2alpha1.PostableAlertingRule) *promresourcesv1.Rule {
if _, ok := rule.Labels[rules.LabelKeyAlertType]; !ok {
rule.Labels[rules.LabelKeyAlertType] = rules.LabelValueAlertType
}
return &promresourcesv1.Rule{
Alert: rule.Name,
Expr: intstr.FromString(rule.Query),

View File

@@ -107,6 +107,24 @@ func (c *RuleCache) getResourceRuleCaches(ruler Ruler, ruleNamespace *corev1.Nam
func (c *RuleCache) GetRule(ruler Ruler, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector, idOrName string) (*ResourceRuleItem, error) {
rules, err := c.GetRuleByIdOrName(ruler, ruleNamespace, extraRuleResourceSelector, idOrName)
if err != nil {
return nil, err
}
if l := len(rules); l == 0 {
return nil, nil
} else if l > 1 {
// guarantees the stability of the get operations.
sort.Slice(rules, func(i, j int) bool {
return v2alpha1.AlertingRuleIdCompare(rules[i].Id, rules[j].Id)
})
}
return rules[0], nil
}
func (c *RuleCache) GetRuleByIdOrName(ruler Ruler, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector, idOrName string) ([]*ResourceRuleItem, error) {
caches, err := c.getResourceRuleCaches(ruler, ruleNamespace, extraRuleResourceSelector)
if err != nil {
return nil, err
@@ -121,9 +139,11 @@ func (c *RuleCache) GetRule(ruler Ruler, ruleNamespace *corev1.Namespace,
for rn, rc := range caches {
if rule, ok := rc.IdRules[idOrName]; ok {
rules = append(rules, &ResourceRuleItem{
Group: rule.Group,
Id: rule.Id,
Rule: rule.Rule.DeepCopy(),
RuleWithGroup: RuleWithGroup{
Group: rule.Group,
Id: rule.Id,
Rule: *rule.Rule.DeepCopy(),
},
ResourceName: rn,
})
}
@@ -133,9 +153,11 @@ func (c *RuleCache) GetRule(ruler Ruler, ruleNamespace *corev1.Namespace,
if nrules, ok := rc.NameRules[idOrName]; ok {
for _, nrule := range nrules {
rules = append(rules, &ResourceRuleItem{
Group: nrule.Group,
Id: nrule.Id,
Rule: nrule.Rule.DeepCopy(),
RuleWithGroup: RuleWithGroup{
Group: nrule.Group,
Id: nrule.Id,
Rule: *nrule.Rule.DeepCopy(),
},
ResourceName: rn,
})
}
@@ -145,15 +167,7 @@ func (c *RuleCache) GetRule(ruler Ruler, ruleNamespace *corev1.Namespace,
return nil, errors.New("unsupported ruler type")
}
if l := len(rules); l == 0 {
return nil, nil
} else if l > 1 {
// guarantees the stability of the get operations.
sort.Slice(rules, func(i, j int) bool {
return v2alpha1.AlertingRuleIdCompare(rules[i].Id, rules[j].Id)
})
}
return rules[0], nil
return rules, err
}
func (c *RuleCache) ListRules(ruler Ruler, ruleNamespace *corev1.Namespace,
@@ -178,9 +192,11 @@ func (c *RuleCache) ListRules(ruler Ruler, ruleNamespace *corev1.Namespace,
for _, rule := range rules {
rrs.GroupSet[rule.Group] = struct{}{}
rr := &ResourceRuleItem{
Group: rule.Group,
Id: rule.Id,
Rule: rule.Rule.DeepCopy(),
RuleWithGroup: RuleWithGroup{
Group: rule.Group,
Id: rule.Id,
Rule: *rule.Rule.DeepCopy(),
},
ResourceName: rn,
}
rrs.IdRules[rr.Id] = rr

View File

@@ -13,9 +13,7 @@ type ResourceRuleCollection struct {
type ResourceRuleItem struct {
ResourceName string
Group string
Id string
Rule *promresourcesv1.Rule
RuleWithGroup
}
type ResourceRule struct {
@@ -29,3 +27,9 @@ type ResourceRuleChunk struct {
Custom bool
ResourceRulesMap map[string]*ResourceRuleCollection
}
type RuleWithGroup struct {
Group string
Id string
promresourcesv1.Rule
}

View File

@@ -5,6 +5,8 @@ import (
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"github.com/docker/docker/pkg/locker"
"github.com/ghodss/yaml"
@@ -22,6 +24,9 @@ import (
const (
customAlertingRuleResourcePrefix = "custom-alerting-rule-"
customRuleGroupDefaultPrefix = "alerting.custom.defaults."
customRuleGroupSize = 20
)
var (
@@ -29,6 +34,8 @@ var (
maxConfigMapDataSize = int(float64(maxSecretSize) * 0.45)
errOutOfConfigMapSize = errors.New("out of config map size")
ruleResourceLocker locker.Locker
)
type Ruler interface {
@@ -39,36 +46,44 @@ type Ruler interface {
ListRuleResources(ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector) (
[]*promresourcesv1.PrometheusRule, error)
AddAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector,
group string, rule *promresourcesv1.Rule, ruleResourceLabels map[string]string) error
UpdateAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector,
group string, rule *promresourcesv1.Rule, ruleResourceLabels map[string]string) error
DeleteAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector,
group string, name string) error
AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector,
ruleResourceLabels map[string]string, rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error)
UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector,
ruleResourceLabels map[string]string, ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error)
DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error)
}
type ruleResource promresourcesv1.PrometheusRule
// deleteAlertingRule deletes the rules with the given name.
// If the rule is deleted, return true to indicate the resource should be updated.
func (r *ruleResource) deleteAlertingRule(name string) (bool, error) {
// deleteAlertingRules deletes the rules.
// If there are rules to be deleted, return true to indicate the resource should be updated.
func (r *ruleResource) deleteAlertingRules(rules ...*RuleWithGroup) (bool, error) {
var (
nGroups []promresourcesv1.RuleGroup
ok bool
gs []promresourcesv1.RuleGroup
dels = make(map[string]struct{})
commit bool
)
for _, rule := range rules {
if rule != nil {
dels[rule.Alert] = struct{}{}
}
}
for _, g := range r.Spec.Groups {
var rules []promresourcesv1.Rule
for _, gr := range g.Rules {
if gr.Alert != "" && gr.Alert == name {
ok = true
continue
if gr.Alert != "" {
if _, ok := dels[gr.Alert]; ok {
commit = true
continue
}
}
rules = append(rules, gr)
}
if len(rules) > 0 {
nGroups = append(nGroups, promresourcesv1.RuleGroup{
gs = append(gs, promresourcesv1.RuleGroup{
Name: g.Name,
Interval: g.Interval,
PartialResponseStrategy: g.PartialResponseStrategy,
@@ -77,104 +92,163 @@ func (r *ruleResource) deleteAlertingRule(name string) (bool, error) {
}
}
if ok {
r.Spec.Groups = nGroups
if commit {
r.Spec.Groups = gs
}
return ok, nil
return commit, nil
}
// updateAlertingRule updates the rule with the given group.
// If the rule is updated, return true to indicate the resource should be updated.
func (r *ruleResource) updateAlertingRule(groupName string, rule *promresourcesv1.Rule) (bool, error) {
// updateAlertingRules updates the rules.
// If there are rules to be updated, return true to indicate the resource should be updated.
func (r *ruleResource) updateAlertingRules(rules ...*RuleWithGroup) (bool, error) {
var (
ok bool
pr = (promresourcesv1.PrometheusRule)(*r)
npr = pr.DeepCopy()
groupMap = make(map[string]*promresourcesv1.RuleGroup)
commit bool
spec = r.Spec.DeepCopy()
ruleMap = make(map[string]*RuleWithGroup)
)
for _, g := range npr.Spec.Groups {
var rules []promresourcesv1.Rule
for i, gr := range g.Rules {
if gr.Alert != "" && gr.Alert == rule.Alert {
ok = true
if spec == nil {
return false, nil
}
for i, rule := range rules {
if rule != nil {
ruleMap[rule.Alert] = rules[i]
}
}
for i, g := range spec.Groups {
for j, r := range g.Rules {
if r.Alert == "" {
continue
}
rules = append(rules, g.Rules[i])
}
if len(rules) > 0 {
groupMap[g.Name] = &promresourcesv1.RuleGroup{
Name: g.Name,
Interval: g.Interval,
PartialResponseStrategy: g.PartialResponseStrategy,
Rules: rules,
if b, ok := ruleMap[r.Alert]; ok {
if b == nil {
spec.Groups[i].Rules = append(g.Rules[:j], g.Rules[j+1:]...)
} else {
spec.Groups[i].Rules[j] = b.Rule
ruleMap[r.Alert] = nil // clear to mark it updated
}
commit = true
}
}
}
if ok {
if g, exist := groupMap[groupName]; exist {
g.Rules = append(g.Rules, *rule)
} else {
groupMap[groupName] = &promresourcesv1.RuleGroup{
Name: groupName,
Rules: []promresourcesv1.Rule{*rule},
}
}
var groups []promresourcesv1.RuleGroup
for _, g := range groupMap {
groups = append(groups, *g)
}
npr.Spec.Groups = groups
content, err := yaml.Marshal(npr.Spec)
if commit {
content, err := yaml.Marshal(spec)
if err != nil {
return false, errors.Wrap(err, "failed to unmarshal content")
}
if len(string(content)) < maxConfigMapDataSize { // check size limit
r.Spec.Groups = groups
return true, nil
if len(string(content)) > maxConfigMapDataSize { // check size limit
return false, errOutOfConfigMapSize
}
return false, errOutOfConfigMapSize
r.Spec = *spec
}
return false, nil
return commit, nil
}
func (r *ruleResource) addAlertingRule(group string, rule *promresourcesv1.Rule) (bool, error) {
// addAlertingRules adds the rules.
// If there are rules to be added, return true to indicate the resource should be updated.
func (r *ruleResource) addAlertingRules(rules ...*RuleWithGroup) (bool, error) {
var (
err error
pr = (promresourcesv1.PrometheusRule)(*r)
npr = pr.DeepCopy()
ok bool
commit bool
spec = r.Spec.DeepCopy()
groupMax = -1
cursor int // indicates which rule to start adding for the rules with no groups
unGroupedRules []promresourcesv1.Rule // rules that do not specify group names
groupedRules = make(map[string][]promresourcesv1.Rule) // rules that have specific group names
)
for i := 0; i < len(npr.Spec.Groups); i++ {
if npr.Spec.Groups[i].Name == group {
npr.Spec.Groups[i].Rules = append(npr.Spec.Groups[i].Rules, *rule)
ok = true
break
for i, rule := range rules {
if len(strings.TrimSpace(rule.Group)) == 0 {
unGroupedRules = append(unGroupedRules, rules[i].Rule)
} else {
groupedRules[rule.Group] = append(groupedRules[rule.Group], rules[i].Rule)
}
}
if !ok { // add a group when there is no group with the specified group name
npr.Spec.Groups = append(npr.Spec.Groups, promresourcesv1.RuleGroup{
Name: group,
Rules: []promresourcesv1.Rule{*rule},
})
if spec == nil {
spec = new(promresourcesv1.PrometheusRuleSpec)
}
content, err := yaml.Marshal(npr.Spec)
if err != nil {
return false, errors.Wrap(err, "failed to unmarshal content")
// For the rules that have specific group names, add them to the matched groups.
// For the rules that do not specify group names, add them to the automatically generated groups until the limit is reached.
for i, g := range spec.Groups {
var (
gName = g.Name
unGroupedRulesDrained = cursor >= len(unGroupedRules) // whether all rules without groups have been added
groupedRulesDrained = len(groupedRules) == 0 // whether all rules with groups have been added
)
if unGroupedRulesDrained && groupedRulesDrained {
break
}
if !groupedRulesDrained {
if _, ok := groupedRules[gName]; ok {
spec.Groups[i].Rules = append(spec.Groups[i].Rules, groupedRules[gName]...)
delete(groupedRules, gName)
commit = true
}
}
g = spec.Groups[i]
if !unGroupedRulesDrained && strings.HasPrefix(gName, customRuleGroupDefaultPrefix) {
suf, err := strconv.Atoi(strings.TrimPrefix(gName, customRuleGroupDefaultPrefix))
if err != nil {
continue
}
if suf > groupMax {
groupMax = suf
}
if size := len(g.Rules); size < customRuleGroupSize {
num := customRuleGroupSize - size
var stop int
if stop = cursor + num; stop > len(unGroupedRules) {
stop = len(unGroupedRules)
}
spec.Groups[i].Rules = append(spec.Groups[i].Rules, unGroupedRules[cursor:stop]...)
cursor = stop
commit = true
}
}
}
if len(string(content)) < maxConfigMapDataSize { // check size limit
r.Spec.Groups = npr.Spec.Groups
return true, nil
} else {
return false, errOutOfConfigMapSize
// If no groups are available, new groups will be created to place the remaining rules.
for gName := range groupedRules {
rules := groupedRules[gName]
if len(rules) == 0 {
continue
}
spec.Groups = append(spec.Groups, promresourcesv1.RuleGroup{Name: gName, Rules: rules})
commit = true
}
for groupMax++; cursor < len(rules); groupMax++ {
g := promresourcesv1.RuleGroup{Name: fmt.Sprintf("%s%d", customRuleGroupDefaultPrefix, groupMax)}
var stop int
if stop = cursor + customRuleGroupSize; stop > len(unGroupedRules) {
stop = len(unGroupedRules)
}
g.Rules = append(g.Rules, unGroupedRules[cursor:stop]...)
spec.Groups = append(spec.Groups, g)
cursor = stop
commit = true
}
if commit {
content, err := yaml.Marshal(spec)
if err != nil {
return false, errors.Wrap(err, "failed to unmarshal content")
}
if len(string(content)) > maxConfigMapDataSize { // check size limit
return false, errOutOfConfigMapSize
}
r.Spec = *spec
}
return commit, nil
}
func (r *ruleResource) commit(ctx context.Context, prometheusResourceClient promresourcesclient.Interface) error {
@@ -182,11 +256,11 @@ func (r *ruleResource) commit(ctx context.Context, prometheusResourceClient prom
if len(pr.Spec.Groups) == 0 {
return prometheusResourceClient.MonitoringV1().PrometheusRules(r.Namespace).Delete(ctx, r.Name, metav1.DeleteOptions{})
}
newPr, err := prometheusResourceClient.MonitoringV1().PrometheusRules(r.Namespace).Update(ctx, &pr, metav1.UpdateOptions{})
npr, err := prometheusResourceClient.MonitoringV1().PrometheusRules(r.Namespace).Update(ctx, &pr, metav1.UpdateOptions{})
if err != nil {
return err
}
newPr.DeepCopyInto(&pr)
npr.DeepCopyInto(&pr)
return nil
}
@@ -250,29 +324,27 @@ func (r *PrometheusRuler) ListRuleResources(ruleNamespace *corev1.Namespace, ext
return r.informer.Lister().PrometheusRules(ruleNamespace.Name).List(rSelector)
}
func (r *PrometheusRuler) AddAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector,
group string, rule *promresourcesv1.Rule, ruleResourceLabels map[string]string) error {
return errors.New("not supported to add rules for prometheus")
func (r *PrometheusRuler) AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string,
rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) {
return nil, errors.New("Adding Prometheus rules not supported")
}
func (r *PrometheusRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector,
group string, rule *promresourcesv1.Rule, ruleResourceLabels map[string]string) error {
return errors.New("not supported to update rules for prometheus")
func (r *PrometheusRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string,
ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) {
return nil, errors.New("Updating Prometheus rules not supported")
}
func (r *PrometheusRuler) DeleteAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector,
group string, name string) error {
return errors.New("not supported to update rules for prometheus")
func (r *PrometheusRuler) DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) {
return nil, errors.New("Deleting Prometheus rules not supported.")
}
type ThanosRuler struct {
resource *promresourcesv1.ThanosRuler
informer prominformersv1.PrometheusRuleInformer
client promresourcesclient.Interface
locker locker.Locker
}
func NewThanosRuler(resource *promresourcesv1.ThanosRuler, informer prominformersv1.PrometheusRuleInformer,
@@ -337,93 +409,64 @@ func (r *ThanosRuler) ListRuleResources(ruleNamespace *corev1.Namespace, extraRu
return r.informer.Lister().PrometheusRules(ruleNamespace.Name).List(rSelector)
}
func (r *ThanosRuler) AddAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector,
group string, rule *promresourcesv1.Rule, ruleResourceLabels map[string]string) error {
func (r *ThanosRuler) AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string,
rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) {
return r.addAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, nil, ruleResourceLabels, rules...)
}
func (r *ThanosRuler) addAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector, excludePrometheusRules map[string]struct{},
ruleResourceLabels map[string]string, rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) {
prometheusRules, err := r.ListRuleResources(ruleNamespace, extraRuleResourceSelector)
if err != nil {
return err
return nil, err
}
return r.addAlertingRule(ctx, ruleNamespace, prometheusRules, nil, group, rule, ruleResourceLabels)
}
func (r *ThanosRuler) addAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace,
prometheusRules []*promresourcesv1.PrometheusRule, excludePrometheusRules map[string]*promresourcesv1.PrometheusRule,
group string, rule *promresourcesv1.Rule, ruleResourceLabels map[string]string) error {
// sort by the left space to speed up the hit rate
sort.Slice(prometheusRules, func(i, j int) bool {
return len(fmt.Sprint(prometheusRules[i])) <= len(fmt.Sprint(prometheusRules[j]))
})
for _, prometheusRule := range prometheusRules {
if len(excludePrometheusRules) > 0 {
if _, ok := excludePrometheusRules[prometheusRule.Name]; ok {
continue
}
}
if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.addAlertingRule(group, rule); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
return err
}
}
return nil
}); err != nil {
if err == errOutOfConfigMapSize {
break
} else if resourceNotFound(err) {
continue
}
return err
}
return nil
}
// create a new rule resource and add rule into it when all existing rule resources are full.
newPromRule := promresourcesv1.PrometheusRule{
ObjectMeta: metav1.ObjectMeta{
Namespace: ruleNamespace.Name,
GenerateName: customAlertingRuleResourcePrefix,
Labels: ruleResourceLabels,
},
Spec: promresourcesv1.PrometheusRuleSpec{
Groups: []promresourcesv1.RuleGroup{{
Name: group,
Rules: []promresourcesv1.Rule{*rule},
}},
},
}
if _, err := r.client.MonitoringV1().
PrometheusRules(ruleNamespace.Name).Create(ctx, &newPromRule, metav1.CreateOptions{}); err != nil {
return errors.Wrapf(err, "error creating a prometheusrule resource %s/%s",
newPromRule.Namespace, newPromRule.Name)
}
return nil
}
func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector,
group string, rule *promresourcesv1.Rule, ruleResourceLabels map[string]string) error {
prometheusRules, err := r.ListRuleResources(ruleNamespace, extraRuleResourceSelector)
if err != nil {
return err
}
var (
found bool
success bool
prsToDelRule = make(map[string]*promresourcesv1.PrometheusRule)
respItems = make([]*v2alpha1.BulkItemResponse, 0, len(rules))
cursor int
)
for i, prometheusRule := range prometheusRules {
if success { // If the update has been successful, delete the possible same rule in other resources
if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error {
resp := func(rule *RuleWithGroup, err error) *v2alpha1.BulkItemResponse {
if err != nil {
return v2alpha1.NewBulkItemErrorServerResponse(rule.Alert, err)
}
return v2alpha1.NewBulkItemSuccessResponse(rule.Alert, v2alpha1.ResultCreated)
}
for _, pr := range prometheusRules {
if cursor >= len(rules) {
break
}
if len(excludePrometheusRules) > 0 {
if _, ok := excludePrometheusRules[pr.Name]; ok {
continue
}
}
var (
err error
num = len(rules) - cursor
stop = len(rules)
rs []*RuleWithGroup
)
// First add all the rules to this resource,
// and if the limit is exceeded, add half
for i := 1; i <= 2; i++ {
stop = cursor + num/i
rs = rules[cursor:stop]
err = r.doRuleResourceOperation(ctx, pr.Namespace, pr.Name, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil {
if ok, err := resource.addAlertingRules(rs...); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
@@ -431,52 +474,235 @@ func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *cor
}
}
return nil
}); err != nil && !resourceNotFound(err) {
})
if err == errOutOfConfigMapSize && num > 1 {
continue
}
break
}
switch {
case err == errOutOfConfigMapSize:
break
case resourceNotFound(err):
continue
default:
for _, rule := range rs {
respItems = append(respItems, resp(rule, err))
}
cursor = stop
}
}
// create new rule resources and add rest rules into them
// when all existing rule resources are full.
for cursor < len(rules) {
var (
err error
num = len(rules) - cursor
stop = len(rules)
rs []*RuleWithGroup
)
// If adding the rules to the new resource exceeds the limit,
// reduce the amount to 1/2, 1/3... of rest rules until the new resource can accommodate.
for i := 1; ; i++ {
stop = cursor + num/i
rs = rules[cursor:stop]
pr := &promresourcesv1.PrometheusRule{
ObjectMeta: metav1.ObjectMeta{
Namespace: ruleNamespace.Name,
GenerateName: customAlertingRuleResourcePrefix,
Labels: ruleResourceLabels,
},
}
resource := ruleResource(*pr)
var ok bool
ok, err = resource.addAlertingRules(rs...)
if err == errOutOfConfigMapSize {
continue
}
if ok {
pr.Spec = resource.Spec
_, err = r.client.MonitoringV1().PrometheusRules(ruleNamespace.Name).Create(ctx, pr, metav1.CreateOptions{})
}
break
}
for _, rule := range rs {
respItems = append(respItems, resp(rule, err))
}
cursor = stop
}
return respItems, nil
}
func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string,
ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) {
var (
itemsMap = make(map[string][]*ResourceRuleItem)
respItems = make([]*v2alpha1.BulkItemResponse, 0, len(ruleItems))
// rules updated successfully. The key is the rule name.
rulesUpdated = make(map[string]struct{})
// rules to be moved to other resources. The key is the resource name in which the rules reside.
rulesToMove = make(map[string][]*ResourceRuleItem)
// duplicate rules to be deleted
rulesToDelete = make(map[string][]*ResourceRuleItem)
)
for i, item := range ruleItems {
itemsMap[item.ResourceName] = append(itemsMap[item.ResourceName], ruleItems[i])
}
// Update the rules in the resources where the rules reside.
// If duplicate rules are found, the first will be updated and the others will be deleted.
// if updating the rules in the original resources causes exceeding size limit,
// they will be moved to other resources and then be updated.
for name, items := range itemsMap {
var (
nrules []*RuleWithGroup
nitems []*ResourceRuleItem
)
for i := range items {
item := items[i]
if _, ok := rulesUpdated[item.Alert]; ok {
rulesToDelete[name] = append(rulesToDelete[name], item)
continue
}
nrules = append(nrules, &item.RuleWithGroup)
nitems = append(nitems, item)
}
if len(nrules) == 0 {
continue
}
err := r.doRuleResourceOperation(ctx, ruleNamespace.Name, name, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.updateAlertingRules(nrules...); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
return err
}
}
return nil
})
switch {
case err == nil:
for _, item := range items {
rulesUpdated[item.Alert] = struct{}{}
respItems = append(respItems, v2alpha1.NewBulkItemSuccessResponse(item.Alert, v2alpha1.ResultUpdated))
}
case err == errOutOfConfigMapSize: // Cannot update the rules in the original resource
rulesToMove[name] = append(rulesToMove[name], nitems...)
case resourceNotFound(err):
for _, item := range items {
respItems = append(respItems, &v2alpha1.BulkItemResponse{
RuleName: item.Alert,
Status: v2alpha1.StatusError,
ErrorType: v2alpha1.ErrNotFound,
})
}
default:
for _, item := range items {
respItems = append(respItems, v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err))
}
}
}
// The move here is not really move, because the move also requires an update.
// What really happens is that the new rules will be added in other resources first,
// and then the old rules will be deleted from the original resources.
for name, items := range rulesToMove {
var (
nrules = make([]*RuleWithGroup, 0, len(items))
nitems = make(map[string]*ResourceRuleItem, len(items))
)
for i := range items {
item := items[i]
nrules = append(nrules, &item.RuleWithGroup)
nitems[item.Alert] = item
}
if len(nrules) == 0 {
continue
}
aRespItems, err := r.addAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector,
map[string]struct{}{name: {}}, ruleResourceLabels, nrules...)
if err != nil {
for _, item := range items {
respItems = append(respItems, v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err))
}
continue
}
if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.updateAlertingRule(group, rule); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
return err
for i := range aRespItems {
resp := aRespItems[i]
switch resp.Status {
case v2alpha1.StatusSuccess:
if item, ok := nitems[resp.RuleName]; ok {
rulesToDelete[name] = append(rulesToDelete[name], item)
}
default:
respItems = append(respItems, resp)
}
return nil
}); err != nil {
if resourceNotFound(err) {
continue
} else if err == errOutOfConfigMapSize {
// updating the rule in the resource may oversize the size limit,
// so delete it and then add the new rule to a new resource.
prsToDelRule[prometheusRule.Name] = prometheusRules[i]
found = true
continue
}
return err
}
found = true
success = true
}
if !found {
return v2alpha1.ErrAlertingRuleNotFound
}
if !success {
err := r.addAlertingRule(ctx, ruleNamespace, prometheusRules, prsToDelRule, group, rule, ruleResourceLabels)
for _, items := range rulesToDelete {
dRespItems, err := r.DeleteAlertingRules(ctx, ruleNamespace, items...)
if err != nil {
return err
for _, item := range items {
respItems = append(respItems, v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err))
}
continue
}
for i := range dRespItems {
resp := dRespItems[i]
if resp.Status == v2alpha1.StatusSuccess {
// The delete operation here is for updating, so update the result to v2alpha1.ResultUpdated
resp.Result = v2alpha1.ResultUpdated
}
respItems = append(respItems, resp)
}
}
for _, pr := range prsToDelRule {
if err := r.doRuleResourceOperation(ctx, pr, func(pr *promresourcesv1.PrometheusRule) error {
return respItems, nil
}
func (r *ThanosRuler) DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) {
var (
itemsMap = make(map[string][]*ResourceRuleItem)
respItems = make([]*v2alpha1.BulkItemResponse, 0, len(ruleItems))
)
for i, ruleItem := range ruleItems {
itemsMap[ruleItem.ResourceName] = append(itemsMap[ruleItem.ResourceName], ruleItems[i])
}
resp := func(item *ResourceRuleItem, err error) *v2alpha1.BulkItemResponse {
if err != nil {
return v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err)
}
return v2alpha1.NewBulkItemSuccessResponse(item.Alert, v2alpha1.ResultDeleted)
}
for name, items := range itemsMap {
var rules []*RuleWithGroup
for i := range items {
rules = append(rules, &items[i].RuleWithGroup)
}
err := r.doRuleResourceOperation(ctx, ruleNamespace.Name, name, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil {
if ok, err := resource.deleteAlertingRules(rules...); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
@@ -484,52 +710,23 @@ func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *cor
}
}
return nil
}); err != nil && !resourceNotFound(err) {
return err
})
for _, item := range items {
respItems = append(respItems, resp(item, err))
}
}
return nil
return respItems, nil
}
func (r *ThanosRuler) DeleteAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace,
extraRuleResourceSelector labels.Selector, group string, name string) error {
prometheusRules, err := r.ListRuleResources(ruleNamespace, extraRuleResourceSelector)
if err != nil {
return err
}
var success bool
for _, prometheusRule := range prometheusRules {
if err := r.doRuleResourceOperation(ctx, prometheusRule, func(pr *promresourcesv1.PrometheusRule) error {
resource := ruleResource(*pr)
if ok, err := resource.deleteAlertingRule(name); err != nil {
return err
} else if ok {
if err = resource.commit(ctx, r.client); err != nil {
return err
}
}
return nil
}); err != nil {
if resourceNotFound(err) {
continue
}
return err
}
success = true
}
if !success {
return v2alpha1.ErrAlertingRuleNotFound
}
return nil
}
func (r *ThanosRuler) doRuleResourceOperation(ctx context.Context, pr *promresourcesv1.PrometheusRule,
func (r *ThanosRuler) doRuleResourceOperation(ctx context.Context, namespace, name string,
operation func(pr *promresourcesv1.PrometheusRule) error) error {
key := pr.Namespace + "/" + pr.Name
// Lock here is used to lock specific resource in order to prevent frequent conflicts
key := namespace + "/" + name
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
r.locker.Lock(key)
defer r.locker.Unlock(key)
pr, err := r.client.MonitoringV1().PrometheusRules(pr.Namespace).Get(ctx, pr.Name, metav1.GetOptions{})
ruleResourceLocker.Lock(key)
defer ruleResourceLocker.Unlock(key)
pr, err := r.client.MonitoringV1().PrometheusRules(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}

View File

@@ -28,6 +28,9 @@ const (
LabelKeyThanosRulerReplica = "thanos_ruler_replica"
LabelKeyPrometheusReplica = "prometheus_replica"
LabelKeyAlertType = "alerttype"
LabelValueAlertType = "metric"
)
func FormatExpr(expr string) (string, error) {
@@ -208,7 +211,7 @@ func GetAlertingRulesStatus(ruleNamespace string, ruleChunk *ResourceRuleChunk,
func GetAlertingRuleStatus(ruleNamespace string, rule *ResourceRule, epRuleGroups []*alerting.RuleGroup,
extLabels func() map[string]string) (*v2alpha1.GettableAlertingRule, error) {
if rule == nil || rule.Rule == nil {
if rule == nil || rule.Alert == "" {
return nil, nil
}
@@ -257,7 +260,7 @@ func GetAlertingRuleStatus(ruleNamespace string, rule *ResourceRule, epRuleGroup
func getAlertingRuleStatus(resRule *ResourceRuleItem, epRule *alerting.AlertingRule,
custom bool, level v2alpha1.RuleLevel) *v2alpha1.GettableAlertingRule {
if resRule == nil || resRule.Rule == nil {
if resRule == nil || resRule.Alert == "" {
return nil
}

View File

@@ -1,7 +1,6 @@
package rules
import (
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
"testing"
"github.com/google/go-cmp/cmp"
@@ -9,6 +8,7 @@ import (
"github.com/prometheus/prometheus/rules"
"k8s.io/apimachinery/pkg/util/intstr"
"kubesphere.io/kubesphere/pkg/api/alerting/v2alpha1"
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
)
func TestGetAlertingRulesStatus(t *testing.T) {
@@ -31,15 +31,17 @@ func TestGetAlertingRulesStatus(t *testing.T) {
NameRules: map[string][]*ResourceRuleItem{
"ca7f09e76954e67c": []*ResourceRuleItem{{
ResourceName: "custom-alerting-rule-jqbgn",
Group: "alerting.custom.defaults",
Id: "ca7f09e76954e67c",
Rule: &promresourcesv1.Rule{
Alert: "TestCPUUsageHigh",
Expr: intstr.FromString(`namespace:workload_cpu_usage:sum{namespace="test"} > 1`),
For: "1m",
Annotations: map[string]string{
"alias": "The alias is here",
"description": "The description is here",
RuleWithGroup: RuleWithGroup{
Group: "alerting.custom.defaults",
Id: "ca7f09e76954e67c",
Rule: promresourcesv1.Rule{
Alert: "TestCPUUsageHigh",
Expr: intstr.FromString(`namespace:workload_cpu_usage:sum{namespace="test"} > 1`),
For: "1m",
Annotations: map[string]string{
"alias": "The alias is here",
"description": "The description is here",
},
},
},
}},

View File

@@ -22,7 +22,6 @@ import (
"flag"
"fmt"
"io/ioutil"
"kubesphere.io/kubesphere/pkg/version"
"log"
"github.com/emicklei/go-restful"
@@ -32,6 +31,7 @@ import (
"github.com/go-openapi/strfmt"
"github.com/go-openapi/validate"
"github.com/pkg/errors"
promfake "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/fake"
urlruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apiserver/runtime"
@@ -53,10 +53,12 @@ import (
tenantv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/tenant/v1alpha2"
terminalv1alpha2 "kubesphere.io/kubesphere/pkg/kapis/terminal/v1alpha2"
"kubesphere.io/kubesphere/pkg/models/iam/group"
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
fakedevops "kubesphere.io/kubesphere/pkg/simple/client/devops/fake"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix"
fakes3 "kubesphere.io/kubesphere/pkg/simple/client/s3/fake"
"kubesphere.io/kubesphere/pkg/version"
)
var output string
@@ -129,7 +131,9 @@ func generateSwaggerJson() []byte {
urlruntime.Must(terminalv1alpha2.AddToContainer(container, clientsets.Kubernetes(), nil))
urlruntime.Must(metricsv1alpha2.AddToContainer(container))
urlruntime.Must(networkv1alpha2.AddToContainer(container, ""))
urlruntime.Must(alertingv2alpha1.AddToContainer(container, informerFactory, nil, nil, nil))
alertingOptions := &alerting.Options{}
alertingClient, _ := alerting.NewRuleClient(alertingOptions)
urlruntime.Must(alertingv2alpha1.AddToContainer(container, informerFactory, promfake.NewSimpleClientset(), alertingClient, alertingOptions))
config := restfulspec.Config{
WebServices: container.RegisteredWebServices(),