remove storage capability auto detection
Signed-off-by: dkven <dkvvven@gmail.com>
This commit is contained in:
@@ -140,7 +140,6 @@ func addControllers(
|
||||
capability.SnapshotSupported(client.Kubernetes().Discovery()),
|
||||
client.Snapshot().SnapshotV1beta1().VolumeSnapshotClasses(),
|
||||
informerFactory.SnapshotSharedInformerFactory().Snapshot().V1beta1().VolumeSnapshotClasses(),
|
||||
kubernetesInformer.Storage().V1beta1().CSIDrivers(),
|
||||
)
|
||||
|
||||
volumeExpansionController := expansion.NewVolumeExpansionController(
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
apiVersion: storage.kubesphere.io/v1alpha1
|
||||
kind: ProvisionerCapability
|
||||
metadata:
|
||||
name: disk-csi-qingcloud-com
|
||||
spec:
|
||||
pluginInfo:
|
||||
name: disk.csi.qingcloud.com
|
||||
version: ""
|
||||
features:
|
||||
topology: true
|
||||
snapshot:
|
||||
create: true
|
||||
list: false
|
||||
volume:
|
||||
attach: true
|
||||
clone: true
|
||||
create: true
|
||||
expandMode: OFFLINE
|
||||
list: false
|
||||
stats: true
|
||||
@@ -0,0 +1,20 @@
|
||||
apiVersion: storage.kubesphere.io/v1alpha1
|
||||
kind: ProvisionerCapability
|
||||
metadata:
|
||||
name: neonsan-csi-qingstor-com
|
||||
spec:
|
||||
pluginInfo:
|
||||
name: neonsan.csi.qingstor.com
|
||||
version: ""
|
||||
features:
|
||||
topology: false
|
||||
snapshot:
|
||||
create: true
|
||||
list: false
|
||||
volume:
|
||||
attach: true
|
||||
clone: true
|
||||
create: true
|
||||
expandMode: OFFLINE
|
||||
list: false
|
||||
stats: true
|
||||
1
go.mod
1
go.mod
@@ -15,7 +15,6 @@ require (
|
||||
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535
|
||||
github.com/aws/aws-sdk-go v1.33.12
|
||||
github.com/beevik/etree v1.1.0
|
||||
github.com/container-storage-interface/spec v1.2.0
|
||||
github.com/containernetworking/cni v0.8.0
|
||||
github.com/coreos/go-oidc v2.1.0+incompatible
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
|
||||
@@ -35,42 +35,36 @@ import (
|
||||
snapinformers "github.com/kubernetes-csi/external-snapshotter/client/v3/informers/externalversions/volumesnapshot/v1beta1"
|
||||
snapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v3/listers/volumesnapshot/v1beta1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
|
||||
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
storageclient "k8s.io/client-go/kubernetes/typed/storage/v1"
|
||||
storagelistersv1 "k8s.io/client-go/listers/storage/v1"
|
||||
storagelistersv1beta1 "k8s.io/client-go/listers/storage/v1beta1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
|
||||
capability "kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
|
||||
ksstorage "kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
|
||||
crdscheme "kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme"
|
||||
capabilityclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/storage/v1alpha1"
|
||||
capabilityinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/storage/v1alpha1"
|
||||
capabilitylisters "kubesphere.io/kubesphere/pkg/client/listers/storage/v1alpha1"
|
||||
ksstorageclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/storage/v1alpha1"
|
||||
ksstorageinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/storage/v1alpha1"
|
||||
ksstoragelisters "kubesphere.io/kubesphere/pkg/client/listers/storage/v1alpha1"
|
||||
)
|
||||
|
||||
const (
|
||||
minSnapshotSupportedVersion = "v1.17.0"
|
||||
csiAddressFormat = "/var/lib/kubelet/plugins/%s/csi.sock"
|
||||
annotationSupportSnapshot = "storageclass.kubesphere.io/support-snapshot"
|
||||
)
|
||||
|
||||
type csiAddressGetter func(storageClassProvisioner string) string
|
||||
|
||||
type StorageCapabilityController struct {
|
||||
storageClassCapabilityClient capabilityclient.StorageClassCapabilityInterface
|
||||
storageCapabilityLister capabilitylisters.StorageClassCapabilityLister
|
||||
storageClassCapabilityClient ksstorageclient.StorageClassCapabilityInterface
|
||||
storageClassCapabilityLister ksstoragelisters.StorageClassCapabilityLister
|
||||
storageClassCapabilitySynced cache.InformerSynced
|
||||
|
||||
provisionerCapabilityLister capabilitylisters.ProvisionerCapabilityLister
|
||||
provisionerCapabilityLister ksstoragelisters.ProvisionerCapabilityLister
|
||||
provisionerCapabilitySynced cache.InformerSynced
|
||||
|
||||
storageClassClient storageclient.StorageClassInterface
|
||||
@@ -82,42 +76,33 @@ type StorageCapabilityController struct {
|
||||
snapshotClassLister snapshotlisters.VolumeSnapshotClassLister
|
||||
snapshotClassSynced cache.InformerSynced
|
||||
|
||||
csiDriverLister storagelistersv1beta1.CSIDriverLister
|
||||
csiDriverSynced cache.InformerSynced
|
||||
|
||||
csiAddressGetter csiAddressGetter
|
||||
|
||||
workQueue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
// This controller is responsible to watch StorageClass, SnapshotClass.
|
||||
// This controller is responsible to watch StorageClass/ProvisionerCapability.
|
||||
// And then update StorageClassCapability CRD resource object to the newest status.
|
||||
func NewController(
|
||||
capabilityClient capabilityclient.StorageClassCapabilityInterface,
|
||||
capabilityInformer capabilityinformers.Interface,
|
||||
storageClassCapabilityClient ksstorageclient.StorageClassCapabilityInterface,
|
||||
ksStorageInformer ksstorageinformers.Interface,
|
||||
storageClassClient storageclient.StorageClassInterface,
|
||||
storageClassInformer storageinformersv1.StorageClassInformer,
|
||||
snapshotSupported bool,
|
||||
snapshotClassClient snapshotclient.VolumeSnapshotClassInterface,
|
||||
snapshotClassInformer snapinformers.VolumeSnapshotClassInformer,
|
||||
csiDriverInformer storageinformersv1beta1.CSIDriverInformer,
|
||||
) *StorageCapabilityController {
|
||||
|
||||
utilruntime.Must(crdscheme.AddToScheme(scheme.Scheme))
|
||||
|
||||
controller := &StorageCapabilityController{
|
||||
storageClassCapabilityClient: capabilityClient,
|
||||
storageCapabilityLister: capabilityInformer.StorageClassCapabilities().Lister(),
|
||||
storageClassCapabilitySynced: capabilityInformer.StorageClassCapabilities().Informer().HasSynced,
|
||||
provisionerCapabilityLister: capabilityInformer.ProvisionerCapabilities().Lister(),
|
||||
provisionerCapabilitySynced: capabilityInformer.ProvisionerCapabilities().Informer().HasSynced,
|
||||
storageClassCapabilityClient: storageClassCapabilityClient,
|
||||
storageClassCapabilityLister: ksStorageInformer.StorageClassCapabilities().Lister(),
|
||||
storageClassCapabilitySynced: ksStorageInformer.StorageClassCapabilities().Informer().HasSynced,
|
||||
provisionerCapabilityLister: ksStorageInformer.ProvisionerCapabilities().Lister(),
|
||||
provisionerCapabilitySynced: ksStorageInformer.ProvisionerCapabilities().Informer().HasSynced,
|
||||
storageClassClient: storageClassClient,
|
||||
storageClassLister: storageClassInformer.Lister(),
|
||||
storageClassSynced: storageClassInformer.Informer().HasSynced,
|
||||
snapshotSupported: snapshotSupported,
|
||||
csiDriverLister: csiDriverInformer.Lister(),
|
||||
csiDriverSynced: csiDriverInformer.Informer().HasSynced,
|
||||
csiAddressGetter: csiAddress,
|
||||
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "StorageClasses"),
|
||||
}
|
||||
|
||||
@@ -140,12 +125,24 @@ func NewController(
|
||||
DeleteFunc: controller.enqueueStorageClass,
|
||||
})
|
||||
|
||||
csiDriverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: controller.handlerCSIDriver,
|
||||
// ProvisionerCapability acts as a value source of its relevant StorageClassCapabilities
|
||||
// so when a PC is created/updated, the corresponding SCCs should be created(if not exists)/updated
|
||||
// we achive this by simply enqueueing the StorageClasses of the same provisioner
|
||||
// but don't overdo by cascade deleting the SCCs when a PC is deleted
|
||||
// since the role of PCs is more like a template rather than owner to SCCs
|
||||
|
||||
// This is a backward compatible fix to remove the useless auto detection of SCCs
|
||||
// in the future, we will only keep ProvisionerCapability and remove the StorageClassCapability CRD entirely
|
||||
ksStorageInformer.ProvisionerCapabilities().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: controller.handleProvisionerCapability,
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
return
|
||||
newPC := newObj.(*ksstorage.ProvisionerCapability)
|
||||
oldPC := oldObj.(*ksstorage.ProvisionerCapability)
|
||||
if newPC.ResourceVersion == oldPC.ResourceVersion {
|
||||
return
|
||||
}
|
||||
controller.handleProvisionerCapability(newObj)
|
||||
},
|
||||
DeleteFunc: controller.handlerCSIDriver,
|
||||
})
|
||||
|
||||
return controller
|
||||
@@ -165,7 +162,6 @@ func (c *StorageCapabilityController) Run(threadCnt int, stopCh <-chan struct{})
|
||||
c.storageClassCapabilitySynced,
|
||||
c.provisionerCapabilitySynced,
|
||||
c.storageClassSynced,
|
||||
c.csiDriverSynced,
|
||||
}
|
||||
|
||||
if c.snapshotAllowed() {
|
||||
@@ -185,16 +181,16 @@ func (c *StorageCapabilityController) Run(threadCnt int, stopCh <-chan struct{})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *StorageCapabilityController) handlerCSIDriver(obj interface{}) {
|
||||
csiDriver := obj.(*storagev1beta1.CSIDriver)
|
||||
func (c *StorageCapabilityController) handleProvisionerCapability(obj interface{}) {
|
||||
provisionerCapability := obj.(*ksstorage.ProvisionerCapability)
|
||||
storageClasses, err := c.storageClassLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Error("list StorageClass error when handler csiDriver", err)
|
||||
klog.Error("list StorageClass error when handle provisionerCapability", err)
|
||||
return
|
||||
}
|
||||
for _, storageClass := range storageClasses {
|
||||
if storageClass.Provisioner == csiDriver.Name {
|
||||
klog.V(4).Infof("enqueue StorageClass %s when handling csiDriver", storageClass.Name)
|
||||
if getProvisionerCapabilityName(storageClass.Provisioner) == provisionerCapability.Name {
|
||||
klog.V(4).Infof("enqueue StorageClass %s while handling provisionerCapability", storageClass.Name)
|
||||
c.enqueueStorageClass(storageClass)
|
||||
}
|
||||
}
|
||||
@@ -277,14 +273,16 @@ func (c *StorageCapabilityController) syncHandler(key string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// No capability because csi-plugin not installed
|
||||
// The corresponding ProvisionerCapability Object does not exist
|
||||
if capabilitySpec == nil {
|
||||
klog.Infof("StorageClass %s has no capability", name)
|
||||
klog.Infof("Can't get StorageClass %s's capability", name)
|
||||
err = c.updateStorageClassSnapshotSupported(storageClass, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.deleteStorageCapability(name)
|
||||
// Don't delete the already created SCC
|
||||
// as it might be created manually by user
|
||||
return nil
|
||||
}
|
||||
klog.Infof("StorageClass %s has capability %v", name, capabilitySpec)
|
||||
|
||||
@@ -315,11 +313,11 @@ func (c *StorageCapabilityController) syncHandler(key string) error {
|
||||
}
|
||||
|
||||
// Handle StorageClassCapability with the same name of StorageClass
|
||||
storageClassCapabilityExist, err := c.storageCapabilityLister.Get(storageClass.Name)
|
||||
storageClassCapabilityExist, err := c.storageClassCapabilityLister.Get(storageClass.Name)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
// If StorageClassCapability doesn't exist, create it
|
||||
storageClassCapabilityCreate := &capability.StorageClassCapability{ObjectMeta: metav1.ObjectMeta{Name: storageClass.Name}}
|
||||
storageClassCapabilityCreate := &ksstorage.StorageClassCapability{ObjectMeta: metav1.ObjectMeta{Name: storageClass.Name}}
|
||||
storageClassCapabilityCreate.Spec = *capabilitySpec
|
||||
klog.Info("Create StorageClassCapability: ", storageClassCapabilityCreate)
|
||||
_, err = c.storageClassCapabilityClient.Create(context.Background(), storageClassCapabilityCreate, metav1.CreateOptions{})
|
||||
@@ -355,7 +353,7 @@ func (c *StorageCapabilityController) updateStorageClassSnapshotSupported(storag
|
||||
}
|
||||
|
||||
func (c *StorageCapabilityController) deleteStorageCapability(name string) error {
|
||||
_, err := c.storageCapabilityLister.Get(name)
|
||||
_, err := c.storageClassCapabilityLister.Get(name)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
return nil
|
||||
@@ -381,7 +379,7 @@ func (c *StorageCapabilityController) deleteSnapshotClass(name string) error {
|
||||
return c.snapshotClassClient.Delete(context.Background(), name, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
func (c *StorageCapabilityController) capabilityFromProvisioner(provisioner string) (*capability.StorageClassCapabilitySpec, error) {
|
||||
func (c *StorageCapabilityController) capabilityFromProvisioner(provisioner string) (*ksstorage.StorageClassCapabilitySpec, error) {
|
||||
provisionerCapability, err := c.provisionerCapabilityLister.Get(getProvisionerCapabilityName(provisioner))
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
@@ -390,13 +388,13 @@ func (c *StorageCapabilityController) capabilityFromProvisioner(provisioner stri
|
||||
return nil, err
|
||||
}
|
||||
klog.V(4).Infof("get provisioner capability:%s %s", provisioner, provisionerCapability.Name)
|
||||
capabilitySpec := &capability.StorageClassCapabilitySpec{
|
||||
capabilitySpec := &ksstorage.StorageClassCapabilitySpec{
|
||||
Features: provisionerCapability.Spec.Features,
|
||||
}
|
||||
return capabilitySpec, nil
|
||||
}
|
||||
|
||||
func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.StorageClass) (*capability.StorageClassCapabilitySpec, error) {
|
||||
func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.StorageClass) (*ksstorage.StorageClassCapabilitySpec, error) {
|
||||
// get from provisioner capability first
|
||||
klog.V(4).Info("get cap ", storageClass.Provisioner)
|
||||
capabilitySpec, err := c.capabilityFromProvisioner(storageClass.Provisioner)
|
||||
@@ -404,24 +402,10 @@ func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// csi of storage capability
|
||||
if capabilitySpec == nil {
|
||||
isCsi, err := c.isCSIStorage(storageClass.Provisioner)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if isCsi {
|
||||
capabilitySpec, err = csiCapability(c.csiAddressGetter(storageClass.Provisioner))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if capabilitySpec != nil {
|
||||
capabilitySpec.Provisioner = storageClass.Provisioner
|
||||
if storageClass.AllowVolumeExpansion == nil || !*storageClass.AllowVolumeExpansion {
|
||||
capabilitySpec.Features.Volume.Expand = capability.ExpandModeUnknown
|
||||
capabilitySpec.Features.Volume.Expand = ksstorage.ExpandModeUnknown
|
||||
}
|
||||
if !c.snapshotSupported {
|
||||
capabilitySpec.Features.Snapshot.Create = false
|
||||
@@ -431,22 +415,6 @@ func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.
|
||||
return capabilitySpec, nil
|
||||
}
|
||||
|
||||
func (c *StorageCapabilityController) isCSIStorage(provisioner string) (bool, error) {
|
||||
_, err := c.csiDriverLister.Get(provisioner)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// this is used for test of CSIDriver on windows
|
||||
func (c *StorageCapabilityController) setCSIAddressGetter(getter csiAddressGetter) {
|
||||
c.csiAddressGetter = getter
|
||||
}
|
||||
|
||||
func (c *StorageCapabilityController) snapshotAllowed() bool {
|
||||
return c.snapshotSupported && c.snapshotClassClient != nil && c.snapshotClassLister != nil && c.snapshotClassSynced != nil
|
||||
}
|
||||
@@ -464,10 +432,6 @@ func SnapshotSupported(discoveryInterface discovery.DiscoveryInterface) bool {
|
||||
return ver.AtLeast(minVer)
|
||||
}
|
||||
|
||||
func csiAddress(provisioner string) string {
|
||||
return fmt.Sprintf(csiAddressFormat, provisioner)
|
||||
}
|
||||
|
||||
func getProvisionerCapabilityName(provisioner string) string {
|
||||
return strings.NewReplacer(".", "-", "/", "-").Replace(provisioner)
|
||||
}
|
||||
|
||||
@@ -19,11 +19,8 @@
|
||||
package capability
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
//"github.com/google/go-cmp/cmp"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -32,7 +29,6 @@ import (
|
||||
snapfake "github.com/kubernetes-csi/external-snapshotter/client/v3/clientset/versioned/fake"
|
||||
snapinformers "github.com/kubernetes-csi/external-snapshotter/client/v3/informers/externalversions"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@@ -59,7 +55,7 @@ type fixture struct {
|
||||
snapshotClassClient *snapfake.Clientset
|
||||
ksClient *ksfake.Clientset
|
||||
// Objects from here preload into NewSimpleFake.
|
||||
storageObjects []runtime.Object // include StorageClass and CSIDriver
|
||||
storageObjects []runtime.Object // include StorageClass
|
||||
snapshotClassObjects []runtime.Object
|
||||
capabilityObjects []runtime.Object // include StorageClassCapability and ProvisionerCapability
|
||||
// Objects to put in the store.
|
||||
@@ -67,19 +63,14 @@ type fixture struct {
|
||||
snapshotClassLister []*snapbeta1.VolumeSnapshotClass
|
||||
storageClassCapabilityLister []*ksv1alpha1.StorageClassCapability
|
||||
provisionerCapabilityLister []*ksv1alpha1.ProvisionerCapability
|
||||
csiDriverLister []*storagev1beta1.CSIDriver
|
||||
// Actions expected to happen on the client.
|
||||
actions []core.Action
|
||||
// CSI server
|
||||
runCSIServer bool
|
||||
fakeCSIServer *fakeCSIServer
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T, snapshotSupported bool, runCSIServer bool) *fixture {
|
||||
func newFixture(t *testing.T, snapshotSupported bool) *fixture {
|
||||
return &fixture{
|
||||
t: t,
|
||||
snapshotSupported: snapshotSupported,
|
||||
runCSIServer: runCSIServer,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,21 +95,11 @@ func (f *fixture) newController() (*StorageCapabilityController,
|
||||
f.snapshotSupported,
|
||||
f.snapshotClassClient.SnapshotV1beta1().VolumeSnapshotClasses(),
|
||||
snapshotInformers.Snapshot().V1beta1().VolumeSnapshotClasses(),
|
||||
k8sInformers.Storage().V1beta1().CSIDrivers(),
|
||||
)
|
||||
if f.runCSIServer {
|
||||
port := 30000 + rand.Intn(100)
|
||||
fakeCSIServer, address := newTestCSIServer(port)
|
||||
f.fakeCSIServer = fakeCSIServer
|
||||
c.setCSIAddressGetter(func(storageClassProvisioner string) string { return address })
|
||||
}
|
||||
|
||||
for _, storageClass := range f.storageClassLister {
|
||||
_ = k8sInformers.Storage().V1().StorageClasses().Informer().GetIndexer().Add(storageClass)
|
||||
}
|
||||
for _, csiDriver := range f.csiDriverLister {
|
||||
_ = k8sInformers.Storage().V1beta1().CSIDrivers().Informer().GetIndexer().Add(csiDriver)
|
||||
}
|
||||
for _, snapshotClass := range f.snapshotClassLister {
|
||||
_ = snapshotInformers.Snapshot().V1beta1().VolumeSnapshotClasses().Informer().GetIndexer().Add(snapshotClass)
|
||||
}
|
||||
@@ -135,11 +116,6 @@ func (f *fixture) newController() (*StorageCapabilityController,
|
||||
func (f *fixture) runController(scName string, startInformers bool, expectError bool) {
|
||||
c, k8sI, crdI, snapI := f.newController()
|
||||
|
||||
if f.runCSIServer {
|
||||
f.fakeCSIServer.run()
|
||||
defer f.fakeCSIServer.stop()
|
||||
}
|
||||
|
||||
if startInformers {
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
@@ -276,6 +252,26 @@ func newStorageClass(name string, provisioner string) *storagev1.StorageClass {
|
||||
}
|
||||
}
|
||||
|
||||
func newStorageClassCapabilitySpec() *ksv1alpha1.StorageClassCapabilitySpec {
|
||||
return &ksv1alpha1.StorageClassCapabilitySpec{
|
||||
Features: ksv1alpha1.CapabilityFeatures{
|
||||
Topology: false,
|
||||
Volume: ksv1alpha1.VolumeFeature{
|
||||
Create: true,
|
||||
Attach: false,
|
||||
List: false,
|
||||
Clone: true,
|
||||
Stats: true,
|
||||
Expand: ksv1alpha1.ExpandModeOffline,
|
||||
},
|
||||
Snapshot: ksv1alpha1.SnapshotFeature{
|
||||
Create: true,
|
||||
List: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newStorageClassCapability(storageClass *storagev1.StorageClass) *ksv1alpha1.StorageClassCapability {
|
||||
storageClassCapability := &ksv1alpha1.StorageClassCapability{}
|
||||
storageClassCapability.Name = storageClass.Name
|
||||
@@ -289,17 +285,9 @@ func newProvisionerCapability(storageClass *storagev1.StorageClass) *ksv1alpha1.
|
||||
provisionerCapability.Name = getProvisionerCapabilityName(storageClass.Provisioner)
|
||||
provisionerCapability.Spec.PluginInfo.Name = storageClass.Provisioner
|
||||
provisionerCapability.Spec.Features = newStorageClassCapabilitySpec().Features
|
||||
// ProvisionerCapability snapshot is always false
|
||||
provisionerCapability.Spec.Features.Snapshot.Create = false
|
||||
return provisionerCapability
|
||||
}
|
||||
|
||||
func newCSIDriver(storageClass *storagev1.StorageClass) *storagev1beta1.CSIDriver {
|
||||
csiDriver := &storagev1beta1.CSIDriver{}
|
||||
csiDriver.Name = storageClass.Provisioner
|
||||
return csiDriver
|
||||
}
|
||||
|
||||
func newSnapshotClass(storageClass *storagev1.StorageClass) *snapbeta1.VolumeSnapshotClass {
|
||||
return &snapbeta1.VolumeSnapshotClass{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
@@ -320,18 +308,19 @@ func getKey(sc *storagev1.StorageClass, t *testing.T) string {
|
||||
}
|
||||
|
||||
func TestCreateStorageClass(t *testing.T) {
|
||||
fixture := newFixture(t, true, true)
|
||||
fixture := newFixture(t, true)
|
||||
storageClass := newStorageClass("csi-example", "csi.example.com")
|
||||
storageClassUpdate := storageClass.DeepCopy()
|
||||
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "true"}
|
||||
provisionerCapability := newProvisionerCapability(storageClass)
|
||||
snapshotClass := newSnapshotClass(storageClass)
|
||||
storageClassCapability := newStorageClassCapability(storageClass)
|
||||
csiDriver := newCSIDriver(storageClass)
|
||||
|
||||
// Objects exist
|
||||
fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver)
|
||||
fixture.storageObjects = append(fixture.storageObjects, storageClass)
|
||||
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
|
||||
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
|
||||
fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability)
|
||||
fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability)
|
||||
|
||||
// Action expected
|
||||
fixture.expectCreateSnapshotClassAction(snapshotClass)
|
||||
@@ -342,28 +331,69 @@ func TestCreateStorageClass(t *testing.T) {
|
||||
fixture.run(getKey(storageClass, t))
|
||||
}
|
||||
|
||||
func TestCreateStorageClassWithoutProvisionerCapability(t *testing.T) {
|
||||
fixture := newFixture(t, true)
|
||||
storageClass := newStorageClass("csi-example", "csi.example.com")
|
||||
|
||||
// Objects exist
|
||||
fixture.storageObjects = append(fixture.storageObjects, storageClass)
|
||||
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
|
||||
|
||||
storageClassUpdate := storageClass.DeepCopy()
|
||||
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"}
|
||||
fixture.expectUpdateStorageClassAction(storageClassUpdate)
|
||||
|
||||
// Run test
|
||||
fixture.run(getKey(storageClass, t))
|
||||
}
|
||||
|
||||
func TestUpdateStorageClass(t *testing.T) {
|
||||
storageClass := newStorageClass("csi-example", "csi.example.com")
|
||||
storageClass.Annotations = map[string]string{annotationSupportSnapshot: "true"}
|
||||
snapshotClass := newSnapshotClass(storageClass)
|
||||
storageClassCapabilityUpdate := newStorageClassCapability(storageClass)
|
||||
storageClassCapability := newStorageClassCapability(storageClass)
|
||||
provisionerCapability := newProvisionerCapability(storageClass)
|
||||
//old and new should have deference
|
||||
storageClassCapability.Spec.Features.Volume.Create = !storageClassCapability.Spec.Features.Volume.Create
|
||||
csiDriver := newCSIDriver(storageClass)
|
||||
|
||||
fixture := newFixture(t, true, true)
|
||||
fixture := newFixture(t, true)
|
||||
// Object exist
|
||||
fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver)
|
||||
fixture.storageObjects = append(fixture.storageObjects, storageClass)
|
||||
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
|
||||
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
|
||||
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
|
||||
fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability, provisionerCapability)
|
||||
fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability)
|
||||
fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability)
|
||||
|
||||
// Action expected
|
||||
fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapabilityUpdate)
|
||||
|
||||
// Run test
|
||||
fixture.run(getKey(storageClass, t))
|
||||
}
|
||||
|
||||
func TestUpdateStorageClassWithoutProvisionerCapability(t *testing.T) {
|
||||
storageClass := newStorageClass("csi-example", "csi.example.com")
|
||||
storageClass.Annotations = map[string]string{annotationSupportSnapshot: "true"}
|
||||
storageClassUpdate := storageClass.DeepCopy()
|
||||
storageClassUpdate.Annotations[annotationSupportSnapshot] = "false"
|
||||
snapshotClass := newSnapshotClass(storageClass)
|
||||
storageClassCapability := newStorageClassCapability(storageClass)
|
||||
//old and new should have deference
|
||||
storageClassCapability.Spec.Features.Volume.Create = !storageClassCapability.Spec.Features.Volume.Create
|
||||
|
||||
fixture := newFixture(t, true)
|
||||
// Object exist
|
||||
fixture.storageObjects = append(fixture.storageObjects, storageClass)
|
||||
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
|
||||
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
|
||||
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
|
||||
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
|
||||
fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability)
|
||||
fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability)
|
||||
|
||||
// Action expected
|
||||
fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapabilityUpdate)
|
||||
fixture.expectUpdateStorageClassAction(storageClassUpdate)
|
||||
|
||||
// Run test
|
||||
fixture.run(getKey(storageClass, t))
|
||||
@@ -374,12 +404,9 @@ func TestDeleteStorageClass(t *testing.T) {
|
||||
snapshotClass := newSnapshotClass(storageClass)
|
||||
storageClassCapability := newStorageClassCapability(storageClass)
|
||||
|
||||
csiDriver := newCSIDriver(storageClass)
|
||||
|
||||
fixture := newFixture(t, true, true)
|
||||
fixture := newFixture(t, true)
|
||||
// Object exist
|
||||
fixture.storageObjects = append(fixture.storageObjects, csiDriver)
|
||||
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
|
||||
fixture.storageObjects = append(fixture.storageObjects, storageClass)
|
||||
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
|
||||
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
|
||||
fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability)
|
||||
@@ -394,7 +421,8 @@ func TestDeleteStorageClass(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCreateStorageClassNotSupportSnapshot(t *testing.T) {
|
||||
fixture := newFixture(t, false, true)
|
||||
// K8S version < 1.17.0
|
||||
fixture := newFixture(t, false)
|
||||
storageClass := newStorageClass("csi-example", "csi.example.com")
|
||||
storageClassUpdate := storageClass.DeepCopy()
|
||||
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"}
|
||||
@@ -402,32 +430,31 @@ func TestCreateStorageClassNotSupportSnapshot(t *testing.T) {
|
||||
storageClassCapability.Spec.Features.Snapshot.Create = false
|
||||
storageClassCapability.Spec.Features.Snapshot.List = false
|
||||
provisionerCapability := newProvisionerCapability(storageClass)
|
||||
csiDriver := newCSIDriver(storageClass)
|
||||
|
||||
// Objects exist
|
||||
fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver)
|
||||
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
|
||||
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
|
||||
fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability)
|
||||
fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability)
|
||||
|
||||
// Action expected
|
||||
fixture.expectUpdateStorageClassAction(storageClassUpdate)
|
||||
fixture.expectCreateStorageClassCapabilitiesAction(storageClassCapability)
|
||||
|
||||
// Run test
|
||||
fixture.run(getKey(storageClass, t))
|
||||
}
|
||||
|
||||
func TestCreateStorageClassInTree(t *testing.T) {
|
||||
// InTree Storage has no snapshot capability
|
||||
fixture := newFixture(t, true, true)
|
||||
storageClass := newStorageClass("csi-example", "csi.example.com")
|
||||
storageClassUpdate := storageClass.DeepCopy()
|
||||
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"}
|
||||
storageClassCapability := newStorageClassCapability(storageClass)
|
||||
storageClassCapability.Spec.Features.Snapshot.Create = false
|
||||
provisionerCapability := newProvisionerCapability(storageClass)
|
||||
|
||||
// Objects exist
|
||||
fixture.storageObjects = append(fixture.storageObjects, storageClass)
|
||||
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
|
||||
fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability)
|
||||
fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability)
|
||||
|
||||
// Action expected
|
||||
fixture.expectUpdateStorageClassAction(storageClassUpdate)
|
||||
fixture.expectCreateStorageClassCapabilitiesAction(storageClassCapability)
|
||||
|
||||
// Run test
|
||||
fixture.run(getKey(storageClass, t))
|
||||
}
|
||||
|
||||
func TestCreateStorageClassNotHaveSnapshotCap(t *testing.T) {
|
||||
// Storage has no snapshot capability
|
||||
fixture := newFixture(t, true)
|
||||
storageClass := newStorageClass("csi-example", "csi.example.com")
|
||||
storageClassUpdate := storageClass.DeepCopy()
|
||||
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"}
|
||||
storageClassCapability := newStorageClassCapability(storageClass)
|
||||
storageClassCapability.Spec.Features.Snapshot.Create = false
|
||||
provisionerCapability := newProvisionerCapability(storageClass)
|
||||
provisionerCapability.Spec.Features.Snapshot.Create = false
|
||||
|
||||
// Objects exist
|
||||
fixture.storageObjects = append(fixture.storageObjects, storageClass)
|
||||
|
||||
@@ -1,168 +0,0 @@
|
||||
/*
|
||||
|
||||
Copyright 2020 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
*/
|
||||
package capability
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
|
||||
)
|
||||
|
||||
const (
|
||||
dialDuration = time.Second * 5
|
||||
requestDuration = time.Second * 10
|
||||
)
|
||||
|
||||
func csiCapability(csiAddress string) (*v1alpha1.StorageClassCapabilitySpec, error) {
|
||||
csiConn, err := connect(csiAddress)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { _ = csiConn.Close() }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestDuration)
|
||||
defer cancel()
|
||||
|
||||
spec := &v1alpha1.StorageClassCapabilitySpec{}
|
||||
err = addPluginCapabilities(ctx, csiConn, spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = addControllerCapabilities(ctx, csiConn, spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = addNodeCapabilities(ctx, csiConn, spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return spec, nil
|
||||
|
||||
}
|
||||
|
||||
func addPluginCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error {
|
||||
identityClient := csi.NewIdentityClient(conn)
|
||||
pluginCapabilitiesResponse, err := identityClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, capability := range pluginCapabilitiesResponse.GetCapabilities() {
|
||||
if capability == nil {
|
||||
continue
|
||||
}
|
||||
if capability.GetService().GetType() == csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS {
|
||||
spec.Features.Topology = true
|
||||
}
|
||||
volumeExpansion := capability.GetVolumeExpansion()
|
||||
if volumeExpansion != nil {
|
||||
switch volumeExpansion.GetType() {
|
||||
case csi.PluginCapability_VolumeExpansion_ONLINE:
|
||||
spec.Features.Volume.Expand = v1alpha1.ExpandModeOnline
|
||||
case csi.PluginCapability_VolumeExpansion_OFFLINE:
|
||||
spec.Features.Volume.Expand = v1alpha1.ExpandModeOffline
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func addControllerCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error {
|
||||
controllerClient := csi.NewControllerClient(conn)
|
||||
controllerCapabilitiesResponse, err := controllerClient.ControllerGetCapabilities(ctx, &csi.ControllerGetCapabilitiesRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, capability := range controllerCapabilitiesResponse.GetCapabilities() {
|
||||
switch capability.GetRpc().GetType() {
|
||||
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME:
|
||||
spec.Features.Volume.Create = true
|
||||
case csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME:
|
||||
spec.Features.Volume.Attach = true
|
||||
case csi.ControllerServiceCapability_RPC_LIST_VOLUMES:
|
||||
spec.Features.Volume.List = true
|
||||
case csi.ControllerServiceCapability_RPC_CLONE_VOLUME:
|
||||
spec.Features.Volume.Clone = true
|
||||
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT:
|
||||
spec.Features.Snapshot.Create = true
|
||||
case csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS:
|
||||
spec.Features.Snapshot.List = true
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func addNodeCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error {
|
||||
nodeClient := csi.NewNodeClient(conn)
|
||||
controllerCapabilitiesResponse, err := nodeClient.NodeGetCapabilities(ctx, &csi.NodeGetCapabilitiesRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, capability := range controllerCapabilitiesResponse.GetCapabilities() {
|
||||
switch capability.GetRpc().GetType() {
|
||||
case csi.NodeServiceCapability_RPC_GET_VOLUME_STATS:
|
||||
spec.Features.Volume.Stats = true
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connect address by GRPC
|
||||
func connect(address string) (*grpc.ClientConn, error) {
|
||||
dialOptions := []grpc.DialOption{
|
||||
grpc.WithInsecure(),
|
||||
}
|
||||
u, err := url.Parse(address)
|
||||
if err == nil && (!u.IsAbs() || u.Scheme == "unix") {
|
||||
dialOptions = append(dialOptions,
|
||||
grpc.WithDialer(
|
||||
func(addr string, timeout time.Duration) (net.Conn, error) {
|
||||
return net.DialTimeout("unix", u.Path, timeout)
|
||||
}))
|
||||
}
|
||||
// This is necessary when connecting via TCP and does not hurt
|
||||
// when using Unix domain sockets. It ensures that gRPC detects a dead connection
|
||||
// in a timely manner.
|
||||
dialOptions = append(dialOptions,
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{PermitWithoutStream: true}))
|
||||
|
||||
conn, err := grpc.Dial(address, dialOptions...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), dialDuration)
|
||||
defer cancel()
|
||||
for {
|
||||
if !conn.WaitForStateChange(ctx, conn.GetState()) {
|
||||
return conn, errors.New("connection timed out")
|
||||
}
|
||||
if conn.GetState() == connectivity.Ready {
|
||||
return conn, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,190 +0,0 @@
|
||||
/*
|
||||
|
||||
Copyright 2020 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
*/
|
||||
package capability
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/klog"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
|
||||
)
|
||||
|
||||
var DefaultControllerRPCType = []csi.ControllerServiceCapability_RPC_Type{
|
||||
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
|
||||
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
|
||||
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
|
||||
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
|
||||
}
|
||||
|
||||
var DefaultNodeRPCType = []csi.NodeServiceCapability_RPC_Type{
|
||||
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
|
||||
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
|
||||
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
|
||||
}
|
||||
|
||||
var DefaultPluginCapability = []*csi.PluginCapability{
|
||||
{
|
||||
Type: &csi.PluginCapability_Service_{
|
||||
Service: &csi.PluginCapability_Service{
|
||||
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: &csi.PluginCapability_VolumeExpansion_{
|
||||
VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
|
||||
Type: csi.PluginCapability_VolumeExpansion_OFFLINE,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
type fakeCSIServer struct {
|
||||
csi.UnimplementedIdentityServer
|
||||
csi.UnimplementedControllerServer
|
||||
csi.UnimplementedNodeServer
|
||||
network string
|
||||
address string
|
||||
server *grpc.Server
|
||||
}
|
||||
|
||||
func newTestCSIServer(port int) (csiServer *fakeCSIServer, address string) {
|
||||
if runtime.GOOS == "windows" {
|
||||
address = fmt.Sprintf("localhost:%d", +port)
|
||||
csiServer = newFakeCSIServer("tcp", address)
|
||||
} else {
|
||||
address = filepath.Join(os.TempDir(), "csi.sock"+rand.String(4))
|
||||
csiServer = newFakeCSIServer("unix", address)
|
||||
address = "unix://" + address
|
||||
}
|
||||
return csiServer, address
|
||||
}
|
||||
|
||||
func newFakeCSIServer(network, address string) *fakeCSIServer {
|
||||
return &fakeCSIServer{
|
||||
network: network,
|
||||
address: address,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeCSIServer) run() {
|
||||
listener, err := net.Listen(f.network, f.address)
|
||||
if err != nil {
|
||||
klog.Error("fake CSI server listen failed, ", err)
|
||||
return
|
||||
}
|
||||
server := grpc.NewServer()
|
||||
csi.RegisterIdentityServer(server, f)
|
||||
csi.RegisterControllerServer(server, f)
|
||||
csi.RegisterNodeServer(server, f)
|
||||
go func() {
|
||||
err = server.Serve(listener)
|
||||
if err != nil && !strings.Contains(err.Error(), "stopped") {
|
||||
klog.Error("fake CSI server serve failed, ", err)
|
||||
}
|
||||
}()
|
||||
f.server = server
|
||||
}
|
||||
|
||||
func (f *fakeCSIServer) stop() {
|
||||
if f.server != nil {
|
||||
f.server.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (*fakeCSIServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
|
||||
return &csi.GetPluginCapabilitiesResponse{Capabilities: DefaultPluginCapability}, nil
|
||||
}
|
||||
|
||||
func (*fakeCSIServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
|
||||
var capabilities []*csi.ControllerServiceCapability
|
||||
for _, rpcType := range DefaultControllerRPCType {
|
||||
capability := &csi.ControllerServiceCapability{
|
||||
Type: &csi.ControllerServiceCapability_Rpc{
|
||||
Rpc: &csi.ControllerServiceCapability_RPC{
|
||||
Type: rpcType,
|
||||
},
|
||||
},
|
||||
}
|
||||
capabilities = append(capabilities, capability)
|
||||
}
|
||||
return &csi.ControllerGetCapabilitiesResponse{Capabilities: capabilities}, nil
|
||||
}
|
||||
|
||||
func (*fakeCSIServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
|
||||
var capabilities []*csi.NodeServiceCapability
|
||||
for _, rpcType := range DefaultNodeRPCType {
|
||||
capability := &csi.NodeServiceCapability{
|
||||
Type: &csi.NodeServiceCapability_Rpc{
|
||||
Rpc: &csi.NodeServiceCapability_RPC{
|
||||
Type: rpcType,
|
||||
},
|
||||
},
|
||||
}
|
||||
capabilities = append(capabilities, capability)
|
||||
}
|
||||
return &csi.NodeGetCapabilitiesResponse{Capabilities: capabilities}, nil
|
||||
}
|
||||
|
||||
func Test_CSICapability(t *testing.T) {
|
||||
fakeCSIServer, address := newTestCSIServer(30087)
|
||||
fakeCSIServer.run()
|
||||
defer fakeCSIServer.stop()
|
||||
|
||||
specGot, err := csiCapability(address)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
specExpected := newStorageClassCapabilitySpec()
|
||||
if diff := cmp.Diff(specGot, specExpected); diff != "" {
|
||||
t.Errorf("%T differ (-got, +want): %s", specExpected, diff)
|
||||
}
|
||||
}
|
||||
|
||||
func newStorageClassCapabilitySpec() *v1alpha1.StorageClassCapabilitySpec {
|
||||
return &v1alpha1.StorageClassCapabilitySpec{
|
||||
Features: v1alpha1.CapabilityFeatures{
|
||||
Topology: false,
|
||||
Volume: v1alpha1.VolumeFeature{
|
||||
Create: true,
|
||||
Attach: false,
|
||||
List: false,
|
||||
Clone: true,
|
||||
Stats: true,
|
||||
Expand: v1alpha1.ExpandModeOffline,
|
||||
},
|
||||
Snapshot: v1alpha1.SnapshotFeature{
|
||||
Create: true,
|
||||
List: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user