Merge pull request #2265 from min-zh/storage-capability

capability for non CSI storage
This commit is contained in:
calvinyv
2020-07-06 19:36:22 +08:00
committed by GitHub
29 changed files with 1378 additions and 263 deletions

View File

@@ -20,82 +20,111 @@ package capability
import (
"fmt"
"os"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/discovery"
"reflect"
"strconv"
"strings"
"time"
snapapi "github.com/kubernetes-csi/external-snapshotter/v2/pkg/apis/volumesnapshot/v1beta1"
snapshotv1beta1 "github.com/kubernetes-csi/external-snapshotter/v2/pkg/apis/volumesnapshot/v1beta1"
snapshotclient "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned/typed/volumesnapshot/v1beta1"
snapinformers "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/informers/externalversions/volumesnapshot/v1beta1"
snaplisters "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/listers/volumesnapshot/v1beta1"
v1strorage "k8s.io/api/storage/v1"
snapshotlisters "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/listers/volumesnapshot/v1beta1"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
scinformers "k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/kubernetes"
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
"k8s.io/client-go/kubernetes/scheme"
sclisters "k8s.io/client-go/listers/storage/v1"
storageclient "k8s.io/client-go/kubernetes/typed/storage/v1"
storagelistersv1 "k8s.io/client-go/listers/storage/v1"
storagelistersv1beta1 "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
crdapi "kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
clientset "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
capability "kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
crdscheme "kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme"
storageinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/storage/v1alpha1"
crdlisters "kubesphere.io/kubesphere/pkg/client/listers/storage/v1alpha1"
capabilityclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned/typed/storage/v1alpha1"
capabilityinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/storage/v1alpha1"
capabilitylisters "kubesphere.io/kubesphere/pkg/client/listers/storage/v1alpha1"
)
const (
minKubernetesVersion = "v1.17.0"
CSIAddressFormat = "/var/lib/kubelet/plugins/%s/csi.sock"
minSnapshotSupportedVersion = "v1.17.0"
csiAddressFormat = "/var/lib/kubelet/plugins/%s/csi.sock"
annotationSupportSnapshot = "storageclass.kubesphere.io/support-snapshot"
)
type csiAddressGetter func(storageClassProvisioner string) string
type StorageCapabilityController struct {
k8sClient kubernetes.Interface
storageClassCapabilityClient clientset.Interface
storageClassLister sclisters.StorageClassLister
storageClassSynced cache.InformerSynced
snapshotClassLister snaplisters.VolumeSnapshotClassLister
snapshotClassSynced cache.InformerSynced
storageClassCapabilityLister crdlisters.StorageClassCapabilityLister
storageClassCapabilityClient capabilityclient.StorageClassCapabilityInterface
storageCapabilityLister capabilitylisters.StorageClassCapabilityLister
storageClassCapabilitySynced cache.InformerSynced
workQueue workqueue.RateLimitingInterface
csiAddressGetter csiAddressGetter
provisionerCapabilityLister capabilitylisters.ProvisionerCapabilityLister
provisionerCapabilitySynced cache.InformerSynced
storageClassClient storageclient.StorageClassInterface
storageClassLister storagelistersv1.StorageClassLister
storageClassSynced cache.InformerSynced
snapshotSupported bool
snapshotClassClient snapshotclient.VolumeSnapshotClassInterface
snapshotClassLister snapshotlisters.VolumeSnapshotClassLister
snapshotClassSynced cache.InformerSynced
csiDriverLister storagelistersv1beta1.CSIDriverLister
csiDriverSynced cache.InformerSynced
csiAddressGetter csiAddressGetter
workQueue workqueue.RateLimitingInterface
}
// This controller is responsible to watch StorageClass, SnapshotClass.
// And then update StorageClassCapability CRD resource object to the newest status.
func NewController(
k8sClient kubernetes.Interface,
storageClassCapabilityClient clientset.Interface,
storageClassInformer scinformers.StorageClassInformer,
capabilityClient capabilityclient.StorageClassCapabilityInterface,
capabilityInformer capabilityinformers.Interface,
storageClassClient storageclient.StorageClassInterface,
storageClassInformer storageinformersv1.StorageClassInformer,
snapshotSupported bool,
snapshotClassClient snapshotclient.VolumeSnapshotClassInterface,
snapshotClassInformer snapinformers.VolumeSnapshotClassInformer,
storageClassCapabilityInformer storageinformers.StorageClassCapabilityInformer,
csiAddressGetter csiAddressGetter,
csiDriverInformer storageinformersv1beta1.CSIDriverInformer,
) *StorageCapabilityController {
utilruntime.Must(crdscheme.AddToScheme(scheme.Scheme))
controller := &StorageCapabilityController{
k8sClient: k8sClient,
storageClassCapabilityClient: storageClassCapabilityClient,
storageClassCapabilityClient: capabilityClient,
storageCapabilityLister: capabilityInformer.StorageClassCapabilities().Lister(),
storageClassCapabilitySynced: capabilityInformer.StorageClassCapabilities().Informer().HasSynced,
provisionerCapabilityLister: capabilityInformer.ProvisionerCapabilities().Lister(),
provisionerCapabilitySynced: capabilityInformer.ProvisionerCapabilities().Informer().HasSynced,
storageClassClient: storageClassClient,
storageClassLister: storageClassInformer.Lister(),
storageClassSynced: storageClassInformer.Informer().HasSynced,
snapshotSupported: snapshotSupported,
snapshotClassClient: snapshotClassClient,
snapshotClassLister: snapshotClassInformer.Lister(),
snapshotClassSynced: snapshotClassInformer.Informer().HasSynced,
storageClassCapabilityLister: storageClassCapabilityInformer.Lister(),
storageClassCapabilitySynced: storageClassCapabilityInformer.Informer().HasSynced,
csiDriverLister: csiDriverInformer.Lister(),
csiDriverSynced: csiDriverInformer.Informer().HasSynced,
csiAddressGetter: csiAddress,
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "StorageClasses"),
csiAddressGetter: csiAddressGetter,
}
storageClassInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueStorageClass,
UpdateFunc: func(old, new interface{}) {
newStorageClass := new.(*v1strorage.StorageClass)
oldStorageClass := old.(*v1strorage.StorageClass)
newStorageClass := new.(*storagev1.StorageClass)
oldStorageClass := old.(*storagev1.StorageClass)
if newStorageClass.ResourceVersion == oldStorageClass.ResourceVersion {
return
}
@@ -103,13 +132,15 @@ func NewController(
},
DeleteFunc: controller.enqueueStorageClass,
})
snapshotClassInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueSnapshotClass,
UpdateFunc: func(old, new interface{}) {
csiDriverInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handlerCSIDriver,
UpdateFunc: func(oldObj, newObj interface{}) {
return
},
DeleteFunc: controller.enqueueSnapshotClass,
DeleteFunc: controller.handlerCSIDriver,
})
return controller
}
@@ -123,7 +154,14 @@ func (c *StorageCapabilityController) Run(threadCnt int, stopCh <-chan struct{})
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.storageClassSynced, c.snapshotClassSynced, c.storageClassCapabilitySynced); !ok {
cacheSyncs := []cache.InformerSynced{
c.storageClassCapabilitySynced,
c.provisionerCapabilitySynced,
c.storageClassSynced,
c.csiDriverSynced,
}
if ok := cache.WaitForCacheSync(stopCh, cacheSyncs...); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
@@ -136,27 +174,22 @@ func (c *StorageCapabilityController) Run(threadCnt int, stopCh <-chan struct{})
return nil
}
func (c *StorageCapabilityController) enqueueStorageClass(obj interface{}) {
storageClass := obj.(*v1strorage.StorageClass)
if !fileExist(c.csiAddressGetter(storageClass.Provisioner)) {
klog.V(4).Infof("CSI address of storage class: %s, provisioner :%s not exist", storageClass.Name, storageClass.Provisioner)
func (c *StorageCapabilityController) handlerCSIDriver(obj interface{}) {
csiDriver := obj.(*storagev1beta1.CSIDriver)
storageClasses, err := c.storageClassLister.List(labels.Everything())
if err != nil {
klog.Error("list StorageClass error when handler csiDriver", err)
return
}
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
for _, storageClass := range storageClasses {
if storageClass.Provisioner == csiDriver.Name {
klog.Info("enqueue StorageClass when handler csiDriver", storageClass)
c.enqueueStorageClass(storageClass)
}
}
c.workQueue.Add(key)
}
func (c *StorageCapabilityController) enqueueSnapshotClass(obj interface{}) {
snapshotClass := obj.(*snapapi.VolumeSnapshotClass)
if !fileExist(c.csiAddressGetter(snapshotClass.Driver)) {
klog.V(4).Infof("CSI address of snapshot class: %s, driver:%s not exist", snapshotClass.Name, snapshotClass.Driver)
return
}
func (c *StorageCapabilityController) enqueueStorageClass(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
@@ -214,67 +247,191 @@ func (c *StorageCapabilityController) syncHandler(key string) error {
// Get StorageClass
storageClass, err := c.storageClassLister.Get(name)
klog.V(4).Infof("Get storageClass %s: entity %v", name, storageClass)
if err != nil {
// StorageClass has been deleted, delete StorageClassCapability and VolumeSnapshotClass
if errors.IsNotFound(err) {
_, err = c.storageClassCapabilityLister.Get(name)
if err != nil {
if errors.IsNotFound(err) {
return nil
if c.snapshotSupported {
err = c.deleteSnapshotClass(name)
if err != nil {
return err
}
return err
}
return c.storageClassCapabilityClient.StorageV1alpha1().StorageClassCapabilities().Delete(name, &metav1.DeleteOptions{})
return c.deleteStorageCapability(name)
}
return err
}
// Get SnapshotClass
snapshotClassCreated := true
_, err = c.snapshotClassLister.Get(storageClass.Name)
// Get capability spec
capabilitySpec, err := c.getCapabilitySpec(storageClass)
if err != nil {
if errors.IsNotFound(err) {
snapshotClassCreated = false
} else {
return err
}
return err
}
// Get exist StorageClassCapability
storageClassCapabilityExist, err := c.storageClassCapabilityLister.Get(storageClass.Name)
if errors.IsNotFound(err) {
// If the resource doesn't exist, we'll create it
klog.V(4).Infof("Create StorageClassProvisioner %s", storageClass.GetName())
storageClassCapabilityCreate := &crdapi.StorageClassCapability{ObjectMeta: metav1.ObjectMeta{Name: storageClass.Name}}
err = c.addSpec(&storageClassCapabilityCreate.Spec, storageClass, snapshotClassCreated)
// No capability because csi-plugin not installed
if capabilitySpec == nil {
klog.Infof("StorageClass %s has no capability", name)
err = c.updateStorageClassSnapshotSupported(storageClass, false)
if err != nil {
return err
}
klog.V(4).Info("Create StorageClassCapability: ", storageClassCapabilityCreate)
_, err = c.storageClassCapabilityClient.StorageV1alpha1().StorageClassCapabilities().Create(storageClassCapabilityCreate)
return err
return c.deleteStorageCapability(name)
}
klog.Infof("StorageClass %s has capability %v", name, capabilitySpec)
// Handle VolumeSnapshotClass with same name of StorageClass
// annotate "support-snapshot" of StorageClass
withSnapshotCapability := false
if c.snapshotSupported && capabilitySpec.Features.Snapshot.Create {
_, err = c.snapshotClassLister.Get(name)
if err != nil {
// If VolumeSnapshotClass not exist, create it
if errors.IsNotFound(err) {
volumeSnapshotClassCreate := &snapshotv1beta1.VolumeSnapshotClass{
ObjectMeta: metav1.ObjectMeta{Name: name},
Driver: storageClass.Provisioner,
DeletionPolicy: snapshotv1beta1.VolumeSnapshotContentDelete,
}
_, err = c.snapshotClassClient.Create(volumeSnapshotClassCreate)
if err != nil {
return err
}
}
}
withSnapshotCapability = true
}
err = c.updateStorageClassSnapshotSupported(storageClass, withSnapshotCapability)
if err != nil {
return err
}
// If the resource exist, we can update it.
storageClassCapabilityUpdate := storageClassCapabilityExist.DeepCopy()
err = c.addSpec(&storageClassCapabilityUpdate.Spec, storageClass, snapshotClassCreated)
// Handle StorageClassCapability with the same name of StorageClass
storageClassCapabilityExist, err := c.storageCapabilityLister.Get(storageClass.Name)
if err != nil {
if errors.IsNotFound(err) {
// If StorageClassCapability doesn't exist, create it
storageClassCapabilityCreate := &capability.StorageClassCapability{ObjectMeta: metav1.ObjectMeta{Name: storageClass.Name}}
storageClassCapabilityCreate.Spec = *capabilitySpec
klog.Info("Create StorageClassCapability: ", storageClassCapabilityCreate)
_, err = c.storageClassCapabilityClient.Create(storageClassCapabilityCreate)
return err
}
return err
}
// If StorageClassCapability exist, update it.
storageClassCapabilityUpdate := storageClassCapabilityExist.DeepCopy()
storageClassCapabilityUpdate.Spec = *capabilitySpec
if !reflect.DeepEqual(storageClassCapabilityExist, storageClassCapabilityUpdate) {
klog.V(4).Info("Update StorageClassCapability: ", storageClassCapabilityUpdate)
_, err = c.storageClassCapabilityClient.StorageV1alpha1().StorageClassCapabilities().Update(storageClassCapabilityUpdate)
klog.Info("Update StorageClassCapability: ", storageClassCapabilityUpdate)
_, err = c.storageClassCapabilityClient.Update(storageClassCapabilityUpdate)
return err
}
return nil
}
func (c *StorageCapabilityController) IsValidKubernetesVersion() bool {
minVer := version.MustParseGeneric(minKubernetesVersion)
rawVer, err := c.k8sClient.Discovery().ServerVersion()
func (c *StorageCapabilityController) updateStorageClassSnapshotSupported(storageClass *storagev1.StorageClass, snapshotSupported bool) error {
if storageClass.Annotations == nil {
storageClass.Annotations = make(map[string]string)
}
snapshotSupportedAnnotated, err := strconv.ParseBool(storageClass.Annotations[annotationSupportSnapshot])
// err != nil means annotationSupportSnapshot is not illegal, include empty
if err != nil || snapshotSupported != snapshotSupportedAnnotated {
storageClass.Annotations[annotationSupportSnapshot] = strconv.FormatBool(snapshotSupported)
_, err = c.storageClassClient.Update(storageClass)
if err != nil {
return err
}
}
return nil
}
func (c *StorageCapabilityController) deleteStorageCapability(name string) error {
_, err := c.storageCapabilityLister.Get(name)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
klog.Infof("Delete StorageClassCapability %s", name)
return c.storageClassCapabilityClient.Delete(name, &metav1.DeleteOptions{})
}
func (c *StorageCapabilityController) deleteSnapshotClass(name string) error {
if !c.snapshotSupported {
return nil
}
_, err := c.snapshotClassLister.Get(name)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
klog.Infof("Delete SnapshotClass %s", name)
return c.snapshotClassClient.Delete(name, &metav1.DeleteOptions{})
}
func (c *StorageCapabilityController) nonCSICapability(provisioner string) (*capability.StorageClassCapabilitySpec, error) {
provisionerCapability, err := c.provisionerCapabilityLister.Get(getProvisionerCapabilityName(provisioner))
if err != nil {
if errors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
capabilitySpec := &capability.StorageClassCapabilitySpec{
Features: provisionerCapability.Spec.Features,
}
return capabilitySpec, nil
}
func (c *StorageCapabilityController) getCapabilitySpec(storageClass *storagev1.StorageClass) (*capability.StorageClassCapabilitySpec, error) {
isCsi, err := c.isCSIStorage(storageClass.Provisioner)
if err != nil {
return nil, err
}
var capabilitySpec *capability.StorageClassCapabilitySpec
if isCsi {
capabilitySpec, err = csiCapability(c.csiAddressGetter(storageClass.Provisioner))
} else {
capabilitySpec, err = c.nonCSICapability(storageClass.Provisioner)
}
if err != nil {
return nil, err
}
if capabilitySpec != nil {
capabilitySpec.Provisioner = storageClass.Provisioner
if storageClass.AllowVolumeExpansion == nil || !*storageClass.AllowVolumeExpansion {
capabilitySpec.Features.Volume.Expand = capability.ExpandModeUnknown
}
if !c.snapshotSupported {
capabilitySpec.Features.Snapshot.Create = false
capabilitySpec.Features.Snapshot.List = false
}
}
return capabilitySpec, nil
}
func (c *StorageCapabilityController) isCSIStorage(provisioner string) (bool, error) {
_, err := c.csiDriverLister.Get(provisioner)
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
}
// this is used for test of CSIDriver on windows
func (c *StorageCapabilityController) setCSIAddressGetter(getter csiAddressGetter) {
c.csiAddressGetter = getter
}
func SnapshotSupported(discoveryInterface discovery.DiscoveryInterface) bool {
minVer := version.MustParseGeneric(minSnapshotSupportedVersion)
rawVer, err := discoveryInterface.ServerVersion()
if err != nil {
return false
}
@@ -285,29 +442,10 @@ func (c *StorageCapabilityController) IsValidKubernetesVersion() bool {
return ver.AtLeast(minVer)
}
func (c *StorageCapabilityController) addSpec(spec *crdapi.StorageClassCapabilitySpec, storageClass *v1strorage.StorageClass, snapshotClassCreated bool) error {
csiCapability, err := csiCapability(c.csiAddressGetter(storageClass.Provisioner))
if err != nil {
return err
}
spec.Provisioner = storageClass.Provisioner
spec.Features.Volume = csiCapability.Features.Volume
spec.Features.Topology = csiCapability.Features.Topology
if *storageClass.AllowVolumeExpansion {
spec.Features.Volume.Expand = csiCapability.Features.Volume.Expand
} else {
spec.Features.Volume.Expand = crdapi.ExpandModeUnknown
}
if snapshotClassCreated {
spec.Features.Snapshot = csiCapability.Features.Snapshot
} else {
spec.Features.Snapshot.Create = false
spec.Features.Snapshot.List = false
}
return nil
func csiAddress(provisioner string) string {
return fmt.Sprintf(csiAddressFormat, provisioner)
}
func fileExist(name string) bool {
_, err := os.Stat(name)
return !os.IsNotExist(err)
func getProvisionerCapabilityName(provisioner string) string {
return strings.NewReplacer(".", "-", "/", "-").Replace(provisioner)
}

View File

@@ -20,98 +20,122 @@ package capability
import (
"github.com/google/go-cmp/cmp"
"math/rand"
//"github.com/google/go-cmp/cmp"
snapbeta1 "github.com/kubernetes-csi/external-snapshotter/v2/pkg/apis/volumesnapshot/v1beta1"
snapfake "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned/fake"
snapinformers "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/informers/externalversions"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
kubeinformers "k8s.io/client-go/informers"
k8sinformers "k8s.io/client-go/informers"
k8sfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
crdv1alpha1 "kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
crdfake "kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
crdinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
ksv1alpha1 "kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
ksfake "kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
ksinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
"reflect"
"testing"
"time"
)
var (
alwaysReady = func() bool { return true }
noReSyncPeriodFunc = func() time.Duration { return 0 }
)
type fixture struct {
t *testing.T
t *testing.T
snapshotSupported bool
// Clients
k8sClient *k8sfake.Clientset
snapshotClassClient *snapfake.Clientset
storageClassCapabilitiesClient *crdfake.Clientset
k8sClient *k8sfake.Clientset
snapshotClassClient *snapfake.Clientset
ksClient *ksfake.Clientset
// Objects from here preload into NewSimpleFake.
storageClassObjects []runtime.Object
snapshotClassObjects []runtime.Object
storageClassCapabilityObjects []runtime.Object
storageObjects []runtime.Object // include StorageClass and CSIDriver
snapshotClassObjects []runtime.Object
capabilityObjects []runtime.Object // include StorageClassCapability and ProvisionerCapability
// Objects to put in the store.
storageClassLister []*storagev1.StorageClass
snapshotClassLister []*snapbeta1.VolumeSnapshotClass
storageClassCapabilityLister []*crdv1alpha1.StorageClassCapability
storageClassCapabilityLister []*ksv1alpha1.StorageClassCapability
provisionerCapabilityLister []*ksv1alpha1.ProvisionerCapability
csiDriverLister []*storagev1beta1.CSIDriver
// Actions expected to happen on the client.
storageClassCapabilitiesActions []core.Action
actions []core.Action
// CSI server
runCSIServer bool
fakeCSIServer *fakeCSIServer
}
func newFixture(t *testing.T) *fixture {
func newFixture(t *testing.T, snapshotSupported bool, runCSIServer bool) *fixture {
return &fixture{
t: t,
t: t,
snapshotSupported: snapshotSupported,
runCSIServer: runCSIServer,
}
}
func (f *fixture) newController() (*StorageCapabilityController, kubeinformers.SharedInformerFactory,
crdinformers.SharedInformerFactory, snapinformers.SharedInformerFactory) {
func (f *fixture) newController() (*StorageCapabilityController,
k8sinformers.SharedInformerFactory,
ksinformers.SharedInformerFactory,
snapinformers.SharedInformerFactory) {
fakeCSIServer, address := newTestCSIServer()
f.fakeCSIServer = fakeCSIServer
f.k8sClient = k8sfake.NewSimpleClientset(f.storageClassObjects...)
f.storageClassCapabilitiesClient = crdfake.NewSimpleClientset(f.storageClassCapabilityObjects...)
f.k8sClient = k8sfake.NewSimpleClientset(f.storageObjects...)
f.ksClient = ksfake.NewSimpleClientset(f.capabilityObjects...)
f.snapshotClassClient = snapfake.NewSimpleClientset(f.snapshotClassObjects...)
k8sI := kubeinformers.NewSharedInformerFactory(f.k8sClient, noReSyncPeriodFunc())
crdI := crdinformers.NewSharedInformerFactory(f.storageClassCapabilitiesClient, noReSyncPeriodFunc())
snapI := snapinformers.NewSharedInformerFactory(f.snapshotClassClient, noReSyncPeriodFunc())
k8sInformers := k8sinformers.NewSharedInformerFactory(f.k8sClient, noReSyncPeriodFunc())
ksInformers := ksinformers.NewSharedInformerFactory(f.ksClient, noReSyncPeriodFunc())
snapshotInformers := snapinformers.NewSharedInformerFactory(f.snapshotClassClient, noReSyncPeriodFunc())
c := NewController(
f.k8sClient,
f.storageClassCapabilitiesClient,
k8sI.Storage().V1().StorageClasses(),
snapI.Snapshot().V1beta1().VolumeSnapshotClasses(),
crdI.Storage().V1alpha1().StorageClassCapabilities(),
func(storageClassProvisioner string) string { return address },
f.ksClient.StorageV1alpha1().StorageClassCapabilities(),
ksInformers.Storage().V1alpha1(),
f.k8sClient.StorageV1().StorageClasses(),
k8sInformers.Storage().V1().StorageClasses(),
f.snapshotSupported,
f.snapshotClassClient.SnapshotV1beta1().VolumeSnapshotClasses(),
snapshotInformers.Snapshot().V1beta1().VolumeSnapshotClasses(),
k8sInformers.Storage().V1beta1().CSIDrivers(),
)
if f.runCSIServer {
port := 30000 + rand.Intn(100)
fakeCSIServer, address := newTestCSIServer(port)
f.fakeCSIServer = fakeCSIServer
c.setCSIAddressGetter(func(storageClassProvisioner string) string { return address })
}
for _, storageClass := range f.storageClassLister {
_ = k8sI.Storage().V1().StorageClasses().Informer().GetIndexer().Add(storageClass)
_ = k8sInformers.Storage().V1().StorageClasses().Informer().GetIndexer().Add(storageClass)
}
for _, csiDriver := range f.csiDriverLister {
_ = k8sInformers.Storage().V1beta1().CSIDrivers().Informer().GetIndexer().Add(csiDriver)
}
for _, snapshotClass := range f.snapshotClassLister {
_ = snapI.Snapshot().V1beta1().VolumeSnapshotClasses().Informer().GetIndexer().Add(snapshotClass)
_ = snapshotInformers.Snapshot().V1beta1().VolumeSnapshotClasses().Informer().GetIndexer().Add(snapshotClass)
}
for _, storageClassCapability := range f.storageClassCapabilityLister {
_ = crdI.Storage().V1alpha1().StorageClassCapabilities().Informer().GetIndexer().Add(storageClassCapability)
_ = ksInformers.Storage().V1alpha1().StorageClassCapabilities().Informer().GetIndexer().Add(storageClassCapability)
}
for _, provisionerCapability := range f.provisionerCapabilityLister {
_ = ksInformers.Storage().V1alpha1().ProvisionerCapabilities().Informer().GetIndexer().Add(provisionerCapability)
}
return c, k8sI, crdI, snapI
return c, k8sInformers, ksInformers, snapshotInformers
}
func (f *fixture) runController(scName string, startInformers bool, expectError bool) {
c, k8sI, crdI, snapI := f.newController()
f.fakeCSIServer.run()
defer f.fakeCSIServer.stop()
if f.runCSIServer {
f.fakeCSIServer.run()
defer f.fakeCSIServer.stop()
}
if startInformers {
stopCh := make(chan struct{})
@@ -120,9 +144,6 @@ func (f *fixture) runController(scName string, startInformers bool, expectError
crdI.Start(stopCh)
snapI.Start(stopCh)
}
c.storageClassSynced = alwaysReady
c.snapshotClassSynced = alwaysReady
c.storageClassCapabilitySynced = alwaysReady
err := c.syncHandler(scName)
if !expectError && err != nil {
@@ -131,13 +152,17 @@ func (f *fixture) runController(scName string, startInformers bool, expectError
f.t.Error("expected error syncing, got nil")
}
actions := filterInformerActions(f.storageClassCapabilitiesClient.Actions())
for i, action := range actions {
if len(f.storageClassCapabilitiesActions) < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.storageClassCapabilitiesActions), actions[i:])
break
}
expectedAction := f.storageClassCapabilitiesActions[i]
var actions []core.Action
actions = append(actions, f.snapshotClassClient.Actions()...)
actions = append(actions, f.k8sClient.Actions()...)
actions = append(actions, f.ksClient.Actions()...)
filerActions := filterInformerActions(actions)
if len(filerActions) != len(f.actions) {
f.t.Errorf("count of actions: differ (-got, +want): %s", cmp.Diff(filerActions, f.actions))
return
}
for i, action := range filerActions {
expectedAction := f.actions[i]
checkAction(expectedAction, action, f.t)
}
}
@@ -146,21 +171,36 @@ func (f *fixture) run(scName string) {
f.runController(scName, true, false)
}
func (f *fixture) expectCreateStorageClassCapabilitiesAction(storageClassCapability *crdv1alpha1.StorageClassCapability) {
f.storageClassCapabilitiesActions = append(f.storageClassCapabilitiesActions, core.NewCreateAction(
func (f *fixture) expectCreateStorageClassCapabilitiesAction(storageClassCapability *ksv1alpha1.StorageClassCapability) {
f.actions = append(f.actions, core.NewCreateAction(
schema.GroupVersionResource{Resource: "storageclasscapabilities"}, storageClassCapability.Namespace, storageClassCapability))
}
func (f *fixture) expectUpdateStorageClassCapabilitiesAction(storageClassCapability *crdv1alpha1.StorageClassCapability) {
f.storageClassCapabilitiesActions = append(f.storageClassCapabilitiesActions, core.NewUpdateAction(
func (f *fixture) expectUpdateStorageClassCapabilitiesAction(storageClassCapability *ksv1alpha1.StorageClassCapability) {
f.actions = append(f.actions, core.NewUpdateAction(
schema.GroupVersionResource{Resource: "storageclasscapabilities"}, storageClassCapability.Namespace, storageClassCapability))
}
func (f *fixture) expectDeleteStorageClassCapabilitiesAction(storageClassCapability *crdv1alpha1.StorageClassCapability) {
f.storageClassCapabilitiesActions = append(f.storageClassCapabilitiesActions, core.NewDeleteAction(
func (f *fixture) expectDeleteStorageClassCapabilitiesAction(storageClassCapability *ksv1alpha1.StorageClassCapability) {
f.actions = append(f.actions, core.NewDeleteAction(
schema.GroupVersionResource{Resource: "storageclasscapabilities"}, storageClassCapability.Namespace, storageClassCapability.Name))
}
func (f *fixture) expectUpdateStorageClassAction(storageClass *storagev1.StorageClass) {
f.actions = append(f.actions, core.NewUpdateAction(
schema.GroupVersionResource{Resource: "storageclasses"}, storageClass.Namespace, storageClass))
}
func (f *fixture) expectCreateSnapshotClassAction(snapshotClass *snapbeta1.VolumeSnapshotClass) {
f.actions = append(f.actions, core.NewCreateAction(
schema.GroupVersionResource{Resource: "volumesnapshotclasses"}, snapshotClass.Namespace, snapshotClass))
}
func (f *fixture) expectDeleteSnapshotClassAction(snapshotClass *snapbeta1.VolumeSnapshotClass) {
f.actions = append(f.actions, core.NewDeleteAction(
schema.GroupVersionResource{Resource: "volumesnapshotclasses"}, snapshotClass.Namespace, snapshotClass.Name))
}
// filterInformerActions filters list and watch actions for testing resources.
// Since list and watch don't change resource state we can filter it to lower
// nose level in our tests.
@@ -179,7 +219,7 @@ func filterInformerActions(actions []core.Action) []core.Action {
// same attached resources
func checkAction(expected, actual core.Action, t *testing.T) {
if !(expected.Matches(actual.GetVerb(), actual.GetResource().Resource) && actual.GetSubresource() == expected.GetSubresource()) {
t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expected, actual)
t.Errorf("\nExpected\n\t%#v\ngot\n\t%#v", expected, actual)
return
}
@@ -233,20 +273,37 @@ func newStorageClass(name string, provisioner string) *storagev1.StorageClass {
}
}
func newStorageClassCapability(storageClass *storagev1.StorageClass) *crdv1alpha1.StorageClassCapability {
storageClassCapability := &crdv1alpha1.StorageClassCapability{}
func newStorageClassCapability(storageClass *storagev1.StorageClass) *ksv1alpha1.StorageClassCapability {
storageClassCapability := &ksv1alpha1.StorageClassCapability{}
storageClassCapability.Name = storageClass.Name
storageClassCapability.Spec = *newStorageClassCapabilitySpec()
storageClassCapability.Spec.Provisioner = storageClass.Provisioner
return storageClassCapability
}
func newProvisionerCapability(storageClass *storagev1.StorageClass) *ksv1alpha1.ProvisionerCapability {
provisionerCapability := &ksv1alpha1.ProvisionerCapability{}
provisionerCapability.Name = getProvisionerCapabilityName(storageClass.Provisioner)
provisionerCapability.Spec.PluginInfo.Name = storageClass.Provisioner
provisionerCapability.Spec.Features = newStorageClassCapabilitySpec().Features
// ProvisionerCapability snapshot is always false
provisionerCapability.Spec.Features.Snapshot.Create = false
return provisionerCapability
}
func newCSIDriver(storageClass *storagev1.StorageClass) *storagev1beta1.CSIDriver {
csiDriver := &storagev1beta1.CSIDriver{}
csiDriver.Name = storageClass.Provisioner
return csiDriver
}
func newSnapshotClass(storageClass *storagev1.StorageClass) *snapbeta1.VolumeSnapshotClass {
return &snapbeta1.VolumeSnapshotClass{
ObjectMeta: v1.ObjectMeta{
Name: storageClass.Name,
},
Driver: storageClass.Provisioner,
Driver: storageClass.Provisioner,
DeletionPolicy: snapbeta1.VolumeSnapshotContentDelete,
}
}
@@ -260,18 +317,22 @@ func getKey(sc *storagev1.StorageClass, t *testing.T) string {
}
func TestCreateStorageClass(t *testing.T) {
fixture := newFixture(t)
fixture := newFixture(t, true, true)
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClassUpdate := storageClass.DeepCopy()
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "true"}
snapshotClass := newSnapshotClass(storageClass)
storageClassCapability := newStorageClassCapability(storageClass)
csiDriver := newCSIDriver(storageClass)
// Objects exist
fixture.storageClassObjects = append(fixture.storageClassObjects, storageClass)
fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
// Action expected
fixture.expectCreateSnapshotClassAction(snapshotClass)
fixture.expectUpdateStorageClassAction(storageClassUpdate)
fixture.expectCreateStorageClassCapabilitiesAction(storageClassCapability)
// Run test
@@ -280,20 +341,26 @@ func TestCreateStorageClass(t *testing.T) {
func TestUpdateStorageClass(t *testing.T) {
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClass.Annotations = map[string]string{annotationSupportSnapshot: "true"}
snapshotClass := newSnapshotClass(storageClass)
storageClassCapabilityUpdate := newStorageClassCapability(storageClass)
storageClassCapability := newStorageClassCapability(storageClass)
//old and new should have deference
storageClassCapability.Spec.Features.Volume.Create = !storageClassCapability.Spec.Features.Volume.Create
csiDriver := newCSIDriver(storageClass)
fixture := newFixture(t)
fixture := newFixture(t, true, true)
// Object exist
fixture.storageClassObjects = append(fixture.storageClassObjects, storageClass)
fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
fixture.storageClassCapabilityObjects = append(fixture.storageClassCapabilityObjects, storageClassCapability)
fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability)
fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability)
// Action expected
fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapability)
fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapabilityUpdate)
// Run test
fixture.run(getKey(storageClass, t))
@@ -304,36 +371,70 @@ func TestDeleteStorageClass(t *testing.T) {
snapshotClass := newSnapshotClass(storageClass)
storageClassCapability := newStorageClassCapability(storageClass)
fixture := newFixture(t)
csiDriver := newCSIDriver(storageClass)
fixture := newFixture(t, true, true)
// Object exist
fixture.storageObjects = append(fixture.storageObjects, csiDriver)
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
fixture.storageClassCapabilityObjects = append(fixture.storageClassCapabilityObjects, storageClassCapability)
fixture.capabilityObjects = append(fixture.capabilityObjects, storageClassCapability)
fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability)
// Action expected
fixture.expectDeleteSnapshotClassAction(snapshotClass)
fixture.expectDeleteStorageClassCapabilitiesAction(storageClassCapability)
// Run test
fixture.run(getKey(storageClass, t))
}
func TestDeleteSnapshotClass(t *testing.T) {
func TestCreateStorageClassNotSupportSnapshot(t *testing.T) {
fixture := newFixture(t, false, true)
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClassUpdate := storageClass.DeepCopy()
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"}
storageClassCapability := newStorageClassCapability(storageClass)
storageClassCapability.Spec.Features.Snapshot.Create = false
storageClassCapability.Spec.Features.Snapshot.List = false
provisionerCapability := newProvisionerCapability(storageClass)
csiDriver := newCSIDriver(storageClass)
fixture := newFixture(t)
// Object exist
fixture.storageClassCapabilityObjects = append(fixture.storageClassCapabilityObjects, storageClassCapability)
fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability)
fixture.storageClassObjects = append(fixture.storageClassObjects, storageClass)
// Objects exist
fixture.storageObjects = append(fixture.storageObjects, storageClass, csiDriver)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.csiDriverLister = append(fixture.csiDriverLister, csiDriver)
fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability)
fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability)
// Action expected
storageClassCapabilityUpdate := storageClassCapability.DeepCopy()
storageClassCapabilityUpdate.Spec.Features.Snapshot.Create = false
storageClassCapabilityUpdate.Spec.Features.Snapshot.List = false
fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapabilityUpdate)
fixture.expectUpdateStorageClassAction(storageClassUpdate)
fixture.expectCreateStorageClassCapabilitiesAction(storageClassCapability)
// Run test
fixture.run(getKey(storageClass, t))
}
func TestCreateStorageClassInTree(t *testing.T) {
// InTree Storage has no snapshot capability
fixture := newFixture(t, true, true)
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClassUpdate := storageClass.DeepCopy()
storageClassUpdate.Annotations = map[string]string{annotationSupportSnapshot: "false"}
storageClassCapability := newStorageClassCapability(storageClass)
storageClassCapability.Spec.Features.Snapshot.Create = false
provisionerCapability := newProvisionerCapability(storageClass)
// Objects exist
fixture.storageObjects = append(fixture.storageObjects, storageClass)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.capabilityObjects = append(fixture.capabilityObjects, provisionerCapability)
fixture.provisionerCapabilityLister = append(fixture.provisionerCapabilityLister, provisionerCapability)
// Action expected
fixture.expectUpdateStorageClassAction(storageClassUpdate)
fixture.expectCreateStorageClassCapabilitiesAction(storageClassCapability)
// Run test
fixture.run(getKey(storageClass, t))

View File

@@ -19,6 +19,7 @@ package capability
import (
"context"
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
@@ -72,9 +73,9 @@ type fakeCSIServer struct {
server *grpc.Server
}
func newTestCSIServer() (csiServer *fakeCSIServer, address string) {
func newTestCSIServer(port int) (csiServer *fakeCSIServer, address string) {
if runtime.GOOS == "windows" {
address = "localhost:38886"
address = fmt.Sprintf("localhost:%d", +port)
csiServer = newFakeCSIServer("tcp", address)
} else {
address = filepath.Join(os.TempDir(), "csi.sock"+rand.String(4))
@@ -151,7 +152,7 @@ func (*fakeCSIServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
}
func Test_CSICapability(t *testing.T) {
fakeCSIServer, address := newTestCSIServer()
fakeCSIServer, address := newTestCSIServer(30087)
fakeCSIServer.run()
defer fakeCSIServer.stop()
@@ -168,9 +169,9 @@ func Test_CSICapability(t *testing.T) {
func newStorageClassCapabilitySpec() *v1alpha1.StorageClassCapabilitySpec {
return &v1alpha1.StorageClassCapabilitySpec{
Features: v1alpha1.StorageClassCapabilitySpecFeatures{
Features: v1alpha1.CapabilityFeatures{
Topology: false,
Volume: v1alpha1.StorageClassCapabilitySpecFeaturesVolume{
Volume: v1alpha1.VolumeFeature{
Create: true,
Attach: false,
List: false,
@@ -178,7 +179,7 @@ func newStorageClassCapabilitySpec() *v1alpha1.StorageClassCapabilitySpec {
Stats: true,
Expand: v1alpha1.ExpandModeOffline,
},
Snapshot: v1alpha1.StorageClassCapabilitySpecFeaturesSnapshot{
Snapshot: v1alpha1.SnapshotFeature{
Create: true,
List: false,
},