diff --git a/build/ks-apiserver/Dockerfile b/build/ks-apiserver/Dockerfile index 1dbc8a9c4..d0268b8f2 100644 --- a/build/ks-apiserver/Dockerfile +++ b/build/ks-apiserver/Dockerfile @@ -1,5 +1,5 @@ # Build -FROM golang:1.20.7 AS build_context +FROM golang:1.21.5 AS build_context ENV OUTDIR=/out RUN mkdir -p ${OUTDIR}/usr/local/bin/ diff --git a/build/ks-controller-manager/Dockerfile b/build/ks-controller-manager/Dockerfile index 941b9828b..108598aad 100644 --- a/build/ks-controller-manager/Dockerfile +++ b/build/ks-controller-manager/Dockerfile @@ -16,7 +16,7 @@ RUN curl -LO https://github.com/kubesphere/telemetry/releases/download/v1.0.0/te COPY config/ks-core ${OUTDIR}/var/helm-charts/ks-core # Build -FROM golang:1.20.7 AS build_context +FROM golang:1.21.5 AS build_context ENV OUTDIR=/out RUN mkdir -p ${OUTDIR}/usr/local/bin/ diff --git a/pkg/controller/application/helm_repo_controller.go b/pkg/controller/application/helm_repo_controller.go index df666406c..30f6edb67 100644 --- a/pkg/controller/application/helm_repo_controller.go +++ b/pkg/controller/application/helm_repo_controller.go @@ -247,7 +247,7 @@ func repoParseRequest(cli client.Client, versions helmrepo.ChartVersions, helmRe FromRepo: true, } url := ver.URLs[0] - methodList := []string{"https://", "http://", "s3://"} + methodList := []string{"https://", "http://", "s3://", "oci://"} needContact := true for _, method := range methodList { if strings.HasPrefix(url, method) { diff --git a/pkg/simple/client/application/helper.go b/pkg/simple/client/application/helper.go index c97e97db1..893a096f0 100644 --- a/pkg/simple/client/application/helper.go +++ b/pkg/simple/client/application/helper.go @@ -29,6 +29,7 @@ import ( "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/kube" + "helm.sh/helm/v3/pkg/registry" helmrelease "helm.sh/helm/v3/pkg/release" "helm.sh/helm/v3/pkg/storage" "helm.sh/helm/v3/pkg/storage/driver" @@ -340,6 +341,10 @@ func HelmPull(u string, cred appv2.RepoCredential) (*bytes.Buffer, error) { } func LoadRepoIndex(u string, cred appv2.RepoCredential) (idx helmrepo.IndexFile, err error) { + if registry.IsOCI(u) { + return LoadRepoIndexFromOci(u, cred) + } + if !strings.HasSuffix(u, "/") { u = fmt.Sprintf("%s/index.yaml", u) } else { diff --git a/pkg/simple/client/application/oci.go b/pkg/simple/client/application/oci.go new file mode 100644 index 000000000..55288b7b0 --- /dev/null +++ b/pkg/simple/client/application/oci.go @@ -0,0 +1,199 @@ +package application + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "helm.sh/helm/v3/pkg/registry" + helmrepo "helm.sh/helm/v3/pkg/repo" + "k8s.io/klog/v2" + appv2 "kubesphere.io/api/application/v2" + + "kubesphere.io/kubesphere/pkg/simple/client/oci" +) + +func HelmPullFromOci(u string, cred appv2.RepoCredential) ([]byte, error) { + if !registry.IsOCI(u) { + return nil, fmt.Errorf("invalid oci URL format: %s", u) + } + _, err := url.Parse(u) + if err != nil { + klog.Errorf("invalid oci chart URL format: %s, err:%v", u, err) + return nil, err + } + + client, err := newOCIRegistryClient(u, cred) + if err != nil { + return nil, err + } + + pullRef := strings.TrimPrefix(u, fmt.Sprintf("%s://", registry.OCIScheme)) + pullResult, err := client.Pull(pullRef) + if err != nil { + klog.Errorf("An error occurred to pull chart from repository: %s,err:%v", pullRef, err) + return nil, err + } + + return pullResult.Chart.Data, nil +} + +func LoadRepoIndexFromOci(u string, cred appv2.RepoCredential) (idx helmrepo.IndexFile, err error) { + if !registry.IsOCI(u) { + return idx, fmt.Errorf("invalid oci URL format: %s", u) + } + + parsedURL, err := url.Parse(u) + if err != nil { + klog.Errorf("invalid repo URL format: %s, err:%v", u, err) + return idx, err + } + + repoCharts, err := GetRepoChartsFromOci(parsedURL, cred) + if err != nil { + return idx, err + } + if len(repoCharts) == 0 { + return idx, nil + } + + client, err := newOCIRegistryClient(u, cred) + if err != nil { + return idx, err + } + + index := helmrepo.NewIndexFile() + for _, repoChart := range repoCharts { + tags, err := client.Tags(fmt.Sprintf("%s/%s", parsedURL.Host, repoChart)) + if err != nil { + klog.Errorf("An error occurred to load tags from repository: %s/%s,err:%v", parsedURL.Host, repoChart, err) + continue + } + if len(tags) == 0 { + klog.Errorf("Unable to locate any tags in provided repository: %s/%s,err:%v", parsedURL.Host, repoChart, err) + continue + } + + for _, tag := range tags { + pullRef := fmt.Sprintf("%s/%s:%s", parsedURL.Host, repoChart, tag) + pullResult, err := client.Pull(pullRef) + if err != nil { + klog.Errorf("An error occurred to pull chart from repository: %s,err:%v", pullRef, err) + continue + } + + baseUrl := fmt.Sprintf("%s://%s", registry.OCIScheme, pullRef) + hash := strings.TrimPrefix(pullResult.Chart.Digest, "sha256:") + if err := index.MustAdd(pullResult.Chart.Meta, "", baseUrl, hash); err != nil { + klog.Errorf("failed adding chart metadata to index with repository: %s,err:%v", pullRef, err) + continue + } + } + } + + index.SortEntries() + + return *index, nil +} + +func GetRepoChartsFromOci(parsedURL *url.URL, cred appv2.RepoCredential) ([]string, error) { + if parsedURL == nil { + return nil, errors.New("missing parsedURL") + } + + skipTLS := true + if cred.InsecureSkipTLSVerify != nil && !*cred.InsecureSkipTLSVerify { + skipTLS = false + } + + reg, err := oci.NewRegistry(parsedURL.Host, + oci.WithTimeout(5*time.Second), + oci.WithBasicAuth(cred.Username, cred.Password), + oci.WithInsecureSkipVerifyTLS(skipTLS)) + if err != nil { + return nil, err + } + + ctx := context.Background() + + repoPath := strings.TrimSuffix(parsedURL.Path, "/") + repoPath = strings.TrimPrefix(repoPath, "/") + var repoCharts []string + err = reg.Repositories(ctx, "", func(repos []string) error { + cutPrefix := repoPath + if cutPrefix != "" { + cutPrefix = cutPrefix + "/" + } + for _, repo := range repos { + if subRepo, found := strings.CutPrefix(repo, cutPrefix); found && subRepo != "" { + if !strings.Contains(subRepo, "/") { + repoCharts = append(repoCharts, fmt.Sprintf("%s/%s", repoPath, subRepo)) + } + } + } + return nil + }) + + if err != nil { + return nil, err + } + + return repoCharts, nil +} + +func newOCIRegistryClient(u string, cred appv2.RepoCredential) (*registry.Client, error) { + parsedURL, err := url.Parse(u) + if err != nil { + klog.Errorf("invalid oci repo URL format: %s, err:%v", u, err) + return nil, err + } + + skipTLS := true + if cred.InsecureSkipTLSVerify != nil && !*cred.InsecureSkipTLSVerify { + skipTLS = false + } + + reg, err := oci.NewRegistry(parsedURL.Host, + oci.WithTimeout(5*time.Second), + oci.WithBasicAuth(cred.Username, cred.Password), + oci.WithInsecureSkipVerifyTLS(skipTLS)) + if err != nil { + return nil, err + } + + transport := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLS}, + Proxy: http.ProxyFromEnvironment, + } + + opts := []registry.ClientOption{registry.ClientOptHTTPClient(&http.Client{ + Transport: transport, + Timeout: 5 * time.Second, + })} + + if reg.PlainHTTP { + opts = append(opts, registry.ClientOptPlainHTTP()) + } + + client, err := registry.NewClient(opts...) + + if err != nil { + return nil, err + } + + if cred.Username != "" || cred.Password != "" { + err = client.Login(parsedURL.Host, + registry.LoginOptBasicAuth(cred.Username, cred.Password), + registry.LoginOptInsecure(reg.PlainHTTP)) + + if err != nil { + return nil, err + } + } + return client, nil +} diff --git a/pkg/simple/client/application/oci_test.go b/pkg/simple/client/application/oci_test.go new file mode 100644 index 000000000..43bed63b9 --- /dev/null +++ b/pkg/simple/client/application/oci_test.go @@ -0,0 +1,65 @@ +package application + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + appv2 "kubesphere.io/api/application/v2" +) + +func TestLoadRepoIndexFromOci(t *testing.T) { + testRepos := []string{"helmcharts/nginx", "helmcharts/test-api", "helmcharts/test-ui", "helmcharts/demo-app"} + testTags := []string{"1.0.0", "1.2.0", "1.0.3"} + testRepo := testRepos[1] + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodGet && (r.URL.Path == "/v2" || r.URL.Path == "/v2/") { + w.WriteHeader(http.StatusOK) + return + } + if r.Method == http.MethodGet && r.URL.Path == "/v2/_catalog" { + result := struct { + Repositories []string `json:"repositories"` + }{ + Repositories: testRepos, + } + if err := json.NewEncoder(w).Encode(result); err != nil { + t.Errorf("failed to write response: %v", err) + } + return + } + if r.Method == http.MethodGet && r.URL.Path == fmt.Sprintf("/v2/%s/tags/list", testRepo) { + result := struct { + Tags []string `json:"tags"` + }{ + Tags: testTags, + } + if err := json.NewEncoder(w).Encode(result); err != nil { + t.Errorf("failed to write response: %v", err) + } + return + } + + t.Logf("unexpected access: %s %s", r.Method, r.URL) + w.WriteHeader(http.StatusNotFound) + })) + defer ts.Close() + uri, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("invalid test http server: %v", err) + } + + url := fmt.Sprintf("oci://%s/helmcharts", uri.Host) + cred := appv2.RepoCredential{ + Username: "", + Password: "", + } + index, err := LoadRepoIndexFromOci(url, cred) + if err != nil { + t.Errorf("LoadRepoIndexFromOci() error: %s", err) + } + t.Log(len(index.Entries)) +} diff --git a/pkg/simple/client/application/store.go b/pkg/simple/client/application/store.go index 5c6e8d461..c8f29feb6 100644 --- a/pkg/simple/client/application/store.go +++ b/pkg/simple/client/application/store.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "helm.sh/helm/v3/pkg/registry" "k8s.io/klog/v2" "io" @@ -145,6 +146,9 @@ func DownLoadChart(cli runtimeclient.Client, pullUrl, repoName string) (data []b klog.Errorf("failed to get app repo, err: %v", err) return data, err } + if registry.IsOCI(pullUrl) { + return HelmPullFromOci(pullUrl, repo.Spec.Credential) + } buf, err := HelmPull(pullUrl, repo.Spec.Credential) if err != nil { klog.Errorf("load chart failed, error: %s", err) diff --git a/pkg/simple/client/oci/errors.go b/pkg/simple/client/oci/errors.go new file mode 100644 index 000000000..f3cac0f19 --- /dev/null +++ b/pkg/simple/client/oci/errors.go @@ -0,0 +1,64 @@ +package oci + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "unicode" +) + +var maxErrorBytes int64 = 8 * 1024 // 8 KiB + +// requestError contains a single error. +type requestError struct { + Code string `json:"code"` + Message string `json:"message"` +} + +// Error returns a error string describing the error. +func (e requestError) Error() string { + code := strings.Map(func(r rune) rune { + if r == '_' { + return ' ' + } + return unicode.ToLower(r) + }, e.Code) + if e.Message == "" { + return code + } + return fmt.Sprintf("%s: %s", code, e.Message) +} + +// requestErrors is a bundle of requestError. +type requestErrors []requestError + +// Error returns a error string describing the error. +func (errs requestErrors) Error() string { + switch len(errs) { + case 0: + return "" + case 1: + return errs[0].Error() + } + var errmsgs []string + for _, err := range errs { + errmsgs = append(errmsgs, err.Error()) + } + return strings.Join(errmsgs, "; ") +} + +func ParseErrorResponse(resp *http.Response) error { + var errmsg string + var body struct { + Errors requestErrors `json:"errors"` + } + lr := io.LimitReader(resp.Body, maxErrorBytes) + if err := json.NewDecoder(lr).Decode(&body); err == nil && len(body.Errors) > 0 { + errmsg = body.Errors.Error() + } else { + errmsg = http.StatusText(resp.StatusCode) + } + return fmt.Errorf("%s %q: unexpected status code %d: %s", resp.Request.Method, resp.Request.URL, resp.StatusCode, errmsg) +} diff --git a/pkg/simple/client/oci/oci_test.go b/pkg/simple/client/oci/oci_test.go new file mode 100644 index 000000000..b70117af5 --- /dev/null +++ b/pkg/simple/client/oci/oci_test.go @@ -0,0 +1,96 @@ +package oci + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + "oras.land/oras-go/pkg/registry" +) + +func TestRegistry_Api(t *testing.T) { + testRepos := []string{"helmcharts/nginx", "helmcharts/test-api", "helmcharts/test-ui", "helmcharts/demo-app"} + testTags := []string{"1.0.0", "1.2.0", "1.0.3"} + testRepo := testRepos[1] + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodGet && (r.URL.Path == "/v2" || r.URL.Path == "/v2/") { + w.WriteHeader(http.StatusOK) + return + } + if r.Method == http.MethodGet && r.URL.Path == "/v2/_catalog" { + result := struct { + Repositories []string `json:"repositories"` + }{ + Repositories: testRepos, + } + if err := json.NewEncoder(w).Encode(result); err != nil { + t.Errorf("failed to write response: %v", err) + } + return + } + if r.Method == http.MethodGet && r.URL.Path == fmt.Sprintf("/v2/%s/tags/list", testRepo) { + result := struct { + Tags []string `json:"tags"` + }{ + Tags: testTags, + } + if err := json.NewEncoder(w).Encode(result); err != nil { + t.Errorf("failed to write response: %v", err) + } + return + } + + t.Errorf("unexpected access: %s %s", r.Method, r.URL) + w.WriteHeader(http.StatusNotFound) + })) + defer ts.Close() + uri, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("invalid test http server: %v", err) + } + + reg, err := NewRegistry(uri.Host, + WithTimeout(5*time.Second), + WithBasicAuth("", ""), + WithInsecureSkipVerifyTLS(true)) + if err != nil { + t.Fatalf("NewRegistry() error = %v", err) + } + + ctx := context.Background() + err = reg.Ping(ctx) + if err != nil { + t.Fatalf("Registry.Ping() error = %v", err) + } + + var registryTags []string + + repo, err := reg.Repository(ctx, testRepo) + if err != nil { + t.Fatalf("Registry.Repository() error = %v", err) + } + registryTags, err = registry.Tags(ctx, repo) + if err != nil { + t.Fatalf("Registry.Repository().Tags() error = %v", err) + } + t.Log(len(registryTags)) + + err = reg.Repositories(ctx, "", func(repos []string) error { + for _, repo := range repos { + if subRepo, found := strings.CutPrefix(repo, ""); found { + t.Log(subRepo) + } + } + return nil + }) + if err != nil { + t.Fatalf("Registry.Repositories() error = %v", err) + } + +} diff --git a/pkg/simple/client/oci/registry.go b/pkg/simple/client/oci/registry.go new file mode 100644 index 000000000..3d61715e0 --- /dev/null +++ b/pkg/simple/client/oci/registry.go @@ -0,0 +1,221 @@ +package oci + +import ( + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "net/http" + "slices" + "strconv" + "time" + + "oras.land/oras-go/pkg/registry" + "oras.land/oras-go/pkg/registry/remote" + "oras.land/oras-go/pkg/registry/remote/auth" +) + +type RepositoryOptions remote.Repository +type RegistryOption func(*Registry) + +// Registry is an HTTP client to a remote registry by oras-go 2.x. +// Registry with authentication requires an administrator account. +type Registry struct { + RepositoryOptions + + RepositoryListPageSize int + + username string + password string + timeout time.Duration + insecureSkipVerifyTLS bool +} + +func NewRegistry(name string, options ...RegistryOption) (*Registry, error) { + ref := registry.Reference{ + Registry: name, + } + if err := ref.ValidateRegistry(); err != nil { + return nil, err + } + + reg := &Registry{RepositoryOptions: RepositoryOptions{ + Reference: ref, + }} + for _, option := range options { + option(reg) + } + + headers := http.Header{} + headers.Set("User-Agent", "kubesphere.io") + reg.Client = &auth.Client{ + Client: &http.Client{ + Timeout: reg.timeout, + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: reg.insecureSkipVerifyTLS}}, + }, + Header: headers, + Credential: func(_ context.Context, _ string) (auth.Credential, error) { + if reg.username == "" && reg.password == "" { + return auth.EmptyCredential, nil + } + + return auth.Credential{ + Username: reg.username, + Password: reg.password, + }, nil + }, + } + + _, err := reg.IsPlainHttp() + if err != nil { + return nil, err + } + + return reg, nil +} + +func WithBasicAuth(username, password string) RegistryOption { + return func(reg *Registry) { + reg.username = username + reg.password = password + } +} + +func WithTimeout(timeout time.Duration) RegistryOption { + return func(reg *Registry) { + reg.timeout = timeout + } +} + +func WithInsecureSkipVerifyTLS(insecureSkipVerifyTLS bool) RegistryOption { + return func(reg *Registry) { + reg.insecureSkipVerifyTLS = insecureSkipVerifyTLS + } +} + +func (r *Registry) client() remote.Client { + if r.Client == nil { + return auth.DefaultClient + } + return r.Client +} + +func (r *Registry) do(req *http.Request) (*http.Response, error) { + return r.client().Do(req) +} + +func (r *Registry) IsPlainHttp() (bool, error) { + schemaProbeList := []bool{false, true} + + var err error + for _, probe := range schemaProbeList { + r.PlainHTTP = probe + err = r.Ping(context.Background()) + if err == nil { + return probe, nil + } + } + + return r.PlainHTTP, err +} + +func (r *Registry) Ping(ctx context.Context) error { + url := buildRegistryBaseURL(r.PlainHTTP, r.Reference) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return err + } + + resp, err := r.do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK: + return nil + case http.StatusNotFound: + return errors.New("not found") + default: + return ParseErrorResponse(resp) + } +} + +func (r *Registry) Repositories(ctx context.Context, last string, fn func(repos []string) error) error { + url := buildRegistryCatalogURL(r.PlainHTTP, r.Reference) + var err error + for err == nil { + url, err = r.repositories(ctx, last, fn, url) + // clear `last` for subsequent pages + last = "" + } + if err != errNoLink { + return err + } + return nil +} + +func (r *Registry) repositories(ctx context.Context, last string, fn func(repos []string) error, url string) (string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return "", err + } + if r.RepositoryListPageSize > 0 || last != "" { + q := req.URL.Query() + if r.RepositoryListPageSize > 0 { + q.Set("n", strconv.Itoa(r.RepositoryListPageSize)) + } + if last != "" { + q.Set("last", last) + } + req.URL.RawQuery = q.Encode() + } + resp, err := r.do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", ParseErrorResponse(resp) + } + var page struct { + Repositories []string `json:"repositories"` + } + lr := limitReader(resp.Body, r.MaxMetadataBytes) + if err := json.NewDecoder(lr).Decode(&page); err != nil { + return "", fmt.Errorf("%s %q: failed to decode response: %w", resp.Request.Method, resp.Request.URL, err) + } + if err := fn(page.Repositories); err != nil { + return "", err + } + + return parseLink(resp) +} + +func (r *Registry) Repository(ctx context.Context, name string) (registry.Repository, error) { + ref := registry.Reference{ + Registry: r.Reference.Registry, + Repository: name, + } + if err := ref.ValidateRepository(); err != nil { + return nil, err + } + repo := r.repository((*remote.Repository)(&r.RepositoryOptions)) + repo.Reference = ref + return repo, nil +} + +func (r *Registry) repository(repo *remote.Repository) *remote.Repository { + return &remote.Repository{ + Client: repo.Client, + Reference: repo.Reference, + PlainHTTP: repo.PlainHTTP, + ManifestMediaTypes: slices.Clone(repo.ManifestMediaTypes), + TagListPageSize: repo.TagListPageSize, + ReferrerListPageSize: repo.ReferrerListPageSize, + MaxMetadataBytes: repo.MaxMetadataBytes, + } +} diff --git a/pkg/simple/client/oci/url.go b/pkg/simple/client/oci/url.go new file mode 100644 index 000000000..0b7225149 --- /dev/null +++ b/pkg/simple/client/oci/url.go @@ -0,0 +1,22 @@ +package oci + +import ( + "fmt" + + "oras.land/oras-go/pkg/registry" +) + +func buildScheme(plainHTTP bool) string { + if plainHTTP { + return "http" + } + return "https" +} + +func buildRegistryBaseURL(plainHTTP bool, ref registry.Reference) string { + return fmt.Sprintf("%s://%s/v2/", buildScheme(plainHTTP), ref.Host()) +} + +func buildRegistryCatalogURL(plainHTTP bool, ref registry.Reference) string { + return fmt.Sprintf("%s://%s/v2/_catalog", buildScheme(plainHTTP), ref.Host()) +} diff --git a/pkg/simple/client/oci/utils.go b/pkg/simple/client/oci/utils.go new file mode 100644 index 000000000..1b39e2253 --- /dev/null +++ b/pkg/simple/client/oci/utils.go @@ -0,0 +1,48 @@ +package oci + +import ( + "errors" + "fmt" + "io" + "net/http" + "strings" +) + +// defaultMaxMetadataBytes specifies the default limit on how many response +// bytes are allowed in the server's response to the metadata APIs. +// See also: Repository.MaxMetadataBytes +var defaultMaxMetadataBytes int64 = 4 * 1024 * 1024 // 4 MiB + +// errNoLink is returned by parseLink() when no Link header is present. +var errNoLink = errors.New("no Link header in response") + +// parseLink returns the URL of the response's "Link" header, if present. +func parseLink(resp *http.Response) (string, error) { + link := resp.Header.Get("Link") + if link == "" { + return "", errNoLink + } + if link[0] != '<' { + return "", fmt.Errorf("invalid next link %q: missing '<'", link) + } + if i := strings.IndexByte(link, '>'); i == -1 { + return "", fmt.Errorf("invalid next link %q: missing '>'", link) + } else { + link = link[1:i] + } + + linkURL, err := resp.Request.URL.Parse(link) + if err != nil { + return "", err + } + return linkURL.String(), nil +} + +// limitReader returns a Reader that reads from r but stops with EOF after n +// bytes. If n is zero, defaultMaxMetadataBytes is used. +func limitReader(r io.Reader, n int64) io.Reader { + if n == 0 { + n = defaultMaxMetadataBytes + } + return io.LimitReader(r, n) +}