Merge pull request #3694 from xyz-li/app-fix
check release resources status
This commit is contained in:
@@ -264,6 +264,9 @@ func run(s *options.KubeSphereControllerManagerOptions, stopCh <-chan struct{})
|
||||
StorageClient: opS3Client,
|
||||
KsFactory: informerFactory.KubeSphereSharedInformerFactory(),
|
||||
MultiClusterEnable: s.MultiClusterOptions.Enable,
|
||||
WaitTime: s.OpenPitrixOptions.ReleaseControllerOptions.WaitTime,
|
||||
MaxConcurrent: s.OpenPitrixOptions.ReleaseControllerOptions.MaxConcurrent,
|
||||
StopChan: stopCh,
|
||||
}).SetupWithManager(mgr)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -98,14 +98,14 @@ spec:
|
||||
items:
|
||||
properties:
|
||||
deployTime:
|
||||
description: deploy time
|
||||
description: deploy time, upgrade time or check status time
|
||||
format: date-time
|
||||
type: string
|
||||
message:
|
||||
description: A human readable message indicating details about why the release is in this state.
|
||||
type: string
|
||||
state:
|
||||
description: deploy state
|
||||
description: current state of the release
|
||||
type: string
|
||||
required:
|
||||
- deployTime
|
||||
@@ -113,7 +113,7 @@ spec:
|
||||
type: object
|
||||
type: array
|
||||
lastDeployed:
|
||||
description: last successful deploy time
|
||||
description: last deploy time or upgrade time
|
||||
format: date-time
|
||||
type: string
|
||||
lastUpdate:
|
||||
|
||||
@@ -39,9 +39,9 @@ const (
|
||||
HelmStatusDeleting = "deleting"
|
||||
HelmStatusUpgrading = "upgrading"
|
||||
HelmStatusRollbacking = "rollbacking"
|
||||
HelmStatusPending = "pending"
|
||||
HelmStatusSuccessful = "successful"
|
||||
HelmStatusFailed = "failed"
|
||||
HelmStatusCreated = "created"
|
||||
HelmStatusUpgraded = "upgraded"
|
||||
|
||||
AttachmentTypeScreenshot = "screenshot"
|
||||
AttachmentTypeIcon = "icon"
|
||||
@@ -57,5 +57,7 @@ const (
|
||||
UncategorizedId = "ctg-uncategorized"
|
||||
AppStoreRepoId = "repo-helm"
|
||||
|
||||
ApplicationInstance = "app.kubesphere.io/instance"
|
||||
|
||||
OriginWorkspaceLabelKey = "kubesphere.io/workspace-origin"
|
||||
)
|
||||
|
||||
@@ -58,9 +58,9 @@ type HelmReleaseSpec struct {
|
||||
type HelmReleaseDeployStatus struct {
|
||||
// A human readable message indicating details about why the release is in this state.
|
||||
Message string `json:"message,omitempty"`
|
||||
// deploy state
|
||||
// current state of the release
|
||||
State string `json:"state"`
|
||||
// deploy time
|
||||
// deploy time, upgrade time or check status time
|
||||
Time metav1.Time `json:"deployTime"`
|
||||
}
|
||||
|
||||
@@ -76,7 +76,7 @@ type HelmReleaseStatus struct {
|
||||
DeployStatus []HelmReleaseDeployStatus `json:"deployStatus,omitempty"`
|
||||
// last update time
|
||||
LastUpdate metav1.Time `json:"lastUpdate,omitempty"`
|
||||
// last successful deploy time
|
||||
// last deploy time or upgrade time
|
||||
LastDeployed *metav1.Time `json:"lastDeployed,omitempty"`
|
||||
}
|
||||
|
||||
|
||||
@@ -108,6 +108,10 @@ func newTestConfig() (*Config, error) {
|
||||
SessionToken: "abcdefghijklmn",
|
||||
Bucket: "app",
|
||||
},
|
||||
ReleaseControllerOptions: &openpitrix.ReleaseControllerOptions{
|
||||
MaxConcurrent: 10,
|
||||
WaitTime: 30 * time.Second,
|
||||
},
|
||||
},
|
||||
NetworkOptions: &network.Options{
|
||||
EnableNetworkPolicy: true,
|
||||
|
||||
@@ -19,9 +19,12 @@ package helmrelease
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
@@ -44,6 +47,7 @@ import (
|
||||
|
||||
const (
|
||||
HelmReleaseFinalizer = "helmrelease.application.kubesphere.io"
|
||||
MaxBackoffTime = 15 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -65,20 +69,29 @@ type ReconcileHelmRelease struct {
|
||||
client.Client
|
||||
recorder record.EventRecorder
|
||||
// mock helm install && uninstall
|
||||
helmMock bool
|
||||
informer cache.SharedIndexInformer
|
||||
helmMock bool
|
||||
informer cache.SharedIndexInformer
|
||||
checkReleaseStatusBackoff *flowcontrol.Backoff
|
||||
|
||||
clusterClients clusterclient.ClusterClients
|
||||
MultiClusterEnable bool
|
||||
|
||||
MaxConcurrent int
|
||||
// wait time when check release is ready or not
|
||||
WaitTime time.Duration
|
||||
|
||||
StopChan <-chan struct{}
|
||||
}
|
||||
|
||||
//
|
||||
// <==>upgrading===================
|
||||
// | \
|
||||
// creating===>active=====>deleting=>deleted |
|
||||
// \ ^ / |
|
||||
// \ | /======> /
|
||||
// \=>failed<==========================
|
||||
// =========================>
|
||||
// ^ |
|
||||
// | <==upgraded<==upgrading================
|
||||
// | \ =========^ /
|
||||
// | | / |
|
||||
// creating=>created===>active=====>deleting=>deleted |
|
||||
// \ ^ / |
|
||||
// \ | /======> /
|
||||
// \=>failed<==========================
|
||||
// Reconcile reads that state of the cluster for a helmreleases object and makes changes based on the state read
|
||||
// and what is in the helmreleases.Spec
|
||||
// +kubebuilder:rbac:groups=application.kubesphere.io,resources=helmreleases,verbs=get;list;watch;create;update;patch;delete
|
||||
@@ -167,106 +180,156 @@ func (r *ReconcileHelmRelease) Reconcile(request reconcile.Request) (reconcile.R
|
||||
// Check the state of the instance then decide what to do.
|
||||
func (r *ReconcileHelmRelease) reconcile(instance *v1alpha1.HelmRelease) (reconcile.Result, error) {
|
||||
|
||||
if instance.Status.State == v1alpha1.HelmStatusActive && instance.Status.Version == instance.Spec.Version {
|
||||
// todo check release status
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
|
||||
ft := failedTimes(instance.Status.DeployStatus)
|
||||
if v1alpha1.HelmStatusFailed == instance.Status.State && ft > 0 {
|
||||
// failed too much times, exponential backoff, max delay 180s
|
||||
retryAfter := time.Duration(math.Min(math.Exp2(float64(ft)), 180)) * time.Second
|
||||
var lastDeploy time.Time
|
||||
|
||||
if instance.Status.LastDeployed != nil {
|
||||
lastDeploy = instance.Status.LastDeployed.Time
|
||||
} else {
|
||||
lastDeploy = instance.Status.LastUpdate.Time
|
||||
}
|
||||
if time.Now().Before(lastDeploy.Add(retryAfter)) {
|
||||
return reconcile.Result{RequeueAfter: retryAfter}, nil
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
switch instance.Status.State {
|
||||
case v1alpha1.HelmStatusDeleting:
|
||||
case v1alpha1.HelmStatusDeleting, v1alpha1.HelmStatusFailed:
|
||||
// no operation
|
||||
return reconcile.Result{}, nil
|
||||
case v1alpha1.HelmStatusActive:
|
||||
// Release used to be active, but instance.Status.Version not equal to instance.Spec.Version
|
||||
instance.Status.State = v1alpha1.HelmStatusUpgrading
|
||||
// Update the state first.
|
||||
err = r.Status().Update(context.TODO(), instance)
|
||||
return reconcile.Result{}, err
|
||||
if instance.Status.Version != instance.Spec.Version {
|
||||
instance.Status.State = v1alpha1.HelmStatusUpgrading
|
||||
// Update the state first.
|
||||
err = r.Status().Update(context.TODO(), instance)
|
||||
return reconcile.Result{}, err
|
||||
} else {
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
case v1alpha1.HelmStatusCreating:
|
||||
// create new release
|
||||
err = r.createOrUpgradeHelmRelease(instance, false)
|
||||
case v1alpha1.HelmStatusFailed:
|
||||
err = r.createOrUpgradeHelmRelease(instance, false)
|
||||
return r.createOrUpgradeHelmRelease(instance, false)
|
||||
case v1alpha1.HelmStatusUpgrading:
|
||||
// We can update the release now.
|
||||
err = r.createOrUpgradeHelmRelease(instance, true)
|
||||
return r.createOrUpgradeHelmRelease(instance, true)
|
||||
case v1alpha1.HelmStatusCreated, v1alpha1.HelmStatusUpgraded:
|
||||
if instance.Status.Version != instance.Spec.Version {
|
||||
// Start a new backoff.
|
||||
r.checkReleaseStatusBackoff.DeleteEntry(rlsBackoffKey(instance))
|
||||
|
||||
instance.Status.State = v1alpha1.HelmStatusUpgrading
|
||||
err = r.Status().Update(context.TODO(), instance)
|
||||
return reconcile.Result{}, err
|
||||
} else {
|
||||
retry, err := r.checkReleaseIsReady(instance)
|
||||
return reconcile.Result{RequeueAfter: retry}, err
|
||||
}
|
||||
case v1alpha1.HelmStatusRollbacking:
|
||||
// TODO: rollback helm release
|
||||
}
|
||||
|
||||
now := metav1.Now()
|
||||
var deployStatus v1alpha1.HelmReleaseDeployStatus
|
||||
if err != nil {
|
||||
instance.Status.State = v1alpha1.HelmStatusFailed
|
||||
instance.Status.Message = stringutils.ShortenString(err.Error(), v1alpha1.MsgLen)
|
||||
deployStatus.Message = instance.Status.Message
|
||||
deployStatus.State = v1alpha1.HelmStatusFailed
|
||||
} else {
|
||||
instance.Status.State = v1alpha1.StateActive
|
||||
instance.Status.Message = ""
|
||||
instance.Status.Version = instance.Spec.Version
|
||||
deployStatus.State = v1alpha1.HelmStatusSuccessful
|
||||
}
|
||||
|
||||
deployStatus.Time = now
|
||||
instance.Status.LastUpdate = now
|
||||
instance.Status.LastDeployed = &now
|
||||
if len(instance.Status.DeployStatus) > 0 {
|
||||
instance.Status.DeployStatus = append([]v1alpha1.HelmReleaseDeployStatus{deployStatus}, instance.Status.DeployStatus...)
|
||||
// At most ten records will be saved.
|
||||
if len(instance.Status.DeployStatus) >= 10 {
|
||||
instance.Status.DeployStatus = instance.Status.DeployStatus[:10:10]
|
||||
}
|
||||
} else {
|
||||
instance.Status.DeployStatus = append([]v1alpha1.HelmReleaseDeployStatus{deployStatus})
|
||||
}
|
||||
|
||||
err = r.Status().Update(context.TODO(), instance)
|
||||
if err != nil {
|
||||
return reconcile.Result{}, err
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
|
||||
func failedTimes(status []v1alpha1.HelmReleaseDeployStatus) int {
|
||||
count := 0
|
||||
for i := range status {
|
||||
if status[i].State == v1alpha1.HelmStatusFailed {
|
||||
count += 1
|
||||
}
|
||||
}
|
||||
return count
|
||||
func rlsBackoffKey(rls *v1alpha1.HelmRelease) string {
|
||||
return rls.Name
|
||||
}
|
||||
|
||||
func (r *ReconcileHelmRelease) createOrUpgradeHelmRelease(rls *v1alpha1.HelmRelease, upgrade bool) error {
|
||||
// doCheck check whether helm release's resources are ready or not.
|
||||
func (r *ReconcileHelmRelease) doCheck(rls *v1alpha1.HelmRelease) (retryAfter time.Duration, err error) {
|
||||
backoffKey := rlsBackoffKey(rls)
|
||||
clusterName := rls.GetRlsCluster()
|
||||
|
||||
var clusterConfig string
|
||||
if r.MultiClusterEnable && clusterName != "" {
|
||||
clusterConfig, err = r.clusterClients.GetClusterKubeconfig(clusterName)
|
||||
if err != nil {
|
||||
klog.Errorf("get cluster %s config failed", clusterConfig)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
hw := helmwrapper.NewHelmWrapper(clusterConfig, rls.GetRlsNamespace(), rls.Spec.Name,
|
||||
helmwrapper.SetMock(r.helmMock))
|
||||
|
||||
ready, err := hw.IsReleaseReady(r.WaitTime)
|
||||
|
||||
if err != nil {
|
||||
// release resources not ready
|
||||
klog.Errorf("check release %s/%s status failed, error: %s", rls.GetRlsNamespace(), rls.GetTrueName(), err)
|
||||
// check status next time
|
||||
r.checkReleaseStatusBackoff.Next(backoffKey, r.checkReleaseStatusBackoff.Clock.Now())
|
||||
retryAfter = r.checkReleaseStatusBackoff.Get(backoffKey)
|
||||
err := r.updateStatus(rls, rls.Status.State, err.Error())
|
||||
return retryAfter, err
|
||||
} else {
|
||||
klog.V(4).Infof("check release %s/%s status success, ready: %v", rls.GetRlsNamespace(), rls.GetTrueName(), ready)
|
||||
// install or upgrade success, remove the release from the queue.
|
||||
r.checkReleaseStatusBackoff.DeleteEntry(backoffKey)
|
||||
// Release resources are ready, it's active now.
|
||||
err := r.updateStatus(rls, v1alpha1.HelmStatusActive, "")
|
||||
// If update status failed, the controller need update the status next time.
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
// checkReleaseIsReady check whether helm release's are ready or not.
|
||||
// If retryAfter > 0 , then the controller will recheck it next time.
|
||||
func (r *ReconcileHelmRelease) checkReleaseIsReady(rls *v1alpha1.HelmRelease) (retryAfter time.Duration, err error) {
|
||||
backoffKey := rlsBackoffKey(rls)
|
||||
now := time.Now()
|
||||
if now.Sub(rls.Status.LastDeployed.Time) > MaxBackoffTime {
|
||||
klog.V(2).Infof("check release %s/%s too much times, ignore it", rls.GetRlsNamespace(), rls.GetTrueName())
|
||||
r.checkReleaseStatusBackoff.DeleteEntry(backoffKey)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if !r.checkReleaseStatusBackoff.IsInBackOffSinceUpdate(backoffKey, r.checkReleaseStatusBackoff.Clock.Now()) {
|
||||
klog.V(4).Infof("start to check release %s/%s status ", rls.GetRlsNamespace(), rls.GetTrueName())
|
||||
return r.doCheck(rls)
|
||||
} else {
|
||||
// backoff, check next time
|
||||
retryAfter := r.checkReleaseStatusBackoff.Get(backoffKey)
|
||||
klog.V(4).Infof("check release %s/%s status has been limited by backoff - %v remaining",
|
||||
rls.GetRlsNamespace(), rls.GetTrueName(), retryAfter)
|
||||
return retryAfter, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ReconcileHelmRelease) updateStatus(rls *v1alpha1.HelmRelease, currentState, msg string) error {
|
||||
now := metav1.Now()
|
||||
var deployStatus v1alpha1.HelmReleaseDeployStatus
|
||||
rls.Status.Message = stringutils.ShortenString(msg, v1alpha1.MsgLen)
|
||||
|
||||
deployStatus.Message = stringutils.ShortenString(msg, v1alpha1.MsgLen)
|
||||
deployStatus.State = currentState
|
||||
deployStatus.Time = now
|
||||
|
||||
if rls.Status.State != currentState &&
|
||||
(currentState == v1alpha1.HelmStatusCreated || currentState == v1alpha1.HelmStatusUpgraded) {
|
||||
rls.Status.Version = rls.Spec.Version
|
||||
rls.Status.LastDeployed = &now
|
||||
}
|
||||
|
||||
rls.Status.State = currentState
|
||||
// record then new state
|
||||
rls.Status.DeployStatus = append([]v1alpha1.HelmReleaseDeployStatus{deployStatus}, rls.Status.DeployStatus...)
|
||||
|
||||
if len(rls.Status.DeployStatus) > 10 {
|
||||
rls.Status.DeployStatus = rls.Status.DeployStatus[:10:10]
|
||||
}
|
||||
|
||||
rls.Status.LastUpdate = now
|
||||
err := r.Status().Update(context.TODO(), rls)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// createOrUpgradeHelmRelease will run helm install to install a new release if upgrade is false,
|
||||
// run helm upgrade if upgrade is true
|
||||
func (r *ReconcileHelmRelease) createOrUpgradeHelmRelease(rls *v1alpha1.HelmRelease, upgrade bool) (reconcile.Result, error) {
|
||||
|
||||
// Install or upgrade release
|
||||
var chartData []byte
|
||||
var err error
|
||||
_, chartData, err = r.GetChartData(rls)
|
||||
if err != nil {
|
||||
return err
|
||||
return reconcile.Result{}, err
|
||||
}
|
||||
|
||||
if len(chartData) == 0 {
|
||||
klog.Errorf("empty chart data failed, release name %s, chart name: %s", rls.Name, rls.Spec.ChartName)
|
||||
return ErrAppVersionDataIsEmpty
|
||||
return reconcile.Result{}, ErrAppVersionDataIsEmpty
|
||||
}
|
||||
|
||||
clusterName := rls.GetRlsCluster()
|
||||
@@ -276,25 +339,36 @@ func (r *ReconcileHelmRelease) createOrUpgradeHelmRelease(rls *v1alpha1.HelmRele
|
||||
clusterConfig, err = r.clusterClients.GetClusterKubeconfig(clusterName)
|
||||
if err != nil {
|
||||
klog.Errorf("get cluster %s config failed", clusterConfig)
|
||||
return err
|
||||
return reconcile.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// If clusterConfig is empty, this application will be installed in current host.
|
||||
hw := helmwrapper.NewHelmWrapper(clusterConfig, rls.GetRlsNamespace(), rls.Spec.Name,
|
||||
// We just add kubesphere.io/creator annotation now.
|
||||
helmwrapper.SetAnnotations(map[string]string{constants.CreatorAnnotationKey: rls.GetCreator()}),
|
||||
helmwrapper.SetLabels(map[string]string{
|
||||
v1alpha1.ApplicationInstance: rls.GetTrueName(),
|
||||
}),
|
||||
helmwrapper.SetMock(r.helmMock))
|
||||
var res helmwrapper.HelmRes
|
||||
|
||||
var currentState string
|
||||
if upgrade {
|
||||
res, err = hw.Upgrade(rls.Spec.ChartName, string(chartData), string(rls.Spec.Values))
|
||||
err = hw.Upgrade(rls.Spec.ChartName, string(chartData), string(rls.Spec.Values))
|
||||
currentState = v1alpha1.HelmStatusUpgraded
|
||||
} else {
|
||||
res, err = hw.Install(rls.Spec.ChartName, string(chartData), string(rls.Spec.Values))
|
||||
err = hw.Install(rls.Spec.ChartName, string(chartData), string(rls.Spec.Values))
|
||||
currentState = v1alpha1.HelmStatusCreated
|
||||
}
|
||||
|
||||
var msg string
|
||||
if err != nil {
|
||||
return errors.New(res.Message)
|
||||
// install or upgrade failed
|
||||
currentState = v1alpha1.HelmStatusFailed
|
||||
msg = err.Error()
|
||||
}
|
||||
return nil
|
||||
err = r.updateStatus(rls, currentState, msg)
|
||||
|
||||
return reconcile.Result{}, err
|
||||
}
|
||||
|
||||
func (r *ReconcileHelmRelease) uninstallHelmRelease(rls *v1alpha1.HelmRelease) error {
|
||||
@@ -329,12 +403,9 @@ func (r *ReconcileHelmRelease) uninstallHelmRelease(rls *v1alpha1.HelmRelease) e
|
||||
|
||||
hw := helmwrapper.NewHelmWrapper(clusterConfig, rls.GetRlsNamespace(), rls.Spec.Name, helmwrapper.SetMock(r.helmMock))
|
||||
|
||||
res, err := hw.Uninstall()
|
||||
err = hw.Uninstall()
|
||||
|
||||
if err != nil {
|
||||
return errors.New(res.Message)
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *ReconcileHelmRelease) SetupWithManager(mgr ctrl.Manager) error {
|
||||
@@ -343,7 +414,12 @@ func (r *ReconcileHelmRelease) SetupWithManager(mgr ctrl.Manager) error {
|
||||
r.clusterClients = clusterclient.NewClusterClient(r.KsFactory.Cluster().V1alpha1().Clusters())
|
||||
}
|
||||
|
||||
// exponential backoff
|
||||
r.checkReleaseStatusBackoff = flowcontrol.NewBackOff(2*time.Second, MaxBackoffTime)
|
||||
go wait.Until(r.checkReleaseStatusBackoff.GC, 1*time.Minute, r.StopChan)
|
||||
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrent}).
|
||||
For(&v1alpha1.HelmRelease{}).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
@@ -99,8 +99,11 @@ func (c *releaseOperator) UpgradeApplication(request UpgradeClusterRequest) erro
|
||||
return err
|
||||
}
|
||||
|
||||
if oldRls.Status.State != v1alpha1.HelmStatusActive {
|
||||
return errors.New("application is not active now")
|
||||
switch oldRls.Status.State {
|
||||
case v1alpha1.StateActive, v1alpha1.HelmStatusUpgraded, v1alpha1.HelmStatusCreated:
|
||||
// no operation
|
||||
default:
|
||||
return errors.New("can not upgrade application now")
|
||||
}
|
||||
|
||||
version, err := c.getAppVersion("", request.VersionId)
|
||||
|
||||
@@ -86,16 +86,16 @@ func (l HelmReleaseList) Len() int { return len(l) }
|
||||
func (l HelmReleaseList) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
|
||||
func (l HelmReleaseList) Less(i, j int) bool {
|
||||
var t1, t2 time.Time
|
||||
if l[i].Status.LastUpdate.IsZero() {
|
||||
if l[i].Status.LastDeployed == nil {
|
||||
t1 = l[i].CreationTimestamp.Time
|
||||
} else {
|
||||
t1 = l[i].Status.LastUpdate.Time
|
||||
t1 = l[i].Status.LastDeployed.Time
|
||||
}
|
||||
|
||||
if l[j].Status.LastUpdate.IsZero() {
|
||||
if l[j].Status.LastDeployed == nil {
|
||||
t2 = l[j].CreationTimestamp.Time
|
||||
} else {
|
||||
t2 = l[j].Status.LastUpdate.Time
|
||||
t2 = l[j].Status.LastDeployed.Time
|
||||
}
|
||||
|
||||
if t1.After(t2) {
|
||||
@@ -221,14 +221,14 @@ func convertApplication(rls *v1alpha1.HelmRelease, rlsInfos []*resource.Info) *A
|
||||
cluster.Status = rls.Status.State
|
||||
cluster.Env = string(rls.Spec.Values)
|
||||
if cluster.Status == "" {
|
||||
cluster.Status = v1alpha1.HelmStatusPending
|
||||
cluster.Status = v1alpha1.HelmStatusCreating
|
||||
}
|
||||
cluster.AdditionalInfo = rls.Status.Message
|
||||
cluster.Description = rls.Spec.Description
|
||||
dt := strfmt.DateTime(rls.CreationTimestamp.Time)
|
||||
cluster.CreateTime = &dt
|
||||
if !rls.Status.LastUpdate.Time.IsZero() {
|
||||
ut := strfmt.DateTime(rls.Status.LastUpdate.Time)
|
||||
if rls.Status.LastDeployed != nil {
|
||||
ut := strfmt.DateTime(rls.Status.LastDeployed.Time)
|
||||
cluster.StatusTime = &ut
|
||||
} else {
|
||||
cluster.StatusTime = &dt
|
||||
@@ -236,6 +236,7 @@ func convertApplication(rls *v1alpha1.HelmRelease, rlsInfos []*resource.Info) *A
|
||||
cluster.AppId = rls.Spec.ApplicationId
|
||||
cluster.VersionId = rls.Spec.ApplicationVersionId
|
||||
cluster.Name = rls.GetTrueName()
|
||||
cluster.AdditionalInfo = rls.Status.Message
|
||||
|
||||
if rls.GetRlsCluster() != "" {
|
||||
cluster.RuntimeId = rls.GetRlsCluster()
|
||||
|
||||
@@ -26,6 +26,11 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"helm.sh/helm/v3/pkg/cli"
|
||||
|
||||
"helm.sh/helm/v3/pkg/kube"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
helmrelease "helm.sh/helm/v3/pkg/release"
|
||||
"k8s.io/klog"
|
||||
@@ -41,6 +46,8 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrorTimedOutToWaitResource = errors.New("timed out waiting for resources to be ready")
|
||||
|
||||
UninstallNotFoundFormat = "Error: uninstall: Release not loaded: %s: release: not found"
|
||||
StatusNotFoundFormat = "Error: release: not found"
|
||||
releaseExists = "release exists"
|
||||
@@ -62,12 +69,50 @@ type HelmRes struct {
|
||||
var _ HelmWrapper = &helmWrapper{}
|
||||
|
||||
type HelmWrapper interface {
|
||||
Install(chartName, chartData, values string) (HelmRes, error)
|
||||
Install(chartName, chartData, values string) error
|
||||
// upgrade a release
|
||||
Upgrade(chartName, chartData, values string) (HelmRes, error)
|
||||
Uninstall() (HelmRes, error)
|
||||
Upgrade(chartName, chartData, values string) error
|
||||
Uninstall() error
|
||||
// Get manifests
|
||||
Manifest() (string, error)
|
||||
|
||||
// IsReleaseReady check helm release is ready or not
|
||||
IsReleaseReady(timeout time.Duration) (bool, error)
|
||||
}
|
||||
|
||||
// IsReleaseReady check helm releases is ready or not
|
||||
// If the return values is (true, nil), then the resources are ready
|
||||
func (c *helmWrapper) IsReleaseReady(waitTime time.Duration) (bool, error) {
|
||||
|
||||
// Get the manifest to build resources
|
||||
manifest, err := c.Manifest()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var client *kube.Client
|
||||
if c.Kubeconfig == "" {
|
||||
client = kube.New(nil)
|
||||
} else {
|
||||
helmSettings := cli.New()
|
||||
helmSettings.KubeConfig = c.kubeConfigPath()
|
||||
client = kube.New(helmSettings.RESTClientGetter())
|
||||
}
|
||||
|
||||
client.Namespace = c.Namespace
|
||||
resources, err := client.Build(bytes.NewBufferString(manifest), true)
|
||||
|
||||
err = client.Wait(resources, waitTime)
|
||||
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if err == wait.ErrWaitTimeout {
|
||||
return false, ErrorTimedOutToWaitResource
|
||||
}
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (c *helmWrapper) Status() (status *helmrelease.Release, err error) {
|
||||
@@ -144,8 +189,7 @@ type helmWrapper struct {
|
||||
annotations map[string]string
|
||||
|
||||
// helm cmd path
|
||||
cmdPath string
|
||||
// base should be /dev/shm on linux
|
||||
cmdPath string
|
||||
base string
|
||||
workspaceSuffix string
|
||||
dryRun bool
|
||||
@@ -170,7 +214,7 @@ func (c *helmWrapper) chartPath() string {
|
||||
|
||||
func (c *helmWrapper) cleanup() {
|
||||
if err := os.RemoveAll(c.Workspace()); err != nil {
|
||||
klog.Errorf("remove dir %s faield, error: %s", c.Workspace(), err)
|
||||
klog.Errorf("remove dir %s failed, error: %s", c.Workspace(), err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,6 +239,13 @@ func SetAnnotations(annotations map[string]string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// extra labels added to all resources in chart
|
||||
func SetLabels(labels map[string]string) Option {
|
||||
return func(wrapper *helmWrapper) {
|
||||
wrapper.labels = labels
|
||||
}
|
||||
}
|
||||
|
||||
func SetMock(mock bool) Option {
|
||||
return func(wrapper *helmWrapper) {
|
||||
wrapper.mock = mock
|
||||
@@ -325,7 +376,7 @@ func (c *helmWrapper) createChart(chartName, chartData, values string) error {
|
||||
}
|
||||
|
||||
// helm uninstall
|
||||
func (c *helmWrapper) Uninstall() (res HelmRes, err error) {
|
||||
func (c *helmWrapper) Uninstall() (err error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
klog.V(2).Infof("run command end, namespace: %s, name: %s elapsed: %v", c.Namespace, c.ReleaseName, time.Now().Sub(start))
|
||||
@@ -373,49 +424,53 @@ func (c *helmWrapper) Uninstall() (res HelmRes, err error) {
|
||||
|
||||
if err != nil {
|
||||
eMsg := strings.TrimSpace(stderr.String())
|
||||
// release does not exist. It's ok.
|
||||
if fmt.Sprintf(UninstallNotFoundFormat, c.ReleaseName) == eMsg {
|
||||
return res, nil
|
||||
return nil
|
||||
}
|
||||
klog.Errorf("run command failed, stderr: %s, error: %v", eMsg, err)
|
||||
res.Message = eMsg
|
||||
return errors.New("%s", eMsg)
|
||||
} else {
|
||||
klog.V(2).Infof("namespace: %s, name: %s, run command success", c.Namespace, c.ReleaseName)
|
||||
klog.V(8).Infof("namespace: %s, name: %s, run command success, stdout: %s", c.Namespace, c.ReleaseName, stdout)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// helm upgrade
|
||||
func (c *helmWrapper) Upgrade(chartName, chartData, values string) (res HelmRes, err error) {
|
||||
// TODO: check release status first
|
||||
if true {
|
||||
func (c *helmWrapper) Upgrade(chartName, chartData, values string) (err error) {
|
||||
sts, err := c.Status()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if sts.Info.Status == "deployed" {
|
||||
return c.install(chartName, chartData, values, true)
|
||||
} else {
|
||||
klog.V(3).Infof("release %s/%s not exists, cannot upgrade it, install a new one", c.Namespace, c.ReleaseName)
|
||||
return
|
||||
err = errors.New("cannot upgrade release %s/%s, current state is %s", c.Namespace, c.ReleaseName, sts.Info.Status)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// helm install
|
||||
func (c *helmWrapper) Install(chartName, chartData, values string) (res HelmRes, err error) {
|
||||
func (c *helmWrapper) Install(chartName, chartData, values string) (err error) {
|
||||
sts, err := c.Status()
|
||||
if err == nil {
|
||||
// helm release has been installed
|
||||
if sts.Info != nil && sts.Info.Status == "deployed" {
|
||||
return HelmRes{}, nil
|
||||
return nil
|
||||
}
|
||||
return HelmRes{}, errors.New(releaseExists)
|
||||
return errors.New(releaseExists)
|
||||
} else {
|
||||
if err.Error() == StatusNotFoundFormat {
|
||||
// continue to install
|
||||
return c.install(chartName, chartData, values, false)
|
||||
}
|
||||
return HelmRes{}, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (c *helmWrapper) install(chartName, chartData, values string, upgrade bool) (res HelmRes, err error) {
|
||||
func (c *helmWrapper) install(chartName, chartData, values string, upgrade bool) (err error) {
|
||||
if klog.V(2) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
@@ -493,7 +548,8 @@ func (c *helmWrapper) install(chartName, chartData, values string, upgrade bool)
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("namespace: %s, name: %s, run command: %s failed, stderr: %s, error: %v", c.Namespace, c.ReleaseName, cmd.String(), stderr, err)
|
||||
res.Message = stderr.String()
|
||||
// return the error of helm install
|
||||
return errors.New("%s", stderr.String())
|
||||
} else {
|
||||
klog.V(2).Infof("namespace: %s, name: %s, run command success", c.Namespace, c.ReleaseName)
|
||||
klog.V(8).Infof("namespace: %s, name: %s, run command success, stdout: %s", c.Namespace, c.ReleaseName, stdout)
|
||||
|
||||
@@ -29,12 +29,10 @@ func TestHelmInstall(t *testing.T) {
|
||||
SetAnnotations(map[string]string{constants.CreatorAnnotationKey: "1234"}),
|
||||
SetMock(true))
|
||||
|
||||
res, err := wr.install("dummy-chart", "", "dummy-value", false)
|
||||
err := wr.install("dummy-chart", "", "dummy-value", false)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
_ = res
|
||||
}
|
||||
|
||||
func TestHelperProcess(t *testing.T) {
|
||||
|
||||
@@ -17,6 +17,8 @@ limitations under the License.
|
||||
package openpitrix
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/s3"
|
||||
@@ -24,12 +26,22 @@ import (
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
S3Options *s3.Options `json:"s3,omitempty" yaml:"s3,omitempty" mapstructure:"s3"`
|
||||
S3Options *s3.Options `json:"s3,omitempty" yaml:"s3,omitempty" mapstructure:"s3"`
|
||||
ReleaseControllerOptions *ReleaseControllerOptions `json:"releaseControllerOptions,omitempty" yaml:"releaseControllerOptions,omitempty" mapstructure:"releaseControllerOptions"`
|
||||
}
|
||||
|
||||
type ReleaseControllerOptions struct {
|
||||
MaxConcurrent int `json:"maxConcurrent,omitempty" yaml:"maxConcurrent,omitempty" mapstructure:"maxConcurrent"`
|
||||
WaitTime time.Duration `json:"waitTime,omitempty" yaml:"waitTime,omitempty" mapstructure:"waitTime"`
|
||||
}
|
||||
|
||||
func NewOptions() *Options {
|
||||
return &Options{
|
||||
S3Options: &s3.Options{},
|
||||
ReleaseControllerOptions: &ReleaseControllerOptions{
|
||||
MaxConcurrent: 10,
|
||||
WaitTime: 30 * time.Second,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,6 +61,10 @@ func (s *Options) ApplyTo(options *Options) {
|
||||
if s.S3Options != nil {
|
||||
reflectutils.Override(options, s)
|
||||
}
|
||||
|
||||
if s.ReleaseControllerOptions != nil {
|
||||
reflectutils.Override(options, s)
|
||||
}
|
||||
}
|
||||
|
||||
// AddFlags add options flags to command line flags,
|
||||
@@ -72,4 +88,7 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
|
||||
fs.BoolVar(&s.S3Options.DisableSSL, "openpitrix-s3-disable-SSL", c.S3Options.DisableSSL, "disable ssl")
|
||||
|
||||
fs.BoolVar(&s.S3Options.ForcePathStyle, "openpitrix-s3-force-path-style", c.S3Options.ForcePathStyle, "force path style")
|
||||
|
||||
fs.DurationVar(&s.ReleaseControllerOptions.WaitTime, "openpitrix-release-controller-options-wait-time", c.ReleaseControllerOptions.WaitTime, "wait time when check release is ready or not")
|
||||
fs.IntVar(&s.ReleaseControllerOptions.MaxConcurrent, "openpitrix-release-controller-options-max-concurrent", c.ReleaseControllerOptions.MaxConcurrent, "the maximum number of concurrent Reconciles which can be run for release controller")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user