1732 lines
63 KiB
Go
1732 lines
63 KiB
Go
/*
|
|
* Please refer to the LICENSE file in the root directory of the project.
|
|
* https://github.com/kubesphere/kubesphere/blob/master/LICENSE
|
|
*/
|
|
|
|
package core
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"path"
|
|
"reflect"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
"golang.org/x/exp/slices"
|
|
"helm.sh/helm/v3/pkg/chart"
|
|
"helm.sh/helm/v3/pkg/chart/loader"
|
|
"helm.sh/helm/v3/pkg/getter"
|
|
"helm.sh/helm/v3/pkg/registry"
|
|
helmrelease "helm.sh/helm/v3/pkg/release"
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
rbacv1 "k8s.io/api/rbac/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/client-go/util/retry"
|
|
"k8s.io/klog/v2"
|
|
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
|
|
corev1alpha1 "kubesphere.io/api/core/v1alpha1"
|
|
extensionsv1alpha1 "kubesphere.io/api/extensions/v1alpha1"
|
|
tenantv1beta1 "kubesphere.io/api/tenant/v1beta1"
|
|
"kubesphere.io/utils/helm"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/builder"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
|
|
rbachelper "kubesphere.io/kubesphere/pkg/componenthelper/auth/rbac"
|
|
"kubesphere.io/kubesphere/pkg/constants"
|
|
kscontroller "kubesphere.io/kubesphere/pkg/controller"
|
|
clusterpredicate "kubesphere.io/kubesphere/pkg/controller/cluster/predicate"
|
|
clusterutils "kubesphere.io/kubesphere/pkg/controller/cluster/utils"
|
|
"kubesphere.io/kubesphere/pkg/controller/options"
|
|
"kubesphere.io/kubesphere/pkg/utils/clusterclient"
|
|
"kubesphere.io/kubesphere/pkg/utils/hashutil"
|
|
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
|
|
)
|
|
|
|
const (
|
|
installPlanController = "installplan"
|
|
installPlanProtection = "kubesphere.io/installplan-protection"
|
|
systemWorkspace = "system-workspace"
|
|
agentReleaseFormat = "%s-agent"
|
|
defaultRoleFormat = "kubesphere:%s:helm-executor"
|
|
defaultRoleBindingFormat = defaultRoleFormat
|
|
defaultClusterRoleFormat = "kubesphere:%s:helm-executor"
|
|
permissionDefinitionFile = "permissions.yaml"
|
|
defaultClusterRoleBindingFormat = defaultClusterRoleFormat
|
|
tagAgent = "agent"
|
|
tagExtension = "extension"
|
|
|
|
upgradeSuccessful = "UpgradeSuccessful"
|
|
upgradeFailed = "UpgradeFailed"
|
|
installSuccessful = "InstallSuccessful"
|
|
installFailed = "InstallFailed"
|
|
initialized = "Initialized"
|
|
uninstallSuccessful = "UninstallSuccessful"
|
|
uninstallFailed = "UninstallFailed"
|
|
relatedResourceNotReady = "RelatedResourceNotReady"
|
|
relatedResourceReady = "RelatedResourceReady"
|
|
|
|
typeHelmRelease = "helm.sh/release.v1"
|
|
|
|
globalExtensionIngressClassName = "global.extension.ingress.ingressClassName"
|
|
globalExtensionIngressDomainSuffix = "global.extension.ingress.domainSuffix"
|
|
globalExtensionIngressHTTPPort = "global.extension.ingress.httpPort"
|
|
globalExtensionIngressHTTPSPort = "global.extension.ingress.httpsPort"
|
|
globalNodeSelector = "global.nodeSelector"
|
|
globalImageRegistry = "global.imageRegistry"
|
|
globalClusterName = "global.clusterInfo.name"
|
|
globalClusterRole = "global.clusterInfo.role"
|
|
globalPortalURL = "global.portal.url"
|
|
)
|
|
|
|
var _ kscontroller.Controller = &InstallPlanReconciler{}
|
|
var _ reconcile.Reconciler = &InstallPlanReconciler{}
|
|
|
|
func (r *InstallPlanReconciler) Name() string {
|
|
return installPlanController
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) Enabled(clusterRole string) bool {
|
|
return strings.EqualFold(clusterRole, string(clusterv1alpha1.ClusterRoleHost))
|
|
}
|
|
|
|
// InstallPlanReconciler reconciles a InstallPlan object.
|
|
type InstallPlanReconciler struct {
|
|
client.Client
|
|
recorder record.EventRecorder
|
|
logger logr.Logger
|
|
PortalURL string
|
|
HelmExecutorOptions *options.HelmExecutorOptions
|
|
ExtensionOptions *options.ExtensionOptions
|
|
hostResetConfig *rest.Config
|
|
clusterClientSet clusterclient.Interface
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) SetupWithManager(mgr *kscontroller.Manager) error {
|
|
if mgr.AuthenticationOptions != nil && mgr.Options.AuthenticationOptions.Issuer != nil {
|
|
r.PortalURL = mgr.Options.AuthenticationOptions.Issuer.URL
|
|
}
|
|
r.HelmExecutorOptions = mgr.HelmExecutorOptions
|
|
r.ExtensionOptions = mgr.ExtensionOptions
|
|
r.hostResetConfig = mgr.K8sClient.Config()
|
|
r.Client = mgr.GetClient()
|
|
r.logger = mgr.GetLogger().WithName(installPlanController)
|
|
r.recorder = mgr.GetEventRecorderFor(installPlanController)
|
|
r.clusterClientSet = mgr.ClusterClient
|
|
|
|
if r.HelmExecutorOptions == nil || r.HelmExecutorOptions.Image == "" {
|
|
return fmt.Errorf("helm executor image is not specified")
|
|
}
|
|
|
|
labelSelector, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
|
|
MatchExpressions: []metav1.LabelSelectorRequirement{{
|
|
Key: corev1alpha1.ExtensionReferenceLabel,
|
|
Operator: metav1.LabelSelectorOpExists,
|
|
}}})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create label selector predicate: %s", err)
|
|
}
|
|
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
Named(installPlanController).
|
|
For(&corev1alpha1.InstallPlan{}).
|
|
Watches(
|
|
&batchv1.Job{},
|
|
handler.EnqueueRequestsFromMapFunc(
|
|
func(ctx context.Context, h client.Object) []reconcile.Request {
|
|
return []reconcile.Request{{
|
|
NamespacedName: types.NamespacedName{
|
|
Name: h.GetLabels()[corev1alpha1.ExtensionReferenceLabel],
|
|
}}}
|
|
}),
|
|
builder.WithPredicates(predicate.And(labelSelector, predicate.Funcs{
|
|
UpdateFunc: func(e event.UpdateEvent) bool {
|
|
if e.ObjectNew.GetLabels()[corev1alpha1.ExtensionReferenceLabel] == "" {
|
|
return false
|
|
}
|
|
oldJob := e.ObjectOld.(*batchv1.Job)
|
|
newJob := e.ObjectNew.(*batchv1.Job)
|
|
return !reflect.DeepEqual(oldJob.Status, newJob.Status)
|
|
},
|
|
CreateFunc: func(e event.CreateEvent) bool {
|
|
return false
|
|
},
|
|
DeleteFunc: func(e event.DeleteEvent) bool {
|
|
return false
|
|
},
|
|
})),
|
|
).
|
|
Watches(
|
|
&corev1.Secret{},
|
|
handler.EnqueueRequestsFromMapFunc(
|
|
func(ctx context.Context, h client.Object) []reconcile.Request {
|
|
releaseName := h.GetLabels()["name"]
|
|
owner := h.GetLabels()["owner"]
|
|
|
|
var result []reconcile.Request
|
|
if releaseName != "" && owner == "helm" {
|
|
result = append(result, reconcile.Request{NamespacedName: types.NamespacedName{
|
|
Name: releaseName,
|
|
}})
|
|
}
|
|
return result
|
|
}),
|
|
builder.WithPredicates(predicate.Funcs{
|
|
UpdateFunc: func(e event.UpdateEvent) bool {
|
|
return e.ObjectNew.(*corev1.Secret).Type == typeHelmRelease
|
|
},
|
|
CreateFunc: func(e event.CreateEvent) bool {
|
|
return e.Object.(*corev1.Secret).Type == typeHelmRelease
|
|
},
|
|
DeleteFunc: func(e event.DeleteEvent) bool {
|
|
return e.Object.(*corev1.Secret).Type == typeHelmRelease
|
|
},
|
|
}),
|
|
).
|
|
Watches(
|
|
&corev1alpha1.ExtensionVersion{},
|
|
handler.EnqueueRequestsFromMapFunc(
|
|
func(ctx context.Context, h client.Object) []reconcile.Request {
|
|
return []reconcile.Request{{
|
|
NamespacedName: types.NamespacedName{
|
|
Name: h.GetLabels()[corev1alpha1.ExtensionReferenceLabel],
|
|
}}}
|
|
}),
|
|
builder.WithPredicates(predicate.Funcs{
|
|
UpdateFunc: func(e event.UpdateEvent) bool {
|
|
return false
|
|
},
|
|
CreateFunc: func(e event.CreateEvent) bool {
|
|
return false
|
|
},
|
|
DeleteFunc: func(e event.DeleteEvent) bool {
|
|
return true
|
|
},
|
|
}),
|
|
).
|
|
Watches(
|
|
&clusterv1alpha1.Cluster{},
|
|
handler.EnqueueRequestsFromMapFunc(r.mapper),
|
|
builder.WithPredicates(clusterpredicate.ClusterStatusChangedPredicate{}),
|
|
).
|
|
WithOptions(controller.Options{MaxConcurrentReconciles: 2}).
|
|
Complete(r)
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
|
logger := r.logger.WithValues("installplan", req.Name)
|
|
ctx = klog.NewContext(ctx, logger)
|
|
|
|
plan := &corev1alpha1.InstallPlan{}
|
|
if err := r.Get(ctx, req.NamespacedName, plan); err != nil {
|
|
return ctrl.Result{}, client.IgnoreNotFound(err)
|
|
}
|
|
|
|
executor, err := r.newExecutor(plan)
|
|
if err != nil {
|
|
logger.Error(err, "failed to create executor")
|
|
return ctrl.Result{}, fmt.Errorf("failed to create executor: %v", err)
|
|
}
|
|
|
|
ctx = context.WithValue(ctx, contextKeyExecutor{}, executor)
|
|
|
|
// fixed kubeconfig
|
|
if kubeConfig, err := clusterutils.BuildKubeconfigFromRestConfig(r.hostResetConfig); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("failed to build host cluster kubeconfig: %v", err)
|
|
} else {
|
|
ctx = context.WithValue(ctx, contextKeyHostKubeConfig{}, kubeConfig)
|
|
}
|
|
|
|
extensionVersion := &corev1alpha1.ExtensionVersion{}
|
|
extensionVersionName := fmt.Sprintf("%s-%s", plan.Spec.Extension.Name, plan.Spec.Extension.Version)
|
|
if err = r.Get(ctx, types.NamespacedName{Name: extensionVersionName}, extensionVersion); err != nil {
|
|
if errors.IsNotFound(err) {
|
|
logger.Info("extension version not found", "name", extensionVersionName)
|
|
return ctrl.Result{}, nil
|
|
}
|
|
return ctrl.Result{}, fmt.Errorf("failed to get extension version: %v", err)
|
|
}
|
|
ctx = context.WithValue(ctx, contextKeyExtensionVersion{}, extensionVersion)
|
|
|
|
if !plan.ObjectMeta.DeletionTimestamp.IsZero() {
|
|
return r.reconcileDelete(ctx, plan)
|
|
}
|
|
|
|
if !controllerutil.ContainsFinalizer(plan, installPlanProtection) {
|
|
expected := plan.DeepCopy()
|
|
controllerutil.AddFinalizer(expected, installPlanProtection)
|
|
return ctrl.Result{Requeue: true}, r.Patch(ctx, expected, client.MergeFrom(plan))
|
|
}
|
|
|
|
targetNamespace := extensionVersion.Spec.Namespace
|
|
if targetNamespace == "" {
|
|
targetNamespace = fmt.Sprintf("extension-%s", plan.Spec.Extension.Name)
|
|
}
|
|
|
|
if plan.Status.TargetNamespace != targetNamespace {
|
|
plan.Status.TargetNamespace = targetNamespace
|
|
return ctrl.Result{Requeue: true}, r.updateInstallPlan(ctx, plan)
|
|
}
|
|
|
|
if err := r.syncInstallPlanStatus(ctx, plan); err != nil {
|
|
logger.Error(err, "failed to sync installplan status")
|
|
return ctrl.Result{}, fmt.Errorf("failed to sync installplan status: %v", err)
|
|
}
|
|
|
|
// Multi-cluster installation
|
|
if plan.Spec.ClusterScheduling != nil {
|
|
if err := r.syncClusterSchedulingStatus(ctx, plan); err != nil {
|
|
logger.Error(err, "failed to sync scheduling status")
|
|
return ctrl.Result{}, fmt.Errorf("failed to sync scheduling status: %v", err)
|
|
}
|
|
}
|
|
|
|
logger.V(4).Info("Successfully synced")
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// reconcileDelete delete the helm release involved and remove finalizer from installplan.
|
|
func (r *InstallPlanReconciler) reconcileDelete(ctx context.Context, plan *corev1alpha1.InstallPlan) (ctrl.Result, error) {
|
|
// It has not been installed correctly.
|
|
if plan.Status.ReleaseName == "" {
|
|
if err := r.postRemove(ctx, plan); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("failed to remove finalizer: %v", err)
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
executor := ctx.Value(contextKeyExecutor{}).(helm.Executor)
|
|
|
|
if len(plan.Status.ClusterSchedulingStatuses) > 0 {
|
|
for clusterName := range plan.Status.ClusterSchedulingStatuses {
|
|
if err := r.uninstallClusterAgent(ctx, plan, clusterName); err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
hostKubeConfig := ctx.Value(contextKeyHostKubeConfig{}).([]byte)
|
|
|
|
opts := []helm.HelmOption{
|
|
helm.SetKubeconfig(hostKubeConfig),
|
|
helm.SetNamespace(plan.Status.TargetNamespace),
|
|
helm.SetTimeout(r.HelmExecutorOptions.Timeout),
|
|
}
|
|
|
|
if _, ok := plan.Annotations[corev1alpha1.ForceDeleteAnnotation]; ok {
|
|
if err := executor.ForceDelete(ctx, plan.Status.ReleaseName, opts...); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("failed to force delete helm release: %v", err)
|
|
}
|
|
if err := r.postRemove(ctx, plan); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("failed to remove finalizer: %v", err)
|
|
}
|
|
}
|
|
|
|
// Check if the target helm release exists.
|
|
// If it does, there is no need to execute the installation process again.
|
|
_, err := executor.Get(ctx, plan.Status.ReleaseName, opts...)
|
|
if err != nil {
|
|
if isReleaseNotFoundError(err) {
|
|
return ctrl.Result{}, r.postRemove(ctx, plan)
|
|
}
|
|
klog.FromContext(ctx).Error(err, "failed to get helm release status")
|
|
return ctrl.Result{}, fmt.Errorf("failed to get helm release status: %v", err)
|
|
}
|
|
|
|
if err := r.syncInstallationStatus(ctx, hostKubeConfig, plan.Status.TargetNamespace, plan.Spec.Extension.Name, &plan.Status.InstallationStatus); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("failed to sync installation status: %v", err)
|
|
}
|
|
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("failed to update install plan: %v", err)
|
|
}
|
|
|
|
installationStatus := plan.Status.InstallationStatus
|
|
if installationStatus.State != corev1alpha1.StateUninstalling &&
|
|
installationStatus.State != corev1alpha1.StateUninstallFailed &&
|
|
installationStatus.State != corev1alpha1.StateUninstalled {
|
|
|
|
extensionVersion, ok := ctx.Value(contextKeyExtensionVersion{}).(*corev1alpha1.ExtensionVersion)
|
|
if !ok {
|
|
return ctrl.Result{}, fmt.Errorf("failed to get extension version from context")
|
|
}
|
|
|
|
helmOptions := []helm.HelmOption{
|
|
helm.SetKubeconfig(hostKubeConfig),
|
|
helm.SetNamespace(plan.Status.TargetNamespace),
|
|
helm.SetTimeout(r.HelmExecutorOptions.Timeout),
|
|
helm.SetHookImage(r.getHookImageForUninstall(extensionVersion)),
|
|
}
|
|
|
|
jobName, err := executor.Uninstall(ctx, plan.Status.ReleaseName, helmOptions...)
|
|
if err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("failed to uninstall helm release: %v", err)
|
|
}
|
|
installationStatus.JobName = jobName
|
|
updateStateAndConditions(&installationStatus, corev1alpha1.StateUninstalling, "", time.Now())
|
|
plan.Status.InstallationStatus = installationStatus
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
}
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) getHookImageForUninstall(extensionVersion *corev1alpha1.ExtensionVersion) string {
|
|
hookImage := extensionVersion.Annotations[corev1alpha1.ExecutorHookImageAnnotation]
|
|
if extensionVersion.Annotations[corev1alpha1.ExecutorUninstallHookImageAnnotation] != "" {
|
|
hookImage = extensionVersion.Annotations[corev1alpha1.ExecutorUninstallHookImageAnnotation]
|
|
}
|
|
if r.ExtensionOptions.ImageRegistry != "" && hookImage != "" {
|
|
hookImage = path.Join(r.ExtensionOptions.ImageRegistry, hookImage)
|
|
}
|
|
return hookImage
|
|
}
|
|
|
|
func latestJobCondition(job *batchv1.Job) batchv1.JobCondition {
|
|
condition := batchv1.JobCondition{}
|
|
if job == nil {
|
|
return condition
|
|
}
|
|
|
|
jobConditions := job.Status.Conditions
|
|
sort.Slice(jobConditions, func(i, j int) bool {
|
|
return jobConditions[i].LastTransitionTime.After(jobConditions[j].LastTransitionTime.Time)
|
|
})
|
|
if len(job.Status.Conditions) > 0 {
|
|
return jobConditions[0]
|
|
}
|
|
return condition
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) loadChartData(ctx context.Context) ([]byte, string, error) {
|
|
extensionVersion, ok := ctx.Value(contextKeyExtensionVersion{}).(*corev1alpha1.ExtensionVersion)
|
|
if !ok {
|
|
return nil, "", fmt.Errorf("failed to get extension version from context")
|
|
}
|
|
|
|
// load chart data from
|
|
if extensionVersion.Spec.ChartDataRef != nil {
|
|
configMap := &corev1.ConfigMap{}
|
|
if err := r.Get(ctx, types.NamespacedName{Namespace: extensionVersion.Spec.ChartDataRef.Namespace, Name: extensionVersion.Spec.ChartDataRef.Name}, configMap); err != nil {
|
|
return nil, "", err
|
|
}
|
|
data := configMap.BinaryData[extensionVersion.Spec.ChartDataRef.Key]
|
|
if data != nil {
|
|
return data, "", nil
|
|
}
|
|
return nil, "", fmt.Errorf("binary data not found")
|
|
}
|
|
|
|
chartURL, err := url.Parse(extensionVersion.Spec.ChartURL)
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("failed to parse chart url: %v", err)
|
|
}
|
|
|
|
var caBundle string
|
|
repo := &corev1alpha1.Repository{}
|
|
if extensionVersion.Spec.Repository != "" {
|
|
if err := r.Get(ctx, types.NamespacedName{Name: extensionVersion.Spec.Repository}, repo); err != nil {
|
|
return nil, "", fmt.Errorf("failed to get repository: %v", err)
|
|
}
|
|
caBundle = repo.Spec.CABundle
|
|
}
|
|
|
|
var chartGetter getter.Getter
|
|
switch chartURL.Scheme {
|
|
case registry.OCIScheme:
|
|
opts := make([]getter.Option, 0)
|
|
if extensionVersion.Spec.Repository != "" {
|
|
opts = append(opts, getter.WithInsecureSkipVerifyTLS(repo.Spec.Insecure))
|
|
}
|
|
if repo.Spec.BasicAuth != nil {
|
|
opts = append(opts, getter.WithBasicAuth(repo.Spec.BasicAuth.Username, repo.Spec.BasicAuth.Password))
|
|
}
|
|
chartGetter, err = getter.NewOCIGetter(opts...)
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("failed to create chart getter: %v", err)
|
|
}
|
|
case "http", "https":
|
|
opts := make([]getter.Option, 0)
|
|
if chartURL.Scheme == "https" && extensionVersion.Spec.Repository != "" {
|
|
opts = append(opts, getter.WithInsecureSkipVerifyTLS(repo.Spec.Insecure))
|
|
}
|
|
if repo.Spec.CABundle != "" {
|
|
caFile, err := storeCAFile(repo.Spec.CABundle, repo.Name)
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("failed to store CABundle to local file: %s", err)
|
|
}
|
|
opts = append(opts, getter.WithTLSClientConfig("", "", caFile))
|
|
}
|
|
if chartURL.Scheme == "https" {
|
|
opts = append(opts, getter.WithInsecureSkipVerifyTLS(repo.Spec.Insecure))
|
|
}
|
|
if repo.Spec.BasicAuth != nil {
|
|
opts = append(opts, getter.WithBasicAuth(repo.Spec.BasicAuth.Username, repo.Spec.BasicAuth.Password))
|
|
}
|
|
chartGetter, err = getter.NewHTTPGetter(opts...)
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("failed to create chart getter: %v", err)
|
|
}
|
|
default:
|
|
return nil, "", fmt.Errorf("unsupported scheme: %s", chartURL.Scheme)
|
|
}
|
|
|
|
buffer, err := chartGetter.Get(chartURL.String())
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("failed to get chart data: %v", err)
|
|
}
|
|
|
|
data, err := io.ReadAll(buffer)
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("failed to read chart data: %v", err)
|
|
}
|
|
|
|
return data, caBundle, nil
|
|
}
|
|
|
|
func updateState(status *corev1alpha1.InstallationStatus, state string, time time.Time) bool {
|
|
var lastState corev1alpha1.InstallPlanState
|
|
if len(status.StateHistory) > 0 {
|
|
lastState = status.StateHistory[len(status.StateHistory)-1]
|
|
}
|
|
|
|
if lastState.State == state {
|
|
return false
|
|
}
|
|
|
|
if time.Before(lastState.LastTransitionTime.Time) {
|
|
return false
|
|
}
|
|
|
|
status.State = state
|
|
|
|
newState := corev1alpha1.InstallPlanState{
|
|
LastTransitionTime: metav1.NewTime(time),
|
|
State: state,
|
|
}
|
|
|
|
if status.StateHistory == nil {
|
|
status.StateHistory = make([]corev1alpha1.InstallPlanState, 0)
|
|
}
|
|
|
|
status.StateHistory = append(status.StateHistory, newState)
|
|
|
|
sort.Slice(status.StateHistory, func(i, j int) bool {
|
|
return status.StateHistory[i].LastTransitionTime.Before(&status.StateHistory[j].LastTransitionTime)
|
|
})
|
|
|
|
if len(status.StateHistory) > corev1alpha1.MaxStateNum {
|
|
status.StateHistory = status.StateHistory[len(status.StateHistory)-corev1alpha1.MaxStateNum:]
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func updateCondition(status *corev1alpha1.InstallationStatus, conditionType, reason, message string, conditionStatus metav1.ConditionStatus, time time.Time) {
|
|
conditions := []metav1.Condition{
|
|
{
|
|
Type: conditionType,
|
|
Reason: reason,
|
|
Status: conditionStatus,
|
|
LastTransitionTime: metav1.NewTime(time),
|
|
Message: message,
|
|
},
|
|
}
|
|
|
|
if len(status.Conditions) == 0 {
|
|
status.Conditions = conditions
|
|
return
|
|
}
|
|
|
|
for _, c := range status.Conditions {
|
|
if c.Type != conditionType {
|
|
conditions = append(conditions, c)
|
|
}
|
|
}
|
|
|
|
status.Conditions = conditions
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) postRemove(ctx context.Context, plan *corev1alpha1.InstallPlan) error {
|
|
message := fmt.Sprintf("The extension %s has been successfully uninstalled.", plan.Spec.Extension.Name)
|
|
updateStateAndConditions(&plan.Status.InstallationStatus, corev1alpha1.StateUninstalled, message, time.Now())
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return err
|
|
}
|
|
// Remove the finalizer from the installplan and update it.
|
|
if controllerutil.RemoveFinalizer(plan, installPlanProtection) {
|
|
return r.Update(ctx, plan)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) syncExtendedAPIStatus(ctx context.Context, clusterClient client.Client, plan *corev1alpha1.InstallPlan) error {
|
|
if err := syncJSBundleStatus(ctx, clusterClient, plan); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := syncAPIServiceStatus(ctx, clusterClient, plan); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := syncReverseProxyStatus(ctx, clusterClient, plan); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := syncExtensionEntryStatus(ctx, clusterClient, plan); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) updateInstallPlan(ctx context.Context, plan *corev1alpha1.InstallPlan) error {
|
|
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
target := &corev1alpha1.InstallPlan{}
|
|
if err := r.Get(ctx, client.ObjectKey{Name: plan.Name}, target); err != nil {
|
|
return client.IgnoreNotFound(err)
|
|
}
|
|
if !reflect.DeepEqual(target.Labels, plan.Labels) ||
|
|
!reflect.DeepEqual(target.Annotations, plan.Annotations) ||
|
|
!reflect.DeepEqual(target.Status, plan.Status) {
|
|
if r.logger.V(4).Enabled() {
|
|
r.logger.Info("installplan status changed", "name", plan.Name, "status", plan.Status)
|
|
}
|
|
target.Labels = plan.Labels
|
|
target.Annotations = plan.Annotations
|
|
target.Status = plan.Status
|
|
if err := r.Update(ctx, target); err != nil {
|
|
return fmt.Errorf("failed to update install plan: %v", err)
|
|
}
|
|
target.DeepCopyInto(plan)
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return fmt.Errorf("failed to update install plan %s: %s", plan.Name, err)
|
|
}
|
|
if err := r.syncExtensionStatus(ctx, plan); err != nil {
|
|
return fmt.Errorf("failed to sync extension %s status: %s", plan.Spec.Extension.Name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func createNamespaceIfNotExists(ctx context.Context, client client.Client, namespace, extensionName string) error {
|
|
var ns corev1.Namespace
|
|
if err := client.Get(ctx, types.NamespacedName{Name: namespace}, &ns); err != nil {
|
|
if errors.IsNotFound(err) {
|
|
ns = corev1.Namespace{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: namespace,
|
|
Labels: map[string]string{
|
|
tenantv1beta1.WorkspaceLabel: systemWorkspace,
|
|
corev1alpha1.ExtensionReferenceLabel: extensionName,
|
|
constants.KubeSphereManagedLabel: "true",
|
|
},
|
|
},
|
|
}
|
|
if err := client.Create(ctx, &ns); err != nil {
|
|
return fmt.Errorf("failed to create namespace: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("failed to get namespace: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func createOrUpdateRole(ctx context.Context, client client.Client, namespace, extensionName string, rules []rbacv1.PolicyRule) error {
|
|
roleName := fmt.Sprintf(defaultRoleFormat, extensionName)
|
|
|
|
var defaultRules = []rbacv1.PolicyRule{
|
|
{
|
|
Verbs: []string{"*"},
|
|
APIGroups: []string{"", "apps", "batch", "policy", "networking.k8s.io", "autoscaling", "metrics.k8s.io"},
|
|
Resources: []string{"*"},
|
|
},
|
|
}
|
|
|
|
role := &rbacv1.Role{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: roleName}}
|
|
op, err := controllerutil.CreateOrUpdate(ctx, client, role, func() error {
|
|
role.Labels = map[string]string{corev1alpha1.ExtensionReferenceLabel: extensionName}
|
|
role.Rules = rules
|
|
_, uncoveredRules := rbachelper.Covers(role.Rules, defaultRules)
|
|
if len(uncoveredRules) > 0 {
|
|
role.Rules = append(role.Rules, uncoveredRules...)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
klog.V(4).Infof("role %s in namespace %s %s", role.Name, role.Namespace, op)
|
|
|
|
return nil
|
|
}
|
|
|
|
func createOrUpdateRoleBinding(ctx context.Context, client client.Client, namespace, extensionName string, sa rbacv1.Subject) error {
|
|
roleName := fmt.Sprintf(defaultRoleFormat, extensionName)
|
|
roleBindingName := fmt.Sprintf(defaultRoleBindingFormat, extensionName)
|
|
|
|
roleBinding := &rbacv1.RoleBinding{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: roleBindingName}}
|
|
|
|
op, err := controllerutil.CreateOrUpdate(ctx, client, roleBinding, func() error {
|
|
roleBinding.Labels = map[string]string{corev1alpha1.ExtensionReferenceLabel: extensionName}
|
|
roleBinding.RoleRef = rbacv1.RoleRef{APIGroup: rbacv1.GroupName, Kind: "Role", Name: roleName}
|
|
roleBinding.Subjects = []rbacv1.Subject{sa}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
klog.V(4).Infof("role binding %s in namespace %s %s", roleBinding.Name, roleBinding.Namespace, op)
|
|
|
|
return nil
|
|
}
|
|
|
|
func initTargetNamespace(ctx context.Context, client client.Client, namespace, extensionName string, clusterRole rbacv1.ClusterRole, role rbacv1.Role) error {
|
|
if err := createNamespaceIfNotExists(ctx, client, namespace, extensionName); err != nil {
|
|
return fmt.Errorf("failed to create namespace: %v", err)
|
|
}
|
|
sa := rbacv1.Subject{
|
|
Kind: rbacv1.ServiceAccountKind,
|
|
Name: fmt.Sprintf("helm-executor.%s", extensionName),
|
|
Namespace: namespace,
|
|
}
|
|
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
if err := createOrUpdateRole(ctx, client, namespace, extensionName, role.Rules); err != nil {
|
|
return err
|
|
}
|
|
if err := createOrUpdateRoleBinding(ctx, client, namespace, extensionName, sa); err != nil {
|
|
return err
|
|
}
|
|
if err := createOrUpdateClusterRole(ctx, client, extensionName, clusterRole.Rules); err != nil {
|
|
return err
|
|
}
|
|
if err := createOrUpdateClusterRoleBinding(ctx, client, extensionName, sa); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func createOrUpdateClusterRole(ctx context.Context, client client.Client, extensionName string, rules []rbacv1.PolicyRule) error {
|
|
clusterRoleName := fmt.Sprintf(defaultClusterRoleFormat, extensionName)
|
|
clusterRole := &rbacv1.ClusterRole{ObjectMeta: metav1.ObjectMeta{Name: clusterRoleName}}
|
|
|
|
op, err := controllerutil.CreateOrUpdate(ctx, client, clusterRole, func() error {
|
|
clusterRole.Labels = map[string]string{corev1alpha1.ExtensionReferenceLabel: extensionName}
|
|
clusterRole.Rules = rules
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
klog.V(4).Infof("cluster role %s %s", clusterRole.Name, op)
|
|
|
|
return nil
|
|
}
|
|
|
|
func createOrUpdateClusterRoleBinding(ctx context.Context, client client.Client, extensionName string, sa rbacv1.Subject) error {
|
|
clusterRoleName := fmt.Sprintf(defaultClusterRoleFormat, extensionName)
|
|
clusterRoleBindingName := fmt.Sprintf(defaultClusterRoleBindingFormat, extensionName)
|
|
clusterRoleBinding := &rbacv1.ClusterRoleBinding{ObjectMeta: metav1.ObjectMeta{Name: clusterRoleBindingName}}
|
|
|
|
op, err := controllerutil.CreateOrUpdate(ctx, client, clusterRoleBinding, func() error {
|
|
if clusterRoleBinding.RoleRef.Name != "" && clusterRoleBinding.RoleRef.Name != clusterRoleName {
|
|
return fmt.Errorf("conflict cluster role binding found: %s", clusterRoleBindingName)
|
|
}
|
|
clusterRoleBinding.Labels = map[string]string{corev1alpha1.ExtensionReferenceLabel: extensionName}
|
|
clusterRoleBinding.RoleRef = rbacv1.RoleRef{APIGroup: rbacv1.GroupName, Kind: "ClusterRole", Name: clusterRoleName}
|
|
clusterRoleBinding.Subjects = []rbacv1.Subject{sa}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
klog.V(4).Infof("cluster role binding %s %s", clusterRoleBinding.Name, op)
|
|
|
|
return nil
|
|
}
|
|
|
|
func syncJSBundleStatus(ctx context.Context, clusterClient client.Client, plan *corev1alpha1.InstallPlan) error {
|
|
jsBundles := &extensionsv1alpha1.JSBundleList{}
|
|
if err := clusterClient.List(ctx, jsBundles, client.MatchingLabels{corev1alpha1.ExtensionReferenceLabel: plan.Spec.Extension.Name}); err != nil {
|
|
return err
|
|
}
|
|
for _, jsBundle := range jsBundles.Items {
|
|
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
if err := clusterClient.Get(ctx, types.NamespacedName{Name: jsBundle.Name}, &jsBundle); err != nil {
|
|
return err
|
|
}
|
|
// TODO unavailable state should be considered
|
|
expected := jsBundle.DeepCopy()
|
|
if plan.Spec.Enabled {
|
|
expected.Status.State = extensionsv1alpha1.StateAvailable
|
|
} else {
|
|
expected.Status.State = extensionsv1alpha1.StateDisabled
|
|
}
|
|
if expected.Status.State != jsBundle.Status.State {
|
|
if err := clusterClient.Update(ctx, expected); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update js bundle status: %v", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func syncAPIServiceStatus(ctx context.Context, clusterClient client.Client, plan *corev1alpha1.InstallPlan) error {
|
|
apiServices := &extensionsv1alpha1.APIServiceList{}
|
|
if err := clusterClient.List(ctx, apiServices, client.MatchingLabels{corev1alpha1.ExtensionReferenceLabel: plan.Spec.Extension.Name}); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, apiService := range apiServices.Items {
|
|
|
|
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
if err := clusterClient.Get(ctx, types.NamespacedName{Name: apiService.Name}, &apiService); err != nil {
|
|
return err
|
|
}
|
|
// TODO unavailable state should be considered
|
|
expected := apiService.DeepCopy()
|
|
if plan.Spec.Enabled {
|
|
expected.Status.State = extensionsv1alpha1.StateAvailable
|
|
} else {
|
|
expected.Status.State = extensionsv1alpha1.StateDisabled
|
|
}
|
|
if expected.Status.State != apiService.Status.State {
|
|
if err := clusterClient.Update(ctx, expected); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update api service status: %v", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func syncReverseProxyStatus(ctx context.Context, clusterClient client.Client, plan *corev1alpha1.InstallPlan) error {
|
|
reverseProxies := &extensionsv1alpha1.ReverseProxyList{}
|
|
if err := clusterClient.List(ctx, reverseProxies, client.MatchingLabels{corev1alpha1.ExtensionReferenceLabel: plan.Spec.Extension.Name}); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, reverseProxy := range reverseProxies.Items {
|
|
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
if err := clusterClient.Get(ctx, types.NamespacedName{Name: reverseProxy.Name}, &reverseProxy); err != nil {
|
|
return err
|
|
}
|
|
expected := reverseProxy.DeepCopy()
|
|
if plan.Spec.Enabled {
|
|
expected.Status.State = extensionsv1alpha1.StateAvailable
|
|
} else {
|
|
expected.Status.State = extensionsv1alpha1.StateDisabled
|
|
}
|
|
if expected.Status.State != reverseProxy.Status.State {
|
|
if err := clusterClient.Update(ctx, expected); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update reverse proxy status: %v", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func syncExtensionEntryStatus(ctx context.Context, clusterClient client.Client, plan *corev1alpha1.InstallPlan) error {
|
|
extensionEntries := &extensionsv1alpha1.ExtensionEntryList{}
|
|
if err := clusterClient.List(ctx, extensionEntries, client.MatchingLabels{corev1alpha1.ExtensionReferenceLabel: plan.Spec.Extension.Name}); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, extensionEntry := range extensionEntries.Items {
|
|
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
if err := clusterClient.Get(ctx, types.NamespacedName{Name: extensionEntry.Name}, &extensionEntry); err != nil {
|
|
return err
|
|
}
|
|
expected := extensionEntry.DeepCopy()
|
|
if plan.Spec.Enabled {
|
|
expected.Status.State = extensionsv1alpha1.StateAvailable
|
|
} else {
|
|
expected.Status.State = extensionsv1alpha1.StateDisabled
|
|
}
|
|
if expected.Status.State != extensionEntry.Status.State {
|
|
if err := clusterClient.Update(ctx, expected); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update extension entry status: %v", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) syncExtensionStatus(ctx context.Context, plan *corev1alpha1.InstallPlan) error {
|
|
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
extension := &corev1alpha1.Extension{}
|
|
if err := r.Get(ctx, types.NamespacedName{Name: plan.Spec.Extension.Name}, extension); err != nil {
|
|
return client.IgnoreNotFound(err)
|
|
}
|
|
|
|
updated := extension.DeepCopy()
|
|
updated.Status.State = plan.Status.State
|
|
updated.Status.Enabled = plan.Status.Enabled
|
|
updated.Status.Conditions = plan.Status.Conditions
|
|
updated.Status.PlannedInstallVersion = plan.Spec.Extension.Version
|
|
updated.Status.InstalledVersion = plan.Status.Version
|
|
updated.Status.ClusterSchedulingStatuses = plan.Status.ClusterSchedulingStatuses
|
|
|
|
if plan.Status.State == corev1alpha1.StateUninstalled {
|
|
updated.Status.State = ""
|
|
updated.Status.Enabled = false
|
|
updated.Status.Conditions = nil
|
|
updated.Status.PlannedInstallVersion = ""
|
|
updated.Status.InstalledVersion = ""
|
|
updated.Status.ClusterSchedulingStatuses = nil
|
|
}
|
|
|
|
if !reflect.DeepEqual(extension.Status, updated.Status) {
|
|
if err := r.Update(ctx, updated); err != nil {
|
|
return err
|
|
}
|
|
if r.logger.V(4).Enabled() {
|
|
r.logger.Info("extension status changed", "name", extension.Name, "status", updated.Status)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update extension status: %s", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) syncClusterSchedulingStatus(ctx context.Context, plan *corev1alpha1.InstallPlan) error {
|
|
logger := klog.FromContext(ctx)
|
|
if plan.Status.State != corev1alpha1.StateDeployed {
|
|
return nil
|
|
}
|
|
// extension is already installed
|
|
var targetClusters []clusterv1alpha1.Cluster
|
|
if len(plan.Spec.ClusterScheduling.Placement.Clusters) > 0 {
|
|
for _, target := range plan.Spec.ClusterScheduling.Placement.Clusters {
|
|
var cluster clusterv1alpha1.Cluster
|
|
if err := r.Get(ctx, types.NamespacedName{Name: target}, &cluster); err != nil {
|
|
if errors.IsNotFound(err) {
|
|
logger.V(4).Info("cluster not found")
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
targetClusters = append(targetClusters, cluster)
|
|
}
|
|
} else if plan.Spec.ClusterScheduling.Placement.ClusterSelector != nil {
|
|
clusterList := &clusterv1alpha1.ClusterList{}
|
|
selector, err := metav1.LabelSelectorAsSelector(plan.Spec.ClusterScheduling.Placement.ClusterSelector)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := r.List(ctx, clusterList, client.MatchingLabelsSelector{Selector: selector}); err != nil {
|
|
return err
|
|
}
|
|
targetClusters = clusterList.Items
|
|
}
|
|
|
|
for _, cluster := range targetClusters {
|
|
if err := r.syncClusterAgentStatus(ctx, plan, &cluster); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for clusterName := range plan.Status.ClusterSchedulingStatuses {
|
|
if !hasCluster(targetClusters, clusterName) {
|
|
if err := r.uninstallClusterAgent(ctx, plan, clusterName); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) cleanupOutdatedJobsAndConfigMaps(ctx context.Context, plan *corev1alpha1.InstallPlan) error {
|
|
jobNames := make([]string, 0)
|
|
if plan.Status.JobName != "" {
|
|
jobNames = append(jobNames, plan.Status.JobName)
|
|
}
|
|
for _, s := range plan.Status.ClusterSchedulingStatuses {
|
|
if s.JobName != "" {
|
|
jobNames = append(jobNames, s.JobName)
|
|
}
|
|
}
|
|
if len(jobNames) == 0 {
|
|
return nil
|
|
}
|
|
|
|
selector, _ := labels.Parse(fmt.Sprintf(
|
|
"%s=%s,%s=%s,name notin (%s)",
|
|
constants.KubeSphereManagedLabel, "true",
|
|
corev1alpha1.ExtensionReferenceLabel, plan.Spec.Extension.Name,
|
|
strings.Join(jobNames, ","),
|
|
))
|
|
|
|
deletePolicy := metav1.DeletePropagationBackground
|
|
if err := r.DeleteAllOf(ctx, &batchv1.Job{}, client.InNamespace(plan.Status.TargetNamespace), &client.DeleteAllOfOptions{
|
|
ListOptions: client.ListOptions{LabelSelector: selector},
|
|
DeleteOptions: client.DeleteOptions{PropagationPolicy: &deletePolicy},
|
|
}); err != nil {
|
|
return fmt.Errorf("failed to delete related helm executor jobs: %s", err)
|
|
}
|
|
if err := r.DeleteAllOf(ctx, &corev1.ConfigMap{}, client.InNamespace(plan.Status.TargetNamespace), &client.DeleteAllOfOptions{
|
|
ListOptions: client.ListOptions{LabelSelector: selector},
|
|
}); err != nil {
|
|
return fmt.Errorf("failed to delete related helm executor configmaps: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// syncInstallPlanStatus syncs the installation status of an extension.
|
|
func (r *InstallPlanReconciler) syncInstallPlanStatus(ctx context.Context, plan *corev1alpha1.InstallPlan) error {
|
|
releaseName := plan.Spec.Extension.Name
|
|
targetNamespace := plan.Status.TargetNamespace
|
|
|
|
hostKubeConfig := ctx.Value(contextKeyHostKubeConfig{}).([]byte)
|
|
installationStatus := plan.Status.InstallationStatus
|
|
|
|
if err := r.syncInstallationStatus(ctx, hostKubeConfig, targetNamespace, releaseName, &installationStatus); err != nil {
|
|
return fmt.Errorf("failed to sync release status: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(plan.Status.InstallationStatus, installationStatus) {
|
|
plan.Status.InstallationStatus = installationStatus
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return fmt.Errorf("failed to sync extension status: %v", err)
|
|
}
|
|
}
|
|
|
|
switch plan.Status.State {
|
|
case "":
|
|
return r.installOrUpgradeExtension(ctx, plan, false)
|
|
case corev1alpha1.StateInstallFailed:
|
|
// upgrade after configuration changes
|
|
if configChanged(plan, "") || versionChanged(plan, "") {
|
|
return r.installOrUpgradeExtension(ctx, plan, false)
|
|
}
|
|
case corev1alpha1.StatePreparing, corev1alpha1.StateInstalling, corev1alpha1.StateUpgrading:
|
|
// waiting for the installation to complete
|
|
return nil
|
|
case corev1alpha1.StateDeployed, corev1alpha1.StateUpgradeFailed:
|
|
// upgrade after configuration changes
|
|
if configChanged(plan, "") || versionChanged(plan, "") {
|
|
return r.installOrUpgradeExtension(ctx, plan, true)
|
|
}
|
|
}
|
|
|
|
if plan.Status.State == corev1alpha1.StateDeployed {
|
|
if err := r.syncExtendedAPIStatus(ctx, r.Client, plan); err != nil {
|
|
return fmt.Errorf("failed to sync extended api status: %v", err)
|
|
}
|
|
|
|
if plan.Status.Enabled != plan.Spec.Enabled {
|
|
plan.Status.Enabled = plan.Spec.Enabled
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return fmt.Errorf("failed to sync extension status: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) syncClusterAgentStatus(ctx context.Context,
|
|
plan *corev1alpha1.InstallPlan, cluster *clusterv1alpha1.Cluster) error {
|
|
if !clusterutils.IsClusterSchedulable(cluster) {
|
|
klog.V(4).Infof("cluster %s is not schedulable", cluster.Name)
|
|
return nil
|
|
}
|
|
if plan.Status.ClusterSchedulingStatuses == nil {
|
|
plan.Status.ClusterSchedulingStatuses = make(map[string]corev1alpha1.InstallationStatus)
|
|
}
|
|
|
|
releaseName := fmt.Sprintf(agentReleaseFormat, plan.Spec.Extension.Name)
|
|
targetNamespace := plan.Status.TargetNamespace
|
|
kubeConfig := cluster.Spec.Connection.KubeConfig
|
|
installationStatus := plan.Status.ClusterSchedulingStatuses[cluster.Name]
|
|
|
|
if err := r.syncInstallationStatus(ctx, kubeConfig, targetNamespace, releaseName, &installationStatus); err != nil {
|
|
return fmt.Errorf("failed to sync cluster agent release status: %v", err)
|
|
}
|
|
|
|
plan.Status.ClusterSchedulingStatuses[cluster.Name] = installationStatus
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return fmt.Errorf("failed to sync cluster agent status: %v", err)
|
|
}
|
|
|
|
switch plan.Status.ClusterSchedulingStatuses[cluster.Name].State {
|
|
case "":
|
|
return r.installOrUpgradeClusterAgent(ctx, plan, cluster, false)
|
|
case corev1alpha1.StateInstallFailed:
|
|
// upgrade after configuration changes
|
|
if configChanged(plan, cluster.Name) || versionChanged(plan, cluster.Name) {
|
|
return r.installOrUpgradeClusterAgent(ctx, plan, cluster, false)
|
|
}
|
|
case corev1alpha1.StatePreparing, corev1alpha1.StateInstalling, corev1alpha1.StateUpgrading:
|
|
// waiting for the installation to complete
|
|
return nil
|
|
case corev1alpha1.StateDeployed, corev1alpha1.StateUpgradeFailed:
|
|
// upgrade after configuration changes
|
|
if configChanged(plan, cluster.Name) || versionChanged(plan, cluster.Name) {
|
|
return r.installOrUpgradeClusterAgent(ctx, plan, cluster, true)
|
|
}
|
|
}
|
|
|
|
if plan.Status.ClusterSchedulingStatuses[cluster.Name].State == corev1alpha1.StateDeployed {
|
|
clusterClient, err := r.clusterClientSet.GetRuntimeClient(cluster.Name)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get cluster client: %v", err)
|
|
}
|
|
|
|
if err := r.syncExtendedAPIStatus(ctx, clusterClient, plan); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) installOrUpgradeExtension(ctx context.Context, plan *corev1alpha1.InstallPlan, upgrade bool) error {
|
|
updateStateAndConditions(&plan.Status.InstallationStatus, corev1alpha1.StatePreparing, "", time.Now())
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return err
|
|
}
|
|
var onFailed = func(err error) error {
|
|
if upgrade {
|
|
klog.FromContext(ctx).Error(err, "failed to upgrade extension")
|
|
message := fmt.Sprintf("Failed to upgrade extension: %s", err)
|
|
updateStateAndConditions(&plan.Status.InstallationStatus, corev1alpha1.StateUpgradeFailed, message, time.Now())
|
|
} else {
|
|
klog.FromContext(ctx).Error(err, "failed to install extension")
|
|
message := fmt.Sprintf("Failed to install extension: %s", err)
|
|
updateStateAndConditions(&plan.Status.InstallationStatus, corev1alpha1.StateInstallFailed, message, time.Now())
|
|
}
|
|
return r.updateInstallPlan(ctx, plan)
|
|
}
|
|
|
|
chartData, caBundle, err := r.loadChartData(ctx)
|
|
if err != nil {
|
|
return onFailed(err)
|
|
}
|
|
|
|
mainChart, err := loader.LoadArchive(bytes.NewReader(chartData))
|
|
if err != nil {
|
|
return onFailed(fmt.Errorf("failed to load chart data: %v", err))
|
|
}
|
|
|
|
releaseName := plan.Spec.Extension.Name
|
|
clusterRole, role := usesPermissions(mainChart)
|
|
if err = initTargetNamespace(ctx, r.Client, plan.Status.TargetNamespace, plan.Spec.Extension.Name, clusterRole, role); err != nil {
|
|
return onFailed(fmt.Errorf("failed to init target namespace: %v", err))
|
|
}
|
|
|
|
if !upgrade {
|
|
message := fmt.Sprintf("The extension %s has been successfully initialized.", plan.Spec.Extension.Name)
|
|
updateCondition(&plan.Status.InstallationStatus, corev1alpha1.ConditionTypeInitialized, initialized, message, metav1.ConditionTrue, time.Now())
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
hostKubeConfig := ctx.Value(contextKeyHostKubeConfig{}).([]byte)
|
|
|
|
extensionVersion, ok := ctx.Value(contextKeyExtensionVersion{}).(*corev1alpha1.ExtensionVersion)
|
|
if !ok {
|
|
return fmt.Errorf("failed to get extension version from context")
|
|
}
|
|
|
|
helmOptions := []helm.HelmOption{
|
|
helm.SetKubeconfig(hostKubeConfig),
|
|
helm.SetNamespace(plan.Status.TargetNamespace),
|
|
helm.SetInstall(!upgrade),
|
|
helm.SetTimeout(r.HelmExecutorOptions.Timeout),
|
|
helm.SetKubeAsUser(fmt.Sprintf("system:serviceaccount:%s:helm-executor.%s", plan.Status.TargetNamespace, plan.Spec.Extension.Name)),
|
|
helm.SetLabels(map[string]string{corev1alpha1.ExtensionReferenceLabel: plan.Spec.Extension.Name}),
|
|
helm.SetOverrides(r.getOverrides(mainChart, tagExtension, nil)),
|
|
helm.SetCABundle(caBundle),
|
|
helm.SetHistoryMax(r.HelmExecutorOptions.HistoryMax),
|
|
helm.SetHookImage(r.getHookImageForInstall(extensionVersion, upgrade)),
|
|
}
|
|
|
|
executor, ok := ctx.Value(contextKeyExecutor{}).(helm.Executor)
|
|
if !ok {
|
|
return fmt.Errorf("failed to get executor from context")
|
|
}
|
|
|
|
chartURL, helmOptions := fixedOptions(extensionVersion.Spec.ChartURL, chartData, helmOptions)
|
|
values := clusterConfig(plan, "")
|
|
jobName, err := executor.Upgrade(ctx, releaseName, chartURL, values, helmOptions...)
|
|
if err != nil {
|
|
return onFailed(fmt.Errorf("failed to create executor job: %v", err))
|
|
}
|
|
|
|
plan.Status.ConfigHash = hashutil.FNVString(values)
|
|
plan.Status.Version = extensionVersion.Spec.Version
|
|
plan.Status.ReleaseName = releaseName
|
|
plan.Status.JobName = jobName
|
|
if upgrade {
|
|
updateStateAndConditions(&plan.Status.InstallationStatus, corev1alpha1.StateUpgrading, "", time.Now())
|
|
} else {
|
|
updateStateAndConditions(&plan.Status.InstallationStatus, corev1alpha1.StateInstalling, "", time.Now())
|
|
}
|
|
|
|
if err = r.updateInstallPlan(ctx, plan); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = r.cleanupOutdatedJobsAndConfigMaps(ctx, plan); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) getHookImageForInstall(extensionVersion *corev1alpha1.ExtensionVersion, upgrade bool) string {
|
|
hookImage := extensionVersion.Annotations[corev1alpha1.ExecutorHookImageAnnotation]
|
|
if !upgrade && extensionVersion.Annotations[corev1alpha1.ExecutorInstallHookImageAnnotation] != "" {
|
|
hookImage = extensionVersion.Annotations[corev1alpha1.ExecutorInstallHookImageAnnotation]
|
|
}
|
|
if upgrade && extensionVersion.Annotations[corev1alpha1.ExecutorUpgradeHookImageAnnotation] != "" {
|
|
hookImage = extensionVersion.Annotations[corev1alpha1.ExecutorUpgradeHookImageAnnotation]
|
|
}
|
|
if r.ExtensionOptions.ImageRegistry != "" && hookImage != "" {
|
|
hookImage = path.Join(r.ExtensionOptions.ImageRegistry, hookImage)
|
|
}
|
|
return hookImage
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) getOverrides(mainChart *chart.Chart, tag string, cluster *clusterv1alpha1.Cluster) (overrides []string) {
|
|
disableSubchartsOverrides := func(mainChart *chart.Chart, tag string) (overrides []string) {
|
|
for _, condition := range conditions(mainChart, tag) {
|
|
overrides = append(overrides, fmt.Sprintf("%s=%s", condition, "false"))
|
|
}
|
|
return overrides
|
|
}
|
|
|
|
switch tag {
|
|
case tagExtension:
|
|
overrides = append(overrides,
|
|
fmt.Sprintf("tags.%s=%s", tagExtension, "true"),
|
|
fmt.Sprintf("tags.%s=%s", tagAgent, "false"),
|
|
)
|
|
|
|
if r.PortalURL != "" {
|
|
overrides = append(overrides, fmt.Sprintf("%s=%s", globalPortalURL, r.PortalURL))
|
|
}
|
|
|
|
overrides = append(overrides, disableSubchartsOverrides(mainChart, tagAgent)...)
|
|
case tagAgent:
|
|
overrides = append(overrides,
|
|
fmt.Sprintf("tags.%s=%s", tagAgent, "true"),
|
|
fmt.Sprintf("tags.%s=%s", tagExtension, "false"),
|
|
)
|
|
overrides = append(overrides, disableSubchartsOverrides(mainChart, tagExtension)...)
|
|
}
|
|
|
|
if cluster != nil {
|
|
clusterRole := clusterv1alpha1.ClusterRoleMember
|
|
if clusterutils.IsHostCluster(cluster) {
|
|
clusterRole = clusterv1alpha1.ClusterRoleHost
|
|
}
|
|
overrides = append(overrides,
|
|
fmt.Sprintf("%s=%s", globalClusterName, cluster.Name),
|
|
fmt.Sprintf("%s=%s", globalClusterRole, clusterRole),
|
|
)
|
|
}
|
|
|
|
if r.ExtensionOptions != nil {
|
|
if r.ExtensionOptions.ImageRegistry != "" {
|
|
overrides = append(overrides, fmt.Sprintf("%s=%s", globalImageRegistry, r.ExtensionOptions.ImageRegistry))
|
|
}
|
|
if r.ExtensionOptions.NodeSelector != nil {
|
|
for k, v := range r.ExtensionOptions.NodeSelector {
|
|
k = strings.ReplaceAll(k, ".", "\\.")
|
|
overrides = append(overrides, fmt.Sprintf("%s.%s=%s", globalNodeSelector, k, v))
|
|
}
|
|
}
|
|
if r.ExtensionOptions.Ingress != nil {
|
|
overrides = append(overrides, fmt.Sprintf("%s=%s", globalExtensionIngressClassName, r.ExtensionOptions.Ingress.IngressClassName))
|
|
overrides = append(overrides, fmt.Sprintf("%s=%s", globalExtensionIngressDomainSuffix, r.ExtensionOptions.Ingress.DomainSuffix))
|
|
overrides = append(overrides, fmt.Sprintf("%s=%d", globalExtensionIngressHTTPPort, r.ExtensionOptions.Ingress.HTTPPort))
|
|
overrides = append(overrides, fmt.Sprintf("%s=%d", globalExtensionIngressHTTPSPort, r.ExtensionOptions.Ingress.HTTPSPort))
|
|
}
|
|
}
|
|
|
|
return overrides
|
|
}
|
|
|
|
func conditions(mainChart *chart.Chart, tag string) []string {
|
|
var conditions []string
|
|
for _, dependency := range mainChart.Metadata.Dependencies {
|
|
if dependency.Condition != "" && sliceutil.HasString(dependency.Tags, tag) {
|
|
conditions = append(conditions, strings.Split(dependency.Condition, ",")...)
|
|
}
|
|
}
|
|
return conditions
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) installOrUpgradeClusterAgent(ctx context.Context, plan *corev1alpha1.InstallPlan, cluster *clusterv1alpha1.Cluster, upgrade bool) error {
|
|
clusterName := cluster.Name
|
|
targetNamespace := plan.Status.TargetNamespace
|
|
releaseName := fmt.Sprintf(agentReleaseFormat, plan.Spec.Extension.Name)
|
|
kubeConfig := cluster.Spec.Connection.KubeConfig
|
|
clusterClient, err := r.clusterClientSet.GetRuntimeClient(cluster.Name)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get cluster client: %v", err)
|
|
}
|
|
|
|
installationStatus := plan.Status.ClusterSchedulingStatuses[cluster.Name]
|
|
updateStateAndConditions(&installationStatus, corev1alpha1.StatePreparing, "", time.Now())
|
|
plan.Status.ClusterSchedulingStatuses[cluster.Name] = installationStatus
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return fmt.Errorf("failed to sync cluster agent status: %v", err)
|
|
}
|
|
|
|
var onFailed = func(err error) error {
|
|
if upgrade {
|
|
klog.FromContext(ctx).Error(err, "failed to upgrade extension")
|
|
message := fmt.Sprintf("failed to upgrade extension: %s", err)
|
|
updateStateAndConditions(&installationStatus, corev1alpha1.StateUpgradeFailed, message, time.Now())
|
|
} else {
|
|
klog.FromContext(ctx).Error(err, "failed to install extension")
|
|
message := fmt.Sprintf("Failed to install extension: %s", err)
|
|
updateStateAndConditions(&installationStatus, corev1alpha1.StateInstallFailed, message, time.Now())
|
|
}
|
|
plan.Status.ClusterSchedulingStatuses[clusterName] = installationStatus
|
|
return r.updateInstallPlan(ctx, plan)
|
|
}
|
|
|
|
chartData, caBundle, err := r.loadChartData(ctx)
|
|
if err != nil {
|
|
return onFailed(fmt.Errorf("failed to load chart data: %v", err))
|
|
}
|
|
|
|
mainChart, err := loader.LoadArchive(bytes.NewReader(chartData))
|
|
if err != nil {
|
|
return onFailed(fmt.Errorf("failed to load chart data: %v", err))
|
|
}
|
|
|
|
clusterRole, role := usesPermissions(mainChart)
|
|
if err = initTargetNamespace(ctx, clusterClient, targetNamespace, plan.Spec.Extension.Name, clusterRole, role); err != nil {
|
|
return onFailed(fmt.Errorf("failed to init target namespace: %v", err))
|
|
}
|
|
|
|
if !upgrade {
|
|
updateCondition(&installationStatus, corev1alpha1.ConditionTypeInitialized,
|
|
initialized, "The extension agent has been successfully initialized.", metav1.ConditionTrue, time.Now())
|
|
plan.Status.ClusterSchedulingStatuses[clusterName] = installationStatus
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
extensionVersion, ok := ctx.Value(contextKeyExtensionVersion{}).(*corev1alpha1.ExtensionVersion)
|
|
if !ok {
|
|
return fmt.Errorf("failed to get extension version from context")
|
|
}
|
|
|
|
executor, ok := ctx.Value(contextKeyExecutor{}).(helm.Executor)
|
|
if !ok {
|
|
return fmt.Errorf("failed to get executor from context")
|
|
}
|
|
|
|
clusterRoleName := clusterv1alpha1.ClusterRoleMember
|
|
if clusterutils.IsHostCluster(cluster) {
|
|
clusterRoleName = clusterv1alpha1.ClusterRoleHost
|
|
}
|
|
helmOptions := []helm.HelmOption{
|
|
helm.SetKubeconfig(kubeConfig),
|
|
helm.SetNamespace(targetNamespace),
|
|
helm.SetInstall(!upgrade),
|
|
helm.SetTimeout(r.HelmExecutorOptions.Timeout),
|
|
helm.SetKubeAsUser(fmt.Sprintf("system:serviceaccount:%s:helm-executor.%s", targetNamespace, plan.Spec.Extension.Name)),
|
|
helm.SetOverrides(r.getOverrides(mainChart, tagAgent, cluster)),
|
|
helm.SetLabels(map[string]string{corev1alpha1.ExtensionReferenceLabel: plan.Spec.Extension.Name}),
|
|
helm.SetCABundle(caBundle),
|
|
helm.SetHistoryMax(r.HelmExecutorOptions.HistoryMax),
|
|
helm.SetHookImage(r.getHookImageForInstall(extensionVersion, upgrade)),
|
|
helm.SetClusterRole(string(clusterRoleName)),
|
|
helm.SetClusterName(clusterName),
|
|
}
|
|
|
|
chartURL, helmOptions := fixedOptions(extensionVersion.Spec.ChartURL, chartData, helmOptions)
|
|
values := clusterConfig(plan, cluster.Name)
|
|
jobName, err := executor.Upgrade(ctx, releaseName, chartURL, values, helmOptions...)
|
|
if err != nil {
|
|
return onFailed(fmt.Errorf("failed to create executor job: %v", err))
|
|
}
|
|
|
|
installationStatus.ConfigHash = hashutil.FNVString(values)
|
|
installationStatus.ReleaseName = releaseName
|
|
installationStatus.Version = extensionVersion.Spec.Version
|
|
installationStatus.TargetNamespace = targetNamespace
|
|
installationStatus.JobName = jobName
|
|
if upgrade {
|
|
updateStateAndConditions(&installationStatus, corev1alpha1.StateUpgrading, "", time.Now())
|
|
} else {
|
|
updateStateAndConditions(&installationStatus, corev1alpha1.StateInstalling, "", time.Now())
|
|
}
|
|
|
|
plan.Status.ClusterSchedulingStatuses[clusterName] = installationStatus
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = r.cleanupOutdatedJobsAndConfigMaps(ctx, plan); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func fixedOptions(chartURL string, chartData []byte, helmOptions []helm.HelmOption) (string, []helm.HelmOption) {
|
|
if chartURL == "" {
|
|
helmOptions = append(helmOptions, helm.SetChartData(chartData))
|
|
} else {
|
|
if strings.HasPrefix(chartURL, "oci://") {
|
|
parts := strings.Split(chartURL, ":")
|
|
if len(parts) > 1 && !strings.Contains(parts[len(parts)-1], "/") {
|
|
tag := parts[len(parts)-1]
|
|
if tag != "" {
|
|
chartURL = strings.TrimSuffix(chartURL, ":"+tag)
|
|
helmOptions = append(helmOptions, helm.SetVersion(tag))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return chartURL, helmOptions
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) uninstallClusterAgent(ctx context.Context, plan *corev1alpha1.InstallPlan, clusterName string) error {
|
|
logger := klog.FromContext(ctx).WithValues("cluster", clusterName)
|
|
installationStatus := plan.Status.ClusterSchedulingStatuses[clusterName]
|
|
releaseName := installationStatus.ReleaseName
|
|
targetNamespace := installationStatus.TargetNamespace
|
|
|
|
if releaseName == "" {
|
|
delete(plan.Status.ClusterSchedulingStatuses, clusterName)
|
|
return r.updateInstallPlan(ctx, plan)
|
|
}
|
|
|
|
var cluster clusterv1alpha1.Cluster
|
|
if err := r.Get(ctx, types.NamespacedName{Name: clusterName}, &cluster); err != nil {
|
|
if errors.IsNotFound(err) {
|
|
delete(plan.Status.ClusterSchedulingStatuses, clusterName)
|
|
return r.updateInstallPlan(ctx, plan)
|
|
}
|
|
return err
|
|
}
|
|
|
|
kubeConfig := cluster.Spec.Connection.KubeConfig
|
|
|
|
executor, ok := ctx.Value(contextKeyExecutor{}).(helm.Executor)
|
|
if !ok {
|
|
return fmt.Errorf("failed to get executor from context")
|
|
}
|
|
|
|
// Check if the target helm release exists.
|
|
// If it does, there is no need to execute the installation process again.
|
|
_, err := executor.Get(ctx, releaseName, helm.SetKubeconfig(kubeConfig), helm.SetNamespace(targetNamespace))
|
|
if err != nil {
|
|
if isReleaseNotFoundError(err) {
|
|
delete(plan.Status.ClusterSchedulingStatuses, clusterName)
|
|
return r.updateInstallPlan(ctx, plan)
|
|
}
|
|
klog.FromContext(ctx).Error(err, "failed to get helm release status")
|
|
return fmt.Errorf("failed to get helm release status: %v", err)
|
|
}
|
|
|
|
if err := r.syncInstallationStatus(ctx, kubeConfig, targetNamespace, releaseName, &installationStatus); err != nil {
|
|
return fmt.Errorf("failed to sync cluster agent release status: %v", err)
|
|
}
|
|
|
|
plan.Status.ClusterSchedulingStatuses[cluster.Name] = installationStatus
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return fmt.Errorf("failed to sync cluster agent status: %v", err)
|
|
}
|
|
|
|
if installationStatus.State != corev1alpha1.StateUninstalling &&
|
|
installationStatus.State != corev1alpha1.StateUninstallFailed &&
|
|
installationStatus.State != corev1alpha1.StateUninstalled {
|
|
|
|
extensionVersion, ok := ctx.Value(contextKeyExtensionVersion{}).(*corev1alpha1.ExtensionVersion)
|
|
if !ok {
|
|
return fmt.Errorf("failed to get extension version from context")
|
|
}
|
|
|
|
clusterRoleName := clusterv1alpha1.ClusterRoleMember
|
|
if clusterutils.IsHostCluster(&cluster) {
|
|
clusterRoleName = clusterv1alpha1.ClusterRoleHost
|
|
}
|
|
helmOptions := []helm.HelmOption{
|
|
helm.SetKubeconfig(kubeConfig),
|
|
helm.SetNamespace(targetNamespace),
|
|
helm.SetTimeout(r.HelmExecutorOptions.Timeout),
|
|
helm.SetHookImage(r.getHookImageForUninstall(extensionVersion)),
|
|
helm.SetClusterRole(string(clusterRoleName)),
|
|
helm.SetClusterName(clusterName),
|
|
}
|
|
|
|
jobName, err := executor.Uninstall(ctx, releaseName, helmOptions...)
|
|
if err != nil {
|
|
logger.Error(err, "failed to uninstall helm release")
|
|
return err
|
|
}
|
|
installationStatus.JobName = jobName
|
|
updateStateAndConditions(&installationStatus, corev1alpha1.StateUninstalling, "", time.Now())
|
|
plan.Status.ClusterSchedulingStatuses[clusterName] = installationStatus
|
|
if err := r.updateInstallPlan(ctx, plan); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) newExecutor(plan *corev1alpha1.InstallPlan) (helm.Executor, error) {
|
|
executorOptions := []helm.ExecutorOption{
|
|
helm.SetExecutorLabels(map[string]string{
|
|
constants.KubeSphereManagedLabel: "true",
|
|
corev1alpha1.ExtensionReferenceLabel: plan.Spec.Extension.Name,
|
|
}),
|
|
helm.SetExecutorOwner(&metav1.OwnerReference{
|
|
APIVersion: corev1alpha1.SchemeGroupVersion.String(),
|
|
Kind: corev1alpha1.ResourceKindInstallPlan,
|
|
Name: plan.Name,
|
|
UID: plan.UID,
|
|
}),
|
|
helm.SetExecutorImage(r.HelmExecutorOptions.Image),
|
|
helm.SetExecutorNamespace(plan.Status.TargetNamespace),
|
|
helm.SetExecutorBackoffLimit(0),
|
|
helm.SetTTLSecondsAfterFinished(r.HelmExecutorOptions.JobTTLAfterFinished),
|
|
helm.SetExecutorAffinity(r.HelmExecutorOptions.Affinity),
|
|
}
|
|
if r.HelmExecutorOptions.Resources != nil {
|
|
executorOptions = append(executorOptions, helm.SetExecutorResources(corev1.ResourceRequirements{
|
|
Limits: corev1.ResourceList{
|
|
corev1.ResourceCPU: resource.MustParse(r.HelmExecutorOptions.Resources.Limits[corev1.ResourceCPU]),
|
|
corev1.ResourceMemory: resource.MustParse(r.HelmExecutorOptions.Resources.Limits[corev1.ResourceMemory]),
|
|
},
|
|
Requests: corev1.ResourceList{
|
|
corev1.ResourceCPU: resource.MustParse(r.HelmExecutorOptions.Resources.Requests[corev1.ResourceCPU]),
|
|
corev1.ResourceMemory: resource.MustParse(r.HelmExecutorOptions.Resources.Requests[corev1.ResourceMemory]),
|
|
},
|
|
}))
|
|
}
|
|
return helm.NewExecutor(executorOptions...)
|
|
}
|
|
|
|
func updateStateAndConditions(installationStatus *corev1alpha1.InstallationStatus, state, message string, lastTransitionTime time.Time) {
|
|
lastTransitionTime = lastTransitionTime.Round(time.Second)
|
|
fixedState := state
|
|
if state == corev1alpha1.StateInstalled || state == corev1alpha1.StateUpgraded {
|
|
fixedState = corev1alpha1.StateDeployed
|
|
}
|
|
|
|
if updateState(installationStatus, fixedState, lastTransitionTime) {
|
|
switch state {
|
|
case corev1alpha1.StateInstalled:
|
|
updateCondition(installationStatus, corev1alpha1.ConditionTypeInstalled, installSuccessful, message, metav1.ConditionTrue, lastTransitionTime)
|
|
case corev1alpha1.StateInstallFailed:
|
|
updateCondition(installationStatus, corev1alpha1.ConditionTypeInstalled, installFailed, message, metav1.ConditionFalse, lastTransitionTime)
|
|
case corev1alpha1.StateUpgraded:
|
|
updateCondition(installationStatus, corev1alpha1.ConditionTypeUpgraded, upgradeSuccessful, message, metav1.ConditionTrue, lastTransitionTime)
|
|
case corev1alpha1.StateUpgradeFailed:
|
|
updateCondition(installationStatus, corev1alpha1.ConditionTypeUpgraded, upgradeFailed, message, metav1.ConditionFalse, lastTransitionTime)
|
|
case corev1alpha1.StateUninstallFailed:
|
|
updateCondition(installationStatus, corev1alpha1.ConditionTypeUninstalled, uninstallFailed, message, metav1.ConditionFalse, lastTransitionTime)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) syncInstallationStatus(ctx context.Context, kubeConfig []byte, namespace string, releaseName string, installationStatus *corev1alpha1.InstallationStatus) error {
|
|
var job *batchv1.Job
|
|
if installationStatus.JobName != "" {
|
|
job = &batchv1.Job{}
|
|
if err := r.Get(ctx, client.ObjectKey{Namespace: namespace, Name: installationStatus.JobName}, job); err != nil {
|
|
if errors.IsNotFound(err) {
|
|
job = nil
|
|
klog.FromContext(ctx).Info("related job not found", "namespace", installationStatus.TargetNamespace, "job", installationStatus.JobName)
|
|
} else {
|
|
return fmt.Errorf("failed to get job: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
executor, ok := ctx.Value(contextKeyExecutor{}).(helm.Executor)
|
|
if !ok {
|
|
return fmt.Errorf("failed to get executor from context")
|
|
}
|
|
|
|
// Check if the target helm release exists.
|
|
// If it does, there is no need to execute the installation process again.
|
|
release, err := executor.Get(ctx, releaseName, helm.SetKubeconfig(kubeConfig), helm.SetNamespace(namespace))
|
|
if err != nil && !isReleaseNotFoundError(err) {
|
|
klog.FromContext(ctx).Error(err, "failed to get helm release status")
|
|
return fmt.Errorf("failed to get helm release status: %v", err)
|
|
}
|
|
|
|
if job != nil {
|
|
action := job.Annotations[helm.ExecutorJobActionAnnotation]
|
|
active, completed, failed := jobStatus(job)
|
|
condition := latestJobCondition(job)
|
|
lastTransitionTime := condition.LastTransitionTime.Time
|
|
|
|
if failed {
|
|
switch action {
|
|
case helm.ActionInstall:
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateInstallFailed, condition.Message, lastTransitionTime)
|
|
case helm.ActionUpgrade:
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateUpgradeFailed, condition.Message, lastTransitionTime)
|
|
case helm.ActionUninstall:
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateUninstallFailed, condition.Message, lastTransitionTime)
|
|
}
|
|
}
|
|
|
|
if completed && action == helm.ActionUninstall && release == nil {
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateUninstalled, condition.Message, lastTransitionTime)
|
|
}
|
|
|
|
if active {
|
|
lastTransitionTime = job.CreationTimestamp.Time
|
|
switch action {
|
|
case helm.ActionInstall:
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateInstalling, "", lastTransitionTime)
|
|
case helm.ActionUpgrade:
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateUpgrading, "", lastTransitionTime)
|
|
case helm.ActionUninstall:
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateUninstalling, "", lastTransitionTime)
|
|
}
|
|
}
|
|
}
|
|
|
|
if release != nil {
|
|
switch release.Info.Status {
|
|
case helmrelease.StatusFailed:
|
|
if release.Version > 1 {
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateUpgradeFailed, release.Info.Description, release.Info.LastDeployed.Time)
|
|
} else {
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateInstallFailed, release.Info.Description, release.Info.LastDeployed.Time)
|
|
}
|
|
case helmrelease.StatusDeployed:
|
|
installationStatus.Version = release.Chart.Metadata.Version
|
|
installationStatus.ReleaseName = release.Name
|
|
if release.Version > 1 {
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateUpgraded, release.Info.Description, release.Info.LastDeployed.Time)
|
|
} else {
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateInstalled, release.Info.Description, release.Info.LastDeployed.Time)
|
|
}
|
|
case helmrelease.StatusPendingInstall:
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateInstalling, release.Info.Description, release.Info.LastDeployed.Time)
|
|
case helmrelease.StatusPendingRollback, helmrelease.StatusPendingUpgrade:
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateUpgrading, release.Info.Description, release.Info.LastDeployed.Time)
|
|
case helmrelease.StatusUninstalling:
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateUninstalling, release.Info.Description, release.Info.LastDeployed.Time)
|
|
case helmrelease.StatusUninstalled:
|
|
updateStateAndConditions(installationStatus, corev1alpha1.StateUninstalled, release.Info.Description, release.Info.LastDeployed.Time)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *InstallPlanReconciler) mapper(ctx context.Context, object client.Object) []reconcile.Request {
|
|
var requests []reconcile.Request
|
|
if cluster, ok := object.(*clusterv1alpha1.Cluster); ok {
|
|
installPlans := &corev1alpha1.InstallPlanList{}
|
|
if err := r.List(ctx, installPlans); err != nil {
|
|
klog.Warningf("failed to list install plans: %v", err)
|
|
return requests
|
|
}
|
|
for _, plan := range installPlans.Items {
|
|
if plan.Spec.ClusterScheduling == nil {
|
|
continue
|
|
}
|
|
if slices.Contains(plan.Spec.ClusterScheduling.Placement.Clusters, cluster.Name) {
|
|
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Name: plan.Name}})
|
|
} else if plan.Spec.ClusterScheduling.Placement.ClusterSelector != nil {
|
|
selector, err := metav1.LabelSelectorAsSelector(plan.Spec.ClusterScheduling.Placement.ClusterSelector)
|
|
if err != nil {
|
|
klog.Warningf("failed to parse cluster selector: %v", err)
|
|
continue
|
|
}
|
|
if selector.Matches(labels.Set(cluster.Labels)) {
|
|
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Name: plan.Name}})
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
return requests
|
|
}
|
|
|
|
func jobStatus(job *batchv1.Job) (active, completed, failed bool) {
|
|
if job == nil {
|
|
return
|
|
}
|
|
completed = job.Spec.Completions != nil && job.Status.Succeeded >= *job.Spec.Completions
|
|
failed = job.Spec.BackoffLimit != nil && job.Status.Failed > *job.Spec.BackoffLimit
|
|
active = !completed && !failed
|
|
return
|
|
}
|
|
|
|
type contextKeyExtensionVersion struct{}
|
|
type contextKeyExecutor struct{}
|
|
type contextKeyHostKubeConfig struct{}
|