diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index 66f150373..8d5a198ca 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -59,6 +59,7 @@ import ( func init() { // core runtime.Must(controller.Register(&core.ExtensionReconciler{})) + runtime.Must(controller.Register(&core.ExtensionVersionReconciler{})) runtime.Must(controller.Register(&core.CategoryReconciler{})) runtime.Must(controller.Register(&core.RepositoryReconciler{})) runtime.Must(controller.Register(&core.InstallPlanReconciler{})) diff --git a/config/ks-core/charts/ks-crds/crds/kubesphere.io_repositories.yaml b/config/ks-core/charts/ks-crds/crds/kubesphere.io_repositories.yaml index b2c43fd25..d79d573c0 100644 --- a/config/ks-core/charts/ks-crds/crds/kubesphere.io_repositories.yaml +++ b/config/ks-core/charts/ks-crds/crds/kubesphere.io_repositories.yaml @@ -54,15 +54,12 @@ spec: to verify the helm server. type: string depth: - description: The number of synchronized versions of each extension. - 0 means synchronized all versions, default is 3. + description: The maximum number of synchronized versions for each + extension. A value of 0 indicates that all versions will be synchronized. + The default is 3. type: integer description: type: string - image: - description: 'DEPRECATED: the field will remove in future versions, - please use url.' - type: string insecure: description: --insecure-skip-tls-verify. default false type: boolean @@ -75,6 +72,10 @@ spec: required: - interval type: object + timeout: + type: string + required: + - timeout type: object url: type: string diff --git a/pkg/controller/core/category_controller.go b/pkg/controller/core/category_controller.go index 40c0d611b..b791def33 100644 --- a/pkg/controller/core/category_controller.go +++ b/pkg/controller/core/category_controller.go @@ -10,8 +10,6 @@ import ( "strconv" "strings" - kscontroller "kubesphere.io/kubesphere/pkg/controller" - "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -24,6 +22,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + kscontroller "kubesphere.io/kubesphere/pkg/controller" ) const ( diff --git a/pkg/controller/core/extension_controller.go b/pkg/controller/core/extension_controller.go index 5befea992..4c5385af9 100644 --- a/pkg/controller/core/extension_controller.go +++ b/pkg/controller/core/extension_controller.go @@ -7,17 +7,19 @@ package core import ( "context" - "fmt" "reflect" "sort" "strings" "github.com/Masterminds/semver/v3" "github.com/go-logr/logr" + "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1" corev1alpha1 "kubesphere.io/api/core/v1alpha1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -28,8 +30,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1" - kscontroller "kubesphere.io/kubesphere/pkg/controller" ) @@ -90,14 +90,16 @@ func (r *ExtensionReconciler) SetupWithManager(mgr *kscontroller.Manager) error } func (r *ExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := r.logger.WithValues("extension", req.String()) + logger.V(4).Info("reconciling extension") + ctx = klog.NewContext(ctx, logger) + extension := &corev1alpha1.Extension{} if err := r.Client.Get(ctx, req.NamespacedName, extension); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } - r.logger.V(4).Info("reconcile", "extension", extension.Name) - - if extension.ObjectMeta.DeletionTimestamp != nil { + if extension.ObjectMeta.DeletionTimestamp.IsZero() { return r.reconcileDelete(ctx, extension) } @@ -108,11 +110,10 @@ func (r *ExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } if err := r.syncExtensionStatus(ctx, extension); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to sync extension status: %s", err) + return ctrl.Result{}, errors.Wrap(err, "failed to sync extension status") } - r.logger.V(4).Info("synced", "extension", extension.Name) - + logger.V(4).Info("extension successfully reconciled") return ctrl.Result{}, nil } @@ -125,13 +126,13 @@ func (r *ExtensionReconciler) reconcileDelete(ctx context.Context, extension *co }, DeleteOptions: client.DeleteOptions{PropagationPolicy: &deletePolicy}, }); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to delete related ExtensionVersion: %s", err) + return ctrl.Result{}, errors.Wrap(err, "failed to delete extension versions") } // Remove the finalizer from the extension controllerutil.RemoveFinalizer(extension, extensionProtection) if err := r.Update(ctx, extension); err != nil { - return ctrl.Result{}, err + return ctrl.Result{}, errors.Wrap(err, "failed to remove finalizer from extension") } return ctrl.Result{}, nil } @@ -144,28 +145,32 @@ func (r *ExtensionReconciler) syncExtensionStatus(ctx context.Context, extension return err } - versions := make([]corev1alpha1.ExtensionVersionInfo, 0, len(versionList.Items)) - for i := range versionList.Items { - if versionList.Items[i].DeletionTimestamp.IsZero() { + versions := make([]corev1alpha1.ExtensionVersionInfo, 0) + for _, version := range versionList.Items { + isValidVersion := len(isValidExtensionVersion(version.Spec.Version)) == 0 + if version.DeletionTimestamp.IsZero() && isValidVersion { versions = append(versions, corev1alpha1.ExtensionVersionInfo{ - Version: versionList.Items[i].Spec.Version, - CreationTimestamp: versionList.Items[i].CreationTimestamp, + Version: version.Spec.Version, + CreationTimestamp: version.CreationTimestamp, }) } } + sort.Slice(versions, func(i, j int) bool { - return versions[i].Version < versions[j].Version + v1, _ := semver.NewVersion(versions[i].Version) + v2, _ := semver.NewVersion(versions[j].Version) + return v1.LessThan(v2) }) err := retry.RetryOnConflict(retry.DefaultRetry, func() error { if err := r.Get(ctx, types.NamespacedName{Name: extension.Name}, extension); err != nil { - return err + return errors.Wrap(err, "failed to get extension") } expected := extension.DeepCopy() if recommended, err := getRecommendedExtensionVersion(versionList.Items, r.k8sVersion); err == nil { expected.Status.RecommendedVersion = recommended } else { - r.logger.Error(err, "failed to get recommended extension version") + klog.FromContext(ctx).Error(err, "failed to get recommended extension version") } expected.Status.Versions = versions if expected.Status.RecommendedVersion != extension.Status.RecommendedVersion || @@ -176,7 +181,7 @@ func (r *ExtensionReconciler) syncExtensionStatus(ctx context.Context, extension }) if err != nil { - return fmt.Errorf("failed to update extension status: %s", err) + return errors.Wrap(err, "failed to update extension status") } return nil } diff --git a/pkg/controller/core/extensionversion_controller.go b/pkg/controller/core/extensionversion_controller.go new file mode 100644 index 000000000..49d0b7dff --- /dev/null +++ b/pkg/controller/core/extensionversion_controller.go @@ -0,0 +1,152 @@ +/* + * Please refer to the LICENSE file in the root directory of the project. + * https://github.com/kubesphere/kubesphere/blob/master/LICENSE + */ + +package core + +import ( + "context" + "reflect" + "strings" + + "github.com/Masterminds/semver/v3" + "github.com/go-logr/logr" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1" + corev1alpha1 "kubesphere.io/api/core/v1alpha1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + kscontroller "kubesphere.io/kubesphere/pkg/controller" +) + +const ( + extensionVersionController = "extensionVersion" +) + +var _ kscontroller.Controller = &ExtensionVersionReconciler{} +var _ reconcile.Reconciler = &ExtensionVersionReconciler{} + +func (r *ExtensionVersionReconciler) Name() string { + return extensionVersionController +} + +func (r *ExtensionVersionReconciler) Enabled(clusterRole string) bool { + return strings.EqualFold(clusterRole, string(clusterv1alpha1.ClusterRoleHost)) +} + +type ExtensionVersionReconciler struct { + client.Client + logger logr.Logger +} + +func (r *ExtensionVersionReconciler) SetupWithManager(mgr *kscontroller.Manager) error { + r.Client = mgr.GetClient() + r.logger = ctrl.Log.WithName("controllers").WithName(extensionVersionController) + return ctrl.NewControllerManagedBy(mgr). + Named(extensionVersionController). + For(&corev1alpha1.ExtensionVersion{}). + Complete(r) +} + +func (r *ExtensionVersionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := r.logger.WithValues("extensionVersion", req.String()) + logger.V(4).Info("reconciling extension version") + ctx = klog.NewContext(ctx, logger) + + extensionVersion := &corev1alpha1.ExtensionVersion{} + if err := r.Client.Get(ctx, req.NamespacedName, extensionVersion); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + if !extensionVersion.ObjectMeta.DeletionTimestamp.IsZero() { + return ctrl.Result{}, nil + } + + if err := r.syncExtensionVersion(ctx, extensionVersion); err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to sync extension version") + } + + logger.V(4).Info("extension version successfully reconciled") + return ctrl.Result{}, nil +} + +func (r *ExtensionVersionReconciler) syncExtensionVersion(ctx context.Context, extensionVersion *corev1alpha1.ExtensionVersion) error { + logger := klog.FromContext(ctx) + + // automatically synchronized by the repository controller + if extensionVersion.Spec.Repository != "" { + return nil + } + + if extensionVersion.Spec.Version != "" { + return nil + } + + extensionVersionSpec, err := fetchExtensionVersionSpec(ctx, r.Client, extensionVersion) + if err != nil { + return errors.Wrap(err, "failed to fetch extension version spec") + } + + if len(isValidExtensionName(extensionVersionSpec.Name)) > 0 { + logger.V(4).Info("invalid extension name found", "name", extensionVersionSpec.Name) + return nil + } + + expected := extensionVersion.DeepCopy() + if expected.Labels == nil { + expected.Labels = map[string]string{} + } + expected.Labels[corev1alpha1.ExtensionReferenceLabel] = extensionVersionSpec.Name + expected.Labels[corev1alpha1.CategoryLabel] = extensionVersionSpec.Category + expected.Spec = extensionVersionSpec + + if !reflect.DeepEqual(expected.Spec, extensionVersion.Spec) || + !reflect.DeepEqual(expected.Labels, extensionVersion.Labels) { + if err := r.Update(ctx, expected); err != nil { + return errors.Wrap(err, "failed to update extension version") + } + logger.V(4).Info("extension version updated") + } + + extension := &corev1alpha1.Extension{ + ObjectMeta: metav1.ObjectMeta{Name: extensionVersionSpec.Name}, + } + + op, err := controllerutil.CreateOrUpdate(ctx, r.Client, extension, func() error { + if !needUpdate(extensionVersionSpec.Version, extension.Status.Versions) { + return nil + } + if extension.Labels == nil { + extension.Labels = make(map[string]string) + } + if extensionVersion.Spec.Category != "" { + extension.Labels[corev1alpha1.CategoryLabel] = extensionVersion.Spec.Category + } + extension.Spec.ExtensionInfo = expected.Spec.ExtensionInfo + return nil + }) + + if err != nil { + return errors.Wrapf(err, "failed to update extension: %v", err) + } + + logger.V(4).Info("extension successfully updated", "operation", op, "name", extension.Name) + return nil +} + +func needUpdate(version string, versions []corev1alpha1.ExtensionVersionInfo) bool { + v1, _ := semver.NewVersion(version) + for _, v := range versions { + v2, _ := semver.NewVersion(v.Version) + if v2.GreaterThan(v1) { + return false + } + } + return true +} diff --git a/pkg/controller/core/installplan_controller.go b/pkg/controller/core/installplan_controller.go index 33cdada1c..24180056a 100644 --- a/pkg/controller/core/installplan_controller.go +++ b/pkg/controller/core/installplan_controller.go @@ -9,8 +9,6 @@ import ( "bytes" "context" "fmt" - "io" - "net/url" "path" "reflect" "sort" @@ -21,8 +19,6 @@ import ( "golang.org/x/exp/slices" "helm.sh/helm/v3/pkg/chart" "helm.sh/helm/v3/pkg/chart/loader" - "helm.sh/helm/v3/pkg/getter" - "helm.sh/helm/v3/pkg/registry" helmrelease "helm.sh/helm/v3/pkg/release" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -413,90 +409,25 @@ func latestJobCondition(job *batchv1.Job) batchv1.JobCondition { return condition } -func (r *InstallPlanReconciler) loadChartData(ctx context.Context) ([]byte, string, error) { +func (r *InstallPlanReconciler) loadChartDataAndCABundle(ctx context.Context) ([]byte, string, error) { extensionVersion, ok := ctx.Value(contextKeyExtensionVersion{}).(*corev1alpha1.ExtensionVersion) if !ok { return nil, "", fmt.Errorf("failed to get extension version from context") } - // load chart data from - if extensionVersion.Spec.ChartDataRef != nil { - configMap := &corev1.ConfigMap{} - if err := r.Get(ctx, types.NamespacedName{Namespace: extensionVersion.Spec.ChartDataRef.Namespace, Name: extensionVersion.Spec.ChartDataRef.Name}, configMap); err != nil { - return nil, "", err - } - data := configMap.BinaryData[extensionVersion.Spec.ChartDataRef.Key] - if data != nil { - return data, "", nil - } - return nil, "", fmt.Errorf("binary data not found") - } - - chartURL, err := url.Parse(extensionVersion.Spec.ChartURL) - if err != nil { - return nil, "", fmt.Errorf("failed to parse chart url: %v", err) - } - - var caBundle string repo := &corev1alpha1.Repository{} if extensionVersion.Spec.Repository != "" { if err := r.Get(ctx, types.NamespacedName{Name: extensionVersion.Spec.Repository}, repo); err != nil { return nil, "", fmt.Errorf("failed to get repository: %v", err) } - caBundle = repo.Spec.CABundle } - var chartGetter getter.Getter - switch chartURL.Scheme { - case registry.OCIScheme: - opts := make([]getter.Option, 0) - if extensionVersion.Spec.Repository != "" { - opts = append(opts, getter.WithInsecureSkipVerifyTLS(repo.Spec.Insecure)) - } - if repo.Spec.BasicAuth != nil { - opts = append(opts, getter.WithBasicAuth(repo.Spec.BasicAuth.Username, repo.Spec.BasicAuth.Password)) - } - chartGetter, err = getter.NewOCIGetter(opts...) - if err != nil { - return nil, "", fmt.Errorf("failed to create chart getter: %v", err) - } - case "http", "https": - opts := make([]getter.Option, 0) - if chartURL.Scheme == "https" && extensionVersion.Spec.Repository != "" { - opts = append(opts, getter.WithInsecureSkipVerifyTLS(repo.Spec.Insecure)) - } - if repo.Spec.CABundle != "" { - caFile, err := storeCAFile(repo.Spec.CABundle, repo.Name) - if err != nil { - return nil, "", fmt.Errorf("failed to store CABundle to local file: %s", err) - } - opts = append(opts, getter.WithTLSClientConfig("", "", caFile)) - } - if chartURL.Scheme == "https" { - opts = append(opts, getter.WithInsecureSkipVerifyTLS(repo.Spec.Insecure)) - } - if repo.Spec.BasicAuth != nil { - opts = append(opts, getter.WithBasicAuth(repo.Spec.BasicAuth.Username, repo.Spec.BasicAuth.Password)) - } - chartGetter, err = getter.NewHTTPGetter(opts...) - if err != nil { - return nil, "", fmt.Errorf("failed to create chart getter: %v", err) - } - default: - return nil, "", fmt.Errorf("unsupported scheme: %s", chartURL.Scheme) - } - - buffer, err := chartGetter.Get(chartURL.String()) + data, err := fetchChartData(ctx, r.Client, extensionVersion) if err != nil { - return nil, "", fmt.Errorf("failed to get chart data: %v", err) + return nil, "", fmt.Errorf("failed to load chart data: %v", err) } - data, err := io.ReadAll(buffer) - if err != nil { - return nil, "", fmt.Errorf("failed to read chart data: %v", err) - } - - return data, caBundle, nil + return data, repo.Spec.CABundle, nil } func updateState(status *corev1alpha1.InstallationStatus, state string, time time.Time) bool { @@ -1174,7 +1105,7 @@ func (r *InstallPlanReconciler) installOrUpgradeExtension(ctx context.Context, p return r.updateInstallPlan(ctx, plan) } - chartData, caBundle, err := r.loadChartData(ctx) + chartData, caBundle, err := r.loadChartDataAndCABundle(ctx) if err != nil { return onFailed(err) } @@ -1366,7 +1297,7 @@ func (r *InstallPlanReconciler) installOrUpgradeClusterAgent(ctx context.Context return r.updateInstallPlan(ctx, plan) } - chartData, caBundle, err := r.loadChartData(ctx) + chartData, caBundle, err := r.loadChartDataAndCABundle(ctx) if err != nil { return onFailed(fmt.Errorf("failed to load chart data: %v", err)) } diff --git a/pkg/controller/core/repository_controller.go b/pkg/controller/core/repository_controller.go index d2a1bed3b..a3f803444 100644 --- a/pkg/controller/core/repository_controller.go +++ b/pkg/controller/core/repository_controller.go @@ -6,28 +6,18 @@ package core import ( - "bytes" "context" - "encoding/base64" - "errors" "fmt" - "mime" - "net/http" "net/url" - "path" "strings" "time" "github.com/go-logr/logr" - "helm.sh/helm/v3/pkg/chart/loader" - appsv1 "k8s.io/api/apps/v1" + "github.com/pkg/errors" + helmrepo "helm.sh/helm/v3/pkg/repo" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/validation" - "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -40,7 +30,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "kubesphere.io/kubesphere/pkg/constants" kscontroller "kubesphere.io/kubesphere/pkg/controller" ) @@ -48,14 +37,11 @@ const ( repositoryProtection = "kubesphere.io/repository-protection" repositoryController = "repository" minimumRegistryPollInterval = 15 * time.Minute - defaultRequeueInterval = 15 * time.Second - generateNameFormat = "repository-%s" + defaultRegistryPollTimeout = 2 * time.Minute extensionFileName = "extension.yaml" - // caTemplate store repository.spec.caBound in local dir. - caTemplate = "{{ .TempDIR }}/repository/{{ .RepositoryName }}/ssl/ca.crt" ) -var extensionRepoConflict = fmt.Errorf("extension repo mismatch") +var extensionRepoConflict = errors.New("extension repo mismatch") var _ kscontroller.Controller = &RepositoryReconciler{} var _ reconcile.Reconciler = &RepositoryReconciler{} @@ -86,7 +72,7 @@ func (r *RepositoryReconciler) SetupWithManager(mgr *kscontroller.Manager) error func (r *RepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := r.logger.WithValues("repository", req.String()) - logger.V(4).Info("sync repository") + logger.V(4).Info("reconciling extension repository") ctx = klog.NewContext(ctx, logger) repo := &corev1alpha1.Repository{} @@ -103,6 +89,7 @@ func (r *RepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) controllerutil.AddFinalizer(expected, repositoryProtection) return ctrl.Result{}, r.Patch(ctx, expected, client.MergeFrom(repo)) } + return r.reconcileRepository(ctx, repo) } @@ -124,10 +111,9 @@ func (r *RepositoryReconciler) createOrUpdateExtension(ctx context.Context, repo op, err := controllerutil.CreateOrUpdate(ctx, r.Client, extension, func() error { originRepoName := extension.Labels[corev1alpha1.RepositoryReferenceLabel] if originRepoName != "" && originRepoName != repo.Name { - logger.Error(extensionRepoConflict, "conflict", "extension", extensionName, "want", originRepoName, "got", repo.Name) + logger.Error(extensionRepoConflict, "extension repo mismatch", "name", extension.Name, "origin", originRepoName, "current", repo.Name) return extensionRepoConflict } - if extension.Labels == nil { extension.Labels = make(map[string]string) } @@ -137,13 +123,13 @@ func (r *RepositoryReconciler) createOrUpdateExtension(ctx context.Context, repo extension.Labels[corev1alpha1.RepositoryReferenceLabel] = repo.Name extension.Spec.ExtensionInfo = extensionVersion.Spec.ExtensionInfo if err := controllerutil.SetOwnerReference(repo, extension, r.Scheme()); err != nil { - return err + return errors.Wrapf(err, "failed to set owner reference") } return nil }) if err != nil { - return nil, fmt.Errorf("failed to update extension: %s", err) + return nil, errors.Wrapf(err, "failed to update extension") } logger.V(4).Info("extension successfully updated", "operation", op, "name", extension.Name) @@ -160,6 +146,12 @@ func (r *RepositoryReconciler) createOrUpdateExtensionVersion(ctx context.Contex for k, v := range extensionVersion.Labels { version.Labels[k] = v } + if version.Annotations == nil { + version.Annotations = make(map[string]string) + } + for k, v := range extensionVersion.Annotations { + version.Annotations[k] = v + } version.Spec = extensionVersion.Spec if err := controllerutil.SetOwnerReference(extension, version, r.Scheme()); err != nil { return err @@ -168,90 +160,87 @@ func (r *RepositoryReconciler) createOrUpdateExtensionVersion(ctx context.Contex }) if err != nil { - return fmt.Errorf("failed to update extension version: %s", err) + return errors.Wrapf(err, "failed to update extension version") } logger.V(4).Info("extension version successfully updated", "operation", op, "name", extensionVersion.Name) return nil } -func (r *RepositoryReconciler) syncExtensionsFromURL(ctx context.Context, repo *corev1alpha1.Repository, repoURL string) error { +func (r *RepositoryReconciler) syncExtensionsFromURL(ctx context.Context, repo *corev1alpha1.Repository, timeout time.Duration) error { logger := klog.FromContext(ctx) - ctx, cancel := context.WithTimeout(ctx, 15*time.Second) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - cred, err := newHelmCred(repo) + cred, err := createHelmCredential(repo) if err != nil { - return err + return errors.Wrapf(err, "failed to create helm credential") } - index, err := helm.LoadRepoIndex(ctx, repoURL, cred) + + repoURL, err := url.Parse(repo.Spec.URL) if err != nil { - return err + return errors.Wrapf(err, "failed to parse repo URL") + } + + index, err := helm.LoadRepoIndex(ctx, repo.Spec.URL, cred) + if err != nil { + return errors.Wrapf(err, "failed to load repo index") } for extensionName, versions := range index.Entries { // check extensionName - if errs := validation.IsDNS1123Subdomain(extensionName); len(errs) > 0 { + if errs := isValidExtensionName(extensionName); len(errs) > 0 { logger.Info("invalid extension name", "extension", extensionName, "error", errs) continue } extensionVersions := make([]corev1alpha1.ExtensionVersion, 0, len(versions)) for _, version := range versions { - if version.Metadata == nil { - logger.Info("version metadata is empty", "repo", repo.Name) - continue - } - if version.Name != extensionName { - logger.Info("invalid extension version found", "want", extensionName, "got", version.Name) + logger.V(4).Info("extension name mismatch", "extension", extensionName, "version", version.Version) continue } - var chartURL string - if len(version.URLs) > 0 { - versionURL := version.URLs[0] - u, err := url.Parse(versionURL) - if err != nil { - logger.Error(err, "failed to parse chart URL", "url", versionURL) - continue - } - if u.Host == "" { - chartURL = fmt.Sprintf("%s/%s", repoURL, versionURL) - } else { - chartURL = u.String() - } - } - - extensionVersionSpec, err := r.loadExtensionVersionSpecFrom(ctx, chartURL, repo, cred) - if err != nil { - return fmt.Errorf("failed to load extension version spec: %s", err) - } - - if extensionVersionSpec == nil { - logger.V(4).Info("extension version spec not found: %s", chartURL) + chartURL := resolveChartURL(version, repoURL) + if chartURL == nil { + logger.V(4).Info("failed to resolve chart URL", "extension", extensionName, "version", version.Version) continue } - extensionVersionSpec.Created = metav1.NewTime(version.Created) - extensionVersionSpec.Digest = version.Digest - extensionVersionSpec.Repository = repo.Name - extensionVersionSpec.ChartDataRef = nil - extensionVersionSpec.ChartURL = chartURL extensionVersion := corev1alpha1.ExtensionVersion{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", extensionName, extensionVersionSpec.Version), + Name: fmt.Sprintf("%s-%s", extensionName, version.Version), Labels: map[string]string{ corev1alpha1.RepositoryReferenceLabel: repo.Name, corev1alpha1.ExtensionReferenceLabel: extensionName, }, Annotations: version.Metadata.Annotations, }, - Spec: *extensionVersionSpec, + Spec: corev1alpha1.ExtensionVersionSpec{ + ChartURL: chartURL.String(), + Repository: repo.Name, + }, } + + extensionVersionSpec, err := r.fetchExtensionVersionSpec(ctx, &extensionVersion) + if err != nil { + return errors.Wrapf(err, "failed to load extension version spec") + } + + if extensionVersionSpec.Name != extensionName { + logger.V(4).Info("extension version name mismatch", "extension", extensionName, "version", version.Version) + continue + } + + extensionVersionSpec.ChartURL = chartURL.String() + extensionVersionSpec.Created = metav1.NewTime(version.Created) + extensionVersionSpec.Digest = version.Digest + extensionVersionSpec.Repository = repo.Name if extensionVersionSpec.Category != "" { extensionVersion.Labels[corev1alpha1.CategoryLabel] = extensionVersionSpec.Category } + + extensionVersion.Spec = extensionVersionSpec extensionVersions = append(extensionVersions, extensionVersion) } @@ -265,29 +254,30 @@ func (r *RepositoryReconciler) syncExtensionsFromURL(ctx context.Context, repo * if errors.Is(err, extensionRepoConflict) { continue } - return fmt.Errorf("failed to create or update extension: %s", err) + return errors.Wrapf(err, "failed to create or update extension") } // create extensionVersions of filteredVersions for _, extensionVersion := range filteredVersions { if err := r.createOrUpdateExtensionVersion(ctx, extension, &extensionVersion); err != nil { - return fmt.Errorf("failed to create or update extension version: %s", err) + return errors.Wrapf(err, "failed to create or update extension version") } } // remove extensionVersions of existVersions - if err := r.removeSuspendedExtensionVersion(ctx, repo, extension, extensionVersions); err != nil { - return fmt.Errorf("failed to remove suspended extension version: %s", err) + if err := r.removeSuspendedExtensionVersion(ctx, repo.Name, extension.Name, extensionVersions); err != nil { + return errors.Wrapf(err, "failed to remove suspended extension version") } } extensions := &corev1alpha1.ExtensionList{} if err := r.List(ctx, extensions, client.MatchingLabels{corev1alpha1.RepositoryReferenceLabel: repo.Name}); err != nil { - return fmt.Errorf("failed to list extensions: %s", err) + return errors.Wrapf(err, "failed to list extensions") } for _, extension := range extensions.Items { if _, ok := index.Entries[extension.Name]; !ok { - if err := r.removeSuspendedExtensionVersion(ctx, repo, &extension, []corev1alpha1.ExtensionVersion{}); err != nil { - return fmt.Errorf("failed to remove suspended extension version: %s", err) + // remove all the extensionVersions if the extension is not in the index + if err := r.removeSuspendedExtensionVersion(ctx, repo.Name, extension.Name, []corev1alpha1.ExtensionVersion{}); err != nil { + return errors.Wrapf(err, "failed to remove suspended extension version") } } } @@ -295,228 +285,81 @@ func (r *RepositoryReconciler) syncExtensionsFromURL(ctx context.Context, repo * return nil } +func resolveChartURL(version *helmrepo.ChartVersion, repoURL *url.URL) *url.URL { + if len(version.URLs) == 0 { + return nil + } + + chartURL, err := url.Parse(version.URLs[0]) + if err != nil { + return nil + } + + if chartURL.Host == "" { + chartURL.Scheme = repoURL.Scheme + chartURL.Host = repoURL.Host + } + + return chartURL +} + func (r *RepositoryReconciler) reconcileRepository(ctx context.Context, repo *corev1alpha1.Repository) (ctrl.Result, error) { registryPollInterval := minimumRegistryPollInterval if repo.Spec.UpdateStrategy != nil && repo.Spec.UpdateStrategy.Interval.Duration > minimumRegistryPollInterval { registryPollInterval = repo.Spec.UpdateStrategy.Interval.Duration } - - var repoURL string - // URL and Image are immutable after creation - if repo.Spec.URL != "" { - repoURL = repo.Spec.URL - } else if repo.Spec.Image != "" { - var deployment appsv1.Deployment - if err := r.Get(ctx, types.NamespacedName{Namespace: constants.KubeSphereNamespace, Name: fmt.Sprintf(generateNameFormat, repo.Name)}, &deployment); err != nil { - if apierrors.IsNotFound(err) { - if err := r.deployRepository(ctx, repo); err != nil { - r.recorder.Event(repo, corev1.EventTypeWarning, "RepositoryDeployFailed", err.Error()) - return ctrl.Result{}, fmt.Errorf("failed to deploy repository: %s", err) - } - r.recorder.Event(repo, corev1.EventTypeNormal, "RepositoryDeployed", "") - return ctrl.Result{Requeue: true, RequeueAfter: defaultRequeueInterval}, nil - } - return ctrl.Result{}, fmt.Errorf("failed to fetch deployment: %s", err) - } - - restartAt, _ := time.Parse(time.RFC3339, deployment.Spec.Template.Annotations["kubesphere.io/restartedAt"]) - if restartAt.IsZero() { - restartAt = deployment.ObjectMeta.CreationTimestamp.Time - } - // restart and pull the latest docker image - if time.Now().After(repo.Status.LastSyncTime.Add(registryPollInterval)) && time.Now().After(restartAt.Add(registryPollInterval)) { - rawData := []byte(fmt.Sprintf("{\"spec\":{\"template\":{\"metadata\":{\"annotations\":{\"kubesphere.io/restartedAt\":\"%s\"}}}}}", time.Now().Format(time.RFC3339))) - if err := r.Patch(ctx, &deployment, client.RawPatch(types.StrategicMergePatchType, rawData)); err != nil { - return ctrl.Result{}, err - } - r.recorder.Event(repo, corev1.EventTypeNormal, "RepositoryRestarted", "") - return ctrl.Result{Requeue: true, RequeueAfter: defaultRequeueInterval}, nil - } - - if deployment.Status.AvailableReplicas != deployment.Status.Replicas { - return ctrl.Result{Requeue: true, RequeueAfter: defaultRequeueInterval}, nil - } - - // ready to sync - repoURL = fmt.Sprintf("http://%s.%s.svc", deployment.Name, constants.KubeSphereNamespace) + registryPollTimeout := defaultRegistryPollTimeout + if repo.Spec.UpdateStrategy != nil && repo.Spec.UpdateStrategy.Timeout.Duration > 0 { + registryPollTimeout = repo.Spec.UpdateStrategy.Timeout.Duration } + repoURL := repo.Spec.URL + if repoURL == "" { + return ctrl.Result{}, nil + } + + logger := klog.FromContext(ctx) + outOfSync := repo.Status.LastSyncTime == nil || time.Now().After(repo.Status.LastSyncTime.Add(registryPollInterval)) - if repoURL != "" && outOfSync { - if err := r.syncExtensionsFromURL(ctx, repo, repoURL); err != nil { + if outOfSync { + if err := r.syncExtensionsFromURL(ctx, repo, registryPollTimeout); err != nil { r.recorder.Eventf(repo, corev1.EventTypeWarning, kscontroller.SyncFailed, "failed to sync extensions from %s: %s", repoURL, err) - return ctrl.Result{}, fmt.Errorf("failed to sync extensions: %s", err) + return ctrl.Result{}, errors.Wrapf(err, "failed to sync extensions from %s", repoURL) } r.recorder.Eventf(repo, corev1.EventTypeNormal, kscontroller.Synced, "sync extensions from %s successfully", repoURL) repo = repo.DeepCopy() repo.Status.LastSyncTime = &metav1.Time{Time: time.Now()} if err := r.Update(ctx, repo); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to update repository: %s", err) + return ctrl.Result{}, errors.Wrapf(err, "failed to update repository status") } + logger.V(4).Info("repository successfully synced", "name", repo.Name) } + logger.V(4).Info("repository successfully reconciled", "name", repo.Name) return ctrl.Result{Requeue: true, RequeueAfter: registryPollInterval}, nil } -func (r *RepositoryReconciler) deployRepository(ctx context.Context, repo *corev1alpha1.Repository) error { - generateName := fmt.Sprintf(generateNameFormat, repo.Name) - deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: generateName, - Namespace: constants.KubeSphereNamespace, - Labels: map[string]string{corev1alpha1.RepositoryReferenceLabel: repo.Name}, - }, - - Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{corev1alpha1.RepositoryReferenceLabel: repo.Name}, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{corev1alpha1.RepositoryReferenceLabel: repo.Name}, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "repository", - Image: repo.Spec.Image, - ImagePullPolicy: corev1.PullAlways, - Env: []corev1.EnvVar{ - { - Name: "CHART_URL", - Value: fmt.Sprintf("http://%s.%s.svc", generateName, constants.KubeSphereNamespace), - }, - }, - LivenessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromInt32(8080), - }, - }, - PeriodSeconds: 10, - InitialDelaySeconds: 5, - }, - }, - }, - }, - }, - }, - } - - if err := controllerutil.SetOwnerReference(repo, deployment, r.Scheme()); err != nil { - return err - } - if err := r.Create(ctx, deployment); err != nil { - return err - } - - service := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: generateName, - Namespace: constants.KubeSphereNamespace, - }, - Spec: corev1.ServiceSpec{ - Ports: []corev1.ServicePort{ - { - Port: 80, - Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt32(8080), - }, - }, - Selector: map[string]string{ - corev1alpha1.RepositoryReferenceLabel: repo.Name, - }, - Type: corev1.ServiceTypeClusterIP, - }, - } - - if err := controllerutil.SetOwnerReference(repo, service, r.Scheme()); err != nil { - return err - } - - if err := r.Create(ctx, service); err != nil { - return err - } - - return nil -} - -func (r *RepositoryReconciler) loadExtensionVersionSpecFrom(ctx context.Context, chartURL string, repo *corev1alpha1.Repository, cred helm.RepoCredential) (*corev1alpha1.ExtensionVersionSpec, error) { - logger := klog.FromContext(ctx) - var result *corev1alpha1.ExtensionVersionSpec - - err := retry.OnError(retry.DefaultRetry, func(err error) bool { +func (r *RepositoryReconciler) fetchExtensionVersionSpec(ctx context.Context, extensionVersion *corev1alpha1.ExtensionVersion) (corev1alpha1.ExtensionVersionSpec, error) { + var extensionVersionSpec corev1alpha1.ExtensionVersionSpec + var err error + err = retry.OnError(retry.DefaultRetry, func(err error) bool { return true }, func() error { - data, err := helm.LoadData(ctx, chartURL, cred) - if err != nil { - return err - } + extensionVersionSpec, err = fetchExtensionVersionSpec(ctx, r.Client, extensionVersion) - files, err := loader.LoadArchiveFiles(data) - if err != nil { - return err - } - - for _, file := range files { - if file.Name == extensionFileName { - extensionVersionSpec := &corev1alpha1.ExtensionVersionSpec{} - if err := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(file.Data), 1024).Decode(extensionVersionSpec); err != nil { - logger.V(4).Info("invalid extension version spec: %s", string(file.Data)) - return nil - } - result = extensionVersionSpec - break - } - } - - if result == nil { - logger.V(6).Info("extension.yaml not found", "chart", chartURL) - return nil - } - - if strings.HasPrefix(result.Icon, "http://") || - strings.HasPrefix(result.Icon, "https://") || - strings.HasPrefix(result.Icon, "data:image") { - return nil - } - - absPath := strings.TrimPrefix(result.Icon, "./") - var iconData []byte - for _, file := range files { - if file.Name == absPath { - iconData = file.Data - break - } - } - - if iconData == nil { - logger.V(4).Info("invalid extension icon path: %s", absPath) - return nil - } - - mimeType := mime.TypeByExtension(path.Ext(result.Icon)) - if mimeType == "" { - mimeType = http.DetectContentType(iconData) - } - - base64EncodedData := base64.StdEncoding.EncodeToString(iconData) - result.Icon = fmt.Sprintf("data:%s;base64,%s", mimeType, base64EncodedData) return nil }) - if err != nil { - return nil, fmt.Errorf("failed to fetch chart data from %s: %s", chartURL, err) + return extensionVersionSpec, errors.Wrapf(err, "failed to fetch extension version spec") } - return result, nil + return extensionVersionSpec, nil } -func (r *RepositoryReconciler) removeSuspendedExtensionVersion(ctx context.Context, repo *corev1alpha1.Repository, extension *corev1alpha1.Extension, versions []corev1alpha1.ExtensionVersion) error { +func (r *RepositoryReconciler) removeSuspendedExtensionVersion(ctx context.Context, repoName, extensionName string, versions []corev1alpha1.ExtensionVersion) error { extensionVersions := &corev1alpha1.ExtensionVersionList{} - if err := r.List(ctx, extensionVersions, client.MatchingLabels{corev1alpha1.ExtensionReferenceLabel: extension.Name, corev1alpha1.RepositoryReferenceLabel: repo.Name}); err != nil { - return fmt.Errorf("failed to list extension versions: %s", err) + if err := r.List(ctx, extensionVersions, client.MatchingLabels{corev1alpha1.ExtensionReferenceLabel: extensionName, corev1alpha1.RepositoryReferenceLabel: repoName}); err != nil { + return errors.Wrapf(err, "failed to list extension versions") } for _, version := range extensionVersions.Items { if checkIfSuspended(versions, version) { @@ -525,7 +368,7 @@ func (r *RepositoryReconciler) removeSuspendedExtensionVersion(ctx context.Conte if apierrors.IsNotFound(err) { return nil } - return fmt.Errorf("failed to delete extension version: %s", err) + return errors.Wrapf(err, "failed to delete extension version %s", version.Name) } } } diff --git a/pkg/controller/core/util.go b/pkg/controller/core/util.go index 9107c31bd..bc295b7b6 100644 --- a/pkg/controller/core/util.go +++ b/pkg/controller/core/util.go @@ -7,32 +7,46 @@ package core import ( "bytes" + "context" "encoding/base64" - goerrors "errors" "fmt" "io" - "os" - "path/filepath" + "mime" + "net" + "net/http" + "net/url" + "path" "slices" "sort" "strings" - "text/template" + "time" "github.com/Masterminds/semver/v3" + "github.com/pkg/errors" yaml3 "gopkg.in/yaml.v3" "helm.sh/helm/v3/pkg/chart" + "helm.sh/helm/v3/pkg/chart/loader" + "helm.sh/helm/v3/pkg/getter" + "helm.sh/helm/v3/pkg/registry" "helm.sh/helm/v3/pkg/storage/driver" + corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/klog/v2" clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1" corev1alpha1 "kubesphere.io/api/core/v1alpha1" "kubesphere.io/utils/helm" + "sigs.k8s.io/controller-runtime/pkg/client" "kubesphere.io/kubesphere/pkg/utils/hashutil" "kubesphere.io/kubesphere/pkg/version" ) +const ExtensionVersionMaxLength = validation.LabelValueMaxLength +const ExtensionNameMaxLength = validation.LabelValueMaxLength + func getRecommendedExtensionVersion(versions []corev1alpha1.ExtensionVersion, k8sVersion *semver.Version) (string, error) { if len(versions) == 0 { return "", nil @@ -40,11 +54,10 @@ func getRecommendedExtensionVersion(versions []corev1alpha1.ExtensionVersion, k8 ksVersion, err := semver.NewVersion(version.Get().GitVersion) if err != nil { - return "", fmt.Errorf("parse KubeSphere version failed: %v", err) + return "", errors.Wrapf(err, "failed to parse KS version: %s", version.Get().GitVersion) } var matchedVersions []*semver.Version - for _, v := range versions { kubeVersionMatched, ksVersionMatched := matchVersionConstraints(v, k8sVersion, ksVersion) if kubeVersionMatched && ksVersionMatched { @@ -231,7 +244,7 @@ func usesPermissions(mainChart *chart.Chart) (rbacv1.ClusterRole, rbacv1.Role) { continue } // break the loop in case of EOF - if goerrors.Is(err, io.EOF) { + if errors.Is(err, io.EOF) { break } if err != nil { @@ -286,18 +299,11 @@ func configChanged(sub *corev1alpha1.InstallPlan, cluster string) bool { return newConfigHash != oldConfigHash } -// newHelmCred from Repository -func newHelmCred(repo *corev1alpha1.Repository) (helm.RepoCredential, error) { +// createHelmCredential from Repository +func createHelmCredential(repo *corev1alpha1.Repository) (helm.RepoCredential, error) { cred := helm.RepoCredential{ InsecureSkipTLSVerify: repo.Spec.Insecure, } - if repo.Spec.CABundle != "" { - caFile, err := storeCAFile(repo.Spec.CABundle, repo.Name) - if err != nil { - return cred, err - } - cred.CAFile = caFile - } if repo.Spec.BasicAuth != nil { cred.Username = repo.Spec.BasicAuth.Username cred.Password = repo.Spec.BasicAuth.Password @@ -305,38 +311,191 @@ func newHelmCred(repo *corev1alpha1.Repository) (helm.RepoCredential, error) { return cred, nil } -// storeCAFile in local file from caTemplate. -func storeCAFile(caBundle string, repoName string) (string, error) { - var buff = &bytes.Buffer{} - tmpl, err := template.New("repositoryCABundle").Parse(caTemplate) +func fetchExtensionVersionSpec(ctx context.Context, client client.Reader, extensionVersion *corev1alpha1.ExtensionVersion) (corev1alpha1.ExtensionVersionSpec, error) { + extensionVersionSpec := extensionVersion.Spec + logger := klog.FromContext(ctx) + data, err := fetchChartData(ctx, client, extensionVersion) if err != nil { - return "", err + return extensionVersionSpec, errors.Wrapf(err, "failed to fetch chart data") } - if err := tmpl.Execute(buff, map[string]string{ - "TempDIR": os.TempDir(), - "RepositoryName": repoName, - }); err != nil { - return "", err + helmChart, err := loader.LoadArchive(bytes.NewReader(data)) + if err != nil { + return extensionVersionSpec, errors.Wrapf(err, "failed to load chart archive") } - caFile := buff.String() - if _, err := os.Stat(filepath.Dir(caFile)); err != nil { - if !os.IsNotExist(err) { - return "", err + errs := isValidExtensionVersion(helmChart.Metadata.Version) + if len(errs) > 0 { + logger.V(4).Info("invalid extension version", "errors", errs) + return extensionVersionSpec, nil + } + for _, file := range helmChart.Files { + if file.Name == extensionFileName { + if err := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(file.Data), 1024).Decode(&extensionVersionSpec); err != nil { + logger.V(4).Info("failed to decode extension.yaml", "error", err) + return extensionVersionSpec, nil + } + break } - - if err := os.MkdirAll(filepath.Dir(caFile), os.ModePerm); err != nil { - return "", err + } + extensionVersionSpec.Name = helmChart.Name() + absPath := strings.TrimPrefix(extensionVersionSpec.Icon, "./") + var iconData []byte + for _, file := range helmChart.Files { + if file.Name == absPath { + iconData = file.Data + break } } - data, err := base64.StdEncoding.DecodeString(caBundle) - if err != nil { - return "", err + if iconData != nil { + mimeType := mime.TypeByExtension(path.Ext(extensionVersionSpec.Icon)) + if mimeType == "" { + mimeType = http.DetectContentType(iconData) + } + base64EncodedData := base64.StdEncoding.EncodeToString(iconData) + extensionVersionSpec.Icon = fmt.Sprintf("data:%s;base64,%s", mimeType, base64EncodedData) } - if err := os.WriteFile(caFile, data, os.ModePerm); err != nil { - return "", err - } - - return caFile, nil + return extensionVersionSpec, nil +} + +func fetchChartData(ctx context.Context, client client.Reader, extensionVersion *corev1alpha1.ExtensionVersion) ([]byte, error) { + if extensionVersion.Spec.ChartDataRef != nil { + return fetchChartDataFromConfigMap(ctx, client, extensionVersion.Spec.ChartDataRef) + } + + chartURL, err := url.Parse(extensionVersion.Spec.ChartURL) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse chart URL: %s", extensionVersion.Spec.ChartURL) + } + + repo, err := fetchRepository(ctx, client, extensionVersion.Spec.Repository) + if err != nil { + return nil, errors.Wrapf(err, "failed to fetch repository: %s", extensionVersion.Spec.Repository) + } + + repoURL, err := url.Parse(repo.Spec.URL) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse repo URL: %s", extensionVersion.Spec.ChartURL) + } + + if chartURL.Host == "" { + chartURL.Scheme = repoURL.Scheme + chartURL.Host = repoURL.Host + } + + transport, err := createTransport(repo, chartURL.Hostname()) + if err != nil { + return nil, errors.Wrapf(err, "failed to create transport") + } + + opts := createGetterOptions(repo, transport) + chartGetter, err := createChartGetter(chartURL.Scheme, opts) + if err != nil { + return nil, errors.Wrapf(err, "failed to create chart getter") + } + + return getChartData(chartGetter, chartURL.String()) +} + +func fetchChartDataFromConfigMap(ctx context.Context, client client.Reader, ref *corev1alpha1.ConfigMapKeyRef) ([]byte, error) { + configMap := &corev1.ConfigMap{} + if err := client.Get(ctx, types.NamespacedName{Namespace: ref.Namespace, Name: ref.Name}, configMap); err != nil { + return nil, errors.Wrapf(err, "failed to get config map: %s", ref.Name) + } + data := configMap.BinaryData[ref.Key] + if data != nil { + return data, nil + } + return nil, errors.New("chart data not found in config map") +} + +func fetchRepository(ctx context.Context, client client.Reader, repoName string) (*corev1alpha1.Repository, error) { + if repoName == "" { + return &corev1alpha1.Repository{}, nil + } + repo := &corev1alpha1.Repository{} + if err := client.Get(ctx, types.NamespacedName{Name: repoName}, repo); err != nil { + return nil, errors.Wrapf(err, "failed to get repository: %s", repoName) + } + return repo, nil +} + +func createTransport(repo *corev1alpha1.Repository, serverName string) (*http.Transport, error) { + tlsConf, err := helm.NewTLSConfig(repo.Spec.CABundle, repo.Spec.Insecure) + if err != nil { + return nil, errors.Wrapf(err, "failed to create tls config") + } + tlsConf.ServerName = serverName + + return &http.Transport{ + DisableCompression: true, + DialContext: (&net.Dialer{Timeout: 5 * time.Second, KeepAlive: 30 * time.Second}).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsConf, + }, nil +} + +func createGetterOptions(repo *corev1alpha1.Repository, transport *http.Transport) []getter.Option { + opts := []getter.Option{getter.WithTransport(transport)} + if repo.Spec.BasicAuth != nil { + opts = append(opts, getter.WithBasicAuth(repo.Spec.BasicAuth.Username, repo.Spec.BasicAuth.Password)) + } + return opts +} + +func createChartGetter(scheme string, opts []getter.Option) (getter.Getter, error) { + switch scheme { + case registry.OCIScheme: + return getter.NewOCIGetter(opts...) + case "http", "https": + return getter.NewHTTPGetter(opts...) + default: + return nil, errors.Errorf("unsupported scheme: %s", scheme) + } +} + +func getChartData(chartGetter getter.Getter, url string) ([]byte, error) { + buffer, err := chartGetter.Get(url) + if err != nil { + return nil, errors.Wrapf(err, "failed to fetch chart data: %s", url) + } + + data, err := io.ReadAll(buffer) + if err != nil { + return nil, errors.Wrapf(err, "failed to read chart data: %s", url) + } + return data, nil +} + +func isValidExtensionVersion(version string) []string { + var errs []string + if len(version) > ExtensionVersionMaxLength { + errs = append(errs, fmt.Sprintf("extension version length exceeds %d", ExtensionVersionMaxLength)) + } + if _, err := semver.NewVersion(version); err != nil { + errs = append(errs, fmt.Sprintf("invalid semver format: %s", err)) + } + if len(validation.IsDNS1123Subdomain(version)) > 0 { + errs = append(errs, "invalid DNS-1123 subdomain") + } + return errs +} + +func isValidExtensionName(name string) []string { + var errs []string + if name == "" { + errs = append(errs, "extension name should not be empty") + } + if len(name) > ExtensionNameMaxLength { + errs = append(errs, fmt.Sprintf("extension name length exceeds %d", ExtensionNameMaxLength)) + } + if len(validation.IsDNS1123Subdomain(name)) > 0 { + errs = append(errs, "invalid DNS-1123 subdomain") + } + return errs } diff --git a/staging/src/kubesphere.io/api/core/v1alpha1/extension_types.go b/staging/src/kubesphere.io/api/core/v1alpha1/extension_types.go index 03cc652e9..5bb378d40 100644 --- a/staging/src/kubesphere.io/api/core/v1alpha1/extension_types.go +++ b/staging/src/kubesphere.io/api/core/v1alpha1/extension_types.go @@ -44,6 +44,7 @@ type ExtensionSpec struct { // ExtensionVersionSpec contains the details of a specific version extension. type ExtensionVersionSpec struct { ExtensionInfo `json:",inline"` + Name string `json:"-"` Version string `json:"version,omitempty"` Keywords []string `json:"keywords,omitempty"` Sources []string `json:"sources,omitempty"` diff --git a/staging/src/kubesphere.io/api/core/v1alpha1/repository_types.go b/staging/src/kubesphere.io/api/core/v1alpha1/repository_types.go index 96fc6dfd5..56d954717 100644 --- a/staging/src/kubesphere.io/api/core/v1alpha1/repository_types.go +++ b/staging/src/kubesphere.io/api/core/v1alpha1/repository_types.go @@ -6,6 +6,7 @@ import ( type UpdateStrategy struct { RegistryPoll `json:"registryPoll,omitempty"` + Timeout metav1.Duration `json:"timeout"` } type RegistryPoll struct { @@ -18,8 +19,6 @@ type BasicAuth struct { } type RepositorySpec struct { - // DEPRECATED: the field will remove in future versions, please use url. - Image string `json:"image,omitempty"` URL string `json:"url,omitempty"` Description string `json:"description,omitempty"` BasicAuth *BasicAuth `json:"basicAuth,omitempty"` diff --git a/staging/src/kubesphere.io/api/core/v1alpha1/zz_generated.deepcopy.go b/staging/src/kubesphere.io/api/core/v1alpha1/zz_generated.deepcopy.go index 73c4fc616..0e96e67e3 100644 --- a/staging/src/kubesphere.io/api/core/v1alpha1/zz_generated.deepcopy.go +++ b/staging/src/kubesphere.io/api/core/v1alpha1/zz_generated.deepcopy.go @@ -856,6 +856,7 @@ func (in *ServiceAccountList) DeepCopyObject() runtime.Object { func (in *UpdateStrategy) DeepCopyInto(out *UpdateStrategy) { *out = *in out.RegistryPoll = in.RegistryPoll + out.Timeout = in.Timeout } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpdateStrategy. diff --git a/staging/src/kubesphere.io/utils/go.mod b/staging/src/kubesphere.io/utils/go.mod index cb40a7bbc..9bf36bd16 100644 --- a/staging/src/kubesphere.io/utils/go.mod +++ b/staging/src/kubesphere.io/utils/go.mod @@ -6,6 +6,7 @@ go 1.22.11 require ( github.com/aws/aws-sdk-go v1.55.5 + github.com/pkg/errors v0.9.1 helm.sh/helm/v3 v3.16.2 k8s.io/api v0.31.2 k8s.io/apimachinery v0.31.2 @@ -106,7 +107,6 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.20.5 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect diff --git a/staging/src/kubesphere.io/utils/helm/repo_index.go b/staging/src/kubesphere.io/utils/helm/repo_index.go index eb83d5ebd..e22848e90 100644 --- a/staging/src/kubesphere.io/utils/helm/repo_index.go +++ b/staging/src/kubesphere.io/utils/helm/repo_index.go @@ -4,10 +4,13 @@ import ( "bytes" "context" "fmt" + "net" + "net/http" "net/url" "strings" "time" + "github.com/pkg/errors" "helm.sh/helm/v3/pkg/getter" helmrepo "helm.sh/helm/v3/pkg/repo" "kubesphere.io/utils/s3" @@ -17,7 +20,6 @@ import ( const IndexYaml = "index.yaml" func LoadRepoIndex(ctx context.Context, u string, cred RepoCredential) (*helmrepo.IndexFile, error) { - if !strings.HasSuffix(u, "/") { u = fmt.Sprintf("%s/%s", u, IndexYaml) } else { @@ -26,12 +28,12 @@ func LoadRepoIndex(ctx context.Context, u string, cred RepoCredential) (*helmrep resp, err := LoadData(ctx, u, cred) if err != nil { - return nil, err + return nil, errors.Errorf("can't load data from %s: %v", u, err) } indexFile, err := loadIndex(resp.Bytes()) if err != nil { - return nil, err + return nil, errors.Errorf("can't load index file: %v", err) } return indexFile, nil @@ -52,10 +54,10 @@ func loadIndex(data []byte) (*helmrepo.IndexFile, error) { return i, nil } -func LoadData(ctx context.Context, u string, cred RepoCredential) (*bytes.Buffer, error) { +func LoadData(_ context.Context, u string, cred RepoCredential) (*bytes.Buffer, error) { parsedURL, err := url.Parse(u) if err != nil { - return nil, err + return nil, errors.Errorf("can't parse url: %v", err) } var resp *bytes.Buffer if strings.HasPrefix(u, "s3://") { @@ -71,22 +73,38 @@ func LoadData(ctx context.Context, u string, cred RepoCredential) (*bytes.Buffer }) if err != nil { - return nil, err + return nil, errors.Errorf("can't create s3 client: %v", err) } data, err := client.Read(p) if err != nil { - return nil, err + return nil, errors.Errorf("can't read data from s3: %v", err) } resp = bytes.NewBuffer(data) } else { + tlsConf, err := NewTLSConfig(cred.CABundle, cred.InsecureSkipTLSVerify) + if err != nil { + return nil, errors.Errorf("can't create tls config: %v", err) + } + tlsConf.ServerName = parsedURL.Hostname() + transport := &http.Transport{ + DisableCompression: true, + DialContext: (&net.Dialer{Timeout: 5 * time.Second, KeepAlive: 30 * time.Second}).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsConf, + } + // TODO add user-agent g, _ := getter.NewHTTPGetter() resp, err = g.Get(parsedURL.String(), getter.WithTimeout(5*time.Minute), - getter.WithInsecureSkipVerifyTLS(cred.InsecureSkipTLSVerify), - getter.WithTLSClientConfig(cred.CertFile, cred.KeyFile, cred.CAFile), + getter.WithTransport(transport), getter.WithBasicAuth(cred.Username, cred.Password), ) if err != nil { @@ -121,12 +139,8 @@ type RepoCredential struct { Username string `json:"username,omitempty"` // chart repository password Password string `json:"password,omitempty"` - // identify HTTPS client using this SSL certificate file - CertFile string `json:"certFile,omitempty"` - // identify HTTPS client using this SSL key file - KeyFile string `json:"keyFile,omitempty"` // verify certificates of HTTPS-enabled servers using this CA bundle - CAFile string `json:"caFile,omitempty"` + CABundle string `json:"caBundle,omitempty"` // skip tls certificate checks for the repository, default is ture InsecureSkipTLSVerify bool `json:"insecureSkipTLSVerify,omitempty"` diff --git a/staging/src/kubesphere.io/utils/helm/tls_util.go b/staging/src/kubesphere.io/utils/helm/tls_util.go new file mode 100644 index 000000000..2256db3fe --- /dev/null +++ b/staging/src/kubesphere.io/utils/helm/tls_util.go @@ -0,0 +1,38 @@ +package helm + +import ( + "crypto/tls" + "crypto/x509" + "encoding/base64" + "fmt" + + "github.com/pkg/errors" +) + +func NewTLSConfig(caBundle string, insecureSkipTLSVerify bool) (*tls.Config, error) { + config := tls.Config{ + InsecureSkipVerify: insecureSkipTLSVerify, + } + + if caBundle != "" { + caCerts, err := base64.StdEncoding.DecodeString(caBundle) + if err != nil { + return nil, fmt.Errorf("failed to decode caBundle: %v", err) + } + cp, err := certPoolFromCABundle(caCerts) + if err != nil { + return nil, err + } + config.RootCAs = cp + } + + return &config, nil +} + +func certPoolFromCABundle(caCerts []byte) (*x509.CertPool, error) { + cp := x509.NewCertPool() + if !cp.AppendCertsFromPEM(caCerts) { + return nil, errors.Errorf("failed to append certificates from caBundle") + } + return cp, nil +}