Merge pull request #2149 from min-zh/snapshot

storage capability
This commit is contained in:
KubeSphere CI Bot
2020-06-04 12:47:37 +08:00
committed by GitHub
84 changed files with 19178 additions and 110 deletions

View File

@@ -0,0 +1,313 @@
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package capability
import (
"fmt"
"os"
"reflect"
"time"
snapapi "github.com/kubernetes-csi/external-snapshotter/v2/pkg/apis/volumesnapshot/v1beta1"
snapinformers "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/informers/externalversions/volumesnapshot/v1beta1"
snaplisters "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/listers/volumesnapshot/v1beta1"
v1strorage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
scinformers "k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
sclisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
crdapi "kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
clientset "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
crdscheme "kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme"
storageinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/storage/v1alpha1"
crdlisters "kubesphere.io/kubesphere/pkg/client/listers/storage/v1alpha1"
)
const (
minKubernetesVersion = "v1.17.0"
CSIAddressFormat = "/var/lib/kubelet/plugins/%s/csi.sock"
)
type csiAddressGetter func(storageClassProvisioner string) string
type StorageCapabilityController struct {
k8sClient kubernetes.Interface
storageClassCapabilityClient clientset.Interface
storageClassLister sclisters.StorageClassLister
storageClassSynced cache.InformerSynced
snapshotClassLister snaplisters.VolumeSnapshotClassLister
snapshotClassSynced cache.InformerSynced
storageClassCapabilityLister crdlisters.StorageClassCapabilityLister
storageClassCapabilitySynced cache.InformerSynced
workQueue workqueue.RateLimitingInterface
csiAddressGetter csiAddressGetter
}
// This controller is responsible to watch StorageClass, SnapshotClass.
// And then update StorageClassCapability CRD resource object to the newest status.
func NewController(
k8sClient kubernetes.Interface,
storageClassCapabilityClient clientset.Interface,
storageClassInformer scinformers.StorageClassInformer,
snapshotClassInformer snapinformers.VolumeSnapshotClassInformer,
storageClassCapabilityInformer storageinformers.StorageClassCapabilityInformer,
csiAddressGetter csiAddressGetter,
) *StorageCapabilityController {
utilruntime.Must(crdscheme.AddToScheme(scheme.Scheme))
controller := &StorageCapabilityController{
k8sClient: k8sClient,
storageClassCapabilityClient: storageClassCapabilityClient,
storageClassLister: storageClassInformer.Lister(),
storageClassSynced: storageClassInformer.Informer().HasSynced,
snapshotClassLister: snapshotClassInformer.Lister(),
snapshotClassSynced: snapshotClassInformer.Informer().HasSynced,
storageClassCapabilityLister: storageClassCapabilityInformer.Lister(),
storageClassCapabilitySynced: storageClassCapabilityInformer.Informer().HasSynced,
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "StorageClasses"),
csiAddressGetter: csiAddressGetter,
}
storageClassInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueStorageClass,
UpdateFunc: func(old, new interface{}) {
newStorageClass := new.(*v1strorage.StorageClass)
oldStorageClass := old.(*v1strorage.StorageClass)
if newStorageClass.ResourceVersion == oldStorageClass.ResourceVersion {
return
}
controller.enqueueStorageClass(newStorageClass)
},
DeleteFunc: controller.enqueueStorageClass,
})
snapshotClassInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueSnapshotClass,
UpdateFunc: func(old, new interface{}) {
return
},
DeleteFunc: controller.enqueueSnapshotClass,
})
return controller
}
func (c *StorageCapabilityController) Start(stopCh <-chan struct{}) error {
return c.Run(5, stopCh)
}
func (c *StorageCapabilityController) Run(threadCnt int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workQueue.ShutDown()
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.storageClassSynced, c.snapshotClassSynced, c.storageClassCapabilitySynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < threadCnt; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
func (c *StorageCapabilityController) enqueueStorageClass(obj interface{}) {
storageClass := obj.(*v1strorage.StorageClass)
if !fileExist(c.csiAddressGetter(storageClass.Provisioner)) {
klog.V(4).Infof("CSI address of storage class: %s, provisioner :%s not exist", storageClass.Name, storageClass.Provisioner)
return
}
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workQueue.Add(key)
}
func (c *StorageCapabilityController) enqueueSnapshotClass(obj interface{}) {
snapshotClass := obj.(*snapapi.VolumeSnapshotClass)
if !fileExist(c.csiAddressGetter(snapshotClass.Driver)) {
klog.V(4).Infof("CSI address of snapshot class: %s, driver:%s not exist", snapshotClass.Name, snapshotClass.Driver)
return
}
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workQueue.Add(key)
}
func (c *StorageCapabilityController) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *StorageCapabilityController) processNextWorkItem() bool {
obj, shutdown := c.workQueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workQueue but got %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
c.workQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workQueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// When creating a new storage class, the controller will create a new storage capability object.
// When updating storage class, the controller will update or create the storage capability object.
// When deleting storage class, the controller will delete storage capability object.
func (c *StorageCapabilityController) syncHandler(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get StorageClass
storageClass, err := c.storageClassLister.Get(name)
klog.V(4).Infof("Get storageClass %s: entity %v", name, storageClass)
if err != nil {
if errors.IsNotFound(err) {
_, err = c.storageClassCapabilityLister.Get(name)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return c.storageClassCapabilityClient.StorageV1alpha1().StorageClassCapabilities().Delete(name, &metav1.DeleteOptions{})
}
return err
}
// Get SnapshotClass
snapshotClassCreated := true
_, err = c.snapshotClassLister.Get(storageClass.Name)
if err != nil {
if errors.IsNotFound(err) {
snapshotClassCreated = false
} else {
return err
}
}
// Get exist StorageClassCapability
storageClassCapabilityExist, err := c.storageClassCapabilityLister.Get(storageClass.Name)
if errors.IsNotFound(err) {
// If the resource doesn't exist, we'll create it
klog.V(4).Infof("Create StorageClassProvisioner %s", storageClass.GetName())
storageClassCapabilityCreate := &crdapi.StorageClassCapability{ObjectMeta: metav1.ObjectMeta{Name: storageClass.Name}}
err = c.addSpec(&storageClassCapabilityCreate.Spec, storageClass, snapshotClassCreated)
if err != nil {
return err
}
klog.V(4).Info("Create StorageClassCapability: ", storageClassCapabilityCreate)
_, err = c.storageClassCapabilityClient.StorageV1alpha1().StorageClassCapabilities().Create(storageClassCapabilityCreate)
return err
}
if err != nil {
return err
}
// If the resource exist, we can update it.
storageClassCapabilityUpdate := storageClassCapabilityExist.DeepCopy()
err = c.addSpec(&storageClassCapabilityUpdate.Spec, storageClass, snapshotClassCreated)
if err != nil {
return err
}
if !reflect.DeepEqual(storageClassCapabilityExist, storageClassCapabilityUpdate) {
klog.V(4).Info("Update StorageClassCapability: ", storageClassCapabilityUpdate)
_, err = c.storageClassCapabilityClient.StorageV1alpha1().StorageClassCapabilities().Update(storageClassCapabilityUpdate)
return err
}
return nil
}
func (c *StorageCapabilityController) IsValidKubernetesVersion() bool {
minVer := version.MustParseGeneric(minKubernetesVersion)
rawVer, err := c.k8sClient.Discovery().ServerVersion()
if err != nil {
return false
}
ver, err := version.ParseSemantic(rawVer.String())
if err != nil {
return false
}
return ver.AtLeast(minVer)
}
func (c *StorageCapabilityController) addSpec(spec *crdapi.StorageClassCapabilitySpec, storageClass *v1strorage.StorageClass, snapshotClassCreated bool) error {
csiCapability, err := csiCapability(c.csiAddressGetter(storageClass.Provisioner))
if err != nil {
return err
}
spec.Provisioner = storageClass.Provisioner
spec.Features.Volume = csiCapability.Features.Volume
spec.Features.Topology = csiCapability.Features.Topology
if *storageClass.AllowVolumeExpansion {
spec.Features.Volume.Expand = csiCapability.Features.Volume.Expand
} else {
spec.Features.Volume.Expand = crdapi.ExpandModeUnknown
}
if snapshotClassCreated {
spec.Features.Snapshot = csiCapability.Features.Snapshot
} else {
spec.Features.Snapshot.Create = false
spec.Features.Snapshot.List = false
}
return nil
}
func fileExist(name string) bool {
_, err := os.Stat(name)
return !os.IsNotExist(err)
}

View File

@@ -0,0 +1,340 @@
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package capability
import (
"github.com/google/go-cmp/cmp"
snapbeta1 "github.com/kubernetes-csi/external-snapshotter/v2/pkg/apis/volumesnapshot/v1beta1"
snapfake "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned/fake"
snapinformers "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/informers/externalversions"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
kubeinformers "k8s.io/client-go/informers"
k8sfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
crdv1alpha1 "kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
crdfake "kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
crdinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
"reflect"
"testing"
"time"
)
var (
alwaysReady = func() bool { return true }
noReSyncPeriodFunc = func() time.Duration { return 0 }
)
type fixture struct {
t *testing.T
// Clients
k8sClient *k8sfake.Clientset
snapshotClassClient *snapfake.Clientset
storageClassCapabilitiesClient *crdfake.Clientset
// Objects from here preload into NewSimpleFake.
storageClassObjects []runtime.Object
snapshotClassObjects []runtime.Object
storageClassCapabilityObjects []runtime.Object
// Objects to put in the store.
storageClassLister []*storagev1.StorageClass
snapshotClassLister []*snapbeta1.VolumeSnapshotClass
storageClassCapabilityLister []*crdv1alpha1.StorageClassCapability
// Actions expected to happen on the client.
storageClassCapabilitiesActions []core.Action
// CSI server
fakeCSIServer *fakeCSIServer
}
func newFixture(t *testing.T) *fixture {
return &fixture{
t: t,
}
}
func (f *fixture) newController() (*StorageCapabilityController, kubeinformers.SharedInformerFactory,
crdinformers.SharedInformerFactory, snapinformers.SharedInformerFactory) {
fakeCSIServer, address := newTestCSIServer()
f.fakeCSIServer = fakeCSIServer
f.k8sClient = k8sfake.NewSimpleClientset(f.storageClassObjects...)
f.storageClassCapabilitiesClient = crdfake.NewSimpleClientset(f.storageClassCapabilityObjects...)
f.snapshotClassClient = snapfake.NewSimpleClientset(f.snapshotClassObjects...)
k8sI := kubeinformers.NewSharedInformerFactory(f.k8sClient, noReSyncPeriodFunc())
crdI := crdinformers.NewSharedInformerFactory(f.storageClassCapabilitiesClient, noReSyncPeriodFunc())
snapI := snapinformers.NewSharedInformerFactory(f.snapshotClassClient, noReSyncPeriodFunc())
c := NewController(
f.k8sClient,
f.storageClassCapabilitiesClient,
k8sI.Storage().V1().StorageClasses(),
snapI.Snapshot().V1beta1().VolumeSnapshotClasses(),
crdI.Storage().V1alpha1().StorageClassCapabilities(),
func(storageClassProvisioner string) string { return address },
)
for _, storageClass := range f.storageClassLister {
_ = k8sI.Storage().V1().StorageClasses().Informer().GetIndexer().Add(storageClass)
}
for _, snapshotClass := range f.snapshotClassLister {
_ = snapI.Snapshot().V1beta1().VolumeSnapshotClasses().Informer().GetIndexer().Add(snapshotClass)
}
for _, storageClassCapability := range f.storageClassCapabilityLister {
_ = crdI.Storage().V1alpha1().StorageClassCapabilities().Informer().GetIndexer().Add(storageClassCapability)
}
return c, k8sI, crdI, snapI
}
func (f *fixture) runController(scName string, startInformers bool, expectError bool) {
c, k8sI, crdI, snapI := f.newController()
f.fakeCSIServer.run()
defer f.fakeCSIServer.stop()
if startInformers {
stopCh := make(chan struct{})
defer close(stopCh)
k8sI.Start(stopCh)
crdI.Start(stopCh)
snapI.Start(stopCh)
}
c.storageClassSynced = alwaysReady
c.snapshotClassSynced = alwaysReady
c.storageClassCapabilitySynced = alwaysReady
err := c.syncHandler(scName)
if !expectError && err != nil {
f.t.Errorf("error syncing: %v", err)
} else if expectError && err == nil {
f.t.Error("expected error syncing, got nil")
}
actions := filterInformerActions(f.storageClassCapabilitiesClient.Actions())
for i, action := range actions {
if len(f.storageClassCapabilitiesActions) < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.storageClassCapabilitiesActions), actions[i:])
break
}
expectedAction := f.storageClassCapabilitiesActions[i]
checkAction(expectedAction, action, f.t)
}
}
func (f *fixture) run(scName string) {
f.runController(scName, true, false)
}
func (f *fixture) expectCreateStorageClassCapabilitiesAction(storageClassCapability *crdv1alpha1.StorageClassCapability) {
f.storageClassCapabilitiesActions = append(f.storageClassCapabilitiesActions, core.NewCreateAction(
schema.GroupVersionResource{Resource: "storageclasscapabilities"}, storageClassCapability.Namespace, storageClassCapability))
}
func (f *fixture) expectUpdateStorageClassCapabilitiesAction(storageClassCapability *crdv1alpha1.StorageClassCapability) {
f.storageClassCapabilitiesActions = append(f.storageClassCapabilitiesActions, core.NewUpdateAction(
schema.GroupVersionResource{Resource: "storageclasscapabilities"}, storageClassCapability.Namespace, storageClassCapability))
}
func (f *fixture) expectDeleteStorageClassCapabilitiesAction(storageClassCapability *crdv1alpha1.StorageClassCapability) {
f.storageClassCapabilitiesActions = append(f.storageClassCapabilitiesActions, core.NewDeleteAction(
schema.GroupVersionResource{Resource: "storageclasscapabilities"}, storageClassCapability.Namespace, storageClassCapability.Name))
}
// filterInformerActions filters list and watch actions for testing resources.
// Since list and watch don't change resource state we can filter it to lower
// nose level in our tests.
func filterInformerActions(actions []core.Action) []core.Action {
var ret []core.Action
for _, action := range actions {
if action.GetVerb() == "list" || action.GetVerb() == "watch" {
continue
}
ret = append(ret, action)
}
return ret
}
// checkAction verifies that expected and actual actions are equal and both have
// same attached resources
func checkAction(expected, actual core.Action, t *testing.T) {
if !(expected.Matches(actual.GetVerb(), actual.GetResource().Resource) && actual.GetSubresource() == expected.GetSubresource()) {
t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expected, actual)
return
}
if reflect.TypeOf(actual) != reflect.TypeOf(expected) {
t.Errorf("Action has wrong type. Expected: %t. Got: %t", expected, actual)
return
}
switch a := actual.(type) {
case core.CreateActionImpl:
e, _ := expected.(core.CreateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if difference := cmp.Diff(object, expObject); len(difference) > 0 {
t.Errorf("[CreateAction] %T differ (-got, +want): %s", expObject, difference)
}
case core.UpdateActionImpl:
e, _ := expected.(core.UpdateActionImpl)
expObject := e.GetObject()
object := a.GetObject()
if difference := cmp.Diff(object, expObject); len(difference) > 0 {
t.Errorf("[UpdateAction] %T differ (-got, +want): %s", expObject, difference)
}
case core.PatchActionImpl:
e, _ := expected.(core.PatchActionImpl)
expPatch := e.GetPatch()
patch := a.GetPatch()
if !reflect.DeepEqual(expPatch, patch) {
t.Errorf("Action %s %s has wrong patch\nDiff:\n %s",
a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintSideBySide(expPatch, patch))
}
case core.DeleteActionImpl:
e, _ := expected.(core.DeleteActionImpl)
if difference := cmp.Diff(e.Name, a.Name); len(difference) > 0 {
t.Errorf("[UpdateAction] %T differ (-got, +want): %s", e.Name, difference)
}
default:
t.Errorf("Uncaptured Action %s %s, you should explicitly add a case to capture it",
actual.GetVerb(), actual.GetResource().Resource)
}
}
func newStorageClass(name string, provisioner string) *storagev1.StorageClass {
isExpansion := true
return &storagev1.StorageClass{
ObjectMeta: v1.ObjectMeta{
Name: name,
},
Provisioner: provisioner,
AllowVolumeExpansion: &isExpansion,
}
}
func newStorageClassCapability(storageClass *storagev1.StorageClass) *crdv1alpha1.StorageClassCapability {
storageClassCapability := &crdv1alpha1.StorageClassCapability{}
storageClassCapability.Name = storageClass.Name
storageClassCapability.Spec = *newStorageClassCapabilitySpec()
storageClassCapability.Spec.Provisioner = storageClass.Provisioner
return storageClassCapability
}
func newSnapshotClass(storageClass *storagev1.StorageClass) *snapbeta1.VolumeSnapshotClass {
return &snapbeta1.VolumeSnapshotClass{
ObjectMeta: v1.ObjectMeta{
Name: storageClass.Name,
},
Driver: storageClass.Provisioner,
}
}
func getKey(sc *storagev1.StorageClass, t *testing.T) string {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(sc)
if err != nil {
t.Errorf("Unexpected error getting key for %v: %v", sc.Name, err)
return ""
}
return key
}
func TestCreateStorageClass(t *testing.T) {
fixture := newFixture(t)
storageClass := newStorageClass("csi-example", "csi.example.com")
snapshotClass := newSnapshotClass(storageClass)
storageClassCapability := newStorageClassCapability(storageClass)
// Objects exist
fixture.storageClassObjects = append(fixture.storageClassObjects, storageClass)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
// Action expected
fixture.expectCreateStorageClassCapabilitiesAction(storageClassCapability)
// Run test
fixture.run(getKey(storageClass, t))
}
func TestUpdateStorageClass(t *testing.T) {
storageClass := newStorageClass("csi-example", "csi.example.com")
snapshotClass := newSnapshotClass(storageClass)
storageClassCapability := newStorageClassCapability(storageClass)
fixture := newFixture(t)
// Object exist
fixture.storageClassObjects = append(fixture.storageClassObjects, storageClass)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
fixture.storageClassCapabilityObjects = append(fixture.storageClassCapabilityObjects, storageClassCapability)
fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability)
// Action expected
fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapability)
// Run test
fixture.run(getKey(storageClass, t))
}
func TestDeleteStorageClass(t *testing.T) {
storageClass := newStorageClass("csi-example", "csi.example.com")
snapshotClass := newSnapshotClass(storageClass)
storageClassCapability := newStorageClassCapability(storageClass)
fixture := newFixture(t)
// Object exist
fixture.snapshotClassObjects = append(fixture.snapshotClassObjects, snapshotClass)
fixture.snapshotClassLister = append(fixture.snapshotClassLister, snapshotClass)
fixture.storageClassCapabilityObjects = append(fixture.storageClassCapabilityObjects, storageClassCapability)
fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability)
// Action expected
fixture.expectDeleteStorageClassCapabilitiesAction(storageClassCapability)
// Run test
fixture.run(getKey(storageClass, t))
}
func TestDeleteSnapshotClass(t *testing.T) {
storageClass := newStorageClass("csi-example", "csi.example.com")
storageClassCapability := newStorageClassCapability(storageClass)
fixture := newFixture(t)
// Object exist
fixture.storageClassCapabilityObjects = append(fixture.storageClassCapabilityObjects, storageClassCapability)
fixture.storageClassCapabilityLister = append(fixture.storageClassCapabilityLister, storageClassCapability)
fixture.storageClassObjects = append(fixture.storageClassObjects, storageClass)
fixture.storageClassLister = append(fixture.storageClassLister, storageClass)
// Action expected
storageClassCapabilityUpdate := storageClassCapability.DeepCopy()
storageClassCapabilityUpdate.Spec.Features.Snapshot.Create = false
storageClassCapabilityUpdate.Spec.Features.Snapshot.List = false
fixture.expectUpdateStorageClassCapabilitiesAction(storageClassCapabilityUpdate)
// Run test
fixture.run(getKey(storageClass, t))
}

