add some comments to alerting bulk
Signed-off-by: junotx <junotx@126.com>
This commit is contained in:
@@ -620,7 +620,7 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp
|
||||
}
|
||||
}
|
||||
|
||||
// check the rules
|
||||
// check all the rules
|
||||
var (
|
||||
br = &v2alpha1.BulkResponse{}
|
||||
nameSet = make(map[string]struct{})
|
||||
@@ -665,10 +665,13 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp
|
||||
}
|
||||
}
|
||||
if len(nameSet) == len(invalids) {
|
||||
return br.Derive(), nil
|
||||
return br.Neaten(), nil
|
||||
}
|
||||
|
||||
// Confirm whether the rules are added or updated
|
||||
// Confirm whether the rules should be added or updated. For each rule that is committed,
|
||||
// it will be added if the rule does not exist, or updated otherwise.
|
||||
// If there are rules with the same name in the existing rules to update, the first will be
|
||||
// updated but the others will be deleted
|
||||
var (
|
||||
addRules []*rules.RuleWithGroup
|
||||
updRules []*rules.ResourceRuleItem
|
||||
@@ -699,7 +702,7 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp
|
||||
}
|
||||
}
|
||||
|
||||
// do the actions
|
||||
// add rules
|
||||
if len(addRules) > 0 {
|
||||
resps, err := ruler.AddAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels, addRules...)
|
||||
if err == nil {
|
||||
@@ -710,6 +713,7 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp
|
||||
}
|
||||
}
|
||||
}
|
||||
// update existing rules
|
||||
if len(updRules) > 0 {
|
||||
resps, err := ruler.UpdateAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, ruleResourceLabels, updRules...)
|
||||
if err == nil {
|
||||
@@ -720,6 +724,7 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp
|
||||
}
|
||||
}
|
||||
}
|
||||
// delete possible duplicate rules
|
||||
if len(delRules) > 0 {
|
||||
_, err := ruler.DeleteAlertingRules(ctx, ruleNamespace, delRules...)
|
||||
if err != nil {
|
||||
@@ -728,7 +733,7 @@ func (o *operator) CreateOrUpdateCustomAlertingRules(ctx context.Context, namesp
|
||||
}
|
||||
}
|
||||
}
|
||||
return br.Derive(), nil
|
||||
return br.Neaten(), nil
|
||||
}
|
||||
|
||||
func (o *operator) DeleteCustomAlertingRules(ctx context.Context, namespace string,
|
||||
@@ -792,7 +797,7 @@ func (o *operator) DeleteCustomAlertingRules(ctx context.Context, namespace stri
|
||||
}
|
||||
br.Items = append(br.Items, respItems...)
|
||||
|
||||
return br.Derive(), nil
|
||||
return br.Neaten(), nil
|
||||
}
|
||||
|
||||
// getPrometheusRuler gets the cluster-in prometheus
|
||||
|
||||
@@ -98,7 +98,7 @@ func (r *ruleResource) deleteAlertingRules(rules ...*RuleWithGroup) (bool, error
|
||||
return commit, nil
|
||||
}
|
||||
|
||||
// updateAlertingRule updates the rules.
|
||||
// updateAlertingRules updates the rules.
|
||||
// If there are rules to be updated, return true to indicate the resource should be updated.
|
||||
func (r *ruleResource) updateAlertingRules(rules ...*RuleWithGroup) (bool, error) {
|
||||
var (
|
||||
@@ -147,15 +147,18 @@ func (r *ruleResource) updateAlertingRules(rules ...*RuleWithGroup) (bool, error
|
||||
return commit, nil
|
||||
}
|
||||
|
||||
// addAlertingRules adds the rules.
|
||||
// If there are rules to be added, return true to indicate the resource should be updated.
|
||||
func (r *ruleResource) addAlertingRules(rules ...*RuleWithGroup) (bool, error) {
|
||||
var (
|
||||
commit bool
|
||||
spec = r.Spec.DeepCopy()
|
||||
groupMax = -1
|
||||
cursor int
|
||||
|
||||
rulesNoGroup []promresourcesv1.Rule
|
||||
rulesWithGroup = make(map[string][]promresourcesv1.Rule)
|
||||
cursor int // indicates which rule to start adding for the rules with no groups
|
||||
|
||||
rulesNoGroup []promresourcesv1.Rule // rules that do not specify group names
|
||||
rulesWithGroup = make(map[string][]promresourcesv1.Rule) // rules that have specific group names
|
||||
)
|
||||
|
||||
for i, rule := range rules {
|
||||
@@ -170,11 +173,13 @@ func (r *ruleResource) addAlertingRules(rules ...*RuleWithGroup) (bool, error) {
|
||||
spec = new(promresourcesv1.PrometheusRuleSpec)
|
||||
}
|
||||
|
||||
// For the rules that have specific group names, add them to the matched groups.
|
||||
// For the rules that do not specify group names, add them to the automatically generated groups until the limit is reached.
|
||||
for i, g := range spec.Groups {
|
||||
var (
|
||||
gName = g.Name
|
||||
doneNoGroup = cursor >= len(rulesNoGroup)
|
||||
doneWithGroup = len(rulesWithGroup) == 0
|
||||
doneNoGroup = cursor >= len(rulesNoGroup) // whether all rules without groups have been added
|
||||
doneWithGroup = len(rulesWithGroup) == 0 // whether all rules with groups have been added
|
||||
)
|
||||
|
||||
if doneNoGroup && doneWithGroup {
|
||||
@@ -201,26 +206,35 @@ func (r *ruleResource) addAlertingRules(rules ...*RuleWithGroup) (bool, error) {
|
||||
|
||||
if size := len(g.Rules); size < customRuleGroupSize {
|
||||
num := customRuleGroupSize - size
|
||||
var limit int
|
||||
if limit = cursor + num; limit > len(rulesNoGroup) {
|
||||
limit = len(rulesNoGroup)
|
||||
var stop int
|
||||
if stop = cursor + num; stop > len(rulesNoGroup) {
|
||||
stop = len(rulesNoGroup)
|
||||
}
|
||||
spec.Groups[i].Rules = append(spec.Groups[i].Rules, rulesNoGroup[cursor:limit]...)
|
||||
cursor = limit
|
||||
spec.Groups[i].Rules = append(spec.Groups[i].Rules, rulesNoGroup[cursor:stop]...)
|
||||
cursor = stop
|
||||
commit = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no groups are available, new groups will be created to place the remaining rules.
|
||||
for gName := range rulesWithGroup {
|
||||
rules := rulesWithGroup[gName]
|
||||
if len(rules) == 0 {
|
||||
continue
|
||||
}
|
||||
spec.Groups = append(spec.Groups, promresourcesv1.RuleGroup{Name: gName, Rules: rules})
|
||||
commit = true
|
||||
}
|
||||
for groupMax++; cursor < len(rules); groupMax++ {
|
||||
g := promresourcesv1.RuleGroup{Name: fmt.Sprintf("%s%d", customRuleGroupDefaultPrefix, groupMax)}
|
||||
var limit int
|
||||
if limit = cursor + customRuleGroupSize; limit > len(rulesNoGroup) {
|
||||
limit = len(rulesNoGroup)
|
||||
var stop int
|
||||
if stop = cursor + customRuleGroupSize; stop > len(rulesNoGroup) {
|
||||
stop = len(rulesNoGroup)
|
||||
}
|
||||
g.Rules = append(g.Rules, rulesNoGroup[cursor:limit]...)
|
||||
g.Rules = append(g.Rules, rulesNoGroup[cursor:stop]...)
|
||||
spec.Groups = append(spec.Groups, g)
|
||||
cursor = limit
|
||||
cursor = stop
|
||||
commit = true
|
||||
}
|
||||
|
||||
@@ -313,18 +327,18 @@ func (r *PrometheusRuler) ListRuleResources(ruleNamespace *corev1.Namespace, ext
|
||||
func (r *PrometheusRuler) AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
|
||||
extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string,
|
||||
rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) {
|
||||
return nil, errors.New("not supported to add rules for prometheus")
|
||||
return nil, errors.New("Adding Prometheus rules not supported")
|
||||
}
|
||||
|
||||
func (r *PrometheusRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
|
||||
extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string,
|
||||
ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) {
|
||||
return nil, errors.New("not supported to update rules for prometheus")
|
||||
return nil, errors.New("Updating Prometheus rules not supported")
|
||||
}
|
||||
|
||||
func (r *PrometheusRuler) DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
|
||||
ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) {
|
||||
return nil, errors.New("not supported to delete rules for prometheus")
|
||||
return nil, errors.New("Deleting Prometheus rules not supported.")
|
||||
}
|
||||
|
||||
type ThanosRuler struct {
|
||||
@@ -438,15 +452,17 @@ func (r *ThanosRuler) addAlertingRules(ctx context.Context, ruleNamespace *corev
|
||||
}
|
||||
|
||||
var (
|
||||
err error
|
||||
num = len(rules) - cursor
|
||||
limit = len(rules)
|
||||
rs []*RuleWithGroup
|
||||
err error
|
||||
num = len(rules) - cursor
|
||||
stop = len(rules)
|
||||
rs []*RuleWithGroup
|
||||
)
|
||||
|
||||
// First add all the rules to this resource,
|
||||
// and if the limit is exceeded, add half
|
||||
for i := 1; i <= 2; i++ {
|
||||
limit = cursor + num/i
|
||||
rs = rules[cursor:limit]
|
||||
stop = cursor + num/i
|
||||
rs = rules[cursor:stop]
|
||||
|
||||
err = r.doRuleResourceOperation(ctx, pr.Namespace, pr.Name, func(pr *promresourcesv1.PrometheusRule) error {
|
||||
resource := ruleResource(*pr)
|
||||
@@ -474,22 +490,24 @@ func (r *ThanosRuler) addAlertingRules(ctx context.Context, ruleNamespace *corev
|
||||
for _, rule := range rs {
|
||||
respItems = append(respItems, resp(rule, err))
|
||||
}
|
||||
cursor = limit
|
||||
cursor = stop
|
||||
}
|
||||
}
|
||||
|
||||
// create new rule resources and add rest rules into them when all existing rule resources are full.
|
||||
// create new rule resources and add rest rules into them
|
||||
// when all existing rule resources are full.
|
||||
for cursor < len(rules) {
|
||||
var (
|
||||
err error
|
||||
num = len(rules) - cursor
|
||||
limit = len(rules)
|
||||
rs []*RuleWithGroup
|
||||
err error
|
||||
num = len(rules) - cursor
|
||||
stop = len(rules)
|
||||
rs []*RuleWithGroup
|
||||
)
|
||||
|
||||
// If adding the rules to the new resource exceeds the limit,
|
||||
// reduce the amount to 1/2, 1/3... of rest rules until it can accommodate.
|
||||
for i := 1; ; i++ {
|
||||
limit = cursor + num/i
|
||||
rs = rules[cursor:limit]
|
||||
stop = cursor + num/i
|
||||
rs = rules[cursor:stop]
|
||||
|
||||
pr := &promresourcesv1.PrometheusRule{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@@ -514,7 +532,7 @@ func (r *ThanosRuler) addAlertingRules(ctx context.Context, ruleNamespace *corev
|
||||
for _, rule := range rs {
|
||||
respItems = append(respItems, resp(rule, err))
|
||||
}
|
||||
cursor = limit
|
||||
cursor = stop
|
||||
}
|
||||
|
||||
return respItems, nil
|
||||
@@ -527,16 +545,22 @@ func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *co
|
||||
var (
|
||||
itemsMap = make(map[string][]*ResourceRuleItem)
|
||||
respItems = make([]*v2alpha1.BulkItemResponse, 0, len(ruleItems))
|
||||
// rules updated successfully. The key is the rule name.
|
||||
successes = make(map[string]struct{})
|
||||
moveMap = make(map[string][]*ResourceRuleItem)
|
||||
delMap = make(map[string][]*ResourceRuleItem) // duplicate rules that need to deleted in the same resource
|
||||
|
||||
// rules to be moved to other resources. The key is the resource name in which the rules were.
|
||||
moveMap = make(map[string][]*ResourceRuleItem)
|
||||
// duplicate rules to be deleted
|
||||
delMap = make(map[string][]*ResourceRuleItem)
|
||||
)
|
||||
|
||||
for i, item := range ruleItems {
|
||||
itemsMap[item.ResourceName] = append(itemsMap[item.ResourceName], ruleItems[i])
|
||||
}
|
||||
|
||||
// Update the rules in the resources where the rules reside.
|
||||
// If duplicate rules are found, the first will be updated and the others will be deleted.
|
||||
// if updating the rules in the original resources causes exceeding size limit,
|
||||
// they will be moved to other resources and then be updated.
|
||||
for name, items := range itemsMap {
|
||||
var (
|
||||
nrules []*RuleWithGroup
|
||||
@@ -574,7 +598,7 @@ func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *co
|
||||
successes[item.Alert] = struct{}{}
|
||||
respItems = append(respItems, v2alpha1.NewBulkItemSuccessResponse(item.Alert, v2alpha1.ResultUpdated))
|
||||
}
|
||||
case err == errOutOfConfigMapSize:
|
||||
case err == errOutOfConfigMapSize: // Cannot update the rules in the original resource
|
||||
moveMap[name] = append(moveMap[name], nitems...)
|
||||
case resourceNotFound(err):
|
||||
for _, item := range items {
|
||||
@@ -591,6 +615,9 @@ func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *co
|
||||
}
|
||||
}
|
||||
|
||||
// The move here is not really move, because the move also requires an update.
|
||||
// So the actual operations are firstly add the new rules in other resources
|
||||
// and then delete the old rules in old resources.
|
||||
for name, items := range moveMap {
|
||||
var (
|
||||
nrules = make([]*RuleWithGroup, 0, len(items))
|
||||
@@ -638,6 +665,7 @@ func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *co
|
||||
for i := range dRespItems {
|
||||
resp := dRespItems[i]
|
||||
if resp.Status == v2alpha1.StatusSuccess {
|
||||
// The delete operation here is for updating, so update the result to v2alpha1.ResultUpdated
|
||||
resp.Result = v2alpha1.ResultUpdated
|
||||
}
|
||||
respItems = append(respItems, resp)
|
||||
@@ -693,6 +721,7 @@ func (r *ThanosRuler) DeleteAlertingRules(ctx context.Context, ruleNamespace *co
|
||||
|
||||
func (r *ThanosRuler) doRuleResourceOperation(ctx context.Context, namespace, name string,
|
||||
operation func(pr *promresourcesv1.PrometheusRule) error) error {
|
||||
// Lock here is used to lock specific resource in order to prevent frequent conflicts
|
||||
key := namespace + "/" + name
|
||||
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
ruleResourceLocker.Lock(key)
|
||||
|
||||
Reference in New Issue
Block a user