feat: Adapt to oci-based helmchart repo (#6200)

* add oci client for registry

* add LoadRepoIndexFormOci

* feat: Adapt to oci-based helmchart repo

* Update the golang base image version in the dockerfile

* update oci_test.go

Signed-off-by: lingbo <lingbo@lingbohome.com>

* fix: Update oci_test.go

Signed-off-by: 凌波 <lingbo@lingbohome.com>

* Update go imports

---------

Signed-off-by: lingbo <lingbo@lingbohome.com>
Signed-off-by: 凌波 <lingbo@lingbohome.com>
Co-authored-by: hongming <coder.scala@gmail.com>
This commit is contained in:
凌波
2024-09-25 09:51:16 +08:00
committed by GitHub
parent 86bac734f9
commit 9b2c123bbb
12 changed files with 727 additions and 3 deletions

View File

@@ -247,7 +247,7 @@ func repoParseRequest(cli client.Client, versions helmrepo.ChartVersions, helmRe
FromRepo: true,
}
url := ver.URLs[0]
methodList := []string{"https://", "http://", "s3://"}
methodList := []string{"https://", "http://", "s3://", "oci://"}
needContact := true
for _, method := range methodList {
if strings.HasPrefix(url, method) {

View File

@@ -29,6 +29,7 @@ import (
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/kube"
"helm.sh/helm/v3/pkg/registry"
helmrelease "helm.sh/helm/v3/pkg/release"
"helm.sh/helm/v3/pkg/storage"
"helm.sh/helm/v3/pkg/storage/driver"
@@ -340,6 +341,10 @@ func HelmPull(u string, cred appv2.RepoCredential) (*bytes.Buffer, error) {
}
func LoadRepoIndex(u string, cred appv2.RepoCredential) (idx helmrepo.IndexFile, err error) {
if registry.IsOCI(u) {
return LoadRepoIndexFromOci(u, cred)
}
if !strings.HasSuffix(u, "/") {
u = fmt.Sprintf("%s/index.yaml", u)
} else {

View File

@@ -0,0 +1,199 @@
package application
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"helm.sh/helm/v3/pkg/registry"
helmrepo "helm.sh/helm/v3/pkg/repo"
"k8s.io/klog/v2"
appv2 "kubesphere.io/api/application/v2"
"kubesphere.io/kubesphere/pkg/simple/client/oci"
)
func HelmPullFromOci(u string, cred appv2.RepoCredential) ([]byte, error) {
if !registry.IsOCI(u) {
return nil, fmt.Errorf("invalid oci URL format: %s", u)
}
_, err := url.Parse(u)
if err != nil {
klog.Errorf("invalid oci chart URL format: %s, err:%v", u, err)
return nil, err
}
client, err := newOCIRegistryClient(u, cred)
if err != nil {
return nil, err
}
pullRef := strings.TrimPrefix(u, fmt.Sprintf("%s://", registry.OCIScheme))
pullResult, err := client.Pull(pullRef)
if err != nil {
klog.Errorf("An error occurred to pull chart from repository: %s,err:%v", pullRef, err)
return nil, err
}
return pullResult.Chart.Data, nil
}
func LoadRepoIndexFromOci(u string, cred appv2.RepoCredential) (idx helmrepo.IndexFile, err error) {
if !registry.IsOCI(u) {
return idx, fmt.Errorf("invalid oci URL format: %s", u)
}
parsedURL, err := url.Parse(u)
if err != nil {
klog.Errorf("invalid repo URL format: %s, err:%v", u, err)
return idx, err
}
repoCharts, err := GetRepoChartsFromOci(parsedURL, cred)
if err != nil {
return idx, err
}
if len(repoCharts) == 0 {
return idx, nil
}
client, err := newOCIRegistryClient(u, cred)
if err != nil {
return idx, err
}
index := helmrepo.NewIndexFile()
for _, repoChart := range repoCharts {
tags, err := client.Tags(fmt.Sprintf("%s/%s", parsedURL.Host, repoChart))
if err != nil {
klog.Errorf("An error occurred to load tags from repository: %s/%s,err:%v", parsedURL.Host, repoChart, err)
continue
}
if len(tags) == 0 {
klog.Errorf("Unable to locate any tags in provided repository: %s/%s,err:%v", parsedURL.Host, repoChart, err)
continue
}
for _, tag := range tags {
pullRef := fmt.Sprintf("%s/%s:%s", parsedURL.Host, repoChart, tag)
pullResult, err := client.Pull(pullRef)
if err != nil {
klog.Errorf("An error occurred to pull chart from repository: %s,err:%v", pullRef, err)
continue
}
baseUrl := fmt.Sprintf("%s://%s", registry.OCIScheme, pullRef)
hash := strings.TrimPrefix(pullResult.Chart.Digest, "sha256:")
if err := index.MustAdd(pullResult.Chart.Meta, "", baseUrl, hash); err != nil {
klog.Errorf("failed adding chart metadata to index with repository: %s,err:%v", pullRef, err)
continue
}
}
}
index.SortEntries()
return *index, nil
}
func GetRepoChartsFromOci(parsedURL *url.URL, cred appv2.RepoCredential) ([]string, error) {
if parsedURL == nil {
return nil, errors.New("missing parsedURL")
}
skipTLS := true
if cred.InsecureSkipTLSVerify != nil && !*cred.InsecureSkipTLSVerify {
skipTLS = false
}
reg, err := oci.NewRegistry(parsedURL.Host,
oci.WithTimeout(5*time.Second),
oci.WithBasicAuth(cred.Username, cred.Password),
oci.WithInsecureSkipVerifyTLS(skipTLS))
if err != nil {
return nil, err
}
ctx := context.Background()
repoPath := strings.TrimSuffix(parsedURL.Path, "/")
repoPath = strings.TrimPrefix(repoPath, "/")
var repoCharts []string
err = reg.Repositories(ctx, "", func(repos []string) error {
cutPrefix := repoPath
if cutPrefix != "" {
cutPrefix = cutPrefix + "/"
}
for _, repo := range repos {
if subRepo, found := strings.CutPrefix(repo, cutPrefix); found && subRepo != "" {
if !strings.Contains(subRepo, "/") {
repoCharts = append(repoCharts, fmt.Sprintf("%s/%s", repoPath, subRepo))
}
}
}
return nil
})
if err != nil {
return nil, err
}
return repoCharts, nil
}
func newOCIRegistryClient(u string, cred appv2.RepoCredential) (*registry.Client, error) {
parsedURL, err := url.Parse(u)
if err != nil {
klog.Errorf("invalid oci repo URL format: %s, err:%v", u, err)
return nil, err
}
skipTLS := true
if cred.InsecureSkipTLSVerify != nil && !*cred.InsecureSkipTLSVerify {
skipTLS = false
}
reg, err := oci.NewRegistry(parsedURL.Host,
oci.WithTimeout(5*time.Second),
oci.WithBasicAuth(cred.Username, cred.Password),
oci.WithInsecureSkipVerifyTLS(skipTLS))
if err != nil {
return nil, err
}
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLS},
Proxy: http.ProxyFromEnvironment,
}
opts := []registry.ClientOption{registry.ClientOptHTTPClient(&http.Client{
Transport: transport,
Timeout: 5 * time.Second,
})}
if reg.PlainHTTP {
opts = append(opts, registry.ClientOptPlainHTTP())
}
client, err := registry.NewClient(opts...)
if err != nil {
return nil, err
}
if cred.Username != "" || cred.Password != "" {
err = client.Login(parsedURL.Host,
registry.LoginOptBasicAuth(cred.Username, cred.Password),
registry.LoginOptInsecure(reg.PlainHTTP))
if err != nil {
return nil, err
}
}
return client, nil
}

View File

@@ -0,0 +1,65 @@
package application
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
appv2 "kubesphere.io/api/application/v2"
)
func TestLoadRepoIndexFromOci(t *testing.T) {
testRepos := []string{"helmcharts/nginx", "helmcharts/test-api", "helmcharts/test-ui", "helmcharts/demo-app"}
testTags := []string{"1.0.0", "1.2.0", "1.0.3"}
testRepo := testRepos[1]
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet && (r.URL.Path == "/v2" || r.URL.Path == "/v2/") {
w.WriteHeader(http.StatusOK)
return
}
if r.Method == http.MethodGet && r.URL.Path == "/v2/_catalog" {
result := struct {
Repositories []string `json:"repositories"`
}{
Repositories: testRepos,
}
if err := json.NewEncoder(w).Encode(result); err != nil {
t.Errorf("failed to write response: %v", err)
}
return
}
if r.Method == http.MethodGet && r.URL.Path == fmt.Sprintf("/v2/%s/tags/list", testRepo) {
result := struct {
Tags []string `json:"tags"`
}{
Tags: testTags,
}
if err := json.NewEncoder(w).Encode(result); err != nil {
t.Errorf("failed to write response: %v", err)
}
return
}
t.Logf("unexpected access: %s %s", r.Method, r.URL)
w.WriteHeader(http.StatusNotFound)
}))
defer ts.Close()
uri, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("invalid test http server: %v", err)
}
url := fmt.Sprintf("oci://%s/helmcharts", uri.Host)
cred := appv2.RepoCredential{
Username: "",
Password: "",
}
index, err := LoadRepoIndexFromOci(url, cred)
if err != nil {
t.Errorf("LoadRepoIndexFromOci() error: %s", err)
}
t.Log(len(index.Entries))
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"helm.sh/helm/v3/pkg/registry"
"k8s.io/klog/v2"
"io"
@@ -145,6 +146,9 @@ func DownLoadChart(cli runtimeclient.Client, pullUrl, repoName string) (data []b
klog.Errorf("failed to get app repo, err: %v", err)
return data, err
}
if registry.IsOCI(pullUrl) {
return HelmPullFromOci(pullUrl, repo.Spec.Credential)
}
buf, err := HelmPull(pullUrl, repo.Spec.Credential)
if err != nil {
klog.Errorf("load chart failed, error: %s", err)

View File

@@ -0,0 +1,64 @@
package oci
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"unicode"
)
var maxErrorBytes int64 = 8 * 1024 // 8 KiB
// requestError contains a single error.
type requestError struct {
Code string `json:"code"`
Message string `json:"message"`
}
// Error returns a error string describing the error.
func (e requestError) Error() string {
code := strings.Map(func(r rune) rune {
if r == '_' {
return ' '
}
return unicode.ToLower(r)
}, e.Code)
if e.Message == "" {
return code
}
return fmt.Sprintf("%s: %s", code, e.Message)
}
// requestErrors is a bundle of requestError.
type requestErrors []requestError
// Error returns a error string describing the error.
func (errs requestErrors) Error() string {
switch len(errs) {
case 0:
return "<nil>"
case 1:
return errs[0].Error()
}
var errmsgs []string
for _, err := range errs {
errmsgs = append(errmsgs, err.Error())
}
return strings.Join(errmsgs, "; ")
}
func ParseErrorResponse(resp *http.Response) error {
var errmsg string
var body struct {
Errors requestErrors `json:"errors"`
}
lr := io.LimitReader(resp.Body, maxErrorBytes)
if err := json.NewDecoder(lr).Decode(&body); err == nil && len(body.Errors) > 0 {
errmsg = body.Errors.Error()
} else {
errmsg = http.StatusText(resp.StatusCode)
}
return fmt.Errorf("%s %q: unexpected status code %d: %s", resp.Request.Method, resp.Request.URL, resp.StatusCode, errmsg)
}