View File

@@ -0,0 +1,166 @@
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package capability
import (
"context"
"errors"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/keepalive"
"kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
"net"
"net/url"
"time"
)
const (
dialDuration = time.Second * 5
requestDuration = time.Second * 10
)
func csiCapability(csiAddress string) (*v1alpha1.StorageClassCapabilitySpec, error) {
csiConn, err := connect(csiAddress)
if err != nil {
return nil, err
}
defer func() { _ = csiConn.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), requestDuration)
defer cancel()
spec := &v1alpha1.StorageClassCapabilitySpec{}
err = addPluginCapabilities(ctx, csiConn, spec)
if err != nil {
return nil, err
}
err = addControllerCapabilities(ctx, csiConn, spec)
if err != nil {
return nil, err
}
err = addNodeCapabilities(ctx, csiConn, spec)
if err != nil {
return nil, err
}
return spec, nil
}
func addPluginCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error {
identityClient := csi.NewIdentityClient(conn)
pluginCapabilitiesResponse, err := identityClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{})
if err != nil {
return err
}
for _, capability := range pluginCapabilitiesResponse.GetCapabilities() {
if capability == nil {
continue
}
if capability.GetService().GetType() == csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS {
spec.Features.Topology = true
}
volumeExpansion := capability.GetVolumeExpansion()
if volumeExpansion != nil {
switch volumeExpansion.GetType() {
case csi.PluginCapability_VolumeExpansion_ONLINE:
spec.Features.Volume.Expand = v1alpha1.ExpandModeOnline
case csi.PluginCapability_VolumeExpansion_OFFLINE:
spec.Features.Volume.Expand = v1alpha1.ExpandModeOffline
}
}
}
return nil
}
func addControllerCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error {
controllerClient := csi.NewControllerClient(conn)
controllerCapabilitiesResponse, err := controllerClient.ControllerGetCapabilities(ctx, &csi.ControllerGetCapabilitiesRequest{})
if err != nil {
return err
}
for _, capability := range controllerCapabilitiesResponse.GetCapabilities() {
switch capability.GetRpc().GetType() {
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME:
spec.Features.Volume.Create = true
case csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME:
spec.Features.Volume.Attach = true
case csi.ControllerServiceCapability_RPC_LIST_VOLUMES:
spec.Features.Volume.List = true
case csi.ControllerServiceCapability_RPC_CLONE_VOLUME:
spec.Features.Volume.Clone = true
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT:
spec.Features.Snapshot.Create = true
case csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS:
spec.Features.Snapshot.List = true
}
}
return nil
}
func addNodeCapabilities(ctx context.Context, conn *grpc.ClientConn, spec *v1alpha1.StorageClassCapabilitySpec) error {
nodeClient := csi.NewNodeClient(conn)
controllerCapabilitiesResponse, err := nodeClient.NodeGetCapabilities(ctx, &csi.NodeGetCapabilitiesRequest{})
if err != nil {
return err
}
for _, capability := range controllerCapabilitiesResponse.GetCapabilities() {
switch capability.GetRpc().GetType() {
case csi.NodeServiceCapability_RPC_GET_VOLUME_STATS:
spec.Features.Volume.Stats = true
}
}
return nil
}
// Connect address by GRPC
func connect(address string) (*grpc.ClientConn, error) {
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
}
u, err := url.Parse(address)
if err == nil && (!u.IsAbs() || u.Scheme == "unix") {
dialOptions = append(dialOptions,
grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", u.Path, timeout)
}))
}
// This is necessary when connecting via TCP and does not hurt
// when using Unix domain sockets. It ensures that gRPC detects a dead connection
// in a timely manner.
dialOptions = append(dialOptions,
grpc.WithKeepaliveParams(keepalive.ClientParameters{PermitWithoutStream: true}))
conn, err := grpc.Dial(address, dialOptions...)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), dialDuration)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
return conn, errors.New("connection timed out")
}
if conn.GetState() == connectivity.Ready {
return conn, nil
}
}
}

