From 5e700d46933d8a0840c9d9533936fabd5d256211 Mon Sep 17 00:00:00 2001 From: Yunkang Ren <33660223+renyunkang@users.noreply.github.com> Date: Mon, 7 Apr 2025 10:27:19 +0800 Subject: [PATCH] feat: move ks-core agent installer to job (#6473) Signed-off-by: renyunkang --- pkg/controller/cluster/cluster_controller.go | 191 +++++++++++++++++-- pkg/controller/cluster/helper.go | 139 ++++++-------- 2 files changed, 232 insertions(+), 98 deletions(-) diff --git a/pkg/controller/cluster/cluster_controller.go b/pkg/controller/cluster/cluster_controller.go index e8d613ef1..36d3ca1ca 100644 --- a/pkg/controller/cluster/cluster_controller.go +++ b/pkg/controller/cluster/cluster_controller.go @@ -15,9 +15,11 @@ import ( "sync" "time" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -36,10 +38,12 @@ import ( clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1" iamv1beta1 "kubesphere.io/api/iam/v1beta1" tenantv1alpha1 "kubesphere.io/api/tenant/v1beta1" + "kubesphere.io/utils/helm" "kubesphere.io/kubesphere/pkg/constants" kscontroller "kubesphere.io/kubesphere/pkg/controller" clusterutils "kubesphere.io/kubesphere/pkg/controller/cluster/utils" + "kubesphere.io/kubesphere/pkg/controller/options" "kubesphere.io/kubesphere/pkg/utils/clusterclient" "kubesphere.io/kubesphere/pkg/version" ) @@ -53,11 +57,16 @@ import ( // Also check if all the clusters are ready by the spec.connection.kubeconfig every resync period const ( - controllerName = "cluster" + controllerName = "cluster" + installAction = "install" + upgradeAction = "upgrade" + reInstallAction = "reinstall" ) const ( - initializedAnnotation = "kubesphere.io/initialized" + initializedAnnotation = "kubesphere.io/initialized" + installJobAnnotation = "kubesphere.io/install-core-jobname" + ksCoreActionAnnotation = "kubesphere.io/ks-core-action" ) // Cluster template for reconcile host cluster if there is none. @@ -96,13 +105,14 @@ func (r *Reconciler) Enabled(clusterRole string) bool { type Reconciler struct { client.Client - hostConfig *rest.Config - hostClusterName string - resyncPeriod time.Duration - installLock *sync.Map - clusterClient clusterclient.Interface - clusterUID types.UID - tls bool + hostConfig *rest.Config + hostClusterName string + resyncPeriod time.Duration + installLock *sync.Map + clusterClient clusterclient.Interface + clusterUID types.UID + tls bool + HelmExecutorOptions *options.HelmExecutorOptions } // SetupWithManager setups the Reconciler with manager. @@ -118,6 +128,7 @@ func (r *Reconciler) SetupWithManager(mgr *kscontroller.Manager) error { r.clusterUID = kubeSystem.UID r.installLock = &sync.Map{} r.tls = mgr.Options.KubeSphereOptions.TLS + r.HelmExecutorOptions = mgr.Options.HelmExecutorOptions r.Client = mgr.GetClient() if err := mgr.Add(r); err != nil { return fmt.Errorf("unable to add cluster-controller to manager: %v", err) @@ -130,6 +141,7 @@ func (r *Reconciler) SetupWithManager(mgr *kscontroller.Manager) error { clusterChangedPredicate{ stateChangedAnnotations: []string{ "kubesphere.io/syncAt", + ksCoreActionAnnotation, }, }, ), @@ -372,10 +384,63 @@ func (r *Reconciler) syncClusterLabel(ctx context.Context, cluster *clusterv1alp return nil } +func (r *Reconciler) needInstall(ctx context.Context, member *clusterv1alpha1.Cluster) (bool, error) { + conditions := member.Status.Conditions + action := member.Annotations[ksCoreActionAnnotation] + switch action { + case "", installAction: + for _, condition := range conditions { + if condition.Type == clusterv1alpha1.ClusterKSCoreReady { + return false, nil + } + } + + case upgradeAction: + install := false + for _, condition := range conditions { + if condition.Type == clusterv1alpha1.ClusterKSCoreReady && condition.Status == corev1.ConditionTrue { + install = true + } + } + + clusters := &clusterv1alpha1.ClusterList{} + if err := r.List(ctx, clusters); err != nil { + return false, err + } + + host := &clusterv1alpha1.Cluster{} + for _, c := range clusters.Items { + if c.Status.UID == r.clusterUID { + host = &c + break + } + } + + if install && host.Status.KubeSphereVersion != "" && + host.Status.KubeSphereVersion != member.Status.KubeSphereVersion { + klog.Infof("host cluster ks core version: %s, member cluster ks core version: %s", + host.Status.KubeSphereVersion, member.Status.KubeSphereVersion) + return true, nil + } + + case reInstallAction: + return true, nil + + default: + klog.Warningf("unknown action %s", action) + } + + return false, nil +} + func (r *Reconciler) reconcileMemberCluster(ctx context.Context, cluster *clusterv1alpha1.Cluster, clusterClient *clusterclient.ClusterClient) error { // Install KS Core in member cluster - if !hasCondition(cluster.Status.Conditions, clusterv1alpha1.ClusterKSCoreReady) || - configChanged(cluster) { + need, err := r.needInstall(ctx, cluster) + if err != nil { + return fmt.Errorf("failed to check if need install ks core: %v", err) + } + + if need || configChanged(cluster) { // get the lock, make sure only one thread is executing the helm task if _, ok := r.installLock.Load(cluster.Name); ok { return nil @@ -384,25 +449,27 @@ func (r *Reconciler) reconcileMemberCluster(ctx context.Context, cluster *cluste defer r.installLock.Delete(cluster.Name) klog.Infof("Starting installing KS Core for the cluster %s", cluster.Name) defer klog.Infof("Finished installing KS Core for the cluster %s", cluster.Name) + hostConfig, err := getKubeSphereConfig(ctx, r.Client) if err != nil { return fmt.Errorf("failed to get KubeSphere config: %v", err) } - if err = installKSCoreInMemberCluster( - cluster.Spec.Connection.KubeConfig, - hostConfig.AuthenticationOptions.Issuer.JWTSecret, - hostConfig.MultiClusterOptions.ChartPath, - cluster.Spec.Config, - ); err != nil { - return fmt.Errorf("failed to install KS Core in cluster %s: %v", cluster.Name, err) + status := corev1.ConditionTrue + message := "KS Core is available now" + if err := r.installOrUpgradeKSCoreInMemberCluster(ctx, r.HelmExecutorOptions, cluster, + hostConfig.AuthenticationOptions.Issuer.JWTSecret, hostConfig.MultiClusterOptions.ChartPath); err != nil { + status = corev1.ConditionFalse + message = "KS Core installation failed" + klog.Errorf("failed to install KS Core in cluster %s: %v", cluster.Name, err) } + r.updateClusterCondition(cluster, clusterv1alpha1.ClusterCondition{ Type: clusterv1alpha1.ClusterKSCoreReady, - Status: corev1.ConditionTrue, + Status: status, LastUpdateTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: clusterv1alpha1.ClusterKSCoreReady, - Message: "KS Core is available now", + Message: message, }) setConfigHash(cluster) if err = r.Update(ctx, cluster); err != nil { @@ -739,3 +806,87 @@ func (r *Reconciler) unbindWorkspaceTemplate(ctx context.Context, cluster *clust } return nil } + +func (r *Reconciler) installOrUpgradeKSCoreInMemberCluster(ctx context.Context, + opt *options.HelmExecutorOptions, cluster *clusterv1alpha1.Cluster, jwtSecret, chartPath string) error { + chartBytes, err := getChartBytes(chartPath) + if err != nil { + return fmt.Errorf("failed to read chart files: %v", err) + } + + valuesBytes, err := generateChartValueBytes(cluster.Spec.Config, jwtSecret) + if err != nil { + return fmt.Errorf("failed to generate chart values: %v", err) + } + + executorOptions := []helm.ExecutorOption{ + helm.SetExecutorLabels(map[string]string{ + constants.KubeSphereManagedLabel: "true", + }), + helm.SetExecutorOwner(&metav1.OwnerReference{ + APIVersion: clusterv1alpha1.SchemeGroupVersion.String(), + Kind: clusterv1alpha1.ResourceKindCluster, + Name: cluster.Name, + UID: cluster.UID, + }), + helm.SetExecutorImage(opt.Image), + helm.SetExecutorNamespace(constants.KubeSphereNamespace), + helm.SetExecutorBackoffLimit(0), + helm.SetTTLSecondsAfterFinished(opt.JobTTLAfterFinished), + helm.SetExecutorAffinity(opt.Affinity), + } + if opt.Resources != nil { + executorOptions = append(executorOptions, helm.SetExecutorResources(corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(opt.Resources.Limits[corev1.ResourceCPU]), + corev1.ResourceMemory: resource.MustParse(opt.Resources.Limits[corev1.ResourceMemory]), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(opt.Resources.Requests[corev1.ResourceCPU]), + corev1.ResourceMemory: resource.MustParse(opt.Resources.Requests[corev1.ResourceMemory]), + }, + })) + } + + executor, err := helm.NewExecutor(executorOptions...) + if err != nil { + return fmt.Errorf("failed to create executor: %v", err) + } + + jobName, err := executor.Upgrade(ctx, releaseName, releaseName, valuesBytes, + helm.SetKubeconfig(cluster.Spec.Connection.KubeConfig), + helm.SetNamespace(constants.KubeSphereNamespace), + helm.SetChartData(chartBytes), + helm.SetTimeout(5*time.Minute), + helm.SetInstall(true), + helm.SetCreateNamespace(true)) + if err != nil { + return fmt.Errorf("failed to create executor job: %v", err) + } + klog.Infof("Install/Upgrade job %s created", jobName) + + if cluster.Annotations == nil { + cluster.Annotations = make(map[string]string) + } + cluster.Annotations[installJobAnnotation] = jobName + delete(cluster.Annotations, ksCoreActionAnnotation) + cluster.Status.Conditions = []clusterv1alpha1.ClusterCondition{} + if err := r.Update(ctx, cluster); err != nil { + return fmt.Errorf("failed to update cluster %s: %v", cluster.Name, err) + } + + return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { + job := &batchv1.Job{} + if err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: constants.KubeSphereNamespace}, job); err != nil { + return false, err + } + if job.Status.Succeeded == 1 { + return true, nil + } + if job.Status.Failed > 0 { + return false, fmt.Errorf("job %s failed", jobName) + } + + return false, nil + }) +} diff --git a/pkg/controller/cluster/helper.go b/pkg/controller/cluster/helper.go index 6c7a7c44f..19dcf2d73 100644 --- a/pkg/controller/cluster/helper.go +++ b/pkg/controller/cluster/helper.go @@ -7,17 +7,17 @@ package cluster import ( "context" - "errors" - "time" + "fmt" + "os" + "path" - "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/chart/loader" - "helm.sh/helm/v3/pkg/storage/driver" + "helm.sh/helm/v3/pkg/chartutil" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1" - "kubesphere.io/utils/helm" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" @@ -43,74 +43,6 @@ func setConfigHash(cluster *clusterv1alpha1.Cluster) { } } -func installKSCoreInMemberCluster(kubeConfig []byte, jwtSecret, chartPath string, chartConfig []byte) error { - helmConf, err := helm.InitHelmConf(kubeConfig, constants.KubeSphereNamespace) - if err != nil { - return err - } - - if chartPath == "" { - chartPath = "/var/helm-charts/ks-core" - } - chart, err := loader.Load(chartPath) // in-container chart path - if err != nil { - return err - } - - // values example: - // map[string]interface{}{ - // "nestedKey": map[string]interface{}{ - // "simpleKey": "simpleValue", - // }, - // } - values := make(map[string]interface{}) - if chartConfig != nil { - if err = yaml.Unmarshal(chartConfig, &values); err != nil { - return err - } - } - - // Override some necessary values - values["role"] = "member" - values["multicluster"] = map[string]string{"role": "member"} - // disable upgrade to prevent execution of kse-upgrade - values["upgrade"] = map[string]interface{}{ - "enabled": false, - } - if err = unstructured.SetNestedField(values, jwtSecret, "authentication", "issuer", "jwtSecret"); err != nil { - return err - } - - helmStatus := action.NewStatus(helmConf) - if _, err = helmStatus.Run(releaseName); err != nil { - if !errors.Is(err, driver.ErrReleaseNotFound) { - return err - } - - // the release not exists - install := action.NewInstall(helmConf) - install.Namespace = constants.KubeSphereNamespace - install.CreateNamespace = true - install.Wait = true - install.ReleaseName = releaseName - install.Timeout = time.Minute * 5 - if _, err = install.Run(chart, values); err != nil { - return err - } - return nil - } - - upgrade := action.NewUpgrade(helmConf) - upgrade.Namespace = constants.KubeSphereNamespace - upgrade.Install = true - upgrade.Wait = true - upgrade.Timeout = time.Minute * 5 - if _, err = upgrade.Run(releaseName, chart, values); err != nil { - return err - } - return nil -} - func getKubeSphereConfig(ctx context.Context, client runtimeclient.Client) (*config.Config, error) { cm := &corev1.ConfigMap{} if err := client.Get(ctx, types.NamespacedName{Name: constants.KubeSphereConfigName, Namespace: constants.KubeSphereNamespace}, cm); err != nil { @@ -123,11 +55,62 @@ func getKubeSphereConfig(ctx context.Context, client runtimeclient.Client) (*con return configData, nil } -func hasCondition(conditions []clusterv1alpha1.ClusterCondition, conditionsType clusterv1alpha1.ClusterConditionType) bool { - for _, condition := range conditions { - if condition.Type == conditionsType && condition.Status == corev1.ConditionTrue { - return true +// generateChartValueBytes generates the chart value bytes for the cluster +func generateChartValueBytes(chartConfig []byte, jwtSecret string) ([]byte, error) { + values := make(map[string]interface{}) + if chartConfig != nil { + if err := yaml.Unmarshal(chartConfig, &values); err != nil { + return nil, err } } - return false + + // Override some necessary values + values["role"] = "member" + values["multicluster"] = map[string]string{"role": "member"} + // disable upgrade to prevent execution of kse-upgrade + values["upgrade"] = map[string]interface{}{ + "enabled": false, + } + if err := unstructured.SetNestedField(values, jwtSecret, "authentication", "issuer", "jwtSecret"); err != nil { + return nil, err + } + + valuesBytes, err := yaml.Marshal(values) + if err != nil { + return nil, fmt.Errorf("failed to marshal values: %v", err) + } + return valuesBytes, nil +} + +func getChartBytes(chartPath string) ([]byte, error) { + prefix := "/var/helm-charts" + if chartPath == "" { + chartPath = path.Join(prefix, releaseName) + } + + tgzFile := path.Join(prefix, fmt.Sprintf("%s.tgz", releaseName)) + if _, err := os.Stat(tgzFile); os.IsNotExist(err) { + chart, err := loader.Load(chartPath) + if err != nil { + return nil, fmt.Errorf("failed to load chart: %v", err) + } + + saveFile, err := chartutil.Save(chart, prefix) + if err != nil { + return nil, fmt.Errorf("failed to save chart: %v", err) + } + + klog.Infof("saveFile %s, tgzFile %s", saveFile, tgzFile) + if saveFile != tgzFile { + if err := os.Rename(saveFile, tgzFile); err != nil { + return nil, fmt.Errorf("failed to rename chart file: %v", err) + } + } + } + + chartBytes, err := os.ReadFile(tgzFile) + if err != nil { + return nil, fmt.Errorf("failed to read chart files: %v", err) + } + return chartBytes, nil }