remove storage capability auto detection

Signed-off-by: dkven <dkvvven@gmail.com>
This commit is contained in:
dkven
2021-06-09 00:10:47 +08:00
parent 2cc897534d
commit eadf8cc4c8
9 changed files with 191 additions and 526 deletions

View File

@@ -141,7 +141,6 @@ func addControllers(
capability.SnapshotSupported(client.Kubernetes().Discovery()),
client.Snapshot().SnapshotV1beta1().VolumeSnapshotClasses(),
informerFactory.SnapshotSharedInformerFactory().Snapshot().V1beta1().VolumeSnapshotClasses(),
kubernetesInformer.Storage().V1beta1().CSIDrivers(),
)
volumeExpansionController := expansion.NewVolumeExpansionController(

View File

@@ -0,0 +1,20 @@
apiVersion: storage.kubesphere.io/v1alpha1
kind: ProvisionerCapability
metadata:
name: disk-csi-qingcloud-com
spec:
pluginInfo:
name: disk.csi.qingcloud.com
version: ""
features:
topology: true
snapshot:
create: true
list: false
volume:
attach: true
clone: true
create: true
expandMode: OFFLINE
list: false
stats: true

View File

@@ -0,0 +1,20 @@
apiVersion: storage.kubesphere.io/v1alpha1
kind: ProvisionerCapability
metadata:
name: neonsan-csi-qingstor-com
spec:
pluginInfo:
name: neonsan.csi.qingstor.com
version: ""
features:
topology: false
snapshot:
create: true
list: false
volume:
attach: true
clone: true
create: true
expandMode: OFFLINE
list: false
stats: true

View File

@@ -50,8 +50,6 @@ spec:
name: kubesphere-config
- mountPath: /tmp/k8s-webhook-server/serving-certs
name: webhook-secret
- mountPath: /var/lib/kubelet/plugins/
name: kubelet-plugin
- mountPath: /etc/localtime
name: host-time
dnsPolicy: ClusterFirst
@@ -68,10 +66,6 @@ spec:
secret:
defaultMode: 420
secretName: ks-controller-manager-webhook-cert
- name: kubelet-plugin
hostPath:
path: /var/lib/kubelet/plugins/
type: DirectoryOrCreate
- hostPath:
path: /etc/localtime
type: ""

1
go.mod
View File

@@ -15,7 +15,6 @@ require (
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535
github.com/aws/aws-sdk-go v1.33.12
github.com/beevik/etree v1.1.0
github.com/container-storage-interface/spec v1.2.0
github.com/containernetworking/cni v0.8.0
github.com/coreos/go-oidc v2.1.0+incompatible
github.com/davecgh/go-spew v1.1.1

View File

@@ -35,43 +35,37 @@ import (
snapinformers "github.com/kubernetes-csi/external-snapshotter/client/v3/informers/externalversions/volumesnapshot/v1beta1"
snapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v3/listers/volumesnapshot/v1beta1"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
"k8s.io/client-go/kubernetes/scheme"
storageclient "k8s.io/client-go/kubernetes/typed/storage/v1"
storagelistersv1 "k8s.io/client-go/listers/storage/v1"
storagelistersv1beta1 "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
capability "kubesphere.io/api/storage/v1alpha1"
ksstorage "kubesphere.io/api/storage/v1alpha1"
crdscheme "kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme"
capabilityclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/storage/v1alpha1"
capabilityinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/storage/v1alpha1"
capabilitylisters "kubesphere.io/kubesphere/pkg/client/listers/storage/v1alpha1"
ksstorageclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/storage/v1alpha1"
ksstorageinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/storage/v1alpha1"
ksstoragelisters "kubesphere.io/kubesphere/pkg/client/listers/storage/v1alpha1"
)
const (
minSnapshotSupportedVersion = "v1.17.0"
csiAddressFormat = "/var/lib/kubelet/plugins/%s/csi.sock"
annotationSupportSnapshot = "storageclass.kubesphere.io/support-snapshot"
)
type csiAddressGetter func(storageClassProvisioner string) string
type StorageCapabilityController struct {
storageClassCapabilityClient capabilityclient.StorageClassCapabilityInterface
storageCapabilityLister capabilitylisters.StorageClassCapabilityLister
storageClassCapabilityClient ksstorageclient.StorageClassCapabilityInterface
storageClassCapabilityLister ksstoragelisters.StorageClassCapabilityLister
storageClassCapabilitySynced cache.InformerSynced
provisionerCapabilityLister capabilitylisters.ProvisionerCapabilityLister
provisionerCapabilityLister ksstoragelisters.ProvisionerCapabilityLister
provisionerCapabilitySynced cache.InformerSynced
storageClassClient storageclient.StorageClassInterface
@@ -83,42 +77,33 @@ type StorageCapabilityController struct {
snapshotClassLister snapshotlisters.VolumeSnapshotClassLister
snapshotClassSynced cache.InformerSynced
csiDriverLister storagelistersv1beta1.CSIDriverLister
csiDriverSynced cache.InformerSynced
csiAddressGetter csiAddressGetter
workQueue workqueue.RateLimitingInterface
}
// This controller is responsible to watch StorageClass, SnapshotClass.
// This controller is responsible to watch StorageClass/ProvisionerCapability.
// And then update StorageClassCapability CRD resource object to the newest status.
func NewController(
capabilityClient capabilityclient.StorageClassCapabilityInterface,
capabilityInformer capabilityinformers.Interface,
storageClassCapabilityClient ksstorageclient.StorageClassCapabilityInterface,
ksStorageInformer ksstorageinformers.Interface,
storageClassClient storageclient.StorageClassInterface,
storageClassInformer storageinformersv1.StorageClassInformer,
snapshotSupported bool,
snapshotClassClient snapshotclient.VolumeSnapshotClassInterface,
snapshotClassInformer snapinformers.VolumeSnapshotClassInformer,
csiDriverInformer storageinformersv1beta1.CSIDriverInformer,
) *StorageCapabilityController {
utilruntime.Must(crdscheme.AddToScheme(scheme.Scheme))
controller := &StorageCapabilityController{
storageClassCapabilityClient: capabilityClient,
storageCapabilityLister: capabilityInformer.StorageClassCapabilities().Lister(),
storageClassCapabilitySynced: capabilityInformer.StorageClassCapabilities().Informer().HasSynced,
provisionerCapabilityLister: capabilityInformer.ProvisionerCapabilities().Lister(),
provisionerCapabilitySynced: capabilityInformer.ProvisionerCapabilities().Informer().HasSynced,
storageClassCapabilityClient: storageClassCapabilityClient,
storageClassCapabilityLister: ksStorageInformer.StorageClassCapabilities().Lister(),
storageClassCapabilitySynced: ksStorageInformer.StorageClassCapabilities().Informer().HasSynced,
provisionerCapabilityLister: ksStorageInformer.ProvisionerCapabilities().Lister(),
provisionerCapabilitySynced: ksStorageInformer.ProvisionerCapabilities().Informer().HasSynced,
storageClassClient: storageClassClient,
storageClassLister: storageClassInformer.Lister(),
storageClassSynced: storageClassInformer.Informer().HasSynced,
snapshotSupported: snapshotSupported,
csiDriverLister: csiDriverInformer.Lister(),
csiDriverSynced: csiDriverInformer.Informer().HasSynced,
csiAddressGetter: csiAddress,
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "StorageClasses"),
}
@@ -141,12 +126,24 @@ func NewController(
DeleteFunc: controller.enqueueStorageClass,
})
csiDriverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handlerCSIDriver,
// ProvisionerCapability acts as a value source of its relevant StorageClassCapabilities
// so when a PC is created/updated, the corresponding SCCs should be created(if not exists)/updated
// we achive this by simply enqueueing the StorageClasses of the same provisioner
// but don't overdo by cascade deleting the SCCs when a PC is deleted
// since the role of PCs is more like a template rather than owner to SCCs
// This is a backward compatible fix to remove the useless auto detection of SCCs
// in the future, we will only keep ProvisionerCapability and remove the StorageClassCapability CRD entirely
ksStorageInformer.ProvisionerCapabilities().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleProvisionerCapability,
UpdateFunc: func(oldObj, newObj interface{}) {
return
newPC := newObj.(*ksstorage.ProvisionerCapability)
oldPC := oldObj.(*ksstorage.ProvisionerCapability)
if newPC.ResourceVersion == oldPC.ResourceVersion {
return
}
controller.handleProvisionerCapability(newObj)
},
DeleteFunc: controller.handlerCSIDriver,
})
return controller
@@ -166,7 +163,6 @@ func (c *StorageCapabilityController) Run(threadCnt int, stopCh <-chan struct{})
c.storageClassCapabilitySynced,
c.provisionerCapabilitySynced,
c.storageClassSynced,
c.csiDriverSynced,
}
if c.snapshotAllowed() {
@@ -186,16 +182,16 @@ func (c *StorageCapabilityController) Run(threadCnt int, stopCh <-chan struct{})
return nil
}
func (c *StorageCapabilityController) handlerCSIDriver(obj interface{}) {
csiDriver := obj.(*storagev1beta1.CSIDriver)
func (c *StorageCapabilityController) handleProvisionerCapability(obj interface{}) {
provisionerCapability := obj.(*ksstorage.ProvisionerCapability)
storageClasses, err := c.storageClassLister.List(labels.Everything())
if err != nil {
klog.Error("list StorageClass error when handler csiDriver", err)
klog.Error("list StorageClass error when handle provisionerCapability", err)
return
}
for _, storageClass := range storageClasses {
if storageClass.Provisioner == csiDriver.Name {
klog.V(4).Infof("enqueue StorageClass %s when handling csiDriver", storageClass.Name)
if getProvisionerCapabilityName(storageClass.Provisioner) == provisionerCapability.Name {
klog.V(4).Infof("enqueue StorageClass %s while handling provisionerCapability", storageClass.Name)
c.enqueueStorageClass(storageClass)
}
}
@@ -278,14 +274,16 @@ func (c *StorageCapabilityController) syncHandler(key string) error {
if err != nil {
return err
}
// No capability because csi-plugin not installed
// The corresponding ProvisionerCapability Object does not exist
if capabilitySpec == nil {
klog.Infof("StorageClass %s has no capability", name)
klog.Infof("Can't get StorageClass %s's capability", name)
err = c.updateStorageClassSnapshotSupported(storageClass, false)
if err != nil {
return err
}
return c.deleteStorageCapability(name)
// Don't delete the already created SCC
// as it might be created manually by user
return nil
}
klog.Infof("StorageClass %s has capability %v", name, capabilitySpec)
@@ -316,11 +314,11 @@ func (c *StorageCapabilityController) syncHandler(key string) error {
}
// Handle StorageClassCapability with the same name of StorageClass
storageClassCapabilityExist, err := c.storageCapabilityLister.Get(storageClass.Name)
storageClassCapabilityExist, err := c.storageClassCapabilityLister.Get(storageClass.Name)
if err != nil {
if errors.IsNotFound(err) {
// If StorageClassCapability doesn't exist, create it
storageClassCapabilityCreate := &capability.StorageClassCapability{ObjectMeta: metav1.ObjectMeta{Name: storageClass.Name}}
storageClassCapabilityCreate := &ksstorage.StorageClassCapability{ObjectMeta: metav1.ObjectMeta{Name: storageClass.Name}}
storageClassCapabilityCreate.Spec = *capabilitySpec
klog.Info("Create StorageClassCapability: ", storageClassCapabilityCreate)
_, err = c.storageClassCapabilityClient.Create(context.Background(), storageClassCapabilityCreate, metav1.CreateOptions{})
@@ -356,7 +354,7 @@ func (c *StorageCapabilityController) updateStorageClassSnapshotSupported(storag
}
func (c *StorageCapabilityController) deleteStorageCapability(name string) error {
_, err := c.storageCapabilityLister.Get(name)
_, err := c.storageClassCapabilityLister.Get(name)
if err != nil {
if errors.IsNotFound(err) {
return nil
@@ -382,7 +380,7 @@ func (c *StorageCapabilityController) deleteSnapshotClass(name string) error {
return c.snapshotClassClient.Delete(context.Background(), name, metav1.DeleteOptions{})
}
func (c *StorageCapabilityController) capabilityFromProvisioner(provisioner string) (*capability.StorageClassCapabilitySpec, error) {
func (c *StorageCapabilityController) capabilityFromProvisioner(provisioner string) (*ksstorage.StorageClassCapabilitySpec, error) {
provisionerCapability, err := c.provisionerCapabilityLister.Get(getProvisionerCapabilityName(provisioner))
if err != nil {
if errors.IsNotFound(err) {
@@ -391,13 +389,13 @@ func (c *StorageCapabilityController) capabilityFromProvisioner(provisioner stri
return nil, err
}
klog.V(4).Infof("get provisioner capability:%s %s", provisioner, provisionerCapability.Name)
capabilitySpec := &capability.StorageClassCapabilitySpec{
capabilitySpec := &ksstorage.StorageClassCapabilitySpec{
Features: provisionerCapability.Spec.Features,
}
return capabilitySpec, nil
}
func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.StorageClass) (*capability.StorageClassCapabilitySpec, error) {
func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.StorageClass) (*ksstorage.StorageClassCapabilitySpec, error) {
// get from provisioner capability first
klog.V(4).Info("get cap ", storageClass.Provisioner)
capabilitySpec, err := c.capabilityFromProvisioner(storageClass.Provisioner)
@@ -405,24 +403,10 @@ func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.
return nil, err
}
// csi of storage capability
if capabilitySpec == nil {
isCsi, err := c.isCSIStorage(storageClass.Provisioner)
if err != nil {
return nil, err
}
if isCsi {
capabilitySpec, err = csiCapability(c.csiAddressGetter(storageClass.Provisioner))
if err != nil {
return nil, err
}
}
}
if capabilitySpec != nil {
capabilitySpec.Provisioner = storageClass.Provisioner
if storageClass.AllowVolumeExpansion == nil || !*storageClass.AllowVolumeExpansion {
capabilitySpec.Features.Volume.Expand = capability.ExpandModeUnknown
capabilitySpec.Features.Volume.Expand = ksstorage.ExpandModeUnknown
}
if !c.snapshotSupported {
capabilitySpec.Features.Snapshot.Create = false
@@ -432,22 +416,6 @@ func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.
return capabilitySpec, nil
}
func (c *StorageCapabilityController) isCSIStorage(provisioner string) (bool, error) {
_, err := c.csiDriverLister.Get(provisioner)
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
}
// this is used for test of CSIDriver on windows
func (c *StorageCapabilityController) setCSIAddressGetter(getter csiAddressGetter) {
c.csiAddressGetter = getter
}
func (c *StorageCapabilityController) snapshotAllowed() bool {
return c.snapshotSupported && c.snapshotClassClient != nil && c.snapshotClassLister != nil && c.snapshotClassSynced != nil
}
@@ -465,10 +433,6 @@ func SnapshotSupported(discoveryInterface discovery.DiscoveryInterface) bool {
return ver.AtLeast(minVer)
}
func csiAddress(provisioner string) string {
return fmt.Sprintf(csiAddressFormat, provisioner)
}
func getProvisionerCapabilityName(provisioner string) string {
return strings.NewReplacer(".", "-", "/", "-").Replace(provisioner)
}

View File

@@ -19,11 +19,8 @@
package capability
import (
"math/rand"
"github.com/google/go-cmp/cmp"
//"github.com/google/go-cmp/cmp"
"reflect"
"testing"
"time"
@@ -32,7 +29,6 @@ import (
snapfake "github.com/kubernetes-csi/external-snapshotter/client/v3/clientset/versioned/fake"
snapinformers "github.com/kubernetes-csi/external-snapshotter/client/v3/informers/externalversions"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -60,7 +56,7 @@ type fixture struct {
snapshotClassClient *snapfake.Clientset
ksClient *ksfake.Clientset
// Objects from here preload into NewSimpleFake.
storageObjects []runtime.Object // include StorageClass and CSIDriver
storageObjects []runtime.Object // include StorageClass
snapshotClassObjects []runtime.Object
capabilityObjects []runtime.Object // include StorageClassCapability and ProvisionerCapability
// Objects to put in the store.
@@ -68,19 +64,14 @@ type fixture struct {
snapshotClassLister []*snapbeta1.VolumeSnapshotClass
storageClassCapabilityLister []*ksv1alpha1.StorageClassCapability
provisionerCapabilityLister []*ksv1alpha1.ProvisionerCapability
csiDriverLister []*storagev1beta1.CSIDriver
// Actions expected to happen on the client.
actions []core.Action
// CSI server
runCSIServer bool
fakeCSIServer *fakeCSIServer
}
func newFixture(t *testing.T, snapshotSupported bool, runCSIServer bool) *fixture {
func newFixture(t *testing.T, snapshotSupported bool) *fixture {
return &fixture{
t: t,
snapshotSupported: snapshotSupported,
runCSIServer: runCSIServer,
}
}
@@ -105,21 +96,11 @@ func (f *fixture) newController() (*StorageCapabilityController,
f.snapshotSupported,
f.snapshotClassClient.SnapshotV1beta1().VolumeSnapshotClasses(),
snapshotInformers.Snapshot().V1beta1().VolumeSnapshotClasses(),
k8sInformers.Storage().V1beta1().CSIDrivers(),
)
if f.runCSIServer {
port := 30000 + rand.Intn(100)
fakeCSIServer, address := newTestCSIServer(port)
f.fakeCSIServer = fakeCSIServer
c.setCSIAddressGetter(func(storageClassProvisioner string) string { return address })
}
for _, storageClass := range f.storageClassLister {
_ = k8sInformers.Storage().V1().StorageClasses().Informer().GetIndexer().Add(storageClass)
}
for _, csiDriver := range f.csiDriverLister {
_ = k8sInformers.Storage().V1beta1().CSIDrivers().Informer().GetIndexer().Add(csiDriver)
}
for _, snapshotClass := range f.snapshotClassLister {
_ = snapshotInformers.Snapshot().V1beta1().VolumeSnapshotClasses().Informer().GetIndexer().Add(snapshotClass)
}
@@ -136,11 +117,6 @@ func (f *fixture) newController() (*StorageCapabilityController,
func (f *fixture) runController(scName string, startInformers bool, expectError bool) {
c, k8sI, crdI, snapI := f.newController()
if f.runCSIServer {
f.fakeCSIServer.run()
defer f.fakeCSIServer.stop()
}
if startInformers {
stopCh := make(chan struct{})
defer close(stopCh)
@@ -277,6 +253,26 @@ func newStorageClass(name string, provisioner string) *storagev1.StorageClass {
}
}
func newStorageClassCapabilitySpec() *ksv1alpha1.StorageClassCapabilitySpec {
return &ksv1alpha1.StorageClassCapabilitySpec{
Features: ksv1alpha1.CapabilityFeatures{
Topology: false,
Volume: ksv1alpha1.VolumeFeature{
Create: true,
Attach: false,
List: false,
Clone: true,
Stats: true,
Expand: ksv1alpha1.ExpandModeOffline,
},
Snapshot: ksv1alpha1.SnapshotFeature{
Create: true,
List: false,
},
},
}
}
func newStorageClassCapability(storageClass *storagev1.StorageClass) *ksv1alpha1.StorageClassCapability {
storageClassCapability := &ksv1alpha1.StorageClassCapability{}
storageClassCapability.Name = storageClass.Name
@@ -290,17 +286,9 @@ func newProvisionerCapability(storageClass *storagev1.StorageClass) *ksv1alpha1.
provisionerCapability.Name = getProvisionerCapabilityName(storageClass.Provisioner)
provisionerCapability.Spec.PluginInfo.Name = storageClass.Provisioner
provisionerCapability.Spec.Features = newStorageClassCapabilitySpec().Features
// ProvisionerCapability snapshot is always false
provisionerCapability.Spec.Features.Snapshot.Create = false
return provisionerCapability
}
func newCSIDriver(storageClass *storagev1.StorageClass) *storagev1beta1.CSIDriver {
csiDriver := &storagev1beta1.CSIDriver{}
csiDriver.Name = storageClass.Provisioner
return csiDriver
}
func newSnapshotClass(storageClass *storagev1.StorageClass) *snapbeta1.VolumeSnapshotClass {
return &snapbeta1.VolumeSnapshotClass{
ObjectMeta: v1.ObjectMeta{
@@ -321,18 +309,19 @@ func getKey(sc *storagev1.StorageClass, t *testing.T) string {
}
func TestCreateStorageClass(t *testing.T) {
fixture := newFixture(t, true, true)
fixture := newFixture(t, true)
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClassUpdate := storageClass.DeepCopy()
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "true"}
provisionerCapability := newProvisionerCapability(storageClass)
snapshotClass := newSnapshotClass(storageClass)
storageClassCapability := newStorageClassCapability(storageClass)
csiDriver := newCSIDriver(storageClass)
// Objects exist
fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver)
fixture.storageObjects = append(fixture.storageObjects, storageClass)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability)
fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability)
// Action expected
fixture.expectCreateSnapshotClassAction(snapshotClass)
@@ -343,28 +332,69 @@ func TestCreateStorageClass(t *testing.T) {
fixture.run(getKey(storageClass, t))
}
func TestCreateStorageClassWithoutProvisionerCapability(t *testing.T) {
fixture := newFixture(t, true)
storageClass := newStorageClass("csi-example", "csi.example.com")
// Objects exist
fixture.storageObjects = append(fixture.storageObjects, storageClass)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
storageClassUpdate := storageClass.DeepCopy()
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"}
fixture.expectUpdateStorageClassAction(storageClassUpdate)
// Run test
fixture.run(getKey(storageClass, t))
}
func TestUpdateStorageClass(t *testing.T) {
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClass.Annotations = map[string]string{annotationSupportSnapshot: "true"}
snapshotClass := newSnapshotClass(storageClass)
storageClassCapabilityUpdate := newStorageClassCapability(storageClass)
storageClassCapability := newStorageClassCapability(storageClass)
provisionerCapability := newProvisionerCapability(storageClass)
//old and new should have deference
storageClassCapability.Spec.Features.Volume.Create = !storageClassCapability.Spec.Features.Volume.Create
csiDriver := newCSIDriver(storageClass)
fixture := newFixture(t, true, true)
fixture := newFixture(t, true)
// Object exist
fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver)
fixture.storageObjects = append(fixture.storageObjects, storageClass)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability, provisionerCapability)
fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability)
fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability)
// Action expected
fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapabilityUpdate)
// Run test
fixture.run(getKey(storageClass, t))
}
func TestUpdateStorageClassWithoutProvisionerCapability(t *testing.T) {
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClass.Annotations = map[string]string{annotationSupportSnapshot: "true"}
storageClassUpdate := storageClass.DeepCopy()
storageClassUpdate.Annotations[annotationSupportSnapshot] = "false"
snapshotClass := newSnapshotClass(storageClass)
storageClassCapability := newStorageClassCapability(storageClass)
//old and new should have deference
storageClassCapability.Spec.Features.Volume.Create = !storageClassCapability.Spec.Features.Volume.Create
fixture := newFixture(t, true)
// Object exist
fixture.storageObjects = append(fixture.storageObjects, storageClass)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability)
fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability)
// Action expected
fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapabilityUpdate)
fixture.expectUpdateStorageClassAction(storageClassUpdate)
// Run test
fixture.run(getKey(storageClass, t))
@@ -375,12 +405,9 @@ func TestDeleteStorageClass(t *testing.T) {
snapshotClass := newSnapshotClass(storageClass)
storageClassCapability := newStorageClassCapability(storageClass)
csiDriver := newCSIDriver(storageClass)
fixture := newFixture(t, true, true)
fixture := newFixture(t, true)
// Object exist
fixture.storageObjects = append(fixture.storageObjects, csiDriver)
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
fixture.storageObjects = append(fixture.storageObjects, storageClass)
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability)
@@ -395,7 +422,8 @@ func TestDeleteStorageClass(t *testing.T) {
}
func TestCreateStorageClassNotSupportSnapshot(t *testing.T) {
fixture := newFixture(t, false, true)
// K8S version < 1.17.0
fixture := newFixture(t, false)
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClassUpdate := storageClass.DeepCopy()
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"}
@@ -403,32 +431,31 @@ func TestCreateStorageClassNotSupportSnapshot(t *testing.T) {
storageClassCapability.Spec.Features.Snapshot.Create = false
storageClassCapability.Spec.Features.Snapshot.List = false
provisionerCapability := newProvisionerCapability(storageClass)
csiDriver := newCSIDriver(storageClass)
// Objects exist
fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability)
fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability)
// Action expected
fixture.expectUpdateStorageClassAction(storageClassUpdate)
fixture.expectCreateStorageClassCapabilitiesAction(storageClassCapability)
// Run test
fixture.run(getKey(storageClass, t))
}
func TestCreateStorageClassInTree(t *testing.T) {
// InTree Storage has no snapshot capability
fixture := newFixture(t, true, true)
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClassUpdate := storageClass.DeepCopy()
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"}
storageClassCapability := newStorageClassCapability(storageClass)
storageClassCapability.Spec.Features.Snapshot.Create = false
provisionerCapability := newProvisionerCapability(storageClass)
// Objects exist
fixture.storageObjects = append(fixture.storageObjects, storageClass)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability)
fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability)
// Action expected
fixture.expectUpdateStorageClassAction(storageClassUpdate)
fixture.expectCreateStorageClassCapabilitiesAction(storageClassCapability)
// Run test
fixture.run(getKey(storageClass, t))
}
func TestCreateStorageClassNotHaveSnapshotCap(t *testing.T) {
// Storage has no snapshot capability
fixture := newFixture(t, true)
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClassUpdate := storageClass.DeepCopy()
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"}
storageClassCapability := newStorageClassCapability(storageClass)
storageClassCapability.Spec.Features.Snapshot.Create = false
provisionerCapability := newProvisionerCapability(storageClass)
provisionerCapability.Spec.Features.Snapshot.Create = false
// Objects exist
fixture.storageObjects = append(fixture.storageObjects, storageClass)

View File

@@ -1,168 +0,0 @@
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package capability
import (
"context"
"errors"
"net"
"net/url"
"time"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/keepalive"
"kubesphere.io/api/storage/v1alpha1"
)
const (
dialDuration = time.Second * 5
requestDuration = time.Second * 10
)
func csiCapability(csiAddress string) (*v1alpha1.StorageClassCapabilitySpec, error) {
csiConn, err := connect(csiAddress)
if err != nil {
return nil, err
}
defer func() { _ = csiConn.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), requestDuration)
defer cancel()
spec := &v1alpha1.StorageClassCapabilitySpec{}
err = addPluginCapabilities(ctx, csiConn, spec)
if err != nil {
return nil, err
}
err = addControllerCapabilities(ctx, csiConn, spec)
if err != nil {
return nil, err
}
err = addNodeCapabilities(ctx, csiConn, spec)
if err != nil {
return nil, err
}
return spec, nil
}
func addPluginCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error {
identityClient := csi.NewIdentityClient(conn)
pluginCapabilitiesResponse, err := identityClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{})
if err != nil {
return err
}
for _, capability := range pluginCapabilitiesResponse.GetCapabilities() {
if capability == nil {
continue
}
if capability.GetService().GetType() == csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS {
spec.Features.Topology = true
}
volumeExpansion := capability.GetVolumeExpansion()
if volumeExpansion != nil {
switch volumeExpansion.GetType() {
case csi.PluginCapability_VolumeExpansion_ONLINE:
spec.Features.Volume.Expand = v1alpha1.ExpandModeOnline
case csi.PluginCapability_VolumeExpansion_OFFLINE:
spec.Features.Volume.Expand = v1alpha1.ExpandModeOffline
}
}
}
return nil
}
func addControllerCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error {
controllerClient := csi.NewControllerClient(conn)
controllerCapabilitiesResponse, err := controllerClient.ControllerGetCapabilities(ctx, &csi.ControllerGetCapabilitiesRequest{})
if err != nil {
return err
}
for _, capability := range controllerCapabilitiesResponse.GetCapabilities() {
switch capability.GetRpc().GetType() {
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME:
spec.Features.Volume.Create = true
case csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME:
spec.Features.Volume.Attach = true
case csi.ControllerServiceCapability_RPC_LIST_VOLUMES:
spec.Features.Volume.List = true
case csi.ControllerServiceCapability_RPC_CLONE_VOLUME:
spec.Features.Volume.Clone = true
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT:
spec.Features.Snapshot.Create = true
case csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS:
spec.Features.Snapshot.List = true
}
}
return nil
}
func addNodeCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error {
nodeClient := csi.NewNodeClient(conn)
controllerCapabilitiesResponse, err := nodeClient.NodeGetCapabilities(ctx, &csi.NodeGetCapabilitiesRequest{})
if err != nil {
return err
}
for _, capability := range controllerCapabilitiesResponse.GetCapabilities() {
switch capability.GetRpc().GetType() {
case csi.NodeServiceCapability_RPC_GET_VOLUME_STATS:
spec.Features.Volume.Stats = true
}
}
return nil
}
// Connect address by GRPC
func connect(address string) (*grpc.ClientConn, error) {
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
}
u, err := url.Parse(address)
if err == nil && (!u.IsAbs() || u.Scheme == "unix") {
dialOptions = append(dialOptions,
grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", u.Path, timeout)
}))
}
// This is necessary when connecting via TCP and does not hurt
// when using Unix domain sockets. It ensures that gRPC detects a dead connection
// in a timely manner.
dialOptions = append(dialOptions,
grpc.WithKeepaliveParams(keepalive.ClientParameters{PermitWithoutStream: true}))
conn, err := grpc.Dial(address, dialOptions...)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), dialDuration)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
return conn, errors.New("connection timed out")
}
if conn.GetState() == connectivity.Ready {
return conn, nil
}
}
}

View File

@@ -1,190 +0,0 @@
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package capability
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/klog"
"kubesphere.io/api/storage/v1alpha1"
)
var DefaultControllerRPCType = []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
}
var DefaultNodeRPCType = []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
}
var DefaultPluginCapability = []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
{
Type: &csi.PluginCapability_VolumeExpansion_{
VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
Type: csi.PluginCapability_VolumeExpansion_OFFLINE,
},
},
},
}
type fakeCSIServer struct {
csi.UnimplementedIdentityServer
csi.UnimplementedControllerServer
csi.UnimplementedNodeServer
network string
address string
server *grpc.Server
}
func newTestCSIServer(port int) (csiServer *fakeCSIServer, address string) {
if runtime.GOOS == "windows" {
address = fmt.Sprintf("localhost:%d", +port)
csiServer = newFakeCSIServer("tcp", address)
} else {
address = filepath.Join(os.TempDir(), "csi.sock"+rand.String(4))
csiServer = newFakeCSIServer("unix", address)
address = "unix://" + address
}
return csiServer, address
}
func newFakeCSIServer(network, address string) *fakeCSIServer {
return &fakeCSIServer{
network: network,
address: address,
}
}
func (f *fakeCSIServer) run() {
listener, err := net.Listen(f.network, f.address)
if err != nil {
klog.Error("fake CSI server listen failed, ", err)
return
}
server := grpc.NewServer()
csi.RegisterIdentityServer(server, f)
csi.RegisterControllerServer(server, f)
csi.RegisterNodeServer(server, f)
go func() {
err = server.Serve(listener)
if err != nil && !strings.Contains(err.Error(), "stopped") {
klog.Error("fake CSI server serve failed, ", err)
}
}()
f.server = server
}
func (f *fakeCSIServer) stop() {
if f.server != nil {
f.server.Stop()
}
}
func (*fakeCSIServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{Capabilities: DefaultPluginCapability}, nil
}
func (*fakeCSIServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
var capabilities []*csi.ControllerServiceCapability
for _, rpcType := range DefaultControllerRPCType {
capability := &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: rpcType,
},
},
}
capabilities = append(capabilities, capability)
}
return &csi.ControllerGetCapabilitiesResponse{Capabilities: capabilities}, nil
}
func (*fakeCSIServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
var capabilities []*csi.NodeServiceCapability
for _, rpcType := range DefaultNodeRPCType {
capability := &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: rpcType,
},
},
}
capabilities = append(capabilities, capability)
}
return &csi.NodeGetCapabilitiesResponse{Capabilities: capabilities}, nil
}
func Test_CSICapability(t *testing.T) {
fakeCSIServer, address := newTestCSIServer(30087)
fakeCSIServer.run()
defer fakeCSIServer.stop()
specGot, err := csiCapability(address)
if err != nil {
t.Error(err)
}
specExpected := newStorageClassCapabilitySpec()
if diff := cmp.Diff(specGot, specExpected); diff != "" {
t.Errorf("%T differ (-got, +want): %s", specExpected, diff)
}
}
func newStorageClassCapabilitySpec() *v1alpha1.StorageClassCapabilitySpec {
return &v1alpha1.StorageClassCapabilitySpec{
Features: v1alpha1.CapabilityFeatures{
Topology: false,
Volume: v1alpha1.VolumeFeature{
Create: true,
Attach: false,
List: false,
Clone: true,
Stats: true,
Expand: v1alpha1.ExpandModeOffline,
},
Snapshot: v1alpha1.SnapshotFeature{
Create: true,
List: false,
},
},
}
}