From 364941d5d2dc63a15bf549c9b948bd0dc31c6776 Mon Sep 17 00:00:00 2001 From: wang_wenhu <976400757@qq.com> Date: Thu, 5 Aug 2021 13:58:46 +0800 Subject: [PATCH] remove workload auto-restart function when pvc expanded Signed-off-by: wang_wenhu <976400757@qq.com> --- cmd/controller-manager/app/controllers.go | 11 - .../storage/expansion/expansion_controller.go | 507 ------------------ .../expansion/expansion_controller_test.go | 289 ---------- .../storage/expansion/expansion_util.go | 67 --- 4 files changed, 874 deletions(-) delete mode 100644 pkg/controller/storage/expansion/expansion_controller.go delete mode 100644 pkg/controller/storage/expansion/expansion_controller_test.go delete mode 100644 pkg/controller/storage/expansion/expansion_util.go diff --git a/cmd/controller-manager/app/controllers.go b/cmd/controller-manager/app/controllers.go index 4b25ecc8b..0f3cc8190 100644 --- a/cmd/controller-manager/app/controllers.go +++ b/cmd/controller-manager/app/controllers.go @@ -46,7 +46,6 @@ import ( "kubesphere.io/kubesphere/pkg/controller/s2ibinary" "kubesphere.io/kubesphere/pkg/controller/s2irun" "kubesphere.io/kubesphere/pkg/controller/storage/capability" - "kubesphere.io/kubesphere/pkg/controller/storage/expansion" "kubesphere.io/kubesphere/pkg/controller/user" "kubesphere.io/kubesphere/pkg/controller/virtualservice" "kubesphere.io/kubesphere/pkg/informers" @@ -143,15 +142,6 @@ func addControllers( informerFactory.SnapshotSharedInformerFactory().Snapshot().V1beta1().VolumeSnapshotClasses(), ) - volumeExpansionController := expansion.NewVolumeExpansionController( - client.Kubernetes(), - kubernetesInformer.Core().V1().PersistentVolumeClaims(), - kubernetesInformer.Storage().V1().StorageClasses(), - kubernetesInformer.Core().V1().Pods(), - kubernetesInformer.Apps().V1().Deployments(), - kubernetesInformer.Apps().V1().ReplicaSets(), - kubernetesInformer.Apps().V1().StatefulSets()) - var fedUserCache, fedGlobalRoleBindingCache, fedGlobalRoleCache cache.Store var fedUserCacheController, fedGlobalRoleBindingCacheController, fedGlobalRoleCacheController cache.Controller @@ -265,7 +255,6 @@ func addControllers( "s2ibinary-controller": s2iBinaryController, "s2irun-controller": s2iRunController, "storagecapability-controller": storageCapabilityController, - "volumeexpansion-controller": volumeExpansionController, "user-controller": userController, "loginrecord-controller": loginRecordController, "cluster-controller": clusterController, diff --git a/pkg/controller/storage/expansion/expansion_controller.go b/pkg/controller/storage/expansion/expansion_controller.go deleted file mode 100644 index 0db3c05df..000000000 --- a/pkg/controller/storage/expansion/expansion_controller.go +++ /dev/null @@ -1,507 +0,0 @@ -/* - - Copyright 2019 The KubeSphere Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -*/ - -package expansion - -import ( - "context" - "errors" - "fmt" - "time" - - appsv1 "k8s.io/api/apps/v1" - autoscalingv1 "k8s.io/api/autoscaling/v1" - corev1 "k8s.io/api/core/v1" - storagev1 "k8s.io/api/storage/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - appsv1informers "k8s.io/client-go/informers/apps/v1" - corev1informers "k8s.io/client-go/informers/core/v1" - storagev1informer "k8s.io/client-go/informers/storage/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - listerappsv1 "k8s.io/client-go/listers/apps/v1" - listercorev1 "k8s.io/client-go/listers/core/v1" - listerstoragev1 "k8s.io/client-go/listers/storage/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/retry" - "k8s.io/client-go/util/workqueue" - "k8s.io/klog" -) - -const controllerAgentName = "expansion-controller" - -var supportedProvisioner = []string{"disk.csi.qingcloud.com", "csi-qingcloud"} - -var retryTime = wait.Backoff{ - Duration: 1 * time.Second, - Factor: 2, - Steps: 12, -} - -type VolumeExpansionController struct { - kubeclientset kubernetes.Interface - pvcLister listercorev1.PersistentVolumeClaimLister - pvcSynced cache.InformerSynced - classLister listerstoragev1.StorageClassLister - classSynced cache.InformerSynced - podLister listercorev1.PodLister - podSynced cache.InformerSynced - deployLister listerappsv1.DeploymentLister - deploySynced cache.InformerSynced - rsLister listerappsv1.ReplicaSetLister - rsSynced cache.InformerSynced - stsLister listerappsv1.StatefulSetLister - stsSynced cache.InformerSynced - workqueue workqueue.RateLimitingInterface - recorder record.EventRecorder -} - -// NewController returns a new volume expansion controller -func NewVolumeExpansionController( - kubeclientset kubernetes.Interface, - pvcInformer corev1informers.PersistentVolumeClaimInformer, - classInformer storagev1informer.StorageClassInformer, - podInformer corev1informers.PodInformer, - deployInformer appsv1informers.DeploymentInformer, - rsInformer appsv1informers.ReplicaSetInformer, - stsInformer appsv1informers.StatefulSetInformer) *VolumeExpansionController { - klog.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(klog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - controller := &VolumeExpansionController{ - kubeclientset: kubeclientset, - pvcLister: pvcInformer.Lister(), - pvcSynced: pvcInformer.Informer().HasSynced, - classLister: classInformer.Lister(), - classSynced: classInformer.Informer().HasSynced, - podLister: podInformer.Lister(), - podSynced: podInformer.Informer().HasSynced, - deployLister: deployInformer.Lister(), - deploySynced: deployInformer.Informer().HasSynced, - rsLister: rsInformer.Lister(), - rsSynced: rsInformer.Informer().HasSynced, - stsLister: stsInformer.Lister(), - stsSynced: stsInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "expansion"), - recorder: recorder, - } - klog.V(2).Info("Setting up event handlers") - pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueuePVC, - UpdateFunc: func(old, new interface{}) { - oldPVC, ok := old.(*corev1.PersistentVolumeClaim) - if !ok { - return - } - oldSize := oldPVC.Spec.Resources.Requests[corev1.ResourceStorage] - newPVC, ok := new.(*corev1.PersistentVolumeClaim) - if !ok { - return - } - newSize := newPVC.Spec.Resources.Requests[corev1.ResourceStorage] - if newSize.Cmp(oldSize) > 0 && newSize.Cmp(newPVC.Status.Capacity[corev1.ResourceStorage]) > 0 { - controller.handleObject(new) - } - }, - DeleteFunc: controller.enqueuePVC, - }) - return controller -} - -func (c *VolumeExpansionController) Start(stopCh <-chan struct{}) error { - return c.Run(5, stopCh) -} - -func (c *VolumeExpansionController) Run(threadiness int, stopCh <-chan struct{}) error { - defer utilruntime.HandleCrash() - defer c.workqueue.ShutDown() - klog.V(2).Info("Starting expand volume controller") - klog.V(2).Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.pvcSynced, c.classSynced, c.podSynced, c.deploySynced, c.rsSynced, - c.stsSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - klog.V(2).Info("Starting workers") - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) - } - - klog.V(2).Info("Started workers") - <-stopCh - klog.V(2).Info("Shutting down workers") - - return nil -} - -func (c *VolumeExpansionController) runWorker() { - for c.processNextWorkItem() { - } -} - -func (c *VolumeExpansionController) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - err := func(obj interface{}) error { - defer c.workqueue.Done(obj) - var key string - var ok bool - - if key, ok = obj.(string); !ok { - c.workqueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } - if err := c.syncHandler(key); err != nil { - c.workqueue.AddRateLimited(key) - return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) - } - c.workqueue.Forget(obj) - klog.V(2).Infof("Successfully synced '%s'", key) - return nil - }(obj) - - if err != nil { - utilruntime.HandleError(err) - return true - } - - return true -} - -// syncHandler will re-attach PVC on workloads. -// Step 1. Find the workload (deployment or statefulset) mounting PVC. -// If more than one workloads mounts the same PVC, the controller will not -// do anything. -// Step 2. Verify workload types. -// Step 3. Scale down workload. -// Step 4. Retry to check PVC status. -// Step 5. Scale up workload. -func (c *VolumeExpansionController) syncHandler(key string) error { - klog.V(5).Infof("syncHandler: handle %s", key) - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) - return nil - } - - // Get PVC source - pvc, err := c.pvcLister.PersistentVolumeClaims(namespace).Get(name) - if err != nil { - // The PVC resource may no longer exist, in which case we stop processing. - if apierrors.IsNotFound(err) { - klog.V(4).Infof("PVC '%s' in work queue no longer exists", key) - utilruntime.HandleError(fmt.Errorf("PVC '%s' in work queue no longer exists", key)) - return nil - } - return err - } - // find workload - workload, err := c.findWorkload(name, namespace) - if err != nil { - return err - } - if workload == nil { - klog.V(4).Infof("Cannot find any Pods mounting PVC %s", key) - return nil - } - klog.V(5).Infof("Find workload %T pvc name %s", workload, pvc.GetName()) - // handle supported workload - switch workload.(type) { - case *appsv1.StatefulSet: - sts := workload.(*appsv1.StatefulSet) - klog.V(5).Infof("Find StatefulSet %s", sts.GetName()) - case *appsv1.Deployment: - deploy := workload.(*appsv1.Deployment) - klog.V(5).Infof("Find Deployment %s", deploy.GetName()) - default: - klog.Errorf("Unsupported workload type %T", workload) - return nil - } - // Scale workload to 0 - if err = c.scaleDown(workload, pvc.GetNamespace()); err != nil { - klog.V(2).Infof("scale down PVC %s mounted workloads failed %s", key, err.Error()) - return err - } - klog.V(2).Infof("Scale down PVC %s mounted workloads succeed", key) - // Wait to scale up - err = retry.RetryOnConflict(retryTime, func() error { - klog.V(4).Info("waiting for PVC filesystem expansion") - if !c.isWaitingScaleUp(name, namespace) { - return apierrors.NewConflict(schema.GroupResource{Resource: "PersistentVolumeClaim"}, key, - errors.New("waiting for scaling down and expanding disk")) - } - return nil - }) - klog.V(5).Info("after waiting") - if err != nil { - klog.Errorf("Waiting timeout, error: %s", err.Error()) - } - - // Scale up - if err = c.scaleUp(workload, namespace); err != nil { - klog.V(2).Infof("Scale up PVC %s mounted workloads failed %s", key, err.Error()) - return err - } - klog.V(2).Infof("Scale up PVC %s mounted workloads succeed", key) - return nil -} - -// handleObject will take any resource implementing metav1.Object and attempt -// to find the PVC resource that 'owns' it. It does this by looking at the -// StorageClass of PVC whether supporting provisioner and allowing volume -// expansion. -// In KS 2.1, the controller only supports disk.csi.qingcloud.com and -// csi-qingcloud as storageclass provisioner. -func (c *VolumeExpansionController) handleObject(obj interface{}) { - var object metav1.Object - var ok bool - if object, ok = obj.(metav1.Object); !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) - return - } - object, ok = tombstone.Obj.(metav1.Object) - if !ok { - utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) - return - } - klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) - } - klog.V(4).Infof("Processing object: %s", object.GetName()) - - pvc := obj.(*corev1.PersistentVolumeClaim) - // Check storage class - // In KS 2.1, we only support disk.csi.qingcloud.com as storageclass provisioner. - class := c.getStorageClass(pvc) - klog.V(4).Infof("Get PVC %s SC was %s", pvc.String(), class.String()) - if class == nil { - return - } - if *class.AllowVolumeExpansion == false { - return - } - for _, p := range supportedProvisioner { - if class.Provisioner == p { - klog.V(5).Infof("enqueue PVC %s", claimToClaimKey(pvc)) - c.enqueuePVC(obj) - return - } - } -} - -func (c *VolumeExpansionController) getStorageClass(pvc *corev1.PersistentVolumeClaim) *storagev1.StorageClass { - if pvc == nil { - return nil - } - claimClass := getPersistentVolumeClaimClass(pvc) - if claimClass == "" { - klog.V(4).Infof("volume expansion is disabled for PVC without StorageClasses: %s", - claimToClaimKey(pvc)) - return nil - } - class, err := c.classLister.Get(claimClass) - if err != nil { - klog.V(4).Infof("failed to expand PVC: %s with error: %v", claimToClaimKey(pvc), err) - return nil - } - return class -} - -// enqueuePVC takes a PVC resource and converts it into a namespace/name -// string which is then put onto the work queue. This method should *not* be -// passed resources of any type other than PVC. -func (c *VolumeExpansionController) enqueuePVC(obj interface{}) { - pvc, ok := obj.(*corev1.PersistentVolumeClaim) - if !ok { - return - } - size := pvc.Spec.Resources.Requests[corev1.ResourceStorage] - statusSize := pvc.Status.Capacity[corev1.ResourceStorage] - - if pvc.Status.Phase == corev1.ClaimBound && size.Cmp(statusSize) > 0 { - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pvc) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err)) - return - } - c.workqueue.Add(key) - } -} - -// findWorkload returns the pointer of Pod, StatefulSet or Deployment mounting the PVC. -func (c *VolumeExpansionController) findWorkload(pvc, namespace string) (workloadPtr interface{}, err error) { - podList, err := c.podLister.Pods(namespace).List(labels.Everything()) - klog.V(4).Infof("podlist len %d", len(podList)) - if err != nil { - return nil, err - } - // Which pod mounting PVC - podPtrListMountPVC := getPodMountPVC(podList, pvc) - klog.V(4).Infof("Get %d pods mounting PVC", len(podPtrListMountPVC)) - // In KS 2.1, automatic re-attach PVC only support PVC mounted on a single Pod. - if len(podPtrListMountPVC) != 1 { - return nil, nil - } - // If pod managed by Deployment, StatefulSet, it returns Deployment or StatefulSet. - // If not, it returns Pod. - klog.V(4).Info("Find pod parent") - ownerRef, err := c.findPodParent(podPtrListMountPVC[0]) - if err != nil { - return nil, err - } - if ownerRef == nil { - // a single pod - return podPtrListMountPVC[0], nil - } else { - klog.V(4).Infof("OwnerRef kind %s", ownerRef.Kind) - switch ownerRef.Kind { - case "StatefulSet": - return c.stsLister.StatefulSets(namespace).Get(ownerRef.Name) - case "Deployment": - return c.deployLister.Deployments(namespace).Get(ownerRef.Name) - default: - return nil, nil - } - } - -} - -// If the Pod don't controlled by any controller, return nil. -func (c *VolumeExpansionController) findPodParent(pod *corev1.Pod) (*metav1.OwnerReference, error) { - if pod == nil { - return nil, nil - } - if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil { - switch ownerRef.Kind { - case "ReplicaSet": - // get deploy - rs, err := c.rsLister.ReplicaSets(pod.GetNamespace()).Get(ownerRef.Name) - if err != nil { - return nil, err - } - if rsOwnerRef := metav1.GetControllerOf(rs); rsOwnerRef != nil { - return rsOwnerRef, nil - } else { - return ownerRef, nil - } - case "StatefulSet": - return ownerRef, nil - default: - return &metav1.OwnerReference{}, nil - } - } - return nil, nil -} - -func (c *VolumeExpansionController) scaleDown(workload interface{}, namespace string) error { - switch workload.(type) { - case *appsv1.Deployment: - deploy := workload.(*appsv1.Deployment) - scale := &autoscalingv1.Scale{ - ObjectMeta: metav1.ObjectMeta{ - Name: deploy.GetName(), - Namespace: deploy.GetNamespace(), - }, - Spec: autoscalingv1.ScaleSpec{ - Replicas: 0, - }, - } - _, err := c.kubeclientset.AppsV1().Deployments(namespace).UpdateScale(context.Background(), deploy.GetName(), scale, metav1.UpdateOptions{}) - return err - case *appsv1.StatefulSet: - sts := workload.(*appsv1.StatefulSet) - scale := &autoscalingv1.Scale{ - ObjectMeta: metav1.ObjectMeta{ - Name: sts.GetName(), - Namespace: sts.GetNamespace(), - }, - Spec: autoscalingv1.ScaleSpec{ - Replicas: 0, - }, - } - _, err := c.kubeclientset.AppsV1().StatefulSets(namespace).UpdateScale(context.Background(), sts.GetName(), scale, metav1.UpdateOptions{}) - return err - default: - return fmt.Errorf("unsupported type %T", workload) - } -} - -func (c *VolumeExpansionController) scaleUp(workload interface{}, namespace string) error { - switch workload.(type) { - case *appsv1.Deployment: - deploy := workload.(*appsv1.Deployment) - scale := &autoscalingv1.Scale{ - ObjectMeta: metav1.ObjectMeta{ - Name: deploy.GetName(), - Namespace: deploy.GetNamespace(), - }, - Spec: autoscalingv1.ScaleSpec{ - Replicas: *deploy.Spec.Replicas, - }, - } - _, err := c.kubeclientset.AppsV1().Deployments(namespace).UpdateScale(context.Background(), deploy.GetName(), scale, metav1.UpdateOptions{}) - return err - case *appsv1.StatefulSet: - sts := workload.(*appsv1.StatefulSet) - scale := &autoscalingv1.Scale{ - ObjectMeta: metav1.ObjectMeta{ - Name: sts.GetName(), - Namespace: sts.GetNamespace(), - }, - Spec: autoscalingv1.ScaleSpec{ - Replicas: *sts.Spec.Replicas, - }, - } - _, err := c.kubeclientset.AppsV1().StatefulSets(namespace).UpdateScale(context.Background(), sts.GetName(), scale, metav1.UpdateOptions{}) - return err - default: - return fmt.Errorf("unsupported type %T", workload) - } -} - -// isWaitingScaleUp tries to check whether PVC is waiting for restart Pod. -func (c *VolumeExpansionController) isWaitingScaleUp(name, namespace string) bool { - pvc, err := c.pvcLister.PersistentVolumeClaims(namespace).Get(name) - if err != nil { - klog.Errorf("Get PVC error") - } - if pvc == nil { - return false - } - for _, condition := range pvc.Status.Conditions { - if condition.Type == corev1.PersistentVolumeClaimFileSystemResizePending { - return true - } - } - return false -} diff --git a/pkg/controller/storage/expansion/expansion_controller_test.go b/pkg/controller/storage/expansion/expansion_controller_test.go deleted file mode 100644 index 0831db284..000000000 --- a/pkg/controller/storage/expansion/expansion_controller_test.go +++ /dev/null @@ -1,289 +0,0 @@ -/* - - Copyright 2019 The KubeSphere Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -*/ - -package expansion - -import ( - "encoding/json" - "flag" - "fmt" - "os" - "testing" - "time" - - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - storagev1 "k8s.io/api/storage/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/fake" - clientgotesting "k8s.io/client-go/testing" -) - -func TestMain(m *testing.M) { - flag.Set("alsologtostderr", "true") - flag.Set("log_dir", "/tmp") - flag.Set("v", "3") - flag.Parse() - ret := m.Run() - os.Exit(ret) -} - -func TestSyncHandler(t *testing.T) { - retryTime = wait.Backoff{ - Duration: 1 * time.Second, - Factor: 2, - Steps: 2, - } - tests := []struct { - name string - pvc *v1.PersistentVolumeClaim - pod *v1.Pod - deploy *appsv1.Deployment - rs *appsv1.ReplicaSet - sts *appsv1.StatefulSet - sc *storagev1.StorageClass - pvcKey string - hasError bool - }{ - { - name: "mount pvc on deploy", - pvc: getFakePersistentVolumeClaim("fake-pvc", "vol-12345", "fake-sc", types.UID(fmt.Sprintf("%d", 123))), - sc: getFakeStorageClass("fake-sc", "fake.sc.com"), - deploy: getFakeDeployment("fake-deploy", "234", 1, - getFakePersistentVolumeClaim("fake-pvc", "vol-12345", "fake-sc", types.UID(fmt.Sprintf("%d", 123)))), - pvcKey: "default/fake-pvc", - hasError: false, - }, - { - name: "unmounted pvc", - pvc: getFakePersistentVolumeClaim("fake-pvc", "vol-12345", "fake-sc", types.UID(fmt.Sprintf("%d", 123))), - sc: getFakeStorageClass("fake-sc", "fake.sc.com"), - pvcKey: "default/fake-pvc", - hasError: true, - }, - } - for _, tc := range tests { - test := tc - fakeKubeClient := &fake.Clientset{} - fakeWatch := watch.NewFake() - fakeKubeClient.AddWatchReactor("*", clientgotesting.DefaultWatchReactor(fakeWatch, nil)) - informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, 0) - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - storageClassInformer := informerFactory.Storage().V1().StorageClasses() - podInformer := informerFactory.Core().V1().Pods() - deployInformer := informerFactory.Apps().V1().Deployments() - rsInformer := informerFactory.Apps().V1().ReplicaSets() - stsInformer := informerFactory.Apps().V1().StatefulSets() - pvc := tc.pvc - - if tc.pvc != nil { - informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(pvc) - } - if tc.sc != nil { - informerFactory.Storage().V1().StorageClasses().Informer().GetIndexer().Add(tc.sc) - } - if tc.deploy != nil { - informerFactory.Apps().V1().Deployments().Informer().GetIndexer().Add(tc.deploy) - tc.rs = generateReplicaSetFromDeployment(tc.deploy) - } - if tc.rs != nil { - informerFactory.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(tc.rs) - tc.pod = generatePodFromReplicaSet(tc.rs) - } - if tc.sts != nil { - informerFactory.Apps().V1().StatefulSets().Informer().GetIndexer().Add(tc.sts) - } - if tc.pod != nil { - informerFactory.Core().V1().Pods().Informer().GetIndexer().Add(tc.pod) - } - expc := NewVolumeExpansionController(fakeKubeClient, pvcInformer, storageClassInformer, podInformer, deployInformer, - rsInformer, stsInformer) - fakeKubeClient.AddReactor("patch", "persistentvolumeclaims", func(action clientgotesting.Action) (bool, runtime.Object, - error) { - if action.GetSubresource() == "status" { - patchActionaction, _ := action.(clientgotesting.PatchAction) - pvc, err := applyPVCPatch(pvc, patchActionaction.GetPatch()) - if err != nil { - return false, nil, err - } - return true, pvc, nil - } - return true, pvc, nil - }) - err := expc.syncHandler(test.pvcKey) - if err != nil && !test.hasError { - t.Fatalf("for: %s; unexpected error while running handler : %v", test.name, err) - } - } -} - -func applyPVCPatch(originalPVC *v1.PersistentVolumeClaim, patch []byte) (*v1.PersistentVolumeClaim, error) { - pvcData, err := json.Marshal(originalPVC) - if err != nil { - return nil, fmt.Errorf("failed to marshal pvc with %v", err) - } - updated, err := strategicpatch.StrategicMergePatch(pvcData, patch, v1.PersistentVolumeClaim{}) - if err != nil { - return nil, fmt.Errorf("failed to apply patch on pvc %v", err) - } - updatedPVC := &v1.PersistentVolumeClaim{} - if err := json.Unmarshal(updated, updatedPVC); err != nil { - return nil, fmt.Errorf("failed to unmarshal updated pvc : %v", err) - } - return updatedPVC, nil -} - -func getFakePersistentVolumeClaim(pvcName, volumeName, scName string, uid types.UID) *v1.PersistentVolumeClaim { - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "default", UID: uid}, - Spec: v1.PersistentVolumeClaimSpec{}, - } - if volumeName != "" { - pvc.Spec.VolumeName = volumeName - } - - if scName != "" { - pvc.Spec.StorageClassName = &scName - } - return pvc -} - -func getFakeStorageClass(scName, pluginName string) *storagev1.StorageClass { - return &storagev1.StorageClass{ - ObjectMeta: metav1.ObjectMeta{Name: scName}, - Provisioner: pluginName, - } -} - -func getFakePod(podName string, ownerRef *metav1.OwnerReference, uid types.UID) *v1.Pod { - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: podName, - Namespace: "default", - UID: uid, - OwnerReferences: []metav1.OwnerReference{*ownerRef}, - }, - Spec: v1.PodSpec{}, - } - return pod -} - -func getFakeReplicaSet(rsName string, ownerRef *metav1.OwnerReference, uid types.UID, replicas int) *appsv1.ReplicaSet { - rs := &appsv1.ReplicaSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: rsName, - Namespace: "default", - UID: uid, - OwnerReferences: []metav1.OwnerReference{*ownerRef}, - }, - Spec: appsv1.ReplicaSetSpec{}, - } - return rs -} - -func getFakeDeployment(deployName string, uid types.UID, replicas int32, mountPVC *v1.PersistentVolumeClaim) *appsv1. - Deployment { - return &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: deployName, - Namespace: "default", - }, - Spec: appsv1.DeploymentSpec{ - Replicas: &replicas, - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: "test", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: mountPVC.GetName(), - }, - }, - }, - }, - }, - }, - }, - } -} - -func getFakeStatefulSet(stsName string, uid types.UID, replicas int32, mountPVC *v1.PersistentVolumeClaim) *appsv1. - StatefulSet { - return &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: stsName, - Namespace: "default", - }, - Spec: appsv1.StatefulSetSpec{ - Replicas: &replicas, - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: "test", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: mountPVC.GetName(), - }, - }, - }, - }, - }, - }, - }, - } -} - -// generatePodFromRS creates a pod, with the input ReplicaSet's selector and its template -func generatePodFromReplicaSet(rs *appsv1.ReplicaSet) *v1.Pod { - trueVar := true - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: rs.Name + "-pod", - Namespace: rs.Namespace, - OwnerReferences: []metav1.OwnerReference{ - {UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}, - }, - }, - Spec: rs.Spec.Template.Spec, - } -} - -func generateReplicaSetFromDeployment(deploy *appsv1.Deployment) *appsv1.ReplicaSet { - trueVar := true - return &appsv1.ReplicaSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: deploy.Name + "-rs", - Namespace: deploy.Namespace, - OwnerReferences: []metav1.OwnerReference{ - {UID: deploy.UID, APIVersion: "v1beta1", Kind: "Deployment", Name: deploy.Name, Controller: &trueVar}, - }, - }, - Spec: appsv1.ReplicaSetSpec{ - Replicas: deploy.Spec.Replicas, - Template: deploy.Spec.Template, - }, - } -} diff --git a/pkg/controller/storage/expansion/expansion_util.go b/pkg/controller/storage/expansion/expansion_util.go deleted file mode 100644 index 19de01318..000000000 --- a/pkg/controller/storage/expansion/expansion_util.go +++ /dev/null @@ -1,67 +0,0 @@ -/* - - Copyright 2019 The KubeSphere Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -*/ - -package expansion - -import ( - "fmt" - - corev1 "k8s.io/api/core/v1" - "k8s.io/klog" -) - -func getPodMountPVC(pods []*corev1.Pod, pvc string) []*corev1.Pod { - var res []*corev1.Pod - for _, pod := range pods { - klog.V(4).Infof("check pod %s is mount pvc %s", pod.Name, pvc) - curPod := pod - if isMounted(pod, pvc) { - res = append(res, curPod) - } - } - return res -} - -func isMounted(pod *corev1.Pod, pvc string) bool { - for _, vol := range pod.Spec.Volumes { - if vol.PersistentVolumeClaim != nil && vol.PersistentVolumeClaim.ClaimName == pvc { - return true - } - } - return false -} - -// claimToClaimKey return namespace/name string for pvc -func claimToClaimKey(claim *corev1.PersistentVolumeClaim) string { - return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name) -} - -// GetPersistentVolumeClaimClass returns StorageClassName. If no storage class was -// requested, it returns "". -func getPersistentVolumeClaimClass(claim *corev1.PersistentVolumeClaim) string { - // Use beta annotation first - if class, found := claim.Annotations[corev1.BetaStorageClassAnnotation]; found { - return class - } - - if claim.Spec.StorageClassName != nil { - return *claim.Spec.StorageClassName - } - - return "" -}