Fix: nil s3Client of attachment api

Signed-off-by: LiHui <andrewli@yunify.com>

format code

Signed-off-by: LiHui <andrewli@yunify.com>

Fix: delete helmRelease on host when delete member cluster

Signed-off-by: LiHui <andrewli@yunify.com>

Fix: modify repo credential

Signed-off-by: LiHui <andrewli@yunify.com>

remove not exitsts charts from helm repo

Signed-off-by: LiHui <andrewli@yunify.com>
This commit is contained in:
LiHui
2021-03-14 15:31:31 +08:00
parent 686b180f3f
commit dd8429c542
17 changed files with 406 additions and 319 deletions

View File

@@ -19,37 +19,29 @@ package helmrelease
import (
"context"
"errors"
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apis/application/v1alpha1"
clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
"kubesphere.io/kubesphere/pkg/client/informers/externalversions"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix/helmrepoindex"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix/helmwrapper"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
"kubesphere.io/kubesphere/pkg/utils/clusterclient"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
"kubesphere.io/kubesphere/pkg/utils/stringutils"
"math"
"path"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"strings"
"time"
)
const (
HelmReleaseFinalizer = "helmrelease.application.kubesphere.io"
IndexerName = "clusterNamespace"
)
var (
@@ -58,6 +50,7 @@ var (
ErrAppVersionDataIsEmpty = errors.New("app version data is empty")
ErrGetAppVersionFailed = errors.New("get app version failed")
ErrLoadChartFailed = errors.New("load chart failed")
ErrS3Config = errors.New("invalid s3 config")
ErrLoadChartFromStorageFailed = errors.New("load chart from storage failed")
)
@@ -65,14 +58,16 @@ var _ reconcile.Reconciler = &ReconcileHelmRelease{}
// ReconcileWorkspace reconciles a Workspace object
type ReconcileHelmRelease struct {
StorageClient s3.Interface
KsFactory externalversions.SharedInformerFactory
clusterClients clusterclient.ClusterClients
StorageClient s3.Interface
KsFactory externalversions.SharedInformerFactory
client.Client
recorder record.EventRecorder
// mock helm install && uninstall
helmMock bool
informer cache.SharedIndexInformer
clusterClients clusterclient.ClusterClients
MultiClusterEnable bool
}
//
@@ -111,8 +106,29 @@ func (r *ReconcileHelmRelease) Reconcile(request reconcile.Request) (reconcile.R
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object.
if !sliceutil.HasString(instance.ObjectMeta.Finalizers, HelmReleaseFinalizer) {
clusterName := instance.GetRlsCluster()
if r.MultiClusterEnable && clusterName != "" {
clusterInfo, err := r.clusterClients.Get(clusterName)
if err != nil {
// cluster not exists, delete the crd
klog.Warningf("cluster %s not found, delete the helm release %s/%s",
clusterName, instance.GetRlsNamespace(), instance.GetTrueName())
return reconcile.Result{}, r.Delete(context.TODO(), instance)
}
// Host cluster will self-healing, delete host cluster won't cause deletion of helm release
if !r.clusterClients.IsHostCluster(clusterInfo) {
// add owner References
instance.OwnerReferences = append(instance.OwnerReferences, metav1.OwnerReference{
APIVersion: clusterv1alpha1.SchemeGroupVersion.String(),
Kind: clusterv1alpha1.ResourceKindCluster,
Name: clusterInfo.Name,
UID: clusterInfo.UID,
})
}
}
instance.ObjectMeta.Finalizers = append(instance.ObjectMeta.Finalizers, HelmReleaseFinalizer)
// add owner References
if err := r.Update(context.Background(), instance); err != nil {
return reconcile.Result{}, err
}
@@ -146,67 +162,17 @@ func (r *ReconcileHelmRelease) Reconcile(request reconcile.Request) (reconcile.R
return r.reconcile(instance)
}
func (r *ReconcileHelmRelease) GetChartData(rls *v1alpha1.HelmRelease) (chartName string, chartData []byte, err error) {
if rls.Spec.RepoId != "" && rls.Spec.RepoId != v1alpha1.AppStoreRepoId {
// load chart data from helm repo
repo := v1alpha1.HelmRepo{}
err := r.Get(context.TODO(), types.NamespacedName{Name: rls.Spec.RepoId}, &repo)
if err != nil {
klog.Errorf("get helm repo %s failed, error: %v", rls.Spec.RepoId, err)
return chartName, chartData, ErrGetRepoFailed
}
index, err := helmrepoindex.ByteArrayToSavedIndex([]byte(repo.Status.Data))
if version := index.GetApplicationVersion(rls.Spec.ApplicationId, rls.Spec.ApplicationVersionId); version != nil {
url := version.Spec.URLs[0]
if !(strings.HasPrefix(url, "https://") || strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "s3://")) {
url = repo.Spec.Url + "/" + url
}
buf, err := helmrepoindex.LoadChart(context.TODO(), url, &repo.Spec.Credential)
if err != nil {
klog.Infof("load chart failed, error: %s", err)
return chartName, chartData, ErrLoadChartFailed
}
chartData = buf.Bytes()
chartName = version.Name
} else {
klog.Errorf("get app version: %s failed", rls.Spec.ApplicationVersionId)
return chartName, chartData, ErrGetAppVersionFailed
}
} else {
// load chart data from helm application version
appVersion := &v1alpha1.HelmApplicationVersion{}
err = r.Get(context.TODO(), types.NamespacedName{Name: rls.Spec.ApplicationVersionId}, appVersion)
if err != nil {
klog.Errorf("get app version %s failed, error: %v", rls.Spec.ApplicationVersionId, err)
return chartName, chartData, ErrGetAppVersionFailed
}
chartData, err = r.StorageClient.Read(path.Join(appVersion.GetWorkspace(), appVersion.Name))
if err != nil {
klog.Errorf("load chart from storage failed, error: %s", err)
return chartName, chartData, ErrLoadChartFromStorageFailed
}
chartName = appVersion.GetTrueName()
}
return
}
// Check the state of the instance then decide what to do.
func (r *ReconcileHelmRelease) reconcile(instance *v1alpha1.HelmRelease) (reconcile.Result, error) {
if instance.Status.State == v1alpha1.HelmStatusActive && instance.Status.Version == instance.Spec.Version {
// check release status
return reconcile.Result{
// recheck release status after 10 minutes
RequeueAfter: 10 * time.Minute,
}, nil
// todo check release status
return reconcile.Result{}, nil
}
ft := failedTimes(instance.Status.DeployStatus)
if v1alpha1.HelmStatusFailed == instance.Status.State && ft > 0 {
// exponential backoff, max delay 180s
// failed too much times, exponential backoff, max delay 180s
retryAfter := time.Duration(math.Min(math.Exp2(float64(ft)), 180)) * time.Second
var lastDeploy time.Time
@@ -226,16 +192,18 @@ func (r *ReconcileHelmRelease) reconcile(instance *v1alpha1.HelmRelease) (reconc
// no operation
return reconcile.Result{}, nil
case v1alpha1.HelmStatusActive:
// Release used to be active, but instance.Status.Version not equal to instance.Spec.Version
instance.Status.State = v1alpha1.HelmStatusUpgrading
// Update the state first.
err = r.Status().Update(context.TODO(), instance)
return reconcile.Result{}, err
case v1alpha1.HelmStatusCreating:
// create new release
err = r.createOrUpgradeHelmRelease(instance, false)
case v1alpha1.HelmStatusFailed:
// check failed times
err = r.createOrUpgradeHelmRelease(instance, false)
case v1alpha1.HelmStatusUpgrading:
// We can update the release now.
err = r.createOrUpgradeHelmRelease(instance, true)
case v1alpha1.HelmStatusRollbacking:
// TODO: rollback helm release
@@ -260,6 +228,7 @@ func (r *ReconcileHelmRelease) reconcile(instance *v1alpha1.HelmRelease) (reconc
instance.Status.LastDeployed = &now
if len(instance.Status.DeployStatus) > 0 {
instance.Status.DeployStatus = append([]v1alpha1.HelmReleaseDeployStatus{deployStatus}, instance.Status.DeployStatus...)
// At most ten records will be saved.
if len(instance.Status.DeployStatus) >= 10 {
instance.Status.DeployStatus = instance.Status.DeployStatus[:10:10]
}
@@ -301,7 +270,7 @@ func (r *ReconcileHelmRelease) createOrUpgradeHelmRelease(rls *v1alpha1.HelmRele
clusterName := rls.GetRlsCluster()
var clusterConfig string
if clusterName != "" && r.KsFactory != nil {
if r.MultiClusterEnable && clusterName != "" {
clusterConfig, err = r.clusterClients.GetClusterKubeconfig(clusterName)
if err != nil {
klog.Errorf("get cluster %s config failed", clusterConfig)
@@ -327,6 +296,7 @@ func (r *ReconcileHelmRelease) createOrUpgradeHelmRelease(rls *v1alpha1.HelmRele
}
func (r *ReconcileHelmRelease) uninstallHelmRelease(rls *v1alpha1.HelmRelease) error {
if rls.Status.State != v1alpha1.HelmStatusDeleting {
rls.Status.State = v1alpha1.HelmStatusDeleting
rls.Status.LastUpdate = metav1.Now()
@@ -339,12 +309,20 @@ func (r *ReconcileHelmRelease) uninstallHelmRelease(rls *v1alpha1.HelmRelease) e
clusterName := rls.GetRlsCluster()
var clusterConfig string
var err error
if clusterName != "" && r.KsFactory != nil {
clusterConfig, err = r.clusterClients.GetClusterKubeconfig(clusterName)
if r.MultiClusterEnable && clusterName != "" {
clusterInfo, err := r.clusterClients.Get(clusterName)
if err != nil {
klog.Errorf("get cluster %s config failed", clusterConfig)
return err
klog.V(2).Infof("cluster %s was deleted, skip helm release uninstall", clusterName)
return nil
}
// If user deletes helmRelease first and then delete cluster immediately, this may cause helm resources leak.
if clusterInfo.DeletionTimestamp != nil {
klog.V(2).Infof("cluster %s is deleting, skip helm release uninstall", clusterName)
return nil
}
clusterConfig = string(clusterInfo.Spec.Connection.KubeConfig)
}
hw := helmwrapper.NewHelmWrapper(clusterConfig, rls.GetRlsNamespace(), rls.Spec.Name, helmwrapper.SetMock(r.helmMock))
@@ -359,118 +337,11 @@ func (r *ReconcileHelmRelease) uninstallHelmRelease(rls *v1alpha1.HelmRelease) e
func (r *ReconcileHelmRelease) SetupWithManager(mgr ctrl.Manager) error {
r.Client = mgr.GetClient()
if r.KsFactory != nil {
if r.KsFactory != nil && r.MultiClusterEnable {
r.clusterClients = clusterclient.NewClusterClient(r.KsFactory.Cluster().V1alpha1().Clusters())
r.informer = r.KsFactory.Application().V1alpha1().HelmReleases().Informer()
err := r.informer.AddIndexers(map[string]cache.IndexFunc{
IndexerName: func(obj interface{}) ([]string, error) {
rls := obj.(*v1alpha1.HelmRelease)
return []string{fmt.Sprintf("%s/%s", rls.GetRlsCluster(), rls.GetRlsNamespace())}, nil
},
})
if err != nil {
return err
}
go func() {
<-mgr.Elected()
go r.cleanHelmReleaseWhenNamespaceDeleted()
}()
}
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.HelmRelease{}).
Complete(r)
}
func (r *ReconcileHelmRelease) getClusterConfig(cluster string) (string, error) {
if cluster == "" {
return "", nil
}
clusterConfig, err := r.clusterClients.GetClusterKubeconfig(cluster)
if err != nil {
klog.Errorf("get cluster %s config failed", clusterConfig)
return "", err
}
return clusterConfig, nil
}
// When namespace have been removed from member cluster, we need clean all
// the helmRelease from the host cluster.
func (r *ReconcileHelmRelease) cleanHelmReleaseWhenNamespaceDeleted() {
ticker := time.NewTicker(2 * time.Minute)
for _ = range ticker.C {
keys := r.informer.GetIndexer().ListIndexFuncValues(IndexerName)
for _, clusterNs := range keys {
klog.V(4).Infof("clean resource in %s", clusterNs)
parts := stringutils.Split(clusterNs, "/")
if len(parts) == 2 {
cluster, ns := parts[0], parts[1]
items, err := r.informer.GetIndexer().ByIndex(IndexerName, clusterNs)
if err != nil {
klog.Errorf("get items from index failed, error: %s", err)
continue
}
kubeconfig, err := r.getClusterConfig(cluster)
if err != nil {
klog.Errorf("get cluster %s config failed, error: %s", cluster, err)
continue
}
// connect to member or host cluster
var restConfig *restclient.Config
if kubeconfig == "" {
restConfig, err = restclient.InClusterConfig()
} else {
cc, err := clientcmd.NewClientConfigFromBytes([]byte(kubeconfig))
if err != nil {
klog.Errorf("get client config for cluster %s failed, error: %s", cluster, err)
continue
}
restConfig, err = cc.ClientConfig()
}
if err != nil {
klog.Errorf("build rest config for cluster %s failed, error: %s", cluster, err)
continue
}
clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
klog.Errorf("create client set failed, error: %s", err)
continue
}
// check namespace exists or not
namespace, err := clientSet.CoreV1().Namespaces().Get(context.TODO(), ns, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(2).Infof("delete all helm release in %s", clusterNs)
for ind := range items {
rls := items[ind].(*v1alpha1.HelmRelease)
err := r.Client.Delete(context.TODO(), rls)
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("delete release %s failed", rls.Name)
}
}
} else {
klog.Errorf("get namespace %s from cluster %s failed, error: %s", ns, cluster, err)
continue
}
} else {
for ind := range items {
rls := items[ind].(*v1alpha1.HelmRelease)
if namespace.CreationTimestamp.After(rls.CreationTimestamp.Time) {
klog.V(2).Infof("delete helm release %s in %s", rls.Namespace, clusterNs)
// todo, namespace is newer than helmRelease, should we delete the helmRelease
}
}
}
}
}
}
}