Files
kubesphere/pkg/controller/storage/expansion/expansion_controller.go
Xin Wang 55483e6578 add expand volume controller
Signed-off-by: Xin Wang <wileywang@yunify.com>
2019-09-16 14:20:07 +08:00

506 lines
16 KiB
Go

/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package expansion
import (
"errors"
"fmt"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsv1informers "k8s.io/client-go/informers/apps/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
storagev1informer "k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
listerappsv1 "k8s.io/client-go/listers/apps/v1"
listercorev1 "k8s.io/client-go/listers/core/v1"
listerstoragev1 "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"time"
)
const controllerAgentName = "expansion-controller"
var supportedProvisioner = []string{"disk.csi.qingcloud.com", "csi-qingcloud"}
var retryTime = wait.Backoff{
Duration: 1 * time.Second,
Factor: 2,
Steps: 12,
}
type VolumeExpansionController struct {
kubeclientset kubernetes.Interface
pvcLister listercorev1.PersistentVolumeClaimLister
pvcSynced cache.InformerSynced
classLister listerstoragev1.StorageClassLister
classSynced cache.InformerSynced
podLister listercorev1.PodLister
podSynced cache.InformerSynced
deployLister listerappsv1.DeploymentLister
deploySynced cache.InformerSynced
rsLister listerappsv1.ReplicaSetLister
rsSynced cache.InformerSynced
stsLister listerappsv1.StatefulSetLister
stsSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
// NewController returns a new volume expansion controller
func NewVolumeExpansionController(
kubeclientset kubernetes.Interface,
pvcInformer corev1informers.PersistentVolumeClaimInformer,
classInformer storagev1informer.StorageClassInformer,
podInformer corev1informers.PodInformer,
deployInformer appsv1informers.DeploymentInformer,
rsInformer appsv1informers.ReplicaSetInformer,
stsInformer appsv1informers.StatefulSetInformer) *VolumeExpansionController {
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &VolumeExpansionController{
kubeclientset: kubeclientset,
pvcLister: pvcInformer.Lister(),
pvcSynced: pvcInformer.Informer().HasSynced,
classLister: classInformer.Lister(),
classSynced: classInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podSynced: podInformer.Informer().HasSynced,
deployLister: deployInformer.Lister(),
deploySynced: deployInformer.Informer().HasSynced,
rsLister: rsInformer.Lister(),
rsSynced: rsInformer.Informer().HasSynced,
stsLister: stsInformer.Lister(),
stsSynced: stsInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "expansion"),
recorder: recorder,
}
klog.V(2).Info("Setting up event handlers")
pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueuePVC,
UpdateFunc: func(old, new interface{}) {
oldPVC, ok := old.(*corev1.PersistentVolumeClaim)
if !ok {
return
}
oldSize := oldPVC.Spec.Resources.Requests[corev1.ResourceStorage]
newPVC, ok := new.(*corev1.PersistentVolumeClaim)
if !ok {
return
}
newSize := newPVC.Spec.Resources.Requests[corev1.ResourceStorage]
if newSize.Cmp(oldSize) > 0 && newSize.Cmp(newPVC.Status.Capacity[corev1.ResourceStorage]) > 0 {
controller.handleObject(new)
}
},
DeleteFunc: controller.enqueuePVC,
})
return controller
}
func (c *VolumeExpansionController) Start(stopCh <-chan struct{}) error {
return c.Run(5, stopCh)
}
func (c *VolumeExpansionController) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
klog.V(2).Info("Starting expand volume controller")
klog.V(2).Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.pvcSynced, c.classSynced, c.podSynced, c.deploySynced, c.rsSynced,
c.stsSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.V(2).Info("Starting workers")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.V(2).Info("Started workers")
<-stopCh
klog.V(2).Info("Shutting down workers")
return nil
}
func (c *VolumeExpansionController) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *VolumeExpansionController) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workqueue.Forget(obj)
klog.V(2).Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// syncHandler will re-attach PVC on workloads.
// Step 1. Find the workload (deployment or statefulset) mounting PVC.
// If more than one workloads mounts the same PVC, the controller will not
// do anything.
// Step 2. Verify workload types.
// Step 3. Scale down workload.
// Step 4. Retry to check PVC status.
// Step 5. Scale up workload.
func (c *VolumeExpansionController) syncHandler(key string) error {
klog.V(5).Infof("syncHandler: handle %s", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get PVC source
pvc, err := c.pvcLister.PersistentVolumeClaims(namespace).Get(name)
if err != nil {
// The PVC resource may no longer exist, in which case we stop processing.
if apierrors.IsNotFound(err) {
klog.V(4).Infof("PVC '%s' in work queue no longer exists", key)
utilruntime.HandleError(fmt.Errorf("PVC '%s' in work queue no longer exists", key))
return nil
}
return err
}
// find workload
workload, err := c.findWorkload(name, namespace)
if err != nil {
return err
}
if workload == nil {
klog.V(4).Infof("Cannot find any Pods mounting PVC %s", key)
return nil
}
klog.V(5).Infof("Find workload %T pvc name %s", workload, pvc.GetName())
// handle supported workload
switch workload.(type) {
case *appsv1.StatefulSet:
sts := workload.(*appsv1.StatefulSet)
klog.V(5).Infof("Find StatefulSet %s", sts.GetName())
case *appsv1.Deployment:
deploy := workload.(*appsv1.Deployment)
klog.V(5).Infof("Find Deployment %s", deploy.GetName())
default:
klog.Errorf("Unsupported workload type %T", workload)
return nil
}
// Scale workload to 0
if err = c.scaleDown(workload, pvc.GetNamespace()); err != nil {
klog.V(2).Infof("scale down PVC %s mounted workloads failed %s", key, err.Error())
return err
}
klog.V(2).Infof("Scale down PVC %s mounted workloads succeed", key)
// Wait to scale up
err = retry.RetryOnConflict(retryTime, func() error {
klog.V(4).Info("waiting for PVC filesystem expansion")
if !c.isWaitingScaleUp(name, namespace) {
return apierrors.NewConflict(schema.GroupResource{Resource: "PersistentVolumeClaim"}, key,
errors.New("waiting for scaling down and expanding disk"))
}
return nil
})
klog.V(5).Info("after waiting")
if err != nil {
klog.Errorf("Waiting timeout, error: %s", err.Error())
}
// Scale up
if err = c.scaleUp(workload, namespace); err != nil {
klog.V(2).Infof("Scale up PVC %s mounted workloads failed %s", key, err.Error())
return err
}
klog.V(2).Infof("Scale up PVC %s mounted workloads succeed", key)
return nil
}
// handleObject will take any resource implementing metav1.Object and attempt
// to find the PVC resource that 'owns' it. It does this by looking at the
// StorageClass of PVC whether supporting provisioner and allowing volume
// expansion.
// In KS 2.1, the controller only supports disk.csi.qingcloud.com and
// csi-qingcloud as storageclass provisioner.
func (c *VolumeExpansionController) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}
klog.V(4).Infof("Processing object: %s", object.GetName())
pvc := obj.(*corev1.PersistentVolumeClaim)
// Check storage class
// In KS 2.1, we only support disk.csi.qingcloud.com as storageclass provisioner.
class := c.getStorageClass(pvc)
klog.V(4).Infof("Get PVC %s SC was %s", pvc.String(), class.String())
if class == nil {
return
}
if *class.AllowVolumeExpansion == false {
return
}
for _, p := range supportedProvisioner {
if class.Provisioner == p {
klog.V(5).Infof("enqueue PVC %s", claimToClaimKey(pvc))
c.enqueuePVC(obj)
return
}
}
}
func (c *VolumeExpansionController) getStorageClass(pvc *corev1.PersistentVolumeClaim) *storagev1.StorageClass {
if pvc == nil {
return nil
}
claimClass := getPersistentVolumeClaimClass(pvc)
if claimClass == "" {
klog.V(4).Infof("volume expansion is disabled for PVC without StorageClasses: %s",
claimToClaimKey(pvc))
return nil
}
class, err := c.classLister.Get(claimClass)
if err != nil {
klog.V(4).Infof("failed to expand PVC: %s with error: %v", claimToClaimKey(pvc), err)
return nil
}
return class
}
// enqueuePVC takes a PVC resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than PVC.
func (c *VolumeExpansionController) enqueuePVC(obj interface{}) {
pvc, ok := obj.(*corev1.PersistentVolumeClaim)
if !ok {
return
}
size := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
statusSize := pvc.Status.Capacity[corev1.ResourceStorage]
if pvc.Status.Phase == corev1.ClaimBound && size.Cmp(statusSize) > 0 {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err))
return
}
c.workqueue.Add(key)
}
}
// findWorkload returns the pointer of Pod, StatefulSet or Deployment mounting the PVC.
func (c *VolumeExpansionController) findWorkload(pvc, namespace string) (workloadPtr interface{}, err error) {
podList, err := c.podLister.Pods(namespace).List(labels.Everything())
klog.V(4).Infof("podlist len %d", len(podList))
if err != nil {
return nil, err
}
// Which pod mounting PVC
podPtrListMountPVC := getPodMountPVC(podList, pvc)
klog.V(4).Infof("Get %d pods mounting PVC", len(podPtrListMountPVC))
// In KS 2.1, automatic re-attach PVC only support PVC mounted on a single Pod.
if len(podPtrListMountPVC) != 1 {
return nil, nil
}
// If pod managed by Deployment, StatefulSet, it returns Deployment or StatefulSet.
// If not, it returns Pod.
klog.V(4).Info("Find pod parent")
ownerRef, err := c.findPodParent(podPtrListMountPVC[0])
if err != nil {
return nil, err
}
if ownerRef == nil {
// a single pod
return podPtrListMountPVC[0], nil
} else {
klog.V(4).Infof("OwnerRef kind %s", ownerRef.Kind)
switch ownerRef.Kind {
case "StatefulSet":
return c.stsLister.StatefulSets(namespace).Get(ownerRef.Name)
case "Deployment":
return c.deployLister.Deployments(namespace).Get(ownerRef.Name)
default:
return nil, nil
}
}
}
// If the Pod don't controlled by any controller, return nil.
func (c *VolumeExpansionController) findPodParent(pod *corev1.Pod) (*metav1.OwnerReference, error) {
if pod == nil {
return nil, nil
}
if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil {
switch ownerRef.Kind {
case "ReplicaSet":
// get deploy
rs, err := c.rsLister.ReplicaSets(pod.GetNamespace()).Get(ownerRef.Name)
if err != nil {
return nil, err
}
if rsOwnerRef := metav1.GetControllerOf(rs); rsOwnerRef != nil {
return rsOwnerRef, nil
} else {
return ownerRef, nil
}
case "StatefulSet":
return ownerRef, nil
default:
return &metav1.OwnerReference{}, nil
}
}
return nil, nil
}
func (c *VolumeExpansionController) scaleDown(workload interface{}, namespace string) error {
switch workload.(type) {
case *appsv1.Deployment:
deploy := workload.(*appsv1.Deployment)
scale := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: deploy.GetName(),
Namespace: deploy.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: 0,
},
}
_, err := c.kubeclientset.AppsV1().Deployments(namespace).UpdateScale(deploy.GetName(), scale)
return err
case *appsv1.StatefulSet:
sts := workload.(*appsv1.StatefulSet)
scale := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: sts.GetName(),
Namespace: sts.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: 0,
},
}
_, err := c.kubeclientset.AppsV1().StatefulSets(namespace).UpdateScale(sts.GetName(), scale)
return err
default:
return fmt.Errorf("unsupported type %T", workload)
}
}
func (c *VolumeExpansionController) scaleUp(workload interface{}, namespace string) error {
switch workload.(type) {
case *appsv1.Deployment:
deploy := workload.(*appsv1.Deployment)
scale := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: deploy.GetName(),
Namespace: deploy.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: *deploy.Spec.Replicas,
},
}
_, err := c.kubeclientset.AppsV1().Deployments(namespace).UpdateScale(deploy.GetName(), scale)
return err
case *appsv1.StatefulSet:
sts := workload.(*appsv1.StatefulSet)
scale := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: sts.GetName(),
Namespace: sts.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: *sts.Spec.Replicas,
},
}
_, err := c.kubeclientset.AppsV1().StatefulSets(namespace).UpdateScale(sts.GetName(), scale)
return err
default:
return fmt.Errorf("unsupported type %T", workload)
}
}
// isWaitingScaleUp tries to check whether PVC is waiting for restart Pod.
func (c *VolumeExpansionController) isWaitingScaleUp(name, namespace string) bool {
pvc, err := c.pvcLister.PersistentVolumeClaims(namespace).Get(name)
if err != nil {
klog.Errorf("Get PVC error")
}
if pvc == nil {
return false
}
for _, condition := range pvc.Status.Conditions {
if condition.Type == corev1.PersistentVolumeClaimFileSystemResizePending {
return true
}
}
return false
}