/* * Copyright 2024 the KubeSphere Authors. * Please refer to the LICENSE file in the root directory of the project. * https://github.com/kubesphere/kubesphere/blob/master/LICENSE */ package quota import ( "context" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" utilwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/storage" "k8s.io/klog/v2" "k8s.io/utils/lru" quotav1alpha2 "kubesphere.io/api/quota/v1alpha2" "sigs.k8s.io/controller-runtime/pkg/client" utilquota "kubesphere.io/kubesphere/kube/pkg/quota/v1" ) // Following code copied from github.com/openshift/apiserver-library-go/pkg/admission/quota/clusterresourcequota type accessor struct { client client.Client // updatedResourceQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to // back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions // for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts updatedResourceQuotas *lru.Cache } // newQuotaAccessor creates an object that conforms to the QuotaAccessor interface to be used to retrieve quota objects. func newQuotaAccessor(client client.Client) *accessor { updatedCache := lru.New(100) return &accessor{ client: client, updatedResourceQuotas: updatedCache, } } // UpdateQuotaStatus the newQuota coming in will be incremented from the original. The difference between the original // and the new is the amount to add to the namespace total, but the total status is the used value itself func (a *accessor) UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error { // skipping namespaced resource quota if newQuota.APIVersion != quotav1alpha2.SchemeGroupVersion.String() { klog.V(6).Infof("skipping namespaced resource quota %v %v", newQuota.Namespace, newQuota.Name) return nil } ctx := context.TODO() resourceQuota := "av1alpha2.ResourceQuota{} err := a.client.Get(ctx, types.NamespacedName{Name: newQuota.Name}, resourceQuota) if err != nil { klog.Errorf("failed to fetch resource quota: %s, %v", newQuota.Name, err) return err } resourceQuota = a.checkCache(resourceQuota) // re-assign objectmeta // make a copy updatedQuota := resourceQuota.DeepCopy() updatedQuota.ObjectMeta = newQuota.ObjectMeta updatedQuota.Namespace = "" // determine change in usage usageDiff := utilquota.Subtract(newQuota.Status.Used, updatedQuota.Status.Total.Used) // update aggregate usage updatedQuota.Status.Total.Used = newQuota.Status.Used // update per namespace totals oldNamespaceTotals, _ := getResourceQuotasStatusByNamespace(updatedQuota.Status.Namespaces, newQuota.Namespace) namespaceTotalCopy := oldNamespaceTotals.DeepCopy() newNamespaceTotals := *namespaceTotalCopy newNamespaceTotals.Used = utilquota.Add(oldNamespaceTotals.Used, usageDiff) insertResourceQuotasStatus(&updatedQuota.Status.Namespaces, quotav1alpha2.ResourceQuotaStatusByNamespace{ Namespace: newQuota.Namespace, ResourceQuotaStatus: newNamespaceTotals, }) klog.V(6).Infof("update resource quota: %+v", updatedQuota) err = a.client.Status().Update(ctx, updatedQuota) if err != nil { klog.Errorf("failed to update resource quota: %v", err) return err } a.updatedResourceQuotas.Add(resourceQuota.Name, updatedQuota) return nil } var storageVersioner = storage.APIObjectVersioner{} // checkCache compares the passed quota against the value in the look-aside cache and returns the newer // if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions // being monotonically increasing integers func (a *accessor) checkCache(resourceQuota *quotav1alpha2.ResourceQuota) *quotav1alpha2.ResourceQuota { uncastCachedQuota, ok := a.updatedResourceQuotas.Get(resourceQuota.Name) if !ok { return resourceQuota } cachedQuota := uncastCachedQuota.(*quotav1alpha2.ResourceQuota) if storageVersioner.CompareResourceVersion(resourceQuota, cachedQuota) >= 0 { a.updatedResourceQuotas.Remove(resourceQuota.Name) return resourceQuota } return cachedQuota } func (a *accessor) GetQuotas(namespaceName string) ([]corev1.ResourceQuota, error) { resourceQuotaNames, err := a.waitForReadyResourceQuotaNames(namespaceName) if err != nil { klog.Errorf("failed to fetch resource quota names: %v, %v", namespaceName, err) return nil, err } var result []corev1.ResourceQuota for _, resourceQuotaName := range resourceQuotaNames { resourceQuota := "av1alpha2.ResourceQuota{} err = a.client.Get(context.TODO(), types.NamespacedName{Name: resourceQuotaName}, resourceQuota) if err != nil { klog.Errorf("failed to fetch resource quota %s: %v", resourceQuotaName, err) return result, err } resourceQuota = a.checkCache(resourceQuota) // now convert to a ResourceQuota convertedQuota := corev1.ResourceQuota{} convertedQuota.APIVersion = quotav1alpha2.SchemeGroupVersion.String() convertedQuota.ObjectMeta = resourceQuota.ObjectMeta convertedQuota.Namespace = namespaceName convertedQuota.Spec = resourceQuota.Spec.Quota convertedQuota.Status = resourceQuota.Status.Total result = append(result, convertedQuota) } // avoid conflicts with namespaced resource quota namespacedResourceQuotas, err := a.waitForReadyNamespacedResourceQuotas(namespaceName) if err != nil { klog.Errorf("failed to fetch namespaced resource quotas: %v, %v", namespaceName, err) return nil, err } for _, resourceQuota := range namespacedResourceQuotas { resourceQuota.APIVersion = corev1.SchemeGroupVersion.String() result = append(result, resourceQuota) } return result, nil } func (a *accessor) waitForReadyResourceQuotaNames(namespaceName string) ([]string, error) { var resourceQuotaNames []string // wait for a valid mapping cache. The overall response can be delayed for up to 10 seconds. err := utilwait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 8*time.Second, true, func(ctx context.Context) (done bool, err error) { resourceQuotaNames, err = resourceQuotaNamesFor(ctx, a.client, namespaceName) // if we can't find the namespace yet, just wait for the cache to update. Requests to non-existent namespaces // may hang, but those people are doing something wrong and namespace lifecycle should reject them. if apierrors.IsNotFound(err) { return false, nil } if err != nil { return false, err } return true, nil }) return resourceQuotaNames, err } func (a *accessor) waitForReadyNamespacedResourceQuotas(namespaceName string) ([]corev1.ResourceQuota, error) { var resourceQuotas []corev1.ResourceQuota // wait for a valid mapping cache. The overall response can be delayed for up to 10 seconds. err := utilwait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 8*time.Second, true, func(ctx context.Context) (done bool, err error) { resourceQuotaList := &corev1.ResourceQuotaList{} err = a.client.List(ctx, resourceQuotaList, &client.ListOptions{Namespace: namespaceName}) if err != nil { return false, err } resourceQuotas = resourceQuotaList.Items return true, nil }) return resourceQuotas, err }