Merge pull request #3465 from xyz-li/app-fix

Fix nil pointer and delete helmRelease
This commit is contained in:
KubeSphere CI Bot
2021-03-19 14:20:05 +08:00
committed by GitHub
17 changed files with 406 additions and 319 deletions

View File

@@ -0,0 +1,78 @@
/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helmrelease
import (
"context"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apis/application/v1alpha1"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix/helmrepoindex"
"path"
"strings"
)
func (r *ReconcileHelmRelease) GetChartData(rls *v1alpha1.HelmRelease) (chartName string, chartData []byte, err error) {
if rls.Spec.RepoId != "" && rls.Spec.RepoId != v1alpha1.AppStoreRepoId {
// load chart data from helm repo
repo := v1alpha1.HelmRepo{}
err := r.Get(context.TODO(), types.NamespacedName{Name: rls.Spec.RepoId}, &repo)
if err != nil {
klog.Errorf("get helm repo %s failed, error: %v", rls.Spec.RepoId, err)
return chartName, chartData, ErrGetRepoFailed
}
index, err := helmrepoindex.ByteArrayToSavedIndex([]byte(repo.Status.Data))
if version := index.GetApplicationVersion(rls.Spec.ApplicationId, rls.Spec.ApplicationVersionId); version != nil {
url := version.Spec.URLs[0]
if !(strings.HasPrefix(url, "https://") || strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "s3://")) {
url = repo.Spec.Url + "/" + url
}
buf, err := helmrepoindex.LoadChart(context.TODO(), url, &repo.Spec.Credential)
if err != nil {
klog.Infof("load chart failed, error: %s", err)
return chartName, chartData, ErrLoadChartFailed
}
chartData = buf.Bytes()
chartName = version.Name
} else {
klog.Errorf("get app version: %s failed", rls.Spec.ApplicationVersionId)
return chartName, chartData, ErrGetAppVersionFailed
}
} else {
// load chart data from helm application version
appVersion := &v1alpha1.HelmApplicationVersion{}
err = r.Get(context.TODO(), types.NamespacedName{Name: rls.Spec.ApplicationVersionId}, appVersion)
if err != nil {
klog.Errorf("get app version %s failed, error: %v", rls.Spec.ApplicationVersionId, err)
return chartName, chartData, ErrGetAppVersionFailed
}
if r.StorageClient == nil {
return "", nil, ErrS3Config
}
chartData, err = r.StorageClient.Read(path.Join(appVersion.GetWorkspace(), appVersion.Name))
if err != nil {
klog.Errorf("load chart from storage failed, error: %s", err)
return chartName, chartData, ErrLoadChartFromStorageFailed
}
chartName = appVersion.GetTrueName()
}
return
}

View File

