[kse-2112] add extension version controller (#6399)

Signed-off-by: ks-ci-bot <ks-ci-bot@kubesphere.io>
Co-authored-by: ks-ci-bot <ks-ci-bot@kubesphere.io>
This commit is contained in:
KubeSphere CI Bot
2025-03-11 10:13:47 +08:00
committed by GitHub
parent 7e2cafd15b
commit 0556934ecc
16 changed files with 669 additions and 506 deletions

View File

@@ -59,6 +59,7 @@ import (
func init() {
// core
runtime.Must(controller.Register(&core.ExtensionReconciler{}))
runtime.Must(controller.Register(&core.ExtensionVersionReconciler{}))
runtime.Must(controller.Register(&core.CategoryReconciler{}))
runtime.Must(controller.Register(&core.RepositoryReconciler{}))
runtime.Must(controller.Register(&core.InstallPlanReconciler{}))

View File

@@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: (devel)
creationTimestamp: null
controller-gen.kubebuilder.io/version: (unknown)
name: jsbundles.extensions.kubesphere.io
spec:
group: extensions.kubesphere.io
@@ -18,18 +17,24 @@ spec:
- name: v1alpha1
schema:
openAPIV3Schema:
description: JSBundle declares a js bundle that needs to be injected into
ks-console, the endpoint can be provided by a service or a static file.
description: |-
JSBundle declares a js bundle that needs to be injected into ks-console,
the endpoint can be provided by a service or a static file.
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
@@ -55,15 +60,19 @@ spec:
name:
type: string
service:
description: service is a reference to the service for this
endpoint. Either service or url must be specified. the
scheme is default to HTTPS.
description: |-
service is a reference to the service for this endpoint. Either
service or url must be specified.
the scheme is default to HTTPS.
properties:
name:
description: name is the name of the service. Required
description: |-
name is the name of the service.
Required
type: string
namespace:
description: namespace is the namespace of the service.
description: |-
namespace is the namespace of the service.
Required
type: string
path:
@@ -71,10 +80,10 @@ spec:
upstream will be contacted.
type: string
port:
description: port is an optional service port at which
the upstream will be contacted. `port` should be a
valid port number (1-65535, inclusive). Defaults to
443 for backward compatibility.
description: |-
port is an optional service port at which the upstream will be contacted.
`port` should be a valid port number (1-65535, inclusive).
Defaults to 443 for backward compatibility.
format: int32
type: integer
required:
@@ -82,9 +91,10 @@ spec:
- namespace
type: object
url:
description: '`url` gives the location of the upstream,
in standard URL form (`scheme://host:port/path`). Exactly
one of `url` or `service` must be specified.'
description: |-
`url` gives the location of the upstream, in standard URL form
(`scheme://host:port/path`). Exactly one of `url` or `service`
must be specified.
type: string
type: object
type: array
@@ -98,15 +108,19 @@ spec:
link:
type: string
service:
description: service is a reference to the service for this
endpoint. Either service or url must be specified. the scheme
is default to HTTPS.
description: |-
service is a reference to the service for this endpoint. Either
service or url must be specified.
the scheme is default to HTTPS.
properties:
name:
description: name is the name of the service. Required
description: |-
name is the name of the service.
Required
type: string
namespace:
description: namespace is the namespace of the service.
description: |-
namespace is the namespace of the service.
Required
type: string
path:
@@ -114,10 +128,10 @@ spec:
upstream will be contacted.
type: string
port:
description: port is an optional service port at which
the upstream will be contacted. `port` should be a valid
port number (1-65535, inclusive). Defaults to 443 for
backward compatibility.
description: |-
port is an optional service port at which the upstream will be contacted.
`port` should be a valid port number (1-65535, inclusive).
Defaults to 443 for backward compatibility.
format: int32
type: integer
required:
@@ -125,9 +139,10 @@ spec:
- namespace
type: object
url:
description: '`url` gives the location of the upstream, in
standard URL form (`scheme://host:port/path`). Exactly one
of `url` or `service` must be specified.'
description: |-
`url` gives the location of the upstream, in standard URL form
(`scheme://host:port/path`). Exactly one of `url` or `service`
must be specified.
type: string
type: object
type: object
@@ -146,8 +161,13 @@ spec:
description: The key to select.
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Add other useful fields. apiVersion, kind, uid?'
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
type: string
namespace:
type: string
@@ -170,8 +190,13 @@ spec:
a valid secret key.
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Add other useful fields. apiVersion, kind, uid?'
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
type: string
namespace:
type: string
@@ -185,25 +210,30 @@ spec:
type: object
x-kubernetes-map-type: atomic
service:
description: service is a reference to the service for this endpoint.
Either service or url must be specified. the scheme is default
to HTTPS.
description: |-
service is a reference to the service for this endpoint. Either
service or url must be specified.
the scheme is default to HTTPS.
properties:
name:
description: name is the name of the service. Required
description: |-
name is the name of the service.
Required
type: string
namespace:
description: namespace is the namespace of the service. Required
description: |-
namespace is the namespace of the service.
Required
type: string
path:
description: path is an optional URL path at which the upstream
will be contacted.
type: string
port:
description: port is an optional service port at which the
upstream will be contacted. `port` should be a valid port
number (1-65535, inclusive). Defaults to 443 for backward
compatibility.
description: |-
port is an optional service port at which the upstream will be contacted.
`port` should be a valid port number (1-65535, inclusive).
Defaults to 443 for backward compatibility.
format: int32
type: integer
required:
@@ -211,9 +241,10 @@ spec:
- namespace
type: object
url:
description: '`url` gives the location of the upstream, in standard
URL form (`scheme://host:port/path`). Exactly one of `url` or
`service` must be specified.'
description: |-
`url` gives the location of the upstream, in standard URL form
(`scheme://host:port/path`). Exactly one of `url` or `service`
must be specified.
type: string
type: object
type: object
@@ -221,43 +252,35 @@ spec:
properties:
conditions:
items:
description: "Condition contains details for one aspect of the current
state of this API Resource. --- This struct is intended for direct
use as an array at the field path .status.conditions. For example,
\n type FooStatus struct{ // Represents the observations of a
foo's current state. // Known .status.conditions.type are: \"Available\",
\"Progressing\", and \"Degraded\" // +patchMergeKey=type // +patchStrategy=merge
// +listType=map // +listMapKey=type Conditions []metav1.Condition
`json:\"conditions,omitempty\" patchStrategy:\"merge\" patchMergeKey:\"type\"
protobuf:\"bytes,1,rep,name=conditions\"` \n // other fields }"
description: Condition contains details for one aspect of the current
state of this API Resource.
properties:
lastTransitionTime:
description: lastTransitionTime is the last time the condition
transitioned from one status to another. This should be when
the underlying condition changed. If that is not known, then
using the time when the API field changed is acceptable.
description: |-
lastTransitionTime is the last time the condition transitioned from one status to another.
This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable.
format: date-time
type: string
message:
description: message is a human readable message indicating
details about the transition. This may be an empty string.
description: |-
message is a human readable message indicating details about the transition.
This may be an empty string.
maxLength: 32768
type: string
observedGeneration:
description: observedGeneration represents the .metadata.generation
that the condition was set based upon. For instance, if .metadata.generation
is currently 12, but the .status.conditions[x].observedGeneration
is 9, the condition is out of date with respect to the current
state of the instance.
description: |-
observedGeneration represents the .metadata.generation that the condition was set based upon.
For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date
with respect to the current state of the instance.
format: int64
minimum: 0
type: integer
reason:
description: reason contains a programmatic identifier indicating
the reason for the condition's last transition. Producers
of specific condition types may define expected values and
meanings for this field, and whether the values are considered
a guaranteed API. The value should be a CamelCase string.
description: |-
reason contains a programmatic identifier indicating the reason for the condition's last transition.
Producers of specific condition types may define expected values and meanings for this field,
and whether the values are considered a guaranteed API.
The value should be a CamelCase string.
This field may not be empty.
maxLength: 1024
minLength: 1
@@ -272,10 +295,6 @@ spec:
type: string
type:
description: type of condition in CamelCase or in foo.example.com/CamelCase.
--- Many .condition.type values are consistent across resources
like Available, but because arbitrary conditions can be useful
(see .node.status.conditions), the ability to deconflict is
important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt)
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string

View File

@@ -54,15 +54,12 @@ spec:
to verify the helm server.
type: string
depth:
description: The number of synchronized versions of each extension.
0 means synchronized all versions, default is 3.
description: The maximum number of synchronized versions for each
extension. A value of 0 indicates that all versions will be synchronized.
The default is 3.
type: integer
description:
type: string
image:
description: 'DEPRECATED: the field will remove in future versions,
please use url.'
type: string
insecure:
description: --insecure-skip-tls-verify. default false
type: boolean
@@ -75,6 +72,10 @@ spec:
required:
- interval
type: object
timeout:
type: string
required:
- timeout
type: object
url:
type: string

View File

@@ -10,8 +10,6 @@ import (
"strconv"
"strings"
kscontroller "kubesphere.io/kubesphere/pkg/controller"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
@@ -24,6 +22,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
kscontroller "kubesphere.io/kubesphere/pkg/controller"
)
const (

View File

@@ -7,17 +7,19 @@ package core
import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"github.com/Masterminds/semver/v3"
"github.com/go-logr/logr"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
corev1alpha1 "kubesphere.io/api/core/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -28,8 +30,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
kscontroller "kubesphere.io/kubesphere/pkg/controller"
)
@@ -90,14 +90,16 @@ func (r *ExtensionReconciler) SetupWithManager(mgr *kscontroller.Manager) error
}
func (r *ExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.logger.WithValues("extension", req.String())
logger.V(4).Info("reconciling extension")
ctx = klog.NewContext(ctx, logger)
extension := &corev1alpha1.Extension{}
if err := r.Client.Get(ctx, req.NamespacedName, extension); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
r.logger.V(4).Info("reconcile", "extension", extension.Name)
if extension.ObjectMeta.DeletionTimestamp != nil {
if extension.ObjectMeta.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, extension)
}
@@ -108,11 +110,10 @@ func (r *ExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
if err := r.syncExtensionStatus(ctx, extension); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to sync extension status: %s", err)
return ctrl.Result{}, errors.Wrap(err, "failed to sync extension status")
}
r.logger.V(4).Info("synced", "extension", extension.Name)
logger.V(4).Info("extension successfully reconciled")
return ctrl.Result{}, nil
}
@@ -125,13 +126,13 @@ func (r *ExtensionReconciler) reconcileDelete(ctx context.Context, extension *co
},
DeleteOptions: client.DeleteOptions{PropagationPolicy: &deletePolicy},
}); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to delete related ExtensionVersion: %s", err)
return ctrl.Result{}, errors.Wrap(err, "failed to delete extension versions")
}
// Remove the finalizer from the extension
controllerutil.RemoveFinalizer(extension, extensionProtection)
if err := r.Update(ctx, extension); err != nil {
return ctrl.Result{}, err
return ctrl.Result{}, errors.Wrap(err, "failed to remove finalizer from extension")
}
return ctrl.Result{}, nil
}
@@ -144,28 +145,32 @@ func (r *ExtensionReconciler) syncExtensionStatus(ctx context.Context, extension
return err
}
versions := make([]corev1alpha1.ExtensionVersionInfo, 0, len(versionList.Items))
for i := range versionList.Items {
if versionList.Items[i].DeletionTimestamp.IsZero() {
versions := make([]corev1alpha1.ExtensionVersionInfo, 0)
for _, version := range versionList.Items {
isValidVersion := len(isValidExtensionVersion(version.Spec.Version)) == 0
if version.DeletionTimestamp.IsZero() && isValidVersion {
versions = append(versions, corev1alpha1.ExtensionVersionInfo{
Version: versionList.Items[i].Spec.Version,
CreationTimestamp: versionList.Items[i].CreationTimestamp,
Version: version.Spec.Version,
CreationTimestamp: version.CreationTimestamp,
})
}
}
sort.Slice(versions, func(i, j int) bool {
return versions[i].Version < versions[j].Version
v1, _ := semver.NewVersion(versions[i].Version)
v2, _ := semver.NewVersion(versions[j].Version)
return v1.LessThan(v2)
})
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := r.Get(ctx, types.NamespacedName{Name: extension.Name}, extension); err != nil {
return err
return errors.Wrap(err, "failed to get extension")
}
expected := extension.DeepCopy()
if recommended, err := getRecommendedExtensionVersion(versionList.Items, r.k8sVersion); err == nil {
expected.Status.RecommendedVersion = recommended
} else {
r.logger.Error(err, "failed to get recommended extension version")
klog.FromContext(ctx).Error(err, "failed to get recommended extension version")
}
expected.Status.Versions = versions
if expected.Status.RecommendedVersion != extension.Status.RecommendedVersion ||
@@ -176,7 +181,7 @@ func (r *ExtensionReconciler) syncExtensionStatus(ctx context.Context, extension
})
if err != nil {
return fmt.Errorf("failed to update extension status: %s", err)
return errors.Wrap(err, "failed to update extension status")
}
return nil
}

View File

@@ -0,0 +1,152 @@
/*
* Please refer to the LICENSE file in the root directory of the project.
* https://github.com/kubesphere/kubesphere/blob/master/LICENSE
*/
package core
import (
"context"
"reflect"
"strings"
"github.com/Masterminds/semver/v3"
"github.com/go-logr/logr"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
corev1alpha1 "kubesphere.io/api/core/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
kscontroller "kubesphere.io/kubesphere/pkg/controller"
)
const (
extensionVersionController = "extensionVersion"
)
var _ kscontroller.Controller = &ExtensionVersionReconciler{}
var _ reconcile.Reconciler = &ExtensionVersionReconciler{}
func (r *ExtensionVersionReconciler) Name() string {
return extensionVersionController
}
func (r *ExtensionVersionReconciler) Enabled(clusterRole string) bool {
return strings.EqualFold(clusterRole, string(clusterv1alpha1.ClusterRoleHost))
}
type ExtensionVersionReconciler struct {
client.Client
logger logr.Logger
}
func (r *ExtensionVersionReconciler) SetupWithManager(mgr *kscontroller.Manager) error {
r.Client = mgr.GetClient()
r.logger = ctrl.Log.WithName("controllers").WithName(extensionVersionController)
return ctrl.NewControllerManagedBy(mgr).
Named(extensionVersionController).
For(&corev1alpha1.ExtensionVersion{}).
Complete(r)
}
func (r *ExtensionVersionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.logger.WithValues("extensionVersion", req.String())
logger.V(4).Info("reconciling extension version")
ctx = klog.NewContext(ctx, logger)
extensionVersion := &corev1alpha1.ExtensionVersion{}
if err := r.Client.Get(ctx, req.NamespacedName, extensionVersion); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if !extensionVersion.ObjectMeta.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}
if err := r.syncExtensionVersion(ctx, extensionVersion); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to sync extension version")
}
logger.V(4).Info("extension version successfully reconciled")
return ctrl.Result{}, nil
}
func (r *ExtensionVersionReconciler) syncExtensionVersion(ctx context.Context, extensionVersion *corev1alpha1.ExtensionVersion) error {
logger := klog.FromContext(ctx)
// automatically synchronized by the repository controller
if extensionVersion.Spec.Repository != "" {
return nil
}
if extensionVersion.Spec.Version != "" {
return nil
}
extensionVersionSpec, err := fetchExtensionVersionSpec(ctx, r.Client, extensionVersion)
if err != nil {
return errors.Wrap(err, "failed to fetch extension version spec")
}
if len(isValidExtensionName(extensionVersionSpec.Name)) > 0 {
logger.V(4).Info("invalid extension name found", "name", extensionVersionSpec.Name)
return nil
}
expected := extensionVersion.DeepCopy()
if expected.Labels == nil {
expected.Labels = map[string]string{}
}
expected.Labels[corev1alpha1.ExtensionReferenceLabel] = extensionVersionSpec.Name
expected.Labels[corev1alpha1.CategoryLabel] = extensionVersionSpec.Category
expected.Spec = extensionVersionSpec
if !reflect.DeepEqual(expected.Spec, extensionVersion.Spec) ||
!reflect.DeepEqual(expected.Labels, extensionVersion.Labels) {
if err := r.Update(ctx, expected); err != nil {
return errors.Wrap(err, "failed to update extension version")
}
logger.V(4).Info("extension version updated")
}
extension := &corev1alpha1.Extension{
ObjectMeta: metav1.ObjectMeta{Name: extensionVersionSpec.Name},
}
op, err := controllerutil.CreateOrUpdate(ctx, r.Client, extension, func() error {
if !needUpdate(extensionVersionSpec.Version, extension.Status.Versions) {
return nil
}
if extension.Labels == nil {
extension.Labels = make(map[string]string)
}
if extensionVersion.Spec.Category != "" {
extension.Labels[corev1alpha1.CategoryLabel] = extensionVersion.Spec.Category
}
extension.Spec.ExtensionInfo = expected.Spec.ExtensionInfo
return nil
})
if err != nil {
return errors.Wrapf(err, "failed to update extension: %v", err)
}
logger.V(4).Info("extension successfully updated", "operation", op, "name", extension.Name)
return nil
}
func needUpdate(version string, versions []corev1alpha1.ExtensionVersionInfo) bool {
v1, _ := semver.NewVersion(version)
for _, v := range versions {
v2, _ := semver.NewVersion(v.Version)
if v2.GreaterThan(v1) {
return false
}
}
return true
}

View File

@@ -9,8 +9,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"path"
"reflect"
"sort"
@@ -21,8 +19,6 @@ import (
"golang.org/x/exp/slices"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/getter"
"helm.sh/helm/v3/pkg/registry"
helmrelease "helm.sh/helm/v3/pkg/release"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
@@ -413,90 +409,25 @@ func latestJobCondition(job *batchv1.Job) batchv1.JobCondition {
return condition
}
func (r *InstallPlanReconciler) loadChartData(ctx context.Context) ([]byte, string, error) {
func (r *InstallPlanReconciler) loadChartDataAndCABundle(ctx context.Context) ([]byte, string, error) {
extensionVersion, ok := ctx.Value(contextKeyExtensionVersion{}).(*corev1alpha1.ExtensionVersion)
if !ok {
return nil, "", fmt.Errorf("failed to get extension version from context")
}
// load chart data from
if extensionVersion.Spec.ChartDataRef != nil {
configMap := &corev1.ConfigMap{}
if err := r.Get(ctx, types.NamespacedName{Namespace: extensionVersion.Spec.ChartDataRef.Namespace, Name: extensionVersion.Spec.ChartDataRef.Name}, configMap); err != nil {
return nil, "", err
}
data := configMap.BinaryData[extensionVersion.Spec.ChartDataRef.Key]
if data != nil {
return data, "", nil
}
return nil, "", fmt.Errorf("binary data not found")
}
chartURL, err := url.Parse(extensionVersion.Spec.ChartURL)
if err != nil {
return nil, "", fmt.Errorf("failed to parse chart url: %v", err)
}
var caBundle string
repo := &corev1alpha1.Repository{}
if extensionVersion.Spec.Repository != "" {
if err := r.Get(ctx, types.NamespacedName{Name: extensionVersion.Spec.Repository}, repo); err != nil {
return nil, "", fmt.Errorf("failed to get repository: %v", err)
}
caBundle = repo.Spec.CABundle
}
var chartGetter getter.Getter
switch chartURL.Scheme {
case registry.OCIScheme:
opts := make([]getter.Option, 0)
if extensionVersion.Spec.Repository != "" {
opts = append(opts, getter.WithInsecureSkipVerifyTLS(repo.Spec.Insecure))
}
if repo.Spec.BasicAuth != nil {
opts = append(opts, getter.WithBasicAuth(repo.Spec.BasicAuth.Username, repo.Spec.BasicAuth.Password))
}
chartGetter, err = getter.NewOCIGetter(opts...)
data, err := fetchChartData(ctx, r.Client, extensionVersion)
if err != nil {
return nil, "", fmt.Errorf("failed to create chart getter: %v", err)
}
case "http", "https":
opts := make([]getter.Option, 0)
if chartURL.Scheme == "https" && extensionVersion.Spec.Repository != "" {
opts = append(opts, getter.WithInsecureSkipVerifyTLS(repo.Spec.Insecure))
}
if repo.Spec.CABundle != "" {
caFile, err := storeCAFile(repo.Spec.CABundle, repo.Name)
if err != nil {
return nil, "", fmt.Errorf("failed to store CABundle to local file: %s", err)
}
opts = append(opts, getter.WithTLSClientConfig("", "", caFile))
}
if chartURL.Scheme == "https" {
opts = append(opts, getter.WithInsecureSkipVerifyTLS(repo.Spec.Insecure))
}
if repo.Spec.BasicAuth != nil {
opts = append(opts, getter.WithBasicAuth(repo.Spec.BasicAuth.Username, repo.Spec.BasicAuth.Password))
}
chartGetter, err = getter.NewHTTPGetter(opts...)
if err != nil {
return nil, "", fmt.Errorf("failed to create chart getter: %v", err)
}
default:
return nil, "", fmt.Errorf("unsupported scheme: %s", chartURL.Scheme)
return nil, "", fmt.Errorf("failed to load chart data: %v", err)
}
buffer, err := chartGetter.Get(chartURL.String())
if err != nil {
return nil, "", fmt.Errorf("failed to get chart data: %v", err)
}
data, err := io.ReadAll(buffer)
if err != nil {
return nil, "", fmt.Errorf("failed to read chart data: %v", err)
}
return data, caBundle, nil
return data, repo.Spec.CABundle, nil
}
func updateState(status *corev1alpha1.InstallationStatus, state string, time time.Time) bool {
@@ -1174,7 +1105,7 @@ func (r *InstallPlanReconciler) installOrUpgradeExtension(ctx context.Context, p
return r.updateInstallPlan(ctx, plan)
}
chartData, caBundle, err := r.loadChartData(ctx)
chartData, caBundle, err := r.loadChartDataAndCABundle(ctx)
if err != nil {
return onFailed(err)
}
@@ -1366,7 +1297,7 @@ func (r *InstallPlanReconciler) installOrUpgradeClusterAgent(ctx context.Context
return r.updateInstallPlan(ctx, plan)
}
chartData, caBundle, err := r.loadChartData(ctx)
chartData, caBundle, err := r.loadChartDataAndCABundle(ctx)
if err != nil {
return onFailed(fmt.Errorf("failed to load chart data: %v", err))
}

View File

@@ -6,28 +6,18 @@
package core
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"mime"
"net/http"
"net/url"
"path"
"strings"
"time"
"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/chart/loader"
appsv1 "k8s.io/api/apps/v1"
"github.com/pkg/errors"
helmrepo "helm.sh/helm/v3/pkg/repo"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
@@ -40,7 +30,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"kubesphere.io/kubesphere/pkg/constants"
kscontroller "kubesphere.io/kubesphere/pkg/controller"
)
@@ -48,14 +37,11 @@ const (
repositoryProtection = "kubesphere.io/repository-protection"
repositoryController = "repository"
minimumRegistryPollInterval = 15 * time.Minute
defaultRequeueInterval = 15 * time.Second
generateNameFormat = "repository-%s"
defaultRegistryPollTimeout = 2 * time.Minute
extensionFileName = "extension.yaml"
// caTemplate store repository.spec.caBound in local dir.
caTemplate = "{{ .TempDIR }}/repository/{{ .RepositoryName }}/ssl/ca.crt"
)
var extensionRepoConflict = fmt.Errorf("extension repo mismatch")
var extensionRepoConflict = errors.New("extension repo mismatch")
var _ kscontroller.Controller = &RepositoryReconciler{}
var _ reconcile.Reconciler = &RepositoryReconciler{}
@@ -86,7 +72,7 @@ func (r *RepositoryReconciler) SetupWithManager(mgr *kscontroller.Manager) error
func (r *RepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.logger.WithValues("repository", req.String())
logger.V(4).Info("sync repository")
logger.V(4).Info("reconciling extension repository")
ctx = klog.NewContext(ctx, logger)
repo := &corev1alpha1.Repository{}
@@ -103,6 +89,7 @@ func (r *RepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
controllerutil.AddFinalizer(expected, repositoryProtection)
return ctrl.Result{}, r.Patch(ctx, expected, client.MergeFrom(repo))
}
return r.reconcileRepository(ctx, repo)
}
@@ -124,10 +111,9 @@ func (r *RepositoryReconciler) createOrUpdateExtension(ctx context.Context, repo
op, err := controllerutil.CreateOrUpdate(ctx, r.Client, extension, func() error {
originRepoName := extension.Labels[corev1alpha1.RepositoryReferenceLabel]
if originRepoName != "" && originRepoName != repo.Name {
logger.Error(extensionRepoConflict, "conflict", "extension", extensionName, "want", originRepoName, "got", repo.Name)
logger.Error(extensionRepoConflict, "extension repo mismatch", "name", extension.Name, "origin", originRepoName, "current", repo.Name)
return extensionRepoConflict
}
if extension.Labels == nil {
extension.Labels = make(map[string]string)
}
@@ -137,13 +123,13 @@ func (r *RepositoryReconciler) createOrUpdateExtension(ctx context.Context, repo
extension.Labels[corev1alpha1.RepositoryReferenceLabel] = repo.Name
extension.Spec.ExtensionInfo = extensionVersion.Spec.ExtensionInfo
if err := controllerutil.SetOwnerReference(repo, extension, r.Scheme()); err != nil {
return err
return errors.Wrapf(err, "failed to set owner reference")
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to update extension: %s", err)
return nil, errors.Wrapf(err, "failed to update extension")
}
logger.V(4).Info("extension successfully updated", "operation", op, "name", extension.Name)
@@ -160,6 +146,12 @@ func (r *RepositoryReconciler) createOrUpdateExtensionVersion(ctx context.Contex
for k, v := range extensionVersion.Labels {
version.Labels[k] = v
}
if version.Annotations == nil {
version.Annotations = make(map[string]string)
}
for k, v := range extensionVersion.Annotations {
version.Annotations[k] = v
}
version.Spec = extensionVersion.Spec
if err := controllerutil.SetOwnerReference(extension, version, r.Scheme()); err != nil {
return err
@@ -168,90 +160,87 @@ func (r *RepositoryReconciler) createOrUpdateExtensionVersion(ctx context.Contex
})
if err != nil {
return fmt.Errorf("failed to update extension version: %s", err)
return errors.Wrapf(err, "failed to update extension version")
}
logger.V(4).Info("extension version successfully updated", "operation", op, "name", extensionVersion.Name)
return nil
}
func (r *RepositoryReconciler) syncExtensionsFromURL(ctx context.Context, repo *corev1alpha1.Repository, repoURL string) error {
func (r *RepositoryReconciler) syncExtensionsFromURL(ctx context.Context, repo *corev1alpha1.Repository, timeout time.Duration) error {
logger := klog.FromContext(ctx)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
cred, err := newHelmCred(repo)
cred, err := createHelmCredential(repo)
if err != nil {
return err
return errors.Wrapf(err, "failed to create helm credential")
}
index, err := helm.LoadRepoIndex(ctx, repoURL, cred)
repoURL, err := url.Parse(repo.Spec.URL)
if err != nil {
return err
return errors.Wrapf(err, "failed to parse repo URL")
}
index, err := helm.LoadRepoIndex(ctx, repo.Spec.URL, cred)
if err != nil {
return errors.Wrapf(err, "failed to load repo index")
}
for extensionName, versions := range index.Entries {
// check extensionName
if errs := validation.IsDNS1123Subdomain(extensionName); len(errs) > 0 {
if errs := isValidExtensionName(extensionName); len(errs) > 0 {
logger.Info("invalid extension name", "extension", extensionName, "error", errs)
continue
}
extensionVersions := make([]corev1alpha1.ExtensionVersion, 0, len(versions))
for _, version := range versions {
if version.Metadata == nil {
logger.Info("version metadata is empty", "repo", repo.Name)
continue
}
if version.Name != extensionName {
logger.Info("invalid extension version found", "want", extensionName, "got", version.Name)
logger.V(4).Info("extension name mismatch", "extension", extensionName, "version", version.Version)
continue
}
var chartURL string
if len(version.URLs) > 0 {
versionURL := version.URLs[0]
u, err := url.Parse(versionURL)
if err != nil {
logger.Error(err, "failed to parse chart URL", "url", versionURL)
chartURL := resolveChartURL(version, repoURL)
if chartURL == nil {
logger.V(4).Info("failed to resolve chart URL", "extension", extensionName, "version", version.Version)
continue
}
if u.Host == "" {
chartURL = fmt.Sprintf("%s/%s", repoURL, versionURL)
} else {
chartURL = u.String()
}
}
extensionVersionSpec, err := r.loadExtensionVersionSpecFrom(ctx, chartURL, repo, cred)
if err != nil {
return fmt.Errorf("failed to load extension version spec: %s", err)
}
if extensionVersionSpec == nil {
logger.V(4).Info("extension version spec not found: %s", chartURL)
continue
}
extensionVersionSpec.Created = metav1.NewTime(version.Created)
extensionVersionSpec.Digest = version.Digest
extensionVersionSpec.Repository = repo.Name
extensionVersionSpec.ChartDataRef = nil
extensionVersionSpec.ChartURL = chartURL
extensionVersion := corev1alpha1.ExtensionVersion{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", extensionName, extensionVersionSpec.Version),
Name: fmt.Sprintf("%s-%s", extensionName, version.Version),
Labels: map[string]string{
corev1alpha1.RepositoryReferenceLabel: repo.Name,
corev1alpha1.ExtensionReferenceLabel: extensionName,
},
Annotations: version.Metadata.Annotations,
},
Spec: *extensionVersionSpec,
Spec: corev1alpha1.ExtensionVersionSpec{
ChartURL: chartURL.String(),
Repository: repo.Name,
},
}
extensionVersionSpec, err := r.fetchExtensionVersionSpec(ctx, &extensionVersion)
if err != nil {
return errors.Wrapf(err, "failed to load extension version spec")
}
if extensionVersionSpec.Name != extensionName {
logger.V(4).Info("extension version name mismatch", "extension", extensionName, "version", version.Version)
continue
}
extensionVersionSpec.ChartURL = chartURL.String()
extensionVersionSpec.Created = metav1.NewTime(version.Created)
extensionVersionSpec.Digest = version.Digest
extensionVersionSpec.Repository = repo.Name
if extensionVersionSpec.Category != "" {
extensionVersion.Labels[corev1alpha1.CategoryLabel] = extensionVersionSpec.Category
}
extensionVersion.Spec = extensionVersionSpec
extensionVersions = append(extensionVersions, extensionVersion)
}
@@ -265,29 +254,30 @@ func (r *RepositoryReconciler) syncExtensionsFromURL(ctx context.Context, repo *
if errors.Is(err, extensionRepoConflict) {
continue
}
return fmt.Errorf("failed to create or update extension: %s", err)
return errors.Wrapf(err, "failed to create or update extension")
}
// create extensionVersions of filteredVersions
for _, extensionVersion := range filteredVersions {
if err := r.createOrUpdateExtensionVersion(ctx, extension, &extensionVersion); err != nil {
return fmt.Errorf("failed to create or update extension version: %s", err)
return errors.Wrapf(err, "failed to create or update extension version")
}
}
// remove extensionVersions of existVersions
if err := r.removeSuspendedExtensionVersion(ctx, repo, extension, extensionVersions); err != nil {
return fmt.Errorf("failed to remove suspended extension version: %s", err)
if err := r.removeSuspendedExtensionVersion(ctx, repo.Name, extension.Name, extensionVersions); err != nil {
return errors.Wrapf(err, "failed to remove suspended extension version")
}
}
extensions := &corev1alpha1.ExtensionList{}
if err := r.List(ctx, extensions, client.MatchingLabels{corev1alpha1.RepositoryReferenceLabel: repo.Name}); err != nil {
return fmt.Errorf("failed to list extensions: %s", err)
return errors.Wrapf(err, "failed to list extensions")
}
for _, extension := range extensions.Items {
if _, ok := index.Entries[extension.Name]; !ok {
if err := r.removeSuspendedExtensionVersion(ctx, repo, &extension, []corev1alpha1.ExtensionVersion{}); err != nil {
return fmt.Errorf("failed to remove suspended extension version: %s", err)
// remove all the extensionVersions if the extension is not in the index
if err := r.removeSuspendedExtensionVersion(ctx, repo.Name, extension.Name, []corev1alpha1.ExtensionVersion{}); err != nil {
return errors.Wrapf(err, "failed to remove suspended extension version")
}
}
}
@@ -295,228 +285,81 @@ func (r *RepositoryReconciler) syncExtensionsFromURL(ctx context.Context, repo *
return nil
}
func resolveChartURL(version *helmrepo.ChartVersion, repoURL *url.URL) *url.URL {
if len(version.URLs) == 0 {
return nil
}
chartURL, err := url.Parse(version.URLs[0])
if err != nil {
return nil
}
if chartURL.Host == "" {
chartURL.Scheme = repoURL.Scheme
chartURL.Host = repoURL.Host
}
return chartURL
}
func (r *RepositoryReconciler) reconcileRepository(ctx context.Context, repo *corev1alpha1.Repository) (ctrl.Result, error) {
registryPollInterval := minimumRegistryPollInterval
if repo.Spec.UpdateStrategy != nil && repo.Spec.UpdateStrategy.Interval.Duration > minimumRegistryPollInterval {
registryPollInterval = repo.Spec.UpdateStrategy.Interval.Duration
}
var repoURL string
// URL and Image are immutable after creation
if repo.Spec.URL != "" {
repoURL = repo.Spec.URL
} else if repo.Spec.Image != "" {
var deployment appsv1.Deployment
if err := r.Get(ctx, types.NamespacedName{Namespace: constants.KubeSphereNamespace, Name: fmt.Sprintf(generateNameFormat, repo.Name)}, &deployment); err != nil {
if apierrors.IsNotFound(err) {
if err := r.deployRepository(ctx, repo); err != nil {
r.recorder.Event(repo, corev1.EventTypeWarning, "RepositoryDeployFailed", err.Error())
return ctrl.Result{}, fmt.Errorf("failed to deploy repository: %s", err)
}
r.recorder.Event(repo, corev1.EventTypeNormal, "RepositoryDeployed", "")
return ctrl.Result{Requeue: true, RequeueAfter: defaultRequeueInterval}, nil
}
return ctrl.Result{}, fmt.Errorf("failed to fetch deployment: %s", err)
registryPollTimeout := defaultRegistryPollTimeout
if repo.Spec.UpdateStrategy != nil && repo.Spec.UpdateStrategy.Timeout.Duration > 0 {
registryPollTimeout = repo.Spec.UpdateStrategy.Timeout.Duration
}
restartAt, _ := time.Parse(time.RFC3339, deployment.Spec.Template.Annotations["kubesphere.io/restartedAt"])
if restartAt.IsZero() {
restartAt = deployment.ObjectMeta.CreationTimestamp.Time
}
// restart and pull the latest docker image
if time.Now().After(repo.Status.LastSyncTime.Add(registryPollInterval)) && time.Now().After(restartAt.Add(registryPollInterval)) {
rawData := []byte(fmt.Sprintf("{\"spec\":{\"template\":{\"metadata\":{\"annotations\":{\"kubesphere.io/restartedAt\":\"%s\"}}}}}", time.Now().Format(time.RFC3339)))
if err := r.Patch(ctx, &deployment, client.RawPatch(types.StrategicMergePatchType, rawData)); err != nil {
return ctrl.Result{}, err
}
r.recorder.Event(repo, corev1.EventTypeNormal, "RepositoryRestarted", "")
return ctrl.Result{Requeue: true, RequeueAfter: defaultRequeueInterval}, nil
repoURL := repo.Spec.URL
if repoURL == "" {
return ctrl.Result{}, nil
}
if deployment.Status.AvailableReplicas != deployment.Status.Replicas {
return ctrl.Result{Requeue: true, RequeueAfter: defaultRequeueInterval}, nil
}
// ready to sync
repoURL = fmt.Sprintf("http://%s.%s.svc", deployment.Name, constants.KubeSphereNamespace)
}
logger := klog.FromContext(ctx)
outOfSync := repo.Status.LastSyncTime == nil || time.Now().After(repo.Status.LastSyncTime.Add(registryPollInterval))
if repoURL != "" && outOfSync {
if err := r.syncExtensionsFromURL(ctx, repo, repoURL); err != nil {
if outOfSync {
if err := r.syncExtensionsFromURL(ctx, repo, registryPollTimeout); err != nil {
r.recorder.Eventf(repo, corev1.EventTypeWarning, kscontroller.SyncFailed, "failed to sync extensions from %s: %s", repoURL, err)
return ctrl.Result{}, fmt.Errorf("failed to sync extensions: %s", err)
return ctrl.Result{}, errors.Wrapf(err, "failed to sync extensions from %s", repoURL)
}
r.recorder.Eventf(repo, corev1.EventTypeNormal, kscontroller.Synced, "sync extensions from %s successfully", repoURL)
repo = repo.DeepCopy()
repo.Status.LastSyncTime = &metav1.Time{Time: time.Now()}
if err := r.Update(ctx, repo); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update repository: %s", err)
return ctrl.Result{}, errors.Wrapf(err, "failed to update repository status")
}
logger.V(4).Info("repository successfully synced", "name", repo.Name)
}
logger.V(4).Info("repository successfully reconciled", "name", repo.Name)
return ctrl.Result{Requeue: true, RequeueAfter: registryPollInterval}, nil
}
func (r *RepositoryReconciler) deployRepository(ctx context.Context, repo *corev1alpha1.Repository) error {
generateName := fmt.Sprintf(generateNameFormat, repo.Name)
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: generateName,
Namespace: constants.KubeSphereNamespace,
Labels: map[string]string{corev1alpha1.RepositoryReferenceLabel: repo.Name},
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{corev1alpha1.RepositoryReferenceLabel: repo.Name},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{corev1alpha1.RepositoryReferenceLabel: repo.Name},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "repository",
Image: repo.Spec.Image,
ImagePullPolicy: corev1.PullAlways,
Env: []corev1.EnvVar{
{
Name: "CHART_URL",
Value: fmt.Sprintf("http://%s.%s.svc", generateName, constants.KubeSphereNamespace),
},
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Port: intstr.FromInt32(8080),
},
},
PeriodSeconds: 10,
InitialDelaySeconds: 5,
},
},
},
},
},
},
}
if err := controllerutil.SetOwnerReference(repo, deployment, r.Scheme()); err != nil {
return err
}
if err := r.Create(ctx, deployment); err != nil {
return err
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: generateName,
Namespace: constants.KubeSphereNamespace,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Port: 80,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt32(8080),
},
},
Selector: map[string]string{
corev1alpha1.RepositoryReferenceLabel: repo.Name,
},
Type: corev1.ServiceTypeClusterIP,
},
}
if err := controllerutil.SetOwnerReference(repo, service, r.Scheme()); err != nil {
return err
}
if err := r.Create(ctx, service); err != nil {
return err
}
return nil
}
func (r *RepositoryReconciler) loadExtensionVersionSpecFrom(ctx context.Context, chartURL string, repo *corev1alpha1.Repository, cred helm.RepoCredential) (*corev1alpha1.ExtensionVersionSpec, error) {
logger := klog.FromContext(ctx)
var result *corev1alpha1.ExtensionVersionSpec
err := retry.OnError(retry.DefaultRetry, func(err error) bool {
func (r *RepositoryReconciler) fetchExtensionVersionSpec(ctx context.Context, extensionVersion *corev1alpha1.ExtensionVersion) (corev1alpha1.ExtensionVersionSpec, error) {
var extensionVersionSpec corev1alpha1.ExtensionVersionSpec
var err error
err = retry.OnError(retry.DefaultRetry, func(err error) bool {
return true
}, func() error {
data, err := helm.LoadData(ctx, chartURL, cred)
if err != nil {
return err
}
extensionVersionSpec, err = fetchExtensionVersionSpec(ctx, r.Client, extensionVersion)
files, err := loader.LoadArchiveFiles(data)
if err != nil {
return err
}
for _, file := range files {
if file.Name == extensionFileName {
extensionVersionSpec := &corev1alpha1.ExtensionVersionSpec{}
if err := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(file.Data), 1024).Decode(extensionVersionSpec); err != nil {
logger.V(4).Info("invalid extension version spec: %s", string(file.Data))
return nil
}
result = extensionVersionSpec
break
}
}
if result == nil {
logger.V(6).Info("extension.yaml not found", "chart", chartURL)
return nil
}
if strings.HasPrefix(result.Icon, "http://") ||
strings.HasPrefix(result.Icon, "https://") ||
strings.HasPrefix(result.Icon, "data:image") {
return nil
}
absPath := strings.TrimPrefix(result.Icon, "./")
var iconData []byte
for _, file := range files {
if file.Name == absPath {
iconData = file.Data
break
}
}
if iconData == nil {
logger.V(4).Info("invalid extension icon path: %s", absPath)
return nil
}
mimeType := mime.TypeByExtension(path.Ext(result.Icon))
if mimeType == "" {
mimeType = http.DetectContentType(iconData)
}
base64EncodedData := base64.StdEncoding.EncodeToString(iconData)
result.Icon = fmt.Sprintf("data:%s;base64,%s", mimeType, base64EncodedData)
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to fetch chart data from %s: %s", chartURL, err)
return extensionVersionSpec, errors.Wrapf(err, "failed to fetch extension version spec")
}
return result, nil
return extensionVersionSpec, nil
}
func (r *RepositoryReconciler) removeSuspendedExtensionVersion(ctx context.Context, repo *corev1alpha1.Repository, extension *corev1alpha1.Extension, versions []corev1alpha1.ExtensionVersion) error {
func (r *RepositoryReconciler) removeSuspendedExtensionVersion(ctx context.Context, repoName, extensionName string, versions []corev1alpha1.ExtensionVersion) error {
extensionVersions := &corev1alpha1.ExtensionVersionList{}
if err := r.List(ctx, extensionVersions, client.MatchingLabels{corev1alpha1.ExtensionReferenceLabel: extension.Name, corev1alpha1.RepositoryReferenceLabel: repo.Name}); err != nil {
return fmt.Errorf("failed to list extension versions: %s", err)
if err := r.List(ctx, extensionVersions, client.MatchingLabels{corev1alpha1.ExtensionReferenceLabel: extensionName, corev1alpha1.RepositoryReferenceLabel: repoName}); err != nil {
return errors.Wrapf(err, "failed to list extension versions")
}
for _, version := range extensionVersions.Items {
if checkIfSuspended(versions, version) {
@@ -525,7 +368,7 @@ func (r *RepositoryReconciler) removeSuspendedExtensionVersion(ctx context.Conte
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to delete extension version: %s", err)
return errors.Wrapf(err, "failed to delete extension version %s", version.Name)
}
}
}

View File

@@ -7,32 +7,46 @@ package core
import (
"bytes"
"context"
"encoding/base64"
goerrors "errors"
"fmt"
"io"
"os"
"path/filepath"
"mime"
"net"
"net/http"
"net/url"
"path"
"slices"
"sort"
"strings"
"text/template"
"time"
"github.com/Masterminds/semver/v3"
"github.com/pkg/errors"
yaml3 "gopkg.in/yaml.v3"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/getter"
"helm.sh/helm/v3/pkg/registry"
"helm.sh/helm/v3/pkg/storage/driver"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/klog/v2"
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
corev1alpha1 "kubesphere.io/api/core/v1alpha1"
"kubesphere.io/utils/helm"
"sigs.k8s.io/controller-runtime/pkg/client"
"kubesphere.io/kubesphere/pkg/utils/hashutil"
"kubesphere.io/kubesphere/pkg/version"
)
const ExtensionVersionMaxLength = validation.LabelValueMaxLength
const ExtensionNameMaxLength = validation.LabelValueMaxLength
func getRecommendedExtensionVersion(versions []corev1alpha1.ExtensionVersion, k8sVersion *semver.Version) (string, error) {
if len(versions) == 0 {
return "", nil
@@ -40,11 +54,10 @@ func getRecommendedExtensionVersion(versions []corev1alpha1.ExtensionVersion, k8
ksVersion, err := semver.NewVersion(version.Get().GitVersion)
if err != nil {
return "", fmt.Errorf("parse KubeSphere version failed: %v", err)
return "", errors.Wrapf(err, "failed to parse KS version: %s", version.Get().GitVersion)
}
var matchedVersions []*semver.Version
for _, v := range versions {
kubeVersionMatched, ksVersionMatched := matchVersionConstraints(v, k8sVersion, ksVersion)
if kubeVersionMatched && ksVersionMatched {
@@ -231,7 +244,7 @@ func usesPermissions(mainChart *chart.Chart) (rbacv1.ClusterRole, rbacv1.Role) {
continue
}
// break the loop in case of EOF
if goerrors.Is(err, io.EOF) {
if errors.Is(err, io.EOF) {
break
}
if err != nil {
@@ -286,18 +299,11 @@ func configChanged(sub *corev1alpha1.InstallPlan, cluster string) bool {
return newConfigHash != oldConfigHash
}
// newHelmCred from Repository
func newHelmCred(repo *corev1alpha1.Repository) (helm.RepoCredential, error) {
// createHelmCredential from Repository
func createHelmCredential(repo *corev1alpha1.Repository) (helm.RepoCredential, error) {
cred := helm.RepoCredential{
InsecureSkipTLSVerify: repo.Spec.Insecure,
}
if repo.Spec.CABundle != "" {
caFile, err := storeCAFile(repo.Spec.CABundle, repo.Name)
if err != nil {
return cred, err
}
cred.CAFile = caFile
}
if repo.Spec.BasicAuth != nil {
cred.Username = repo.Spec.BasicAuth.Username
cred.Password = repo.Spec.BasicAuth.Password
@@ -305,38 +311,191 @@ func newHelmCred(repo *corev1alpha1.Repository) (helm.RepoCredential, error) {
return cred, nil
}
// storeCAFile in local file from caTemplate.
func storeCAFile(caBundle string, repoName string) (string, error) {
var buff = &bytes.Buffer{}
tmpl, err := template.New("repositoryCABundle").Parse(caTemplate)
func fetchExtensionVersionSpec(ctx context.Context, client client.Reader, extensionVersion *corev1alpha1.ExtensionVersion) (corev1alpha1.ExtensionVersionSpec, error) {
extensionVersionSpec := extensionVersion.Spec
logger := klog.FromContext(ctx)
data, err := fetchChartData(ctx, client, extensionVersion)
if err != nil {
return "", err
return extensionVersionSpec, errors.Wrapf(err, "failed to fetch chart data")
}
if err := tmpl.Execute(buff, map[string]string{
"TempDIR": os.TempDir(),
"RepositoryName": repoName,
}); err != nil {
return "", err
}
caFile := buff.String()
if _, err := os.Stat(filepath.Dir(caFile)); err != nil {
if !os.IsNotExist(err) {
return "", err
}
if err := os.MkdirAll(filepath.Dir(caFile), os.ModePerm); err != nil {
return "", err
}
}
data, err := base64.StdEncoding.DecodeString(caBundle)
helmChart, err := loader.LoadArchive(bytes.NewReader(data))
if err != nil {
return "", err
return extensionVersionSpec, errors.Wrapf(err, "failed to load chart archive")
}
errs := isValidExtensionVersion(helmChart.Metadata.Version)
if len(errs) > 0 {
logger.V(4).Info("invalid extension version", "errors", errs)
return extensionVersionSpec, nil
}
for _, file := range helmChart.Files {
if file.Name == extensionFileName {
if err := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(file.Data), 1024).Decode(&extensionVersionSpec); err != nil {
logger.V(4).Info("failed to decode extension.yaml", "error", err)
return extensionVersionSpec, nil
}
break
}
}
extensionVersionSpec.Name = helmChart.Name()
absPath := strings.TrimPrefix(extensionVersionSpec.Icon, "./")
var iconData []byte
for _, file := range helmChart.Files {
if file.Name == absPath {
iconData = file.Data
break
}
}
if err := os.WriteFile(caFile, data, os.ModePerm); err != nil {
return "", err
if iconData != nil {
mimeType := mime.TypeByExtension(path.Ext(extensionVersionSpec.Icon))
if mimeType == "" {
mimeType = http.DetectContentType(iconData)
}
base64EncodedData := base64.StdEncoding.EncodeToString(iconData)
extensionVersionSpec.Icon = fmt.Sprintf("data:%s;base64,%s", mimeType, base64EncodedData)
}
return caFile, nil
return extensionVersionSpec, nil
}
func fetchChartData(ctx context.Context, client client.Reader, extensionVersion *corev1alpha1.ExtensionVersion) ([]byte, error) {
if extensionVersion.Spec.ChartDataRef != nil {
return fetchChartDataFromConfigMap(ctx, client, extensionVersion.Spec.ChartDataRef)
}
chartURL, err := url.Parse(extensionVersion.Spec.ChartURL)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse chart URL: %s", extensionVersion.Spec.ChartURL)
}
repo, err := fetchRepository(ctx, client, extensionVersion.Spec.Repository)
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch repository: %s", extensionVersion.Spec.Repository)
}
repoURL, err := url.Parse(repo.Spec.URL)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse repo URL: %s", extensionVersion.Spec.ChartURL)
}
if chartURL.Host == "" {
chartURL.Scheme = repoURL.Scheme
chartURL.Host = repoURL.Host
}
transport, err := createTransport(repo, chartURL.Hostname())
if err != nil {
return nil, errors.Wrapf(err, "failed to create transport")
}
opts := createGetterOptions(repo, transport)
chartGetter, err := createChartGetter(chartURL.Scheme, opts)
if err != nil {
return nil, errors.Wrapf(err, "failed to create chart getter")
}
return getChartData(chartGetter, chartURL.String())
}
func fetchChartDataFromConfigMap(ctx context.Context, client client.Reader, ref *corev1alpha1.ConfigMapKeyRef) ([]byte, error) {
configMap := &corev1.ConfigMap{}
if err := client.Get(ctx, types.NamespacedName{Namespace: ref.Namespace, Name: ref.Name}, configMap); err != nil {
return nil, errors.Wrapf(err, "failed to get config map: %s", ref.Name)
}
data := configMap.BinaryData[ref.Key]
if data != nil {
return data, nil
}
return nil, errors.New("chart data not found in config map")
}
func fetchRepository(ctx context.Context, client client.Reader, repoName string) (*corev1alpha1.Repository, error) {
if repoName == "" {
return &corev1alpha1.Repository{}, nil
}
repo := &corev1alpha1.Repository{}
if err := client.Get(ctx, types.NamespacedName{Name: repoName}, repo); err != nil {
return nil, errors.Wrapf(err, "failed to get repository: %s", repoName)
}
return repo, nil
}
func createTransport(repo *corev1alpha1.Repository, serverName string) (*http.Transport, error) {
tlsConf, err := helm.NewTLSConfig(repo.Spec.CABundle, repo.Spec.Insecure)
if err != nil {
return nil, errors.Wrapf(err, "failed to create tls config")
}
tlsConf.ServerName = serverName
return &http.Transport{
DisableCompression: true,
DialContext: (&net.Dialer{Timeout: 5 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsConf,
}, nil
}
func createGetterOptions(repo *corev1alpha1.Repository, transport *http.Transport) []getter.Option {
opts := []getter.Option{getter.WithTransport(transport)}
if repo.Spec.BasicAuth != nil {
opts = append(opts, getter.WithBasicAuth(repo.Spec.BasicAuth.Username, repo.Spec.BasicAuth.Password))
}
return opts
}
func createChartGetter(scheme string, opts []getter.Option) (getter.Getter, error) {
switch scheme {
case registry.OCIScheme:
return getter.NewOCIGetter(opts...)
case "http", "https":
return getter.NewHTTPGetter(opts...)
default:
return nil, errors.Errorf("unsupported scheme: %s", scheme)
}
}
func getChartData(chartGetter getter.Getter, url string) ([]byte, error) {
buffer, err := chartGetter.Get(url)
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch chart data: %s", url)
}
data, err := io.ReadAll(buffer)
if err != nil {
return nil, errors.Wrapf(err, "failed to read chart data: %s", url)
}
return data, nil
}
func isValidExtensionVersion(version string) []string {
var errs []string
if len(version) > ExtensionVersionMaxLength {
errs = append(errs, fmt.Sprintf("extension version length exceeds %d", ExtensionVersionMaxLength))
}
if _, err := semver.NewVersion(version); err != nil {
errs = append(errs, fmt.Sprintf("invalid semver format: %s", err))
}
if len(validation.IsDNS1123Subdomain(version)) > 0 {
errs = append(errs, "invalid DNS-1123 subdomain")
}
return errs
}
func isValidExtensionName(name string) []string {
var errs []string
if name == "" {
errs = append(errs, "extension name should not be empty")
}
if len(name) > ExtensionNameMaxLength {
errs = append(errs, fmt.Sprintf("extension name length exceeds %d", ExtensionNameMaxLength))
}
if len(validation.IsDNS1123Subdomain(name)) > 0 {
errs = append(errs, "invalid DNS-1123 subdomain")
}
return errs
}

View File

@@ -44,6 +44,7 @@ type ExtensionSpec struct {
// ExtensionVersionSpec contains the details of a specific version extension.
type ExtensionVersionSpec struct {
ExtensionInfo `json:",inline"`
Name string `json:"-"`
Version string `json:"version,omitempty"`
Keywords []string `json:"keywords,omitempty"`
Sources []string `json:"sources,omitempty"`

View File

@@ -6,6 +6,7 @@ import (
type UpdateStrategy struct {
RegistryPoll `json:"registryPoll,omitempty"`
Timeout metav1.Duration `json:"timeout"`
}
type RegistryPoll struct {
@@ -18,8 +19,6 @@ type BasicAuth struct {
}
type RepositorySpec struct {
// DEPRECATED: the field will remove in future versions, please use url.
Image string `json:"image,omitempty"`
URL string `json:"url,omitempty"`
Description string `json:"description,omitempty"`
BasicAuth *BasicAuth `json:"basicAuth,omitempty"`

View File

@@ -856,6 +856,7 @@ func (in *ServiceAccountList) DeepCopyObject() runtime.Object {
func (in *UpdateStrategy) DeepCopyInto(out *UpdateStrategy) {
*out = *in
out.RegistryPoll = in.RegistryPoll
out.Timeout = in.Timeout
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpdateStrategy.

View File

@@ -1,5 +1,4 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
// Code generated by controller-gen. DO NOT EDIT.

View File

@@ -6,6 +6,7 @@ go 1.22.11
require (
github.com/aws/aws-sdk-go v1.55.5
github.com/pkg/errors v0.9.1
helm.sh/helm/v3 v3.16.2
k8s.io/api v0.31.2
k8s.io/apimachinery v0.31.2
@@ -106,7 +107,6 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect

View File

@@ -4,10 +4,13 @@ import (
"bytes"
"context"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/pkg/errors"
"helm.sh/helm/v3/pkg/getter"
helmrepo "helm.sh/helm/v3/pkg/repo"
"kubesphere.io/utils/s3"
@@ -17,7 +20,6 @@ import (
const IndexYaml = "index.yaml"
func LoadRepoIndex(ctx context.Context, u string, cred RepoCredential) (*helmrepo.IndexFile, error) {
if !strings.HasSuffix(u, "/") {
u = fmt.Sprintf("%s/%s", u, IndexYaml)
} else {
@@ -26,12 +28,12 @@ func LoadRepoIndex(ctx context.Context, u string, cred RepoCredential) (*helmrep
resp, err := LoadData(ctx, u, cred)
if err != nil {
return nil, err
return nil, errors.Errorf("can't load data from %s: %v", u, err)
}
indexFile, err := loadIndex(resp.Bytes())
if err != nil {
return nil, err
return nil, errors.Errorf("can't load index file: %v", err)
}
return indexFile, nil
@@ -52,10 +54,10 @@ func loadIndex(data []byte) (*helmrepo.IndexFile, error) {
return i, nil
}
func LoadData(ctx context.Context, u string, cred RepoCredential) (*bytes.Buffer, error) {
func LoadData(_ context.Context, u string, cred RepoCredential) (*bytes.Buffer, error) {
parsedURL, err := url.Parse(u)
if err != nil {
return nil, err
return nil, errors.Errorf("can't parse url: %v", err)
}
var resp *bytes.Buffer
if strings.HasPrefix(u, "s3://") {
@@ -71,22 +73,38 @@ func LoadData(ctx context.Context, u string, cred RepoCredential) (*bytes.Buffer
})
if err != nil {
return nil, err
return nil, errors.Errorf("can't create s3 client: %v", err)
}
data, err := client.Read(p)
if err != nil {
return nil, err
return nil, errors.Errorf("can't read data from s3: %v", err)
}
resp = bytes.NewBuffer(data)
} else {
tlsConf, err := NewTLSConfig(cred.CABundle, cred.InsecureSkipTLSVerify)
if err != nil {
return nil, errors.Errorf("can't create tls config: %v", err)
}
tlsConf.ServerName = parsedURL.Hostname()
transport := &http.Transport{
DisableCompression: true,
DialContext: (&net.Dialer{Timeout: 5 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsConf,
}
// TODO add user-agent
g, _ := getter.NewHTTPGetter()
resp, err = g.Get(parsedURL.String(),
getter.WithTimeout(5*time.Minute),
getter.WithInsecureSkipVerifyTLS(cred.InsecureSkipTLSVerify),
getter.WithTLSClientConfig(cred.CertFile, cred.KeyFile, cred.CAFile),
getter.WithTransport(transport),
getter.WithBasicAuth(cred.Username, cred.Password),
)
if err != nil {
@@ -121,12 +139,8 @@ type RepoCredential struct {
Username string `json:"username,omitempty"`
// chart repository password
Password string `json:"password,omitempty"`
// identify HTTPS client using this SSL certificate file
CertFile string `json:"certFile,omitempty"`
// identify HTTPS client using this SSL key file
KeyFile string `json:"keyFile,omitempty"`
// verify certificates of HTTPS-enabled servers using this CA bundle
CAFile string `json:"caFile,omitempty"`
CABundle string `json:"caBundle,omitempty"`
// skip tls certificate checks for the repository, default is ture
InsecureSkipTLSVerify bool `json:"insecureSkipTLSVerify,omitempty"`

View File

@@ -0,0 +1,38 @@
package helm
import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"github.com/pkg/errors"
)
func NewTLSConfig(caBundle string, insecureSkipTLSVerify bool) (*tls.Config, error) {
config := tls.Config{
InsecureSkipVerify: insecureSkipTLSVerify,
}
if caBundle != "" {
caCerts, err := base64.StdEncoding.DecodeString(caBundle)
if err != nil {
return nil, fmt.Errorf("failed to decode caBundle: %v", err)
}
cp, err := certPoolFromCABundle(caCerts)
if err != nil {
return nil, err
}
config.RootCAs = cp
}
return &config, nil
}
func certPoolFromCABundle(caCerts []byte) (*x509.CertPool, error) {
cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(caCerts) {
return nil, errors.Errorf("failed to append certificates from caBundle")
}
return cp, nil
}