From eadf8cc4c865af3af78076cf049fa0bb4b4f9089 Mon Sep 17 00:00:00 2001 From: dkven Date: Wed, 9 Jun 2021 00:10:47 +0800 Subject: [PATCH] remove storage capability auto detection Signed-off-by: dkven --- cmd/controller-manager/app/controllers.go | 1 - .../disk-csi-qingcloud-com.yaml | 20 ++ .../neonsan-csi-qingstor-com.yaml | 20 ++ .../templates/ks-controller-manager.yaml | 6 - go.mod | 1 - .../capability/capability_controller.go | 132 +++++------- .../capability/capability_controller_test.go | 179 ++++++++++------- .../storage/capability/csi_capability.go | 168 ---------------- .../storage/capability/csi_capability_test.go | 190 ------------------ 9 files changed, 191 insertions(+), 526 deletions(-) create mode 100644 config/default/provisonercapability/disk-csi-qingcloud-com.yaml create mode 100644 config/default/provisonercapability/neonsan-csi-qingstor-com.yaml delete mode 100644 pkg/controller/storage/capability/csi_capability.go delete mode 100644 pkg/controller/storage/capability/csi_capability_test.go diff --git a/cmd/controller-manager/app/controllers.go b/cmd/controller-manager/app/controllers.go index ce0ee1925..4b25ecc8b 100644 --- a/cmd/controller-manager/app/controllers.go +++ b/cmd/controller-manager/app/controllers.go @@ -141,7 +141,6 @@ func addControllers( capability.SnapshotSupported(client.Kubernetes().Discovery()), client.Snapshot().SnapshotV1beta1().VolumeSnapshotClasses(), informerFactory.SnapshotSharedInformerFactory().Snapshot().V1beta1().VolumeSnapshotClasses(), - kubernetesInformer.Storage().V1beta1().CSIDrivers(), ) volumeExpansionController := expansion.NewVolumeExpansionController( diff --git a/config/default/provisonercapability/disk-csi-qingcloud-com.yaml b/config/default/provisonercapability/disk-csi-qingcloud-com.yaml new file mode 100644 index 000000000..b9d3130a6 --- /dev/null +++ b/config/default/provisonercapability/disk-csi-qingcloud-com.yaml @@ -0,0 +1,20 @@ +apiVersion: storage.kubesphere.io/v1alpha1 +kind: ProvisionerCapability +metadata: + name: disk-csi-qingcloud-com +spec: + pluginInfo: + name: disk.csi.qingcloud.com + version: "" + features: + topology: true + snapshot: + create: true + list: false + volume: + attach: true + clone: true + create: true + expandMode: OFFLINE + list: false + stats: true diff --git a/config/default/provisonercapability/neonsan-csi-qingstor-com.yaml b/config/default/provisonercapability/neonsan-csi-qingstor-com.yaml new file mode 100644 index 000000000..75c429364 --- /dev/null +++ b/config/default/provisonercapability/neonsan-csi-qingstor-com.yaml @@ -0,0 +1,20 @@ +apiVersion: storage.kubesphere.io/v1alpha1 +kind: ProvisionerCapability +metadata: + name: neonsan-csi-qingstor-com +spec: + pluginInfo: + name: neonsan.csi.qingstor.com + version: "" + features: + topology: false + snapshot: + create: true + list: false + volume: + attach: true + clone: true + create: true + expandMode: OFFLINE + list: false + stats: true diff --git a/config/ks-core/templates/ks-controller-manager.yaml b/config/ks-core/templates/ks-controller-manager.yaml index 4d351873c..06c31edd5 100644 --- a/config/ks-core/templates/ks-controller-manager.yaml +++ b/config/ks-core/templates/ks-controller-manager.yaml @@ -50,8 +50,6 @@ spec: name: kubesphere-config - mountPath: /tmp/k8s-webhook-server/serving-certs name: webhook-secret - - mountPath: /var/lib/kubelet/plugins/ - name: kubelet-plugin - mountPath: /etc/localtime name: host-time dnsPolicy: ClusterFirst @@ -68,10 +66,6 @@ spec: secret: defaultMode: 420 secretName: ks-controller-manager-webhook-cert - - name: kubelet-plugin - hostPath: - path: /var/lib/kubelet/plugins/ - type: DirectoryOrCreate - hostPath: path: /etc/localtime type: "" diff --git a/go.mod b/go.mod index c1e4816fb..0764d8937 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 github.com/aws/aws-sdk-go v1.33.12 github.com/beevik/etree v1.1.0 - github.com/container-storage-interface/spec v1.2.0 github.com/containernetworking/cni v0.8.0 github.com/coreos/go-oidc v2.1.0+incompatible github.com/davecgh/go-spew v1.1.1 diff --git a/pkg/controller/storage/capability/capability_controller.go b/pkg/controller/storage/capability/capability_controller.go index 345d68488..7fce92afb 100644 --- a/pkg/controller/storage/capability/capability_controller.go +++ b/pkg/controller/storage/capability/capability_controller.go @@ -35,43 +35,37 @@ import ( snapinformers "github.com/kubernetes-csi/external-snapshotter/client/v3/informers/externalversions/volumesnapshot/v1beta1" snapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v3/listers/volumesnapshot/v1beta1" storagev1 "k8s.io/api/storage/v1" - storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" storageinformersv1 "k8s.io/client-go/informers/storage/v1" - storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1" "k8s.io/client-go/kubernetes/scheme" storageclient "k8s.io/client-go/kubernetes/typed/storage/v1" storagelistersv1 "k8s.io/client-go/listers/storage/v1" - storagelistersv1beta1 "k8s.io/client-go/listers/storage/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - capability "kubesphere.io/api/storage/v1alpha1" + ksstorage "kubesphere.io/api/storage/v1alpha1" crdscheme "kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme" - capabilityclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/storage/v1alpha1" - capabilityinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/storage/v1alpha1" - capabilitylisters "kubesphere.io/kubesphere/pkg/client/listers/storage/v1alpha1" + ksstorageclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/storage/v1alpha1" + ksstorageinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/storage/v1alpha1" + ksstoragelisters "kubesphere.io/kubesphere/pkg/client/listers/storage/v1alpha1" ) const ( minSnapshotSupportedVersion = "v1.17.0" - csiAddressFormat = "/var/lib/kubelet/plugins/%s/csi.sock" annotationSupportSnapshot = "storageclass.kubesphere.io/support-snapshot" ) -type csiAddressGetter func(storageClassProvisioner string) string - type StorageCapabilityController struct { - storageClassCapabilityClient capabilityclient.StorageClassCapabilityInterface - storageCapabilityLister capabilitylisters.StorageClassCapabilityLister + storageClassCapabilityClient ksstorageclient.StorageClassCapabilityInterface + storageClassCapabilityLister ksstoragelisters.StorageClassCapabilityLister storageClassCapabilitySynced cache.InformerSynced - provisionerCapabilityLister capabilitylisters.ProvisionerCapabilityLister + provisionerCapabilityLister ksstoragelisters.ProvisionerCapabilityLister provisionerCapabilitySynced cache.InformerSynced storageClassClient storageclient.StorageClassInterface @@ -83,42 +77,33 @@ type StorageCapabilityController struct { snapshotClassLister snapshotlisters.VolumeSnapshotClassLister snapshotClassSynced cache.InformerSynced - csiDriverLister storagelistersv1beta1.CSIDriverLister - csiDriverSynced cache.InformerSynced - - csiAddressGetter csiAddressGetter - workQueue workqueue.RateLimitingInterface } -// This controller is responsible to watch StorageClass, SnapshotClass. +// This controller is responsible to watch StorageClass/ProvisionerCapability. // And then update StorageClassCapability CRD resource object to the newest status. func NewController( - capabilityClient capabilityclient.StorageClassCapabilityInterface, - capabilityInformer capabilityinformers.Interface, + storageClassCapabilityClient ksstorageclient.StorageClassCapabilityInterface, + ksStorageInformer ksstorageinformers.Interface, storageClassClient storageclient.StorageClassInterface, storageClassInformer storageinformersv1.StorageClassInformer, snapshotSupported bool, snapshotClassClient snapshotclient.VolumeSnapshotClassInterface, snapshotClassInformer snapinformers.VolumeSnapshotClassInformer, - csiDriverInformer storageinformersv1beta1.CSIDriverInformer, ) *StorageCapabilityController { utilruntime.Must(crdscheme.AddToScheme(scheme.Scheme)) controller := &StorageCapabilityController{ - storageClassCapabilityClient: capabilityClient, - storageCapabilityLister: capabilityInformer.StorageClassCapabilities().Lister(), - storageClassCapabilitySynced: capabilityInformer.StorageClassCapabilities().Informer().HasSynced, - provisionerCapabilityLister: capabilityInformer.ProvisionerCapabilities().Lister(), - provisionerCapabilitySynced: capabilityInformer.ProvisionerCapabilities().Informer().HasSynced, + storageClassCapabilityClient: storageClassCapabilityClient, + storageClassCapabilityLister: ksStorageInformer.StorageClassCapabilities().Lister(), + storageClassCapabilitySynced: ksStorageInformer.StorageClassCapabilities().Informer().HasSynced, + provisionerCapabilityLister: ksStorageInformer.ProvisionerCapabilities().Lister(), + provisionerCapabilitySynced: ksStorageInformer.ProvisionerCapabilities().Informer().HasSynced, storageClassClient: storageClassClient, storageClassLister: storageClassInformer.Lister(), storageClassSynced: storageClassInformer.Informer().HasSynced, snapshotSupported: snapshotSupported, - csiDriverLister: csiDriverInformer.Lister(), - csiDriverSynced: csiDriverInformer.Informer().HasSynced, - csiAddressGetter: csiAddress, workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "StorageClasses"), } @@ -141,12 +126,24 @@ func NewController( DeleteFunc: controller.enqueueStorageClass, }) - csiDriverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.handlerCSIDriver, + // ProvisionerCapability acts as a value source of its relevant StorageClassCapabilities + // so when a PC is created/updated, the corresponding SCCs should be created(if not exists)/updated + // we achive this by simply enqueueing the StorageClasses of the same provisioner + // but don't overdo by cascade deleting the SCCs when a PC is deleted + // since the role of PCs is more like a template rather than owner to SCCs + + // This is a backward compatible fix to remove the useless auto detection of SCCs + // in the future, we will only keep ProvisionerCapability and remove the StorageClassCapability CRD entirely + ksStorageInformer.ProvisionerCapabilities().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleProvisionerCapability, UpdateFunc: func(oldObj, newObj interface{}) { - return + newPC := newObj.(*ksstorage.ProvisionerCapability) + oldPC := oldObj.(*ksstorage.ProvisionerCapability) + if newPC.ResourceVersion == oldPC.ResourceVersion { + return + } + controller.handleProvisionerCapability(newObj) }, - DeleteFunc: controller.handlerCSIDriver, }) return controller @@ -166,7 +163,6 @@ func (c *StorageCapabilityController) Run(threadCnt int, stopCh <-chan struct{}) c.storageClassCapabilitySynced, c.provisionerCapabilitySynced, c.storageClassSynced, - c.csiDriverSynced, } if c.snapshotAllowed() { @@ -186,16 +182,16 @@ func (c *StorageCapabilityController) Run(threadCnt int, stopCh <-chan struct{}) return nil } -func (c *StorageCapabilityController) handlerCSIDriver(obj interface{}) { - csiDriver := obj.(*storagev1beta1.CSIDriver) +func (c *StorageCapabilityController) handleProvisionerCapability(obj interface{}) { + provisionerCapability := obj.(*ksstorage.ProvisionerCapability) storageClasses, err := c.storageClassLister.List(labels.Everything()) if err != nil { - klog.Error("list StorageClass error when handler csiDriver", err) + klog.Error("list StorageClass error when handle provisionerCapability", err) return } for _, storageClass := range storageClasses { - if storageClass.Provisioner == csiDriver.Name { - klog.V(4).Infof("enqueue StorageClass %s when handling csiDriver", storageClass.Name) + if getProvisionerCapabilityName(storageClass.Provisioner) == provisionerCapability.Name { + klog.V(4).Infof("enqueue StorageClass %s while handling provisionerCapability", storageClass.Name) c.enqueueStorageClass(storageClass) } } @@ -278,14 +274,16 @@ func (c *StorageCapabilityController) syncHandler(key string) error { if err != nil { return err } - // No capability because csi-plugin not installed + // The corresponding ProvisionerCapability Object does not exist if capabilitySpec == nil { - klog.Infof("StorageClass %s has no capability", name) + klog.Infof("Can't get StorageClass %s's capability", name) err = c.updateStorageClassSnapshotSupported(storageClass, false) if err != nil { return err } - return c.deleteStorageCapability(name) + // Don't delete the already created SCC + // as it might be created manually by user + return nil } klog.Infof("StorageClass %s has capability %v", name, capabilitySpec) @@ -316,11 +314,11 @@ func (c *StorageCapabilityController) syncHandler(key string) error { } // Handle StorageClassCapability with the same name of StorageClass - storageClassCapabilityExist, err := c.storageCapabilityLister.Get(storageClass.Name) + storageClassCapabilityExist, err := c.storageClassCapabilityLister.Get(storageClass.Name) if err != nil { if errors.IsNotFound(err) { // If StorageClassCapability doesn't exist, create it - storageClassCapabilityCreate := &capability.StorageClassCapability{ObjectMeta: metav1.ObjectMeta{Name: storageClass.Name}} + storageClassCapabilityCreate := &ksstorage.StorageClassCapability{ObjectMeta: metav1.ObjectMeta{Name: storageClass.Name}} storageClassCapabilityCreate.Spec = *capabilitySpec klog.Info("Create StorageClassCapability: ", storageClassCapabilityCreate) _, err = c.storageClassCapabilityClient.Create(context.Background(), storageClassCapabilityCreate, metav1.CreateOptions{}) @@ -356,7 +354,7 @@ func (c *StorageCapabilityController) updateStorageClassSnapshotSupported(storag } func (c *StorageCapabilityController) deleteStorageCapability(name string) error { - _, err := c.storageCapabilityLister.Get(name) + _, err := c.storageClassCapabilityLister.Get(name) if err != nil { if errors.IsNotFound(err) { return nil @@ -382,7 +380,7 @@ func (c *StorageCapabilityController) deleteSnapshotClass(name string) error { return c.snapshotClassClient.Delete(context.Background(), name, metav1.DeleteOptions{}) } -func (c *StorageCapabilityController) capabilityFromProvisioner(provisioner string) (*capability.StorageClassCapabilitySpec, error) { +func (c *StorageCapabilityController) capabilityFromProvisioner(provisioner string) (*ksstorage.StorageClassCapabilitySpec, error) { provisionerCapability, err := c.provisionerCapabilityLister.Get(getProvisionerCapabilityName(provisioner)) if err != nil { if errors.IsNotFound(err) { @@ -391,13 +389,13 @@ func (c *StorageCapabilityController) capabilityFromProvisioner(provisioner stri return nil, err } klog.V(4).Infof("get provisioner capability:%s %s", provisioner, provisionerCapability.Name) - capabilitySpec := &capability.StorageClassCapabilitySpec{ + capabilitySpec := &ksstorage.StorageClassCapabilitySpec{ Features: provisionerCapability.Spec.Features, } return capabilitySpec, nil } -func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.StorageClass) (*capability.StorageClassCapabilitySpec, error) { +func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.StorageClass) (*ksstorage.StorageClassCapabilitySpec, error) { // get from provisioner capability first klog.V(4).Info("get cap ", storageClass.Provisioner) capabilitySpec, err := c.capabilityFromProvisioner(storageClass.Provisioner) @@ -405,24 +403,10 @@ func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1. return nil, err } - // csi of storage capability - if capabilitySpec == nil { - isCsi, err := c.isCSIStorage(storageClass.Provisioner) - if err != nil { - return nil, err - } - if isCsi { - capabilitySpec, err = csiCapability(c.csiAddressGetter(storageClass.Provisioner)) - if err != nil { - return nil, err - } - } - } - if capabilitySpec != nil { capabilitySpec.Provisioner = storageClass.Provisioner if storageClass.AllowVolumeExpansion == nil || !*storageClass.AllowVolumeExpansion { - capabilitySpec.Features.Volume.Expand = capability.ExpandModeUnknown + capabilitySpec.Features.Volume.Expand = ksstorage.ExpandModeUnknown } if !c.snapshotSupported { capabilitySpec.Features.Snapshot.Create = false @@ -432,22 +416,6 @@ func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1. return capabilitySpec, nil } -func (c *StorageCapabilityController) isCSIStorage(provisioner string) (bool, error) { - _, err := c.csiDriverLister.Get(provisioner) - if err != nil { - if errors.IsNotFound(err) { - return false, nil - } - return false, err - } - return true, nil -} - -// this is used for test of CSIDriver on windows -func (c *StorageCapabilityController) setCSIAddressGetter(getter csiAddressGetter) { - c.csiAddressGetter = getter -} - func (c *StorageCapabilityController) snapshotAllowed() bool { return c.snapshotSupported && c.snapshotClassClient != nil && c.snapshotClassLister != nil && c.snapshotClassSynced != nil } @@ -465,10 +433,6 @@ func SnapshotSupported(discoveryInterface discovery.DiscoveryInterface) bool { return ver.AtLeast(minVer) } -func csiAddress(provisioner string) string { - return fmt.Sprintf(csiAddressFormat, provisioner) -} - func getProvisionerCapabilityName(provisioner string) string { return strings.NewReplacer(".", "-", "/", "-").Replace(provisioner) } diff --git a/pkg/controller/storage/capability/capability_controller_test.go b/pkg/controller/storage/capability/capability_controller_test.go index 7026e6515..233739e87 100644 --- a/pkg/controller/storage/capability/capability_controller_test.go +++ b/pkg/controller/storage/capability/capability_controller_test.go @@ -19,11 +19,8 @@ package capability import ( - "math/rand" - "github.com/google/go-cmp/cmp" - //"github.com/google/go-cmp/cmp" "reflect" "testing" "time" @@ -32,7 +29,6 @@ import ( snapfake "github.com/kubernetes-csi/external-snapshotter/client/v3/clientset/versioned/fake" snapinformers "github.com/kubernetes-csi/external-snapshotter/client/v3/informers/externalversions" storagev1 "k8s.io/api/storage/v1" - storagev1beta1 "k8s.io/api/storage/v1beta1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -60,7 +56,7 @@ type fixture struct { snapshotClassClient *snapfake.Clientset ksClient *ksfake.Clientset // Objects from here preload into NewSimpleFake. - storageObjects []runtime.Object // include StorageClass and CSIDriver + storageObjects []runtime.Object // include StorageClass snapshotClassObjects []runtime.Object capabilityObjects []runtime.Object // include StorageClassCapability and ProvisionerCapability // Objects to put in the store. @@ -68,19 +64,14 @@ type fixture struct { snapshotClassLister []*snapbeta1.VolumeSnapshotClass storageClassCapabilityLister []*ksv1alpha1.StorageClassCapability provisionerCapabilityLister []*ksv1alpha1.ProvisionerCapability - csiDriverLister []*storagev1beta1.CSIDriver // Actions expected to happen on the client. actions []core.Action - // CSI server - runCSIServer bool - fakeCSIServer *fakeCSIServer } -func newFixture(t *testing.T, snapshotSupported bool, runCSIServer bool) *fixture { +func newFixture(t *testing.T, snapshotSupported bool) *fixture { return &fixture{ t: t, snapshotSupported: snapshotSupported, - runCSIServer: runCSIServer, } } @@ -105,21 +96,11 @@ func (f *fixture) newController() (*StorageCapabilityController, f.snapshotSupported, f.snapshotClassClient.SnapshotV1beta1().VolumeSnapshotClasses(), snapshotInformers.Snapshot().V1beta1().VolumeSnapshotClasses(), - k8sInformers.Storage().V1beta1().CSIDrivers(), ) - if f.runCSIServer { - port := 30000 + rand.Intn(100) - fakeCSIServer, address := newTestCSIServer(port) - f.fakeCSIServer = fakeCSIServer - c.setCSIAddressGetter(func(storageClassProvisioner string) string { return address }) - } for _, storageClass := range f.storageClassLister { _ = k8sInformers.Storage().V1().StorageClasses().Informer().GetIndexer().Add(storageClass) } - for _, csiDriver := range f.csiDriverLister { - _ = k8sInformers.Storage().V1beta1().CSIDrivers().Informer().GetIndexer().Add(csiDriver) - } for _, snapshotClass := range f.snapshotClassLister { _ = snapshotInformers.Snapshot().V1beta1().VolumeSnapshotClasses().Informer().GetIndexer().Add(snapshotClass) } @@ -136,11 +117,6 @@ func (f *fixture) newController() (*StorageCapabilityController, func (f *fixture) runController(scName string, startInformers bool, expectError bool) { c, k8sI, crdI, snapI := f.newController() - if f.runCSIServer { - f.fakeCSIServer.run() - defer f.fakeCSIServer.stop() - } - if startInformers { stopCh := make(chan struct{}) defer close(stopCh) @@ -277,6 +253,26 @@ func newStorageClass(name string, provisioner string) *storagev1.StorageClass { } } +func newStorageClassCapabilitySpec() *ksv1alpha1.StorageClassCapabilitySpec { + return &ksv1alpha1.StorageClassCapabilitySpec{ + Features: ksv1alpha1.CapabilityFeatures{ + Topology: false, + Volume: ksv1alpha1.VolumeFeature{ + Create: true, + Attach: false, + List: false, + Clone: true, + Stats: true, + Expand: ksv1alpha1.ExpandModeOffline, + }, + Snapshot: ksv1alpha1.SnapshotFeature{ + Create: true, + List: false, + }, + }, + } +} + func newStorageClassCapability(storageClass *storagev1.StorageClass) *ksv1alpha1.StorageClassCapability { storageClassCapability := &ksv1alpha1.StorageClassCapability{} storageClassCapability.Name = storageClass.Name @@ -290,17 +286,9 @@ func newProvisionerCapability(storageClass *storagev1.StorageClass) *ksv1alpha1. provisionerCapability.Name = getProvisionerCapabilityName(storageClass.Provisioner) provisionerCapability.Spec.PluginInfo.Name = storageClass.Provisioner provisionerCapability.Spec.Features = newStorageClassCapabilitySpec().Features - // ProvisionerCapability snapshot is always false - provisionerCapability.Spec.Features.Snapshot.Create = false return provisionerCapability } -func newCSIDriver(storageClass *storagev1.StorageClass) *storagev1beta1.CSIDriver { - csiDriver := &storagev1beta1.CSIDriver{} - csiDriver.Name = storageClass.Provisioner - return csiDriver -} - func newSnapshotClass(storageClass *storagev1.StorageClass) *snapbeta1.VolumeSnapshotClass { return &snapbeta1.VolumeSnapshotClass{ ObjectMeta: v1.ObjectMeta{ @@ -321,18 +309,19 @@ func getKey(sc *storagev1.StorageClass, t *testing.T) string { } func TestCreateStorageClass(t *testing.T) { - fixture := newFixture(t, true, true) + fixture := newFixture(t, true) storageClass := newStorageClass("csi-example", "csi.example.com") storageClassUpdate := storageClass.DeepCopy() storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "true"} + provisionerCapability := newProvisionerCapability(storageClass) snapshotClass := newSnapshotClass(storageClass) storageClassCapability := newStorageClassCapability(storageClass) - csiDriver := newCSIDriver(storageClass) // Objects exist - fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver) + fixture.storageObjects = append(fixture.storageObjects, storageClass) fixture.storageClassLister = append(fixture.storageClassLister, storageClass) - fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver) + fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability) + fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability) // Action expected fixture.expectCreateSnapshotClassAction(snapshotClass) @@ -343,28 +332,69 @@ func TestCreateStorageClass(t *testing.T) { fixture.run(getKey(storageClass, t)) } +func TestCreateStorageClassWithoutProvisionerCapability(t *testing.T) { + fixture := newFixture(t, true) + storageClass := newStorageClass("csi-example", "csi.example.com") + + // Objects exist + fixture.storageObjects = append(fixture.storageObjects, storageClass) + fixture.storageClassLister = append(fixture.storageClassLister, storageClass) + + storageClassUpdate := storageClass.DeepCopy() + storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"} + fixture.expectUpdateStorageClassAction(storageClassUpdate) + + // Run test + fixture.run(getKey(storageClass, t)) +} + func TestUpdateStorageClass(t *testing.T) { storageClass := newStorageClass("csi-example", "csi.example.com") storageClass.Annotations = map[string]string{annotationSupportSnapshot: "true"} snapshotClass := newSnapshotClass(storageClass) storageClassCapabilityUpdate := newStorageClassCapability(storageClass) storageClassCapability := newStorageClassCapability(storageClass) + provisionerCapability := newProvisionerCapability(storageClass) //old and new should have deference storageClassCapability.Spec.Features.Volume.Create = !storageClassCapability.Spec.Features.Volume.Create - csiDriver := newCSIDriver(storageClass) - fixture := newFixture(t, true, true) + fixture := newFixture(t, true) // Object exist - fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver) + fixture.storageObjects = append(fixture.storageObjects, storageClass) + fixture.storageClassLister = append(fixture.storageClassLister, storageClass) + fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass) + fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass) + fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability, provisionerCapability) + fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability) + fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability) + + // Action expected + fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapabilityUpdate) + + // Run test + fixture.run(getKey(storageClass, t)) +} + +func TestUpdateStorageClassWithoutProvisionerCapability(t *testing.T) { + storageClass := newStorageClass("csi-example", "csi.example.com") + storageClass.Annotations = map[string]string{annotationSupportSnapshot: "true"} + storageClassUpdate := storageClass.DeepCopy() + storageClassUpdate.Annotations[annotationSupportSnapshot] = "false" + snapshotClass := newSnapshotClass(storageClass) + storageClassCapability := newStorageClassCapability(storageClass) + //old and new should have deference + storageClassCapability.Spec.Features.Volume.Create = !storageClassCapability.Spec.Features.Volume.Create + + fixture := newFixture(t, true) + // Object exist + fixture.storageObjects = append(fixture.storageObjects, storageClass) fixture.storageClassLister = append(fixture.storageClassLister, storageClass) - fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver) fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass) fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass) fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability) fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability) - // Action expected - fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapabilityUpdate) + fixture.expectUpdateStorageClassAction(storageClassUpdate) // Run test fixture.run(getKey(storageClass, t)) @@ -375,12 +405,9 @@ func TestDeleteStorageClass(t *testing.T) { snapshotClass := newSnapshotClass(storageClass) storageClassCapability := newStorageClassCapability(storageClass) - csiDriver := newCSIDriver(storageClass) - - fixture := newFixture(t, true, true) + fixture := newFixture(t, true) // Object exist - fixture.storageObjects = append(fixture.storageObjects, csiDriver) - fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver) + fixture.storageObjects = append(fixture.storageObjects, storageClass) fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass) fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass) fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability) @@ -395,7 +422,8 @@ func TestDeleteStorageClass(t *testing.T) { } func TestCreateStorageClassNotSupportSnapshot(t *testing.T) { - fixture := newFixture(t, false, true) + // K8S version < 1.17.0 + fixture := newFixture(t, false) storageClass := newStorageClass("csi-example", "csi.example.com") storageClassUpdate := storageClass.DeepCopy() storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"} @@ -403,32 +431,31 @@ func TestCreateStorageClassNotSupportSnapshot(t *testing.T) { storageClassCapability.Spec.Features.Snapshot.Create = false storageClassCapability.Spec.Features.Snapshot.List = false provisionerCapability := newProvisionerCapability(storageClass) - csiDriver := newCSIDriver(storageClass) - - // Objects exist - fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver) - fixture.storageClassLister = append(fixture.storageClassLister, storageClass) - fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver) - fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability) - fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability) - - // Action expected - fixture.expectUpdateStorageClassAction(storageClassUpdate) - fixture.expectCreateStorageClassCapabilitiesAction(storageClassCapability) - - // Run test - fixture.run(getKey(storageClass, t)) -} - -func TestCreateStorageClassInTree(t *testing.T) { - // InTree Storage has no snapshot capability - fixture := newFixture(t, true, true) - storageClass := newStorageClass("csi-example", "csi.example.com") - storageClassUpdate := storageClass.DeepCopy() - storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"} - storageClassCapability := newStorageClassCapability(storageClass) - storageClassCapability.Spec.Features.Snapshot.Create = false - provisionerCapability := newProvisionerCapability(storageClass) + + // Objects exist + fixture.storageObjects = append(fixture.storageObjects, storageClass) + fixture.storageClassLister = append(fixture.storageClassLister, storageClass) + fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability) + fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability) + + // Action expected + fixture.expectUpdateStorageClassAction(storageClassUpdate) + fixture.expectCreateStorageClassCapabilitiesAction(storageClassCapability) + + // Run test + fixture.run(getKey(storageClass, t)) +} + +func TestCreateStorageClassNotHaveSnapshotCap(t *testing.T) { + // Storage has no snapshot capability + fixture := newFixture(t, true) + storageClass := newStorageClass("csi-example", "csi.example.com") + storageClassUpdate := storageClass.DeepCopy() + storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"} + storageClassCapability := newStorageClassCapability(storageClass) + storageClassCapability.Spec.Features.Snapshot.Create = false + provisionerCapability := newProvisionerCapability(storageClass) + provisionerCapability.Spec.Features.Snapshot.Create = false // Objects exist fixture.storageObjects = append(fixture.storageObjects, storageClass) diff --git a/pkg/controller/storage/capability/csi_capability.go b/pkg/controller/storage/capability/csi_capability.go deleted file mode 100644 index 65daeb0c1..000000000 --- a/pkg/controller/storage/capability/csi_capability.go +++ /dev/null @@ -1,168 +0,0 @@ -/* - - Copyright 2020 The KubeSphere Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -*/ -package capability - -import ( - "context" - "errors" - "net" - "net/url" - "time" - - "github.com/container-storage-interface/spec/lib/go/csi" - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/keepalive" - - "kubesphere.io/api/storage/v1alpha1" -) - -const ( - dialDuration = time.Second * 5 - requestDuration = time.Second * 10 -) - -func csiCapability(csiAddress string) (*v1alpha1.StorageClassCapabilitySpec, error) { - csiConn, err := connect(csiAddress) - if err != nil { - return nil, err - } - defer func() { _ = csiConn.Close() }() - - ctx, cancel := context.WithTimeout(context.Background(), requestDuration) - defer cancel() - - spec := &v1alpha1.StorageClassCapabilitySpec{} - err = addPluginCapabilities(ctx, csiConn, spec) - if err != nil { - return nil, err - } - err = addControllerCapabilities(ctx, csiConn, spec) - if err != nil { - return nil, err - } - err = addNodeCapabilities(ctx, csiConn, spec) - if err != nil { - return nil, err - } - return spec, nil - -} - -func addPluginCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error { - identityClient := csi.NewIdentityClient(conn) - pluginCapabilitiesResponse, err := identityClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{}) - if err != nil { - return err - } - - for _, capability := range pluginCapabilitiesResponse.GetCapabilities() { - if capability == nil { - continue - } - if capability.GetService().GetType() == csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS { - spec.Features.Topology = true - } - volumeExpansion := capability.GetVolumeExpansion() - if volumeExpansion != nil { - switch volumeExpansion.GetType() { - case csi.PluginCapability_VolumeExpansion_ONLINE: - spec.Features.Volume.Expand = v1alpha1.ExpandModeOnline - case csi.PluginCapability_VolumeExpansion_OFFLINE: - spec.Features.Volume.Expand = v1alpha1.ExpandModeOffline - } - } - } - return nil -} - -func addControllerCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error { - controllerClient := csi.NewControllerClient(conn) - controllerCapabilitiesResponse, err := controllerClient.ControllerGetCapabilities(ctx, &csi.ControllerGetCapabilitiesRequest{}) - if err != nil { - return err - } - for _, capability := range controllerCapabilitiesResponse.GetCapabilities() { - switch capability.GetRpc().GetType() { - case csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME: - spec.Features.Volume.Create = true - case csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME: - spec.Features.Volume.Attach = true - case csi.ControllerServiceCapability_RPC_LIST_VOLUMES: - spec.Features.Volume.List = true - case csi.ControllerServiceCapability_RPC_CLONE_VOLUME: - spec.Features.Volume.Clone = true - case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT: - spec.Features.Snapshot.Create = true - case csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS: - spec.Features.Snapshot.List = true - } - } - return nil -} - -func addNodeCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error { - nodeClient := csi.NewNodeClient(conn) - controllerCapabilitiesResponse, err := nodeClient.NodeGetCapabilities(ctx, &csi.NodeGetCapabilitiesRequest{}) - if err != nil { - return err - } - for _, capability := range controllerCapabilitiesResponse.GetCapabilities() { - switch capability.GetRpc().GetType() { - case csi.NodeServiceCapability_RPC_GET_VOLUME_STATS: - spec.Features.Volume.Stats = true - } - } - return nil -} - -// Connect address by GRPC -func connect(address string) (*grpc.ClientConn, error) { - dialOptions := []grpc.DialOption{ - grpc.WithInsecure(), - } - u, err := url.Parse(address) - if err == nil && (!u.IsAbs() || u.Scheme == "unix") { - dialOptions = append(dialOptions, - grpc.WithDialer( - func(addr string, timeout time.Duration) (net.Conn, error) { - return net.DialTimeout("unix", u.Path, timeout) - })) - } - // This is necessary when connecting via TCP and does not hurt - // when using Unix domain sockets. It ensures that gRPC detects a dead connection - // in a timely manner. - dialOptions = append(dialOptions, - grpc.WithKeepaliveParams(keepalive.ClientParameters{PermitWithoutStream: true})) - - conn, err := grpc.Dial(address, dialOptions...) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithTimeout(context.Background(), dialDuration) - defer cancel() - for { - if !conn.WaitForStateChange(ctx, conn.GetState()) { - return conn, errors.New("connection timed out") - } - if conn.GetState() == connectivity.Ready { - return conn, nil - } - } -} diff --git a/pkg/controller/storage/capability/csi_capability_test.go b/pkg/controller/storage/capability/csi_capability_test.go deleted file mode 100644 index ab795e191..000000000 --- a/pkg/controller/storage/capability/csi_capability_test.go +++ /dev/null @@ -1,190 +0,0 @@ -/* - - Copyright 2020 The KubeSphere Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -*/ -package capability - -import ( - "context" - "fmt" - "net" - "os" - "path/filepath" - "runtime" - "strings" - "testing" - - "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc" - "k8s.io/apimachinery/pkg/util/rand" - "k8s.io/klog" - - "kubesphere.io/api/storage/v1alpha1" -) - -var DefaultControllerRPCType = []csi.ControllerServiceCapability_RPC_Type{ - csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, - csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, - csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, - csi.ControllerServiceCapability_RPC_CLONE_VOLUME, -} - -var DefaultNodeRPCType = []csi.NodeServiceCapability_RPC_Type{ - csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, - csi.NodeServiceCapability_RPC_EXPAND_VOLUME, - csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, -} - -var DefaultPluginCapability = []*csi.PluginCapability{ - { - Type: &csi.PluginCapability_Service_{ - Service: &csi.PluginCapability_Service{ - Type: csi.PluginCapability_Service_CONTROLLER_SERVICE, - }, - }, - }, - { - Type: &csi.PluginCapability_VolumeExpansion_{ - VolumeExpansion: &csi.PluginCapability_VolumeExpansion{ - Type: csi.PluginCapability_VolumeExpansion_OFFLINE, - }, - }, - }, -} - -type fakeCSIServer struct { - csi.UnimplementedIdentityServer - csi.UnimplementedControllerServer - csi.UnimplementedNodeServer - network string - address string - server *grpc.Server -} - -func newTestCSIServer(port int) (csiServer *fakeCSIServer, address string) { - if runtime.GOOS == "windows" { - address = fmt.Sprintf("localhost:%d", +port) - csiServer = newFakeCSIServer("tcp", address) - } else { - address = filepath.Join(os.TempDir(), "csi.sock"+rand.String(4)) - csiServer = newFakeCSIServer("unix", address) - address = "unix://" + address - } - return csiServer, address -} - -func newFakeCSIServer(network, address string) *fakeCSIServer { - return &fakeCSIServer{ - network: network, - address: address, - } -} - -func (f *fakeCSIServer) run() { - listener, err := net.Listen(f.network, f.address) - if err != nil { - klog.Error("fake CSI server listen failed, ", err) - return - } - server := grpc.NewServer() - csi.RegisterIdentityServer(server, f) - csi.RegisterControllerServer(server, f) - csi.RegisterNodeServer(server, f) - go func() { - err = server.Serve(listener) - if err != nil && !strings.Contains(err.Error(), "stopped") { - klog.Error("fake CSI server serve failed, ", err) - } - }() - f.server = server -} - -func (f *fakeCSIServer) stop() { - if f.server != nil { - f.server.Stop() - } -} - -func (*fakeCSIServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { - return &csi.GetPluginCapabilitiesResponse{Capabilities: DefaultPluginCapability}, nil -} - -func (*fakeCSIServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { - var capabilities []*csi.ControllerServiceCapability - for _, rpcType := range DefaultControllerRPCType { - capability := &csi.ControllerServiceCapability{ - Type: &csi.ControllerServiceCapability_Rpc{ - Rpc: &csi.ControllerServiceCapability_RPC{ - Type: rpcType, - }, - }, - } - capabilities = append(capabilities, capability) - } - return &csi.ControllerGetCapabilitiesResponse{Capabilities: capabilities}, nil -} - -func (*fakeCSIServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { - var capabilities []*csi.NodeServiceCapability - for _, rpcType := range DefaultNodeRPCType { - capability := &csi.NodeServiceCapability{ - Type: &csi.NodeServiceCapability_Rpc{ - Rpc: &csi.NodeServiceCapability_RPC{ - Type: rpcType, - }, - }, - } - capabilities = append(capabilities, capability) - } - return &csi.NodeGetCapabilitiesResponse{Capabilities: capabilities}, nil -} - -func Test_CSICapability(t *testing.T) { - fakeCSIServer, address := newTestCSIServer(30087) - fakeCSIServer.run() - defer fakeCSIServer.stop() - - specGot, err := csiCapability(address) - if err != nil { - t.Error(err) - } - - specExpected := newStorageClassCapabilitySpec() - if diff := cmp.Diff(specGot, specExpected); diff != "" { - t.Errorf("%T differ (-got, +want): %s", specExpected, diff) - } -} - -func newStorageClassCapabilitySpec() *v1alpha1.StorageClassCapabilitySpec { - return &v1alpha1.StorageClassCapabilitySpec{ - Features: v1alpha1.CapabilityFeatures{ - Topology: false, - Volume: v1alpha1.VolumeFeature{ - Create: true, - Attach: false, - List: false, - Clone: true, - Stats: true, - Expand: v1alpha1.ExpandModeOffline, - }, - Snapshot: v1alpha1.SnapshotFeature{ - Create: true, - List: false, - }, - }, - } -}