@@ -19,37 +19,29 @@ package helmrelease
import (
"context"
"errors"
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apis/application/v1alpha1"
clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
"kubesphere.io/kubesphere/pkg/client/informers/externalversions"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix/helmrepoindex"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix/helmwrapper"
"kubesphere.io/kubesphere/pkg/simple/client/s3"
"kubesphere.io/kubesphere/pkg/utils/clusterclient"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
"kubesphere.io/kubesphere/pkg/utils/stringutils"
"math"
"path"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"strings"
"time"
)
const (
HelmReleaseFinalizer = "helmrelease.application.kubesphere.io"
IndexerName = "clusterNamespace"
)
var (
@@ -58,6 +50,7 @@ var (
ErrAppVersionDataIsEmpty = errors.New("app version data is empty")
ErrGetAppVersionFailed = errors.New("get app version failed")
ErrLoadChartFailed = errors.New("load chart failed")
ErrS3Config = errors.New("invalid s3 config")
ErrLoadChartFromStorageFailed = errors.New("load chart from storage failed")
)
@@ -65,14 +58,16 @@ var _ reconcile.Reconciler = &ReconcileHelmRelease{}
// ReconcileWorkspace reconciles a Workspace object
type ReconcileHelmRelease struct {
StorageClient s3.Interface
KsFactory externalversions.SharedInformerFactory
clusterClients clusterclient.ClusterClients
StorageClient s3.Interface
KsFactory externalversions.SharedInformerFactory
client.Client
recorder record.EventRecorder
// mock helm install && uninstall
helmMock bool
informer cache.SharedIndexInformer
clusterClients clusterclient.ClusterClients
MultiClusterEnable bool
}
//
@@ -111,8 +106,29 @@ func (r *ReconcileHelmRelease) Reconcile(request reconcile.Request) (reconcile.R
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object.
if !sliceutil.HasString(instance.ObjectMeta.Finalizers, HelmReleaseFinalizer) {
clusterName := instance.GetRlsCluster()
if r.MultiClusterEnable && clusterName != "" {
clusterInfo, err := r.clusterClients.Get(clusterName)
if err != nil {
// cluster not exists, delete the crd
klog.Warningf("cluster %s not found, delete the helm release %s/%s",
clusterName, instance.GetRlsNamespace(), instance.GetTrueName())
return reconcile.Result{}, r.Delete(context.TODO(), instance)
}
// Host cluster will self-healing, delete host cluster won't cause deletion of helm release
if !r.clusterClients.IsHostCluster(clusterInfo) {
// add owner References
instance.OwnerReferences = append(instance.OwnerReferences, metav1.OwnerReference{
APIVersion: clusterv1alpha1.SchemeGroupVersion.String(),
Kind: clusterv1alpha1.ResourceKindCluster,
Name: clusterInfo.Name,
UID: clusterInfo.UID,
})
}
}
instance.ObjectMeta.Finalizers = append(instance.ObjectMeta.Finalizers, HelmReleaseFinalizer)
// add owner References
if err := r.Update(context.Background(), instance); err != nil {
return reconcile.Result{}, err
}
@@ -146,67 +162,17 @@ func (r *ReconcileHelmRelease) Reconcile(request reconcile.Request) (reconcile.R
return r.reconcile(instance)
}
func (r *ReconcileHelmRelease) GetChartData(rls *v1alpha1.HelmRelease) (chartName string, chartData []byte, err error) {
if rls.Spec.RepoId != "" && rls.Spec.RepoId != v1alpha1.AppStoreRepoId {
// load chart data from helm repo
repo := v1alpha1.HelmRepo{}
err := r.Get(context.TODO(), types.NamespacedName{Name: rls.Spec.RepoId}, &repo)
if err != nil {
klog.Errorf("get helm repo %s failed, error: %v", rls.Spec.RepoId, err)
return chartName, chartData, ErrGetRepoFailed
}
index, err := helmrepoindex.ByteArrayToSavedIndex([]byte(repo.Status.Data))
if version := index.GetApplicationVersion(rls.Spec.ApplicationId, rls.Spec.ApplicationVersionId); version != nil {
url := version.Spec.URLs[0]
if !(strings.HasPrefix(url, "https://") || strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "s3://")) {
url = repo.Spec.Url + "/" + url
}
buf, err := helmrepoindex.LoadChart(context.TODO(), url, &repo.Spec.Credential)
if err != nil {
klog.Infof("load chart failed, error: %s", err)
return chartName, chartData, ErrLoadChartFailed
}
chartData = buf.Bytes()
chartName = version.Name
} else {
klog.Errorf("get app version: %s failed", rls.Spec.ApplicationVersionId)
return chartName, chartData, ErrGetAppVersionFailed
}
} else {
// load chart data from helm application version
appVersion := &v1alpha1.HelmApplicationVersion{}
err = r.Get(context.TODO(), types.NamespacedName{Name: rls.Spec.ApplicationVersionId}, appVersion)
if err != nil {
klog.Errorf("get app version %s failed, error: %v", rls.Spec.ApplicationVersionId, err)
return chartName, chartData, ErrGetAppVersionFailed
}
chartData, err = r.StorageClient.Read(path.Join(appVersion.GetWorkspace(), appVersion.Name))
if err != nil {
klog.Errorf("load chart from storage failed, error: %s", err)
return chartName, chartData, ErrLoadChartFromStorageFailed
}
chartName = appVersion.GetTrueName()
}
return
}
// Check the state of the instance then decide what to do.
func (r *ReconcileHelmRelease) reconcile(instance *v1alpha1.HelmRelease) (reconcile.Result, error) {
if instance.Status.State == v1alpha1.HelmStatusActive && instance.Status.Version == instance.Spec.Version {
// check release status
return reconcile.Result{
// recheck release status after 10 minutes
RequeueAfter: 10 * time.Minute,
}, nil
// todo check release status
return reconcile.Result{}, nil
}
ft := failedTimes(instance.Status.DeployStatus)
if v1alpha1.HelmStatusFailed == instance.Status.State && ft > 0 {
// exponential backoff, max delay 180s
// failed too much times, exponential backoff, max delay 180s
retryAfter := time.Duration(math.Min(math.Exp2(float64(ft)), 180)) * time.Second
var lastDeploy time.Time
@@ -226,16 +192,18 @@ func (r *ReconcileHelmRelease) reconcile(instance *v1alpha1.HelmRelease) (reconc
// no operation
return reconcile.Result{}, nil
case v1alpha1.HelmStatusActive:
// Release used to be active, but instance.Status.Version not equal to instance.Spec.Version
instance.Status.State = v1alpha1.HelmStatusUpgrading
// Update the state first.
err = r.Status().Update(context.TODO(), instance)
return reconcile.Result{}, err
case v1alpha1.HelmStatusCreating:
// create new release
err = r.createOrUpgradeHelmRelease(instance, false)
case v1alpha1.HelmStatusFailed:
// check failed times
err = r.createOrUpgradeHelmRelease(instance, false)
case v1alpha1.HelmStatusUpgrading:
// We can update the release now.
err = r.createOrUpgradeHelmRelease(instance, true)
case v1alpha1.HelmStatusRollbacking:
// TODO: rollback helm release
@@ -260,6 +228,7 @@ func (r *ReconcileHelmRelease) reconcile(instance *v1alpha1.HelmRelease) (reconc
instance.Status.LastDeployed = &now
if len(instance.Status.DeployStatus) > 0 {
instance.Status.DeployStatus = append([]v1alpha1.HelmReleaseDeployStatus{deployStatus}, instance.Status.DeployStatus...)
// At most ten records will be saved.
if len(instance.Status.DeployStatus) >= 10 {
instance.Status.DeployStatus = instance.Status.DeployStatus[:10:10]
}
@@ -301,7 +270,7 @@ func (r *ReconcileHelmRelease) createOrUpgradeHelmRelease(rls *v1alpha1.HelmRele
clusterName := rls.GetRlsCluster()
var clusterConfig string
if clusterName != "" && r.KsFactory != nil {
if r.MultiClusterEnable && clusterName != "" {
clusterConfig, err = r.clusterClients.GetClusterKubeconfig(clusterName)
if err != nil {
klog.Errorf("get cluster %s config failed", clusterConfig)
@@ -327,6 +296,7 @@ func (r *ReconcileHelmRelease) createOrUpgradeHelmRelease(rls *v1alpha1.HelmRele
}
func (r *ReconcileHelmRelease) uninstallHelmRelease(rls *v1alpha1.HelmRelease) error {
if rls.Status.State != v1alpha1.HelmStatusDeleting {
rls.Status.State = v1alpha1.HelmStatusDeleting
rls.Status.LastUpdate = metav1.Now()
@@ -339,12 +309,20 @@ func (r *ReconcileHelmRelease) uninstallHelmRelease(rls *v1alpha1.HelmRelease) e
clusterName := rls.GetRlsCluster()
var clusterConfig string
var err error
if clusterName != "" && r.KsFactory != nil {
clusterConfig, err = r.clusterClients.GetClusterKubeconfig(clusterName)
if r.MultiClusterEnable && clusterName != "" {
clusterInfo, err := r.clusterClients.Get(clusterName)
if err != nil {
klog.Errorf("get cluster %s config failed", clusterConfig)
return err
klog.V(2).Infof("cluster %s was deleted, skip helm release uninstall", clusterName)
return nil
}
// If user deletes helmRelease first and then delete cluster immediately, this may cause helm resources leak.
if clusterInfo.DeletionTimestamp != nil {
klog.V(2).Infof("cluster %s is deleting, skip helm release uninstall", clusterName)
return nil
}
clusterConfig = string(clusterInfo.Spec.Connection.KubeConfig)
}
hw := helmwrapper.NewHelmWrapper(clusterConfig, rls.GetRlsNamespace(), rls.Spec.Name, helmwrapper.SetMock(r.helmMock))
@@ -359,118 +337,11 @@ func (r *ReconcileHelmRelease) uninstallHelmRelease(rls *v1alpha1.HelmRelease) e
func (r *ReconcileHelmRelease) SetupWithManager(mgr ctrl.Manager) error {
r.Client = mgr.GetClient()
if r.KsFactory != nil {
if r.KsFactory != nil && r.MultiClusterEnable {
r.clusterClients = clusterclient.NewClusterClient(r.KsFactory.Cluster().V1alpha1().Clusters())
r.informer = r.KsFactory.Application().V1alpha1().HelmReleases().Informer()
err := r.informer.AddIndexers(map[string]cache.IndexFunc{
IndexerName: func(obj interface{}) ([]string, error) {
rls := obj.(*v1alpha1.HelmRelease)
return []string{fmt.Sprintf("%s/%s", rls.GetRlsCluster(), rls.GetRlsNamespace())}, nil
},
})
if err != nil {
return err
}
go func() {
<-mgr.Elected()
go r.cleanHelmReleaseWhenNamespaceDeleted()
}()
}
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.HelmRelease{}).
Complete(r)
}
func (r *ReconcileHelmRelease) getClusterConfig(cluster string) (string, error) {
if cluster == "" {
return "", nil
}
clusterConfig, err := r.clusterClients.GetClusterKubeconfig(cluster)
if err != nil {
klog.Errorf("get cluster %s config failed", clusterConfig)
return "", err
}
return clusterConfig, nil
}
// When namespace have been removed from member cluster, we need clean all
// the helmRelease from the host cluster.
func (r *ReconcileHelmRelease) cleanHelmReleaseWhenNamespaceDeleted() {
ticker := time.NewTicker(2 * time.Minute)
for _ = range ticker.C {
keys := r.informer.GetIndexer().ListIndexFuncValues(IndexerName)
for _, clusterNs := range keys {
klog.V(4).Infof("clean resource in %s", clusterNs)
parts := stringutils.Split(clusterNs, "/")
if len(parts) == 2 {
cluster, ns := parts[0], parts[1]
items, err := r.informer.GetIndexer().ByIndex(IndexerName, clusterNs)
if err != nil {
klog.Errorf("get items from index failed, error: %s", err)
continue
}
kubeconfig, err := r.getClusterConfig(cluster)
if err != nil {
klog.Errorf("get cluster %s config failed, error: %s", cluster, err)
continue
}
// connect to member or host cluster
var restConfig *restclient.Config
if kubeconfig == "" {
restConfig, err = restclient.InClusterConfig()
} else {
cc, err := clientcmd.NewClientConfigFromBytes([]byte(kubeconfig))
if err != nil {
klog.Errorf("get client config for cluster %s failed, error: %s", cluster, err)
continue
}
restConfig, err = cc.ClientConfig()
}
if err != nil {
klog.Errorf("build rest config for cluster %s failed, error: %s", cluster, err)
continue
}
clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
klog.Errorf("create client set failed, error: %s", err)
continue
}
// check namespace exists or not
namespace, err := clientSet.CoreV1().Namespaces().Get(context.TODO(), ns, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(2).Infof("delete all helm release in %s", clusterNs)
for ind := range items {
rls := items[ind].(*v1alpha1.HelmRelease)
err := r.Client.Delete(context.TODO(), rls)
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("delete release %s failed", rls.Name)
}
}
} else {
klog.Errorf("get namespace %s from cluster %s failed, error: %s", ns, cluster, err)
continue
}
} else {
for ind := range items {
rls := items[ind].(*v1alpha1.HelmRelease)
if namespace.CreationTimestamp.After(rls.CreationTimestamp.Time) {
klog.V(2).Infof("delete helm release %s in %s", rls.Namespace, clusterNs)
// todo, namespace is newer than helmRelease, should we delete the helmRelease
}
}
}
}
}
}
}