support workspace resource quota
Signed-off-by: hongming <talonwan@yunify.com>
This commit is contained in:
206
pkg/controller/quota/accessor.go
Normal file
206
pkg/controller/quota/accessor.go
Normal file
@@ -0,0 +1,206 @@
|
||||
/*
|
||||
|
||||
Copyright 2021 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
*/
|
||||
|
||||
package quota
|
||||
|
||||
import (
|
||||
"context"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/klog"
|
||||
quotav1alpha2 "kubesphere.io/kubesphere/pkg/apis/quota/v1alpha2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"time"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
utilwait "k8s.io/apimachinery/pkg/util/wait"
|
||||
etcd "k8s.io/apiserver/pkg/storage/etcd3"
|
||||
utilquota "kubesphere.io/kubesphere/kube/pkg/quota/v1"
|
||||
)
|
||||
|
||||
// Following code copied from github.com/openshift/apiserver-library-go/pkg/admission/quota/clusterresourcequota
|
||||
|
||||
type accessor struct {
|
||||
client client.Client
|
||||
|
||||
// updatedResourceQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
|
||||
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
|
||||
// for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
|
||||
updatedResourceQuotas *lru.Cache
|
||||
}
|
||||
|
||||
// newQuotaAccessor creates an object that conforms to the QuotaAccessor interface to be used to retrieve quota objects.
|
||||
func newQuotaAccessor(client client.Client) *accessor {
|
||||
updatedCache, err := lru.New(100)
|
||||
if err != nil {
|
||||
// this should never happen
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &accessor{
|
||||
client: client,
|
||||
updatedResourceQuotas: updatedCache,
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateQuotaStatus the newQuota coming in will be incremented from the original. The difference between the original
|
||||
// and the new is the amount to add to the namespace total, but the total status is the used value itself
|
||||
func (a *accessor) UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error {
|
||||
// skipping namespaced resource quota
|
||||
if newQuota.APIVersion != quotav1alpha2.SchemeGroupVersion.String() {
|
||||
klog.V(6).Infof("skipping namespaced resource quota %v %v", newQuota.Namespace, newQuota.Name)
|
||||
return nil
|
||||
}
|
||||
ctx := context.TODO()
|
||||
resourceQuota := "av1alpha2.ResourceQuota{}
|
||||
err := a.client.Get(ctx, types.NamespacedName{Name: newQuota.Name}, resourceQuota)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to fetch resource quota: %s, %v", newQuota.Name, err)
|
||||
return err
|
||||
}
|
||||
resourceQuota = a.checkCache(resourceQuota)
|
||||
|
||||
// re-assign objectmeta
|
||||
// make a copy
|
||||
updatedQuota := resourceQuota.DeepCopy()
|
||||
updatedQuota.ObjectMeta = newQuota.ObjectMeta
|
||||
updatedQuota.Namespace = ""
|
||||
|
||||
// determine change in usage
|
||||
usageDiff := utilquota.Subtract(newQuota.Status.Used, updatedQuota.Status.Total.Used)
|
||||
|
||||
// update aggregate usage
|
||||
updatedQuota.Status.Total.Used = newQuota.Status.Used
|
||||
|
||||
// update per namespace totals
|
||||
oldNamespaceTotals, _ := getResourceQuotasStatusByNamespace(updatedQuota.Status.Namespaces, newQuota.Namespace)
|
||||
namespaceTotalCopy := oldNamespaceTotals.DeepCopy()
|
||||
newNamespaceTotals := *namespaceTotalCopy
|
||||
newNamespaceTotals.Used = utilquota.Add(oldNamespaceTotals.Used, usageDiff)
|
||||
insertResourceQuotasStatus(&updatedQuota.Status.Namespaces, quotav1alpha2.ResourceQuotaStatusByNamespace{
|
||||
Namespace: newQuota.Namespace,
|
||||
ResourceQuotaStatus: newNamespaceTotals,
|
||||
})
|
||||
|
||||
klog.V(6).Infof("update resource quota: %+v", updatedQuota)
|
||||
err = a.client.Status().Update(ctx, updatedQuota, &client.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to update resource quota: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
a.updatedResourceQuotas.Add(resourceQuota.Name, updatedQuota)
|
||||
return nil
|
||||
}
|
||||
|
||||
var etcdVersioner = etcd.APIObjectVersioner{}
|
||||
|
||||
// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
|
||||
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
|
||||
// being monotonically increasing integers
|
||||
func (a *accessor) checkCache(resourceQuota *quotav1alpha2.ResourceQuota) *quotav1alpha2.ResourceQuota {
|
||||
uncastCachedQuota, ok := a.updatedResourceQuotas.Get(resourceQuota.Name)
|
||||
if !ok {
|
||||
return resourceQuota
|
||||
}
|
||||
cachedQuota := uncastCachedQuota.(*quotav1alpha2.ResourceQuota)
|
||||
|
||||
if etcdVersioner.CompareResourceVersion(resourceQuota, cachedQuota) >= 0 {
|
||||
a.updatedResourceQuotas.Remove(resourceQuota.Name)
|
||||
return resourceQuota
|
||||
}
|
||||
return cachedQuota
|
||||
}
|
||||
|
||||
func (a *accessor) GetQuotas(namespaceName string) ([]corev1.ResourceQuota, error) {
|
||||
resourceQuotaNames, err := a.waitForReadyResourceQuotaNames(namespaceName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to fetch resource quota names: %v, %v", namespaceName, err)
|
||||
return nil, err
|
||||
}
|
||||
var result []corev1.ResourceQuota
|
||||
for _, resourceQuotaName := range resourceQuotaNames {
|
||||
resourceQuota := "av1alpha2.ResourceQuota{}
|
||||
err = a.client.Get(context.TODO(), types.NamespacedName{Name: resourceQuotaName}, resourceQuota)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to fetch resource quota %s: %v", resourceQuotaName, err)
|
||||
return result, err
|
||||
}
|
||||
resourceQuota = a.checkCache(resourceQuota)
|
||||
|
||||
// now convert to a ResourceQuota
|
||||
convertedQuota := corev1.ResourceQuota{}
|
||||
convertedQuota.APIVersion = quotav1alpha2.SchemeGroupVersion.String()
|
||||
convertedQuota.ObjectMeta = resourceQuota.ObjectMeta
|
||||
convertedQuota.Namespace = namespaceName
|
||||
convertedQuota.Spec = resourceQuota.Spec.Quota
|
||||
convertedQuota.Status = resourceQuota.Status.Total
|
||||
result = append(result, convertedQuota)
|
||||
}
|
||||
|
||||
// avoid conflicts with namespaced resource quota
|
||||
namespacedResourceQuotas, err := a.waitForReadyNamespacedResourceQuotas(namespaceName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to fetch namespaced resource quotas: %v, %v", namespaceName, err)
|
||||
return nil, err
|
||||
}
|
||||
for _, resourceQuota := range namespacedResourceQuotas {
|
||||
resourceQuota.APIVersion = corev1.SchemeGroupVersion.String()
|
||||
result = append(result, resourceQuota)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (a *accessor) waitForReadyResourceQuotaNames(namespaceName string) ([]string, error) {
|
||||
ctx := context.TODO()
|
||||
var resourceQuotaNames []string
|
||||
var err error
|
||||
// wait for a valid mapping cache. The overall response can be delayed for up to 10 seconds.
|
||||
err = utilwait.PollImmediate(100*time.Millisecond, 8*time.Second, func() (done bool, err error) {
|
||||
resourceQuotaNames, err = resourceQuotaNamesFor(ctx, a.client, namespaceName)
|
||||
// if we can't find the namespace yet, just wait for the cache to update. Requests to non-existent namespaces
|
||||
// may hang, but those people are doing something wrong and namespace lifecycle should reject them.
|
||||
if apierrors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
return resourceQuotaNames, err
|
||||
}
|
||||
|
||||
func (a *accessor) waitForReadyNamespacedResourceQuotas(namespaceName string) ([]corev1.ResourceQuota, error) {
|
||||
ctx := context.TODO()
|
||||
var resourceQuotas []corev1.ResourceQuota
|
||||
var err error
|
||||
// wait for a valid mapping cache. The overall response can be delayed for up to 10 seconds.
|
||||
err = utilwait.PollImmediate(100*time.Millisecond, 8*time.Second, func() (done bool, err error) {
|
||||
resourceQuotaList := &corev1.ResourceQuotaList{}
|
||||
err = a.client.List(ctx, resourceQuotaList, &client.ListOptions{Namespace: namespaceName})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
resourceQuotas = resourceQuotaList.Items
|
||||
return true, nil
|
||||
})
|
||||
return resourceQuotas, err
|
||||
}
|
||||
59
pkg/controller/quota/lockfactory.go
Normal file
59
pkg/controller/quota/lockfactory.go
Normal file
@@ -0,0 +1,59 @@
|
||||
/*
|
||||
|
||||
Copyright 2021 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
*/
|
||||
|
||||
package quota
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Following code copied from github.com/openshift/apiserver-library-go/pkg/admission/quota/clusterresourcequota
|
||||
type LockFactory interface {
|
||||
GetLock(string) sync.Locker
|
||||
}
|
||||
|
||||
type DefaultLockFactory struct {
|
||||
lock sync.RWMutex
|
||||
|
||||
locks map[string]sync.Locker
|
||||
}
|
||||
|
||||
func NewDefaultLockFactory() *DefaultLockFactory {
|
||||
return &DefaultLockFactory{locks: map[string]sync.Locker{}}
|
||||
}
|
||||
|
||||
func (f *DefaultLockFactory) GetLock(key string) sync.Locker {
|
||||
lock, exists := f.getExistingLock(key)
|
||||
if exists {
|
||||
return lock
|
||||
}
|
||||
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
lock = &sync.Mutex{}
|
||||
f.locks[key] = lock
|
||||
return lock
|
||||
}
|
||||
|
||||
func (f *DefaultLockFactory) getExistingLock(key string) (sync.Locker, bool) {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
|
||||
lock, exists := f.locks[key]
|
||||
return lock, exists
|
||||
}
|
||||
299
pkg/controller/quota/resourcequota_controller.go
Normal file
299
pkg/controller/quota/resourcequota_controller.go
Normal file
@@ -0,0 +1,299 @@
|
||||
/*
|
||||
|
||||
Copyright 2021 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
*/
|
||||
|
||||
package quota
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/go-logr/logr"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog"
|
||||
evaluatorcore "kubesphere.io/kubesphere/kube/pkg/quota/v1/evaluator/core"
|
||||
"kubesphere.io/kubesphere/kube/pkg/quota/v1/generic"
|
||||
"kubesphere.io/kubesphere/kube/pkg/quota/v1/install"
|
||||
quotav1alpha2 "kubesphere.io/kubesphere/pkg/apis/quota/v1alpha2"
|
||||
tenantv1alpha1 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/constants"
|
||||
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
|
||||
"math"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
"time"
|
||||
|
||||
k8sinformers "k8s.io/client-go/informers"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
||||
quotav1 "kubesphere.io/kubesphere/kube/pkg/quota/v1"
|
||||
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
)
|
||||
|
||||
const (
|
||||
ControllerName = "resourcequota-controller"
|
||||
DefaultResyncPeriod = 5 * time.Minute
|
||||
DefaultMaxConcurrentReconciles = 8
|
||||
)
|
||||
|
||||
// Reconciler reconciles a Workspace object
|
||||
type Reconciler struct {
|
||||
client.Client
|
||||
logger logr.Logger
|
||||
recorder record.EventRecorder
|
||||
maxConcurrentReconciles int
|
||||
// Knows how to calculate usage
|
||||
registry quotav1.Registry
|
||||
// Controls full recalculation of quota usage
|
||||
resyncPeriod time.Duration
|
||||
scheme *runtime.Scheme
|
||||
}
|
||||
|
||||
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, maxConcurrentReconciles int, resyncPeriod time.Duration, informerFactory k8sinformers.SharedInformerFactory) error {
|
||||
r.logger = ctrl.Log.WithName("controllers").WithName(ControllerName)
|
||||
r.recorder = mgr.GetEventRecorderFor(ControllerName)
|
||||
r.scheme = mgr.GetScheme()
|
||||
r.registry = generic.NewRegistry(install.NewQuotaConfigurationForControllers(generic.ListerFuncForResourceFunc(informerFactory.ForResource)).Evaluators())
|
||||
if r.Client == nil {
|
||||
r.Client = mgr.GetClient()
|
||||
}
|
||||
if maxConcurrentReconciles > 0 {
|
||||
r.maxConcurrentReconciles = maxConcurrentReconciles
|
||||
} else {
|
||||
r.maxConcurrentReconciles = DefaultMaxConcurrentReconciles
|
||||
}
|
||||
r.resyncPeriod = time.Duration(math.Max(float64(resyncPeriod), float64(DefaultResyncPeriod)))
|
||||
c, err := ctrl.NewControllerManagedBy(mgr).
|
||||
Named(ControllerName).
|
||||
WithOptions(controller.Options{
|
||||
MaxConcurrentReconciles: r.maxConcurrentReconciles,
|
||||
}).
|
||||
For("av1alpha2.ResourceQuota{}).
|
||||
WithEventFilter(predicate.GenerationChangedPredicate{
|
||||
Funcs: predicate.Funcs{
|
||||
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||
oldQuota := e.ObjectOld.(*quotav1alpha2.ResourceQuota)
|
||||
newQuota := e.ObjectNew.(*quotav1alpha2.ResourceQuota)
|
||||
return !equality.Semantic.DeepEqual(oldQuota.Spec, newQuota.Spec)
|
||||
},
|
||||
},
|
||||
}).
|
||||
Build(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resources := []runtime.Object{
|
||||
&corev1.Pod{},
|
||||
&corev1.Service{},
|
||||
&corev1.PersistentVolumeClaim{},
|
||||
}
|
||||
realClock := clock.RealClock{}
|
||||
for _, resource := range resources {
|
||||
err := c.Watch(
|
||||
&source.Kind{Type: resource},
|
||||
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.mapper)},
|
||||
predicate.Funcs{
|
||||
GenericFunc: func(e event.GenericEvent) bool {
|
||||
return false
|
||||
},
|
||||
CreateFunc: func(e event.CreateEvent) bool {
|
||||
return false
|
||||
},
|
||||
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||
notifyChange := false
|
||||
// we only want to queue the updates we care about though as too much noise will overwhelm queue.
|
||||
switch e.MetaOld.(type) {
|
||||
case *corev1.Pod:
|
||||
oldPod := e.ObjectOld.(*corev1.Pod)
|
||||
newPod := e.ObjectNew.(*corev1.Pod)
|
||||
notifyChange = evaluatorcore.QuotaV1Pod(oldPod, realClock) && !evaluatorcore.QuotaV1Pod(newPod, realClock)
|
||||
case *corev1.Service:
|
||||
oldService := e.ObjectOld.(*corev1.Service)
|
||||
newService := e.ObjectNew.(*corev1.Service)
|
||||
notifyChange = evaluatorcore.GetQuotaServiceType(oldService) != evaluatorcore.GetQuotaServiceType(newService)
|
||||
case *corev1.PersistentVolumeClaim:
|
||||
notifyChange = true
|
||||
}
|
||||
return notifyChange
|
||||
},
|
||||
DeleteFunc: func(e event.DeleteEvent) bool {
|
||||
return true
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reconciler) mapper(h handler.MapObject) []reconcile.Request {
|
||||
// check if the quota controller can evaluate this kind, if not, ignore it altogether...
|
||||
var result []reconcile.Request
|
||||
evaluators := r.registry.List()
|
||||
ctx := context.TODO()
|
||||
resourceQuotaNames, err := resourceQuotaNamesFor(ctx, r.Client, h.Meta.GetNamespace())
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get resource quota names for: %v %T %v, err: %v", h.Meta.GetNamespace(), h.Object, h.Meta.GetName(), err)
|
||||
return result
|
||||
}
|
||||
// only queue those quotas that are tracking a resource associated with this kind.
|
||||
for _, resourceQuotaName := range resourceQuotaNames {
|
||||
resourceQuota := "av1alpha2.ResourceQuota{}
|
||||
if err := r.Get(ctx, types.NamespacedName{Name: resourceQuotaName}, resourceQuota); err != nil {
|
||||
klog.Errorf("failed to get resource quota: %v, err: %v", resourceQuotaName, err)
|
||||
return result
|
||||
}
|
||||
resourceQuotaResources := quotav1.ResourceNames(resourceQuota.Status.Total.Hard)
|
||||
for _, evaluator := range evaluators {
|
||||
matchedResources := evaluator.MatchingResources(resourceQuotaResources)
|
||||
if len(matchedResources) > 0 {
|
||||
result = append(result, reconcile.Request{NamespacedName: types.NamespacedName{Name: resourceQuotaName}})
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
klog.V(6).Infof("resource quota reconcile after resource change: %v %T %v, %+v", h.Meta.GetNamespace(), h.Object, h.Meta.GetName(), result)
|
||||
return result
|
||||
}
|
||||
|
||||
func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
|
||||
logger := r.logger.WithValues("resourcequota", req.NamespacedName)
|
||||
rootCtx := context.TODO()
|
||||
resourceQuota := "av1alpha2.ResourceQuota{}
|
||||
if err := r.Get(rootCtx, req.NamespacedName, resourceQuota); err != nil {
|
||||
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||
}
|
||||
|
||||
if err := r.bindWorkspace(resourceQuota); err != nil {
|
||||
logger.Error(err, "failed to set owner reference")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
if err := r.syncQuotaForNamespaces(resourceQuota); err != nil {
|
||||
logger.Error(err, "failed to sync quota")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
r.recorder.Event(resourceQuota, corev1.EventTypeNormal, "Synced", "Synced successfully")
|
||||
return ctrl.Result{RequeueAfter: r.resyncPeriod}, nil
|
||||
}
|
||||
|
||||
func (r *Reconciler) bindWorkspace(resourceQuota *quotav1alpha2.ResourceQuota) error {
|
||||
workspaceName := resourceQuota.Labels[constants.WorkspaceLabelKey]
|
||||
if workspaceName == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
workspace := &tenantv1alpha1.Workspace{}
|
||||
err := r.Get(context.TODO(), types.NamespacedName{Name: workspaceName}, workspace)
|
||||
if err != nil {
|
||||
return client.IgnoreNotFound(err)
|
||||
}
|
||||
|
||||
if !metav1.IsControlledBy(resourceQuota, workspace) {
|
||||
resourceQuota.OwnerReferences = nil
|
||||
if err := controllerutil.SetControllerReference(workspace, resourceQuota, r.scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.Update(context.TODO(), resourceQuota)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reconciler) syncQuotaForNamespaces(originalQuota *quotav1alpha2.ResourceQuota) error {
|
||||
quota := originalQuota.DeepCopy()
|
||||
ctx := context.TODO()
|
||||
// get the list of namespaces that match this cluster quota
|
||||
matchingNamespaceList := corev1.NamespaceList{}
|
||||
if err := r.List(ctx, &matchingNamespaceList, &client.ListOptions{LabelSelector: labels.SelectorFromSet(quota.Spec.LabelSelector)}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
matchingNamespaceNames := make([]string, 0)
|
||||
for _, namespace := range matchingNamespaceList.Items {
|
||||
matchingNamespaceNames = append(matchingNamespaceNames, namespace.Name)
|
||||
}
|
||||
|
||||
for _, namespace := range matchingNamespaceList.Items {
|
||||
namespaceName := namespace.Name
|
||||
namespaceTotals, _ := getResourceQuotasStatusByNamespace(quota.Status.Namespaces, namespaceName)
|
||||
|
||||
actualUsage, err := quotaUsageCalculationFunc(namespaceName, quota.Spec.Quota.Scopes, quota.Spec.Quota.Hard, r.registry, quota.Spec.Quota.ScopeSelector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
recalculatedStatus := corev1.ResourceQuotaStatus{
|
||||
Used: actualUsage,
|
||||
Hard: quota.Spec.Quota.Hard,
|
||||
}
|
||||
|
||||
// subtract old usage, add new usage
|
||||
quota.Status.Total.Used = quotav1.Subtract(quota.Status.Total.Used, namespaceTotals.Used)
|
||||
quota.Status.Total.Used = quotav1.Add(quota.Status.Total.Used, recalculatedStatus.Used)
|
||||
insertResourceQuotasStatus("a.Status.Namespaces, quotav1alpha2.ResourceQuotaStatusByNamespace{
|
||||
Namespace: namespaceName,
|
||||
ResourceQuotaStatus: recalculatedStatus,
|
||||
})
|
||||
}
|
||||
|
||||
// Remove any namespaces from quota.status that no longer match.
|
||||
statusCopy := quota.Status.Namespaces.DeepCopy()
|
||||
for _, namespaceTotals := range statusCopy {
|
||||
namespaceName := namespaceTotals.Namespace
|
||||
if !sliceutil.HasString(matchingNamespaceNames, namespaceName) {
|
||||
quota.Status.Total.Used = quotav1.Subtract(quota.Status.Total.Used, namespaceTotals.Used)
|
||||
removeResourceQuotasStatusByNamespace("a.Status.Namespaces, namespaceName)
|
||||
}
|
||||
}
|
||||
|
||||
quota.Status.Total.Hard = quota.Spec.Quota.Hard
|
||||
|
||||
// if there's no change, no update, return early. NewAggregate returns nil on empty input
|
||||
if equality.Semantic.DeepEqual(quota, originalQuota) {
|
||||
return nil
|
||||
}
|
||||
|
||||
klog.V(6).Infof("update resource quota: %+v", quota)
|
||||
if err := r.Status().Update(ctx, quota, &client.UpdateOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// quotaUsageCalculationFunc is a function to calculate quota usage. It is only configurable for easy unit testing
|
||||
// NEVER CHANGE THIS OUTSIDE A TEST
|
||||
var quotaUsageCalculationFunc = quotav1.CalculateUsage
|
||||
191
pkg/controller/quota/resourcequota_webhook.go
Normal file
191
pkg/controller/quota/resourcequota_webhook.go
Normal file
@@ -0,0 +1,191 @@
|
||||
/*
|
||||
|
||||
Copyright 2021 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
*/
|
||||
|
||||
package quota
|
||||
|
||||
import (
|
||||
"context"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilwait "k8s.io/apimachinery/pkg/util/wait"
|
||||
admissionapi "k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/kube/pkg/quota/v1"
|
||||
"kubesphere.io/kubesphere/kube/pkg/quota/v1/generic"
|
||||
"kubesphere.io/kubesphere/kube/pkg/quota/v1/install"
|
||||
"kubesphere.io/kubesphere/kube/plugin/pkg/admission/resourcequota"
|
||||
resourcequotaapi "kubesphere.io/kubesphere/kube/plugin/pkg/admission/resourcequota/apis/resourcequota"
|
||||
"net/http"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/webhook"
|
||||
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
numEvaluatorThreads = 10
|
||||
)
|
||||
|
||||
type ResourceQuotaAdmission struct {
|
||||
client client.Client
|
||||
|
||||
decoder *webhook.AdmissionDecoder
|
||||
|
||||
lockFactory LockFactory
|
||||
|
||||
// these are used to create the evaluator
|
||||
registry quota.Registry
|
||||
|
||||
init sync.Once
|
||||
evaluator resourcequota.Evaluator
|
||||
}
|
||||
|
||||
func NewResourceQuotaAdmission(client client.Client, scheme *runtime.Scheme) (webhook.AdmissionHandler, error) {
|
||||
decoder, err := admission.NewDecoder(scheme)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ResourceQuotaAdmission{
|
||||
client: client,
|
||||
lockFactory: NewDefaultLockFactory(),
|
||||
decoder: decoder,
|
||||
registry: generic.NewRegistry(install.NewQuotaConfigurationForAdmission().Evaluators()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *ResourceQuotaAdmission) Handle(ctx context.Context, req webhook.AdmissionRequest) webhook.AdmissionResponse {
|
||||
// ignore all operations that correspond to sub-resource actions
|
||||
if len(req.RequestSubResource) != 0 {
|
||||
return webhook.Allowed("")
|
||||
}
|
||||
// ignore cluster level resources
|
||||
if len(req.Namespace) == 0 {
|
||||
return webhook.Allowed("")
|
||||
}
|
||||
|
||||
r.init.Do(func() {
|
||||
resourceQuotaAccessor := newQuotaAccessor(r.client)
|
||||
r.evaluator = resourcequota.NewQuotaEvaluator(resourceQuotaAccessor, install.DefaultIgnoredResources(), r.registry, r.lockAquisition, &resourcequotaapi.Configuration{}, numEvaluatorThreads, utilwait.NeverStop)
|
||||
})
|
||||
|
||||
attributesRecord, err := convertToAdmissionAttributes(req)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return webhook.Errored(http.StatusBadRequest, err)
|
||||
}
|
||||
|
||||
if err := r.evaluator.Evaluate(attributesRecord); err != nil {
|
||||
if errors.IsForbidden(err) {
|
||||
klog.Info(err)
|
||||
return webhook.Denied(err.Error())
|
||||
}
|
||||
klog.Error(err)
|
||||
return webhook.Errored(http.StatusInternalServerError, err)
|
||||
}
|
||||
|
||||
return webhook.Allowed("")
|
||||
}
|
||||
|
||||
type ByName []corev1.ResourceQuota
|
||||
|
||||
func (v ByName) Len() int { return len(v) }
|
||||
func (v ByName) Swap(i, j int) { v[i], v[j] = v[j], v[i] }
|
||||
func (v ByName) Less(i, j int) bool { return v[i].Name < v[j].Name }
|
||||
|
||||
func (r *ResourceQuotaAdmission) lockAquisition(quotas []corev1.ResourceQuota) func() {
|
||||
var locks []sync.Locker
|
||||
|
||||
// acquire the locks in alphabetical order because I'm too lazy to think of something clever
|
||||
sort.Sort(ByName(quotas))
|
||||
for _, quota := range quotas {
|
||||
lock := r.lockFactory.GetLock(string(quota.UID))
|
||||
lock.Lock()
|
||||
locks = append(locks, lock)
|
||||
}
|
||||
|
||||
return func() {
|
||||
for i := len(locks) - 1; i >= 0; i-- {
|
||||
locks[i].Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func convertToAdmissionAttributes(req admission.Request) (admissionapi.Attributes, error) {
|
||||
var err error
|
||||
var object runtime.Object
|
||||
if len(req.Object.Raw) > 0 {
|
||||
object, _, err = scheme.Codecs.UniversalDeserializer().Decode(req.Object.Raw, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var oldObject runtime.Object
|
||||
if len(req.OldObject.Raw) > 0 {
|
||||
oldObject, _, err = scheme.Codecs.UniversalDeserializer().Decode(req.OldObject.Raw, nil, nil)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var operationOptions runtime.Object
|
||||
if len(req.Options.Raw) > 0 {
|
||||
operationOptions, _, err = scheme.Codecs.UniversalDeserializer().Decode(req.Options.Raw, nil, nil)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
extras := map[string][]string{}
|
||||
for k, v := range req.UserInfo.Extra {
|
||||
extras[k] = v
|
||||
}
|
||||
|
||||
attributesRecord := admissionapi.NewAttributesRecord(object,
|
||||
oldObject,
|
||||
schema.GroupVersionKind{
|
||||
Group: req.RequestKind.Group,
|
||||
Version: req.RequestKind.Version,
|
||||
Kind: req.RequestKind.Kind,
|
||||
},
|
||||
req.Namespace,
|
||||
req.Name,
|
||||
schema.GroupVersionResource{
|
||||
Group: req.RequestResource.Group,
|
||||
Version: req.RequestResource.Version,
|
||||
Resource: req.RequestResource.Resource,
|
||||
},
|
||||
req.SubResource,
|
||||
admissionapi.Operation(req.Operation),
|
||||
operationOptions,
|
||||
*req.DryRun,
|
||||
&user.DefaultInfo{
|
||||
Name: req.UserInfo.Username,
|
||||
UID: req.UserInfo.UID,
|
||||
Groups: req.UserInfo.Groups,
|
||||
Extra: extras,
|
||||
})
|
||||
return attributesRecord, nil
|
||||
}
|
||||
92
pkg/controller/quota/util.go
Normal file
92
pkg/controller/quota/util.go
Normal file
@@ -0,0 +1,92 @@
|
||||
/*
|
||||
|
||||
Copyright 2021 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
*/
|
||||
|
||||
package quota
|
||||
|
||||
import (
|
||||
"context"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
quotav1alpha2 "kubesphere.io/kubesphere/pkg/apis/quota/v1alpha2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
// Following code copied from github.com/openshift/library-go/pkg/quota/quotautil
|
||||
func getResourceQuotasStatusByNamespace(namespaceStatuses quotav1alpha2.ResourceQuotasStatusByNamespace, namespace string) (corev1.ResourceQuotaStatus, bool) {
|
||||
for i := range namespaceStatuses {
|
||||
curr := namespaceStatuses[i]
|
||||
if curr.Namespace == namespace {
|
||||
return curr.ResourceQuotaStatus, true
|
||||
}
|
||||
}
|
||||
return corev1.ResourceQuotaStatus{}, false
|
||||
}
|
||||
|
||||
func removeResourceQuotasStatusByNamespace(namespaceStatuses *quotav1alpha2.ResourceQuotasStatusByNamespace, namespace string) {
|
||||
newNamespaceStatuses := quotav1alpha2.ResourceQuotasStatusByNamespace{}
|
||||
for i := range *namespaceStatuses {
|
||||
curr := (*namespaceStatuses)[i]
|
||||
if curr.Namespace == namespace {
|
||||
continue
|
||||
}
|
||||
newNamespaceStatuses = append(newNamespaceStatuses, curr)
|
||||
}
|
||||
*namespaceStatuses = newNamespaceStatuses
|
||||
}
|
||||
|
||||
func insertResourceQuotasStatus(namespaceStatuses *quotav1alpha2.ResourceQuotasStatusByNamespace, newStatus quotav1alpha2.ResourceQuotaStatusByNamespace) {
|
||||
newNamespaceStatuses := quotav1alpha2.ResourceQuotasStatusByNamespace{}
|
||||
found := false
|
||||
for i := range *namespaceStatuses {
|
||||
curr := (*namespaceStatuses)[i]
|
||||
if curr.Namespace == newStatus.Namespace {
|
||||
// do this so that we don't change serialization order
|
||||
newNamespaceStatuses = append(newNamespaceStatuses, newStatus)
|
||||
found = true
|
||||
continue
|
||||
}
|
||||
newNamespaceStatuses = append(newNamespaceStatuses, curr)
|
||||
}
|
||||
if !found {
|
||||
newNamespaceStatuses = append(newNamespaceStatuses, newStatus)
|
||||
}
|
||||
*namespaceStatuses = newNamespaceStatuses
|
||||
}
|
||||
|
||||
func resourceQuotaNamesFor(ctx context.Context, client client.Client, namespaceName string) ([]string, error) {
|
||||
namespace := &corev1.Namespace{}
|
||||
var resourceQuotaNames []string
|
||||
if err := client.Get(ctx, types.NamespacedName{Name: namespaceName}, namespace); err != nil {
|
||||
return resourceQuotaNames, err
|
||||
}
|
||||
if len(namespace.Labels) == 0 {
|
||||
return resourceQuotaNames, nil
|
||||
}
|
||||
resourceQuotaList := "av1alpha2.ResourceQuotaList{}
|
||||
if err := client.List(ctx, resourceQuotaList); err != nil {
|
||||
return resourceQuotaNames, err
|
||||
}
|
||||
for _, resourceQuota := range resourceQuotaList.Items {
|
||||
if len(resourceQuota.Spec.LabelSelector) > 0 &&
|
||||
labels.SelectorFromSet(resourceQuota.Spec.LabelSelector).Matches(labels.Set(namespace.Labels)) {
|
||||
resourceQuotaNames = append(resourceQuotaNames, resourceQuota.Name)
|
||||
}
|
||||
}
|
||||
return resourceQuotaNames, nil
|
||||
}
|
||||
Reference in New Issue
Block a user