View File

@@ -0,0 +1,187 @@
/*
Copyright 2020 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package capability
import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apis/storage/v1alpha1"
"net"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
)
var DefaultControllerRPCType = []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
}
var DefaultNodeRPCType = []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
}
var DefaultPluginCapability = []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
{
Type: &csi.PluginCapability_VolumeExpansion_{
VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
Type: csi.PluginCapability_VolumeExpansion_OFFLINE,
},
},
},
}
type fakeCSIServer struct {
csi.UnimplementedIdentityServer
csi.UnimplementedControllerServer
csi.UnimplementedNodeServer
network string
address string
server *grpc.Server
}
func newTestCSIServer() (csiServer *fakeCSIServer, address string) {
if runtime.GOOS == "windows" {
address = "localhost:38886"
csiServer = newFakeCSIServer("tcp", address)
} else {
address = filepath.Join(os.TempDir(), "csi.sock"+rand.String(4))
csiServer = newFakeCSIServer("unix", address)
address = "unix://" + address
}
return csiServer, address
}
func newFakeCSIServer(network, address string) *fakeCSIServer {
return &fakeCSIServer{
network: network,
address: address,
}
}
func (f *fakeCSIServer) run() {
listener, err := net.Listen(f.network, f.address)
if err != nil {
klog.Error("fake CSI server listen failed, ", err)
return
}
server := grpc.NewServer()
csi.RegisterIdentityServer(server, f)
csi.RegisterControllerServer(server, f)
csi.RegisterNodeServer(server, f)
go func() {
err = server.Serve(listener)
if err != nil && !strings.Contains(err.Error(), "stopped") {
klog.Error("fake CSI server serve failed, ", err)
}
}()
f.server = server
}
func (f *fakeCSIServer) stop() {
if f.server != nil {
f.server.Stop()
}
}
func (*fakeCSIServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{Capabilities: DefaultPluginCapability}, nil
}
func (*fakeCSIServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
var capabilities []*csi.ControllerServiceCapability
for _, rpcType := range DefaultControllerRPCType {
capability := &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: rpcType,
},
},
}
capabilities = append(capabilities, capability)
}
return &csi.ControllerGetCapabilitiesResponse{Capabilities: capabilities}, nil
}
func (*fakeCSIServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
var capabilities []*csi.NodeServiceCapability
for _, rpcType := range DefaultNodeRPCType {
capability := &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: rpcType,
},
},
}
capabilities = append(capabilities, capability)
}
return &csi.NodeGetCapabilitiesResponse{Capabilities: capabilities}, nil
}
func Test_CSICapability(t *testing.T) {
fakeCSIServer, address := newTestCSIServer()
fakeCSIServer.run()
defer fakeCSIServer.stop()
specGot, err := csiCapability(address)
if err != nil {
t.Error(err)
}
specExpected := newStorageClassCapabilitySpec()
if diff := cmp.Diff(specGot, specExpected); diff != "" {
t.Errorf("%T differ (-got, +want): %s", specExpected, diff)
}
}
func newStorageClassCapabilitySpec() *v1alpha1.StorageClassCapabilitySpec {
return &v1alpha1.StorageClassCapabilitySpec{
Features: v1alpha1.StorageClassCapabilitySpecFeatures{
Topology: false,
Volume: v1alpha1.StorageClassCapabilitySpecFeaturesVolume{
Create: true,
Attach: false,
List: false,
Clone: true,
Stats: true,
Expand: v1alpha1.ExpandModeOffline,
},
Snapshot: v1alpha1.StorageClassCapabilitySpecFeaturesSnapshot{
Create: true,
List: false,
},
},
}
}