View File

@@ -0,0 +1,96 @@
package oci
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"oras.land/oras-go/pkg/registry"
)
func TestRegistry_Api(t *testing.T) {
testRepos := []string{"helmcharts/nginx", "helmcharts/test-api", "helmcharts/test-ui", "helmcharts/demo-app"}
testTags := []string{"1.0.0", "1.2.0", "1.0.3"}
testRepo := testRepos[1]
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet && (r.URL.Path == "/v2" || r.URL.Path == "/v2/") {
w.WriteHeader(http.StatusOK)
return
}
if r.Method == http.MethodGet && r.URL.Path == "/v2/_catalog" {
result := struct {
Repositories []string `json:"repositories"`
}{
Repositories: testRepos,
}
if err := json.NewEncoder(w).Encode(result); err != nil {
t.Errorf("failed to write response: %v", err)
}
return
}
if r.Method == http.MethodGet && r.URL.Path == fmt.Sprintf("/v2/%s/tags/list", testRepo) {
result := struct {
Tags []string `json:"tags"`
}{
Tags: testTags,
}
if err := json.NewEncoder(w).Encode(result); err != nil {
t.Errorf("failed to write response: %v", err)
}
return
}
t.Errorf("unexpected access: %s %s", r.Method, r.URL)
w.WriteHeader(http.StatusNotFound)
}))
defer ts.Close()
uri, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("invalid test http server: %v", err)
}
reg, err := NewRegistry(uri.Host,
WithTimeout(5*time.Second),
WithBasicAuth("", ""),
WithInsecureSkipVerifyTLS(true))
if err != nil {
t.Fatalf("NewRegistry() error = %v", err)
}
ctx := context.Background()
err = reg.Ping(ctx)
if err != nil {
t.Fatalf("Registry.Ping() error = %v", err)
}
var registryTags []string
repo, err := reg.Repository(ctx, testRepo)
if err != nil {
t.Fatalf("Registry.Repository() error = %v", err)
}
registryTags, err = registry.Tags(ctx, repo)
if err != nil {
t.Fatalf("Registry.Repository().Tags() error = %v", err)
}
t.Log(len(registryTags))
err = reg.Repositories(ctx, "", func(repos []string) error {
for _, repo := range repos {
if subRepo, found := strings.CutPrefix(repo, ""); found {
t.Log(subRepo)
}
}
return nil
})
if err != nil {
t.Fatalf("Registry.Repositories() error = %v", err)
}
}

View File

@@ -0,0 +1,221 @@
package oci
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/http"
"slices"
"strconv"
"time"
"oras.land/oras-go/pkg/registry"
"oras.land/oras-go/pkg/registry/remote"
"oras.land/oras-go/pkg/registry/remote/auth"
)
type RepositoryOptions remote.Repository
type RegistryOption func(*Registry)
// Registry is an HTTP client to a remote registry by oras-go 2.x.
// Registry with authentication requires an administrator account.
type Registry struct {
RepositoryOptions
RepositoryListPageSize int
username string
password string
timeout time.Duration
insecureSkipVerifyTLS bool
}
func NewRegistry(name string, options ...RegistryOption) (*Registry, error) {
ref := registry.Reference{
Registry: name,
}
if err := ref.ValidateRegistry(); err != nil {
return nil, err
}
reg := &Registry{RepositoryOptions: RepositoryOptions{
Reference: ref,
}}
for _, option := range options {
option(reg)
}
headers := http.Header{}
headers.Set("User-Agent", "kubesphere.io")
reg.Client = &auth.Client{
Client: &http.Client{
Timeout: reg.timeout,
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: reg.insecureSkipVerifyTLS}},
},
Header: headers,
Credential: func(_ context.Context, _ string) (auth.Credential, error) {
if reg.username == "" && reg.password == "" {
return auth.EmptyCredential, nil
}
return auth.Credential{
Username: reg.username,
Password: reg.password,
}, nil
},
}
_, err := reg.IsPlainHttp()
if err != nil {
return nil, err
}
return reg, nil
}
func WithBasicAuth(username, password string) RegistryOption {
return func(reg *Registry) {
reg.username = username
reg.password = password
}
}
func WithTimeout(timeout time.Duration) RegistryOption {
return func(reg *Registry) {
reg.timeout = timeout
}
}
func WithInsecureSkipVerifyTLS(insecureSkipVerifyTLS bool) RegistryOption {
return func(reg *Registry) {
reg.insecureSkipVerifyTLS = insecureSkipVerifyTLS
}
}
func (r *Registry) client() remote.Client {
if r.Client == nil {
return auth.DefaultClient
}
return r.Client
}
func (r *Registry) do(req *http.Request) (*http.Response, error) {
return r.client().Do(req)
}
func (r *Registry) IsPlainHttp() (bool, error) {
schemaProbeList := []bool{false, true}
var err error
for _, probe := range schemaProbeList {
r.PlainHTTP = probe
err = r.Ping(context.Background())
if err == nil {
return probe, nil
}
}
return r.PlainHTTP, err
}
func (r *Registry) Ping(ctx context.Context) error {
url := buildRegistryBaseURL(r.PlainHTTP, r.Reference)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return err
}
resp, err := r.do(req)
if err != nil {
return err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
return nil
case http.StatusNotFound:
return errors.New("not found")
default:
return ParseErrorResponse(resp)
}
}
func (r *Registry) Repositories(ctx context.Context, last string, fn func(repos []string) error) error {
url := buildRegistryCatalogURL(r.PlainHTTP, r.Reference)
var err error
for err == nil {
url, err = r.repositories(ctx, last, fn, url)
// clear `last` for subsequent pages
last = ""
}
if err != errNoLink {
return err
}
return nil
}
func (r *Registry) repositories(ctx context.Context, last string, fn func(repos []string) error, url string) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", err
}
if r.RepositoryListPageSize > 0 || last != "" {
q := req.URL.Query()
if r.RepositoryListPageSize > 0 {
q.Set("n", strconv.Itoa(r.RepositoryListPageSize))
}
if last != "" {
q.Set("last", last)
}
req.URL.RawQuery = q.Encode()
}
resp, err := r.do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", ParseErrorResponse(resp)
}
var page struct {
Repositories []string `json:"repositories"`
}
lr := limitReader(resp.Body, r.MaxMetadataBytes)
if err := json.NewDecoder(lr).Decode(&page); err != nil {
return "", fmt.Errorf("%s %q: failed to decode response: %w", resp.Request.Method, resp.Request.URL, err)
}
if err := fn(page.Repositories); err != nil {
return "", err
}
return parseLink(resp)
}
func (r *Registry) Repository(ctx context.Context, name string) (registry.Repository, error) {
ref := registry.Reference{
Registry: r.Reference.Registry,
Repository: name,
}
if err := ref.ValidateRepository(); err != nil {
return nil, err
}
repo := r.repository((*remote.Repository)(&r.RepositoryOptions))
repo.Reference = ref
return repo, nil
}
func (r *Registry) repository(repo *remote.Repository) *remote.Repository {
return &remote.Repository{
Client: repo.Client,
Reference: repo.Reference,
PlainHTTP: repo.PlainHTTP,
ManifestMediaTypes: slices.Clone(repo.ManifestMediaTypes),
TagListPageSize: repo.TagListPageSize,
ReferrerListPageSize: repo.ReferrerListPageSize,
MaxMetadataBytes: repo.MaxMetadataBytes,
}
}

View File

@@ -0,0 +1,22 @@
package oci
import (
"fmt"
"oras.land/oras-go/pkg/registry"
)
func buildScheme(plainHTTP bool) string {
if plainHTTP {
return "http"
}
return "https"
}
func buildRegistryBaseURL(plainHTTP bool, ref registry.Reference) string {
return fmt.Sprintf("%s://%s/v2/", buildScheme(plainHTTP), ref.Host())
}
func buildRegistryCatalogURL(plainHTTP bool, ref registry.Reference) string {
return fmt.Sprintf("%s://%s/v2/_catalog", buildScheme(plainHTTP), ref.Host())
}

View File

@@ -0,0 +1,48 @@
package oci
import (
"errors"
"fmt"
"io"
"net/http"
"strings"
)
// defaultMaxMetadataBytes specifies the default limit on how many response
// bytes are allowed in the server's response to the metadata APIs.
// See also: Repository.MaxMetadataBytes
var defaultMaxMetadataBytes int64 = 4 * 1024 * 1024 // 4 MiB
// errNoLink is returned by parseLink() when no Link header is present.
var errNoLink = errors.New("no Link header in response")
// parseLink returns the URL of the response's "Link" header, if present.
func parseLink(resp *http.Response) (string, error) {
link := resp.Header.Get("Link")
if link == "" {
return "", errNoLink
}
if link[0] != '<' {
return "", fmt.Errorf("invalid next link %q: missing '<'", link)
}
if i := strings.IndexByte(link, '>'); i == -1 {
return "", fmt.Errorf("invalid next link %q: missing '>'", link)
} else {
link = link[1:i]
}
linkURL, err := resp.Request.URL.Parse(link)
if err != nil {
return "", err
}
return linkURL.String(), nil
}
// limitReader returns a Reader that reads from r but stops with EOF after n
// bytes. If n is zero, defaultMaxMetadataBytes is used.
func limitReader(r io.Reader, n int64) io.Reader {
if n == 0 {
n = defaultMaxMetadataBytes
}
return io.LimitReader(r, n)
}