use istio client-go library instead of knative (#1661)

use istio client-go library instead of knative
bump kubernetes dependency version
change code coverage to codecov
This commit is contained in:
zryfish
2019-12-13 11:26:18 +08:00
committed by GitHub
parent f249a6e081
commit ea88c8803d
2071 changed files with 354531 additions and 108336 deletions

View File

@@ -17,13 +17,16 @@ limitations under the License.
package handlers
import (
"bytes"
"context"
"fmt"
"net/http"
"strings"
"time"
"unicode"
"unicode/utf8"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
@@ -37,13 +40,13 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utiltrace "k8s.io/apiserver/pkg/util/trace"
utiltrace "k8s.io/utils/trace"
)
func createHandler(r rest.NamedCreater, scope RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// For performance tracking purposes.
trace := utiltrace.New("Create " + req.URL.Path)
trace := utiltrace.New("Create", utiltrace.Field{"url", req.URL.Path})
defer trace.LogIfLong(500 * time.Millisecond)
if isDryRun(req.URL) && !utilfeature.DefaultFeatureGate.Enabled(features.DryRun) {
@@ -54,29 +57,38 @@ func createHandler(r rest.NamedCreater, scope RequestScope, admit admission.Inte
// TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
timeout := parseTimeout(req.URL.Query().Get("timeout"))
var (
namespace, name string
err error
)
if includeName {
namespace, name, err = scope.Namer.Name(req)
} else {
namespace, name, err := scope.Namer.Name(req)
if err != nil {
if includeName {
// name was required, return
scope.err(err, w, req)
return
}
// otherwise attempt to look up the namespace
namespace, err = scope.Namer.Namespace(req)
if err != nil {
scope.err(err, w, req)
return
}
}
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
gv := scope.Kind.GroupVersion()
s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
if err != nil {
scope.err(err, w, req)
return
}
decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)
body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
@@ -97,6 +109,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, admit admission.Inte
scope.err(err, w, req)
return
}
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions"))
defaultGVK := scope.Kind
original := r.New()
@@ -119,22 +132,41 @@ func createHandler(r rest.NamedCreater, scope RequestScope, admit admission.Inte
audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)
userInfo, _ := request.UserFrom(ctx)
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, dryrun.IsDryRun(options.DryRun), userInfo)
// On create, get name from new object if unset
if len(name) == 0 {
_, name, _ = scope.Namer.ObjectName(obj)
}
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
err = mutatingAdmission.Admit(admissionAttributes)
err = mutatingAdmission.Admit(ctx, admissionAttributes, scope)
if err != nil {
scope.err(err, w, req)
return
}
}
if scope.FieldManager != nil {
liveObj, err := scope.Creater.New(scope.Kind)
if err != nil {
scope.err(fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err), w, req)
return
}
obj, err = scope.FieldManager.Update(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
if err != nil {
scope.err(fmt.Errorf("failed to update object (Create for %v) managed fields: %v", scope.Kind, err), w, req)
return
}
}
trace.Step("About to store object in database")
result, err := finishRequest(timeout, func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes),
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
})
@@ -144,41 +176,23 @@ func createHandler(r rest.NamedCreater, scope RequestScope, admit admission.Inte
}
trace.Step("Object stored in database")
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
scope.err(fmt.Errorf("missing requestInfo"), w, req)
return
}
if err := setSelfLink(result, requestInfo, scope.Namer); err != nil {
scope.err(err, w, req)
return
}
trace.Step("Self-link added")
// If the object is partially initialized, always indicate it via StatusAccepted
code := http.StatusCreated
if accessor, err := meta.Accessor(result); err == nil {
if accessor.GetInitializers() != nil {
code = http.StatusAccepted
}
}
status, ok := result.(*metav1.Status)
if ok && err == nil && status.Code == 0 {
status.Code = int32(code)
}
scope.Trace = trace
transformResponseObject(ctx, scope, req, w, code, result)
transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
}
}
// CreateNamedResource returns a function that will handle a resource creation with name.
func CreateNamedResource(r rest.NamedCreater, scope RequestScope, admission admission.Interface) http.HandlerFunc {
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}
// CreateResource returns a function that will handle a resource creation.
func CreateResource(r rest.Creater, scope RequestScope, admission admission.Interface) http.HandlerFunc {
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}
@@ -189,3 +203,32 @@ type namedCreaterAdapter struct {
func (c *namedCreaterAdapter) Create(ctx context.Context, name string, obj runtime.Object, createValidatingAdmission rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
return c.Creater.Create(ctx, obj, createValidatingAdmission, options)
}
// manager is assumed to be already a valid value, we need to make
// userAgent into a valid value too.
func managerOrUserAgent(manager, userAgent string) string {
if manager != "" {
return manager
}
return prefixFromUserAgent(userAgent)
}
// prefixFromUserAgent takes the characters preceding the first /, quote
// unprintable character and then trim what's beyond the
// FieldManagerMaxLength limit.
func prefixFromUserAgent(u string) string {
m := strings.Split(u, "/")[0]
buf := bytes.NewBuffer(nil)
for _, r := range m {
// Ignore non-printable characters
if !unicode.IsPrint(r) {
continue
}
// Only append if we have room for it
if buf.Len()+utf8.RuneLen(r) > validation.FieldManagerMaxLength {
break
}
buf.WriteRune(r)
}
return buf.String()
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package handlers
import (
"context"
"fmt"
"net/http"
"time"
@@ -35,15 +36,15 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utiltrace "k8s.io/apiserver/pkg/util/trace"
utiltrace "k8s.io/utils/trace"
)
// DeleteResource returns a function that will handle a resource deletion
// TODO admission here becomes solely validating admission
func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestScope, admit admission.Interface) http.HandlerFunc {
func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// For performance tracking purposes.
trace := utiltrace.New("Delete " + req.URL.Path)
trace := utiltrace.New("Delete", utiltrace.Field{"url", req.URL.Path})
defer trace.LogIfLong(500 * time.Millisecond)
if isDryRun(req.URL) && !utilfeature.DefaultFeatureGate.Enabled(features.DryRun) {
@@ -59,11 +60,18 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco
scope.err(err, w, req)
return
}
ctx := req.Context()
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
ae := request.AuditEventFrom(ctx)
admit = admission.WithAudit(admit, ae)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
options := &metav1.DeleteOptions{}
if allowsOptions {
body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
@@ -107,29 +115,14 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco
scope.err(err, w, req)
return
}
trace.Step("About to check admission control")
if admit != nil && admit.Handles(admission.Delete) {
userInfo, _ := request.UserFrom(ctx)
attrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, dryrun.IsDryRun(options.DryRun), userInfo)
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok {
if err := mutatingAdmission.Admit(attrs); err != nil {
scope.err(err, w, req)
return
}
}
if validatingAdmission, ok := admit.(admission.ValidationInterface); ok {
if err := validatingAdmission.Validate(attrs); err != nil {
scope.err(err, w, req)
return
}
}
}
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("DeleteOptions"))
trace.Step("About to delete object from database")
wasDeleted := true
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo)
result, err := finishRequest(timeout, func() (runtime.Object, error) {
obj, deleted, err := r.Delete(ctx, name, options)
obj, deleted, err := r.Delete(ctx, name, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options)
wasDeleted = deleted
return obj, err
})
@@ -160,30 +153,16 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco
Kind: scope.Kind.Kind,
},
}
} else {
// when a non-status response is returned, set the self link
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
scope.err(fmt.Errorf("missing requestInfo"), w, req)
return
}
if _, ok := result.(*metav1.Status); !ok {
if err := setSelfLink(result, requestInfo, scope.Namer); err != nil {
scope.err(err, w, req)
return
}
}
}
scope.Trace = trace
transformResponseObject(ctx, scope, req, w, status, result)
transformResponseObject(ctx, scope, trace, req, w, status, outputMediaType, result)
}
}
// DeleteCollection returns a function that will handle a collection deletion
func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestScope, admit admission.Interface) http.HandlerFunc {
func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
trace := utiltrace.New("Delete " + req.URL.Path)
trace := utiltrace.New("Delete", utiltrace.Field{"url", req.URL.Path})
defer trace.LogIfLong(500 * time.Millisecond)
if isDryRun(req.URL) && !utilfeature.DefaultFeatureGate.Enabled(features.DryRun) {
@@ -200,10 +179,17 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
return
}
ctx := req.Context()
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
ae := request.AuditEventFrom(ctx)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
listOptions := metainternalversion.ListOptions{}
if err := metainternalversion.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &listOptions); err != nil {
err = errors.NewBadRequest(err.Error())
@@ -238,6 +224,8 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
scope.err(err, w, req)
return
}
// For backwards compatibility, we need to allow existing clients to submit per group DeleteOptions
// It is also allowed to pass a body with meta.k8s.io/v1.DeleteOptions
defaultGVK := scope.Kind.GroupVersion().WithKind("DeleteOptions")
obj, _, err := scope.Serializer.DecoderToVersion(s.Serializer, defaultGVK.GroupVersion()).Decode(body, &defaultGVK, options)
if err != nil {
@@ -264,30 +252,13 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
scope.err(err, w, req)
return
}
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("DeleteOptions"))
admit = admission.WithAudit(admit, ae)
if admit != nil && admit.Handles(admission.Delete) {
userInfo, _ := request.UserFrom(ctx)
attrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, "", scope.Resource, scope.Subresource, admission.Delete, dryrun.IsDryRun(options.DryRun), userInfo)
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok {
err = mutatingAdmission.Admit(attrs)
if err != nil {
scope.err(err, w, req)
return
}
}
if validatingAdmission, ok := admit.(admission.ValidationInterface); ok {
err = validatingAdmission.Validate(attrs)
if err != nil {
scope.err(err, w, req)
return
}
}
}
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, "", scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo)
result, err := finishRequest(timeout, func() (runtime.Object, error) {
return r.DeleteCollection(ctx, options, &listOptions)
return r.DeleteCollection(ctx, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options, &listOptions)
})
if err != nil {
scope.err(err, w, req)
@@ -304,17 +275,8 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
Kind: scope.Kind.Kind,
},
}
} else {
// when a non-status response is returned, set the self link
if _, ok := result.(*metav1.Status); !ok {
if _, err := setListSelfLink(result, ctx, req, scope.Namer); err != nil {
scope.err(err, w, req)
return
}
}
}
scope.Trace = trace
transformResponseObject(ctx, scope, req, w, http.StatusOK, result)
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
}
}

View File

@@ -0,0 +1,5 @@
approvers:
- jennybuckley
- apelisse
reviewers:
- kwiesmueller

View File

@@ -0,0 +1,331 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
"k8s.io/klog"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/structured-merge-diff/fieldpath"
"sigs.k8s.io/structured-merge-diff/merge"
"sigs.k8s.io/yaml"
)
// FieldManager updates the managed fields and merge applied
// configurations.
type FieldManager struct {
typeConverter internal.TypeConverter
objectConverter runtime.ObjectConvertor
objectDefaulter runtime.ObjectDefaulter
groupVersion schema.GroupVersion
hubVersion schema.GroupVersion
updater merge.Updater
}
// NewFieldManager creates a new FieldManager that merges apply requests
// and update managed fields for other types of requests.
func NewFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion) (*FieldManager, error) {
typeConverter, err := internal.NewTypeConverter(models, false)
if err != nil {
return nil, err
}
return &FieldManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
objectDefaulter: objectDefaulter,
groupVersion: gv,
hubVersion: hub,
updater: merge.Updater{
Converter: internal.NewVersionConverter(typeConverter, objectConverter, hub),
},
}, nil
}
// NewCRDFieldManager creates a new FieldManager specifically for
// CRDs. This allows for the possibility of fields which are not defined
// in models, as well as having no models defined at all.
func NewCRDFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion, preserveUnknownFields bool) (_ *FieldManager, err error) {
var typeConverter internal.TypeConverter = internal.DeducedTypeConverter{}
if models != nil {
typeConverter, err = internal.NewTypeConverter(models, preserveUnknownFields)
if err != nil {
return nil, err
}
}
return &FieldManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
objectDefaulter: objectDefaulter,
groupVersion: gv,
hubVersion: hub,
updater: merge.Updater{
Converter: internal.NewCRDVersionConverter(typeConverter, objectConverter, hub),
},
}, nil
}
// Update is used when the object has already been merged (non-apply
// use-case), and simply updates the managed fields in the output
// object.
func (f *FieldManager) Update(liveObj, newObj runtime.Object, manager string) (runtime.Object, error) {
// If the object doesn't have metadata, we should just return without trying to
// set the managedFields at all, so creates/updates/patches will work normally.
if _, err := meta.Accessor(newObj); err != nil {
return newObj, nil
}
// First try to decode the managed fields provided in the update,
// This is necessary to allow directly updating managed fields.
managed, err := internal.DecodeObjectManagedFields(newObj)
// If the managed field is empty or we failed to decode it,
// let's try the live object. This is to prevent clients who
// don't understand managedFields from deleting it accidentally.
if err != nil || len(managed.Fields) == 0 {
managed, err = internal.DecodeObjectManagedFields(liveObj)
if err != nil {
return nil, fmt.Errorf("failed to decode managed fields: %v", err)
}
}
// if managed field is still empty, skip updating managed fields altogether
if len(managed.Fields) == 0 {
return newObj, nil
}
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {
return nil, fmt.Errorf("failed to convert live object to proper version: %v", err)
}
internal.RemoveObjectManagedFields(liveObjVersioned)
internal.RemoveObjectManagedFields(newObjVersioned)
newObjTyped, err := f.typeConverter.ObjectToTyped(newObjVersioned)
if err != nil {
// Return newObj and just by-pass fields update. This really shouldn't happen.
klog.Errorf("[SHOULD NOT HAPPEN] failed to create typed new object: %v", err)
return newObj, nil
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
// Return newObj and just by-pass fields update. This really shouldn't happen.
klog.Errorf("[SHOULD NOT HAPPEN] failed to create typed live object: %v", err)
return newObj, nil
}
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
// TODO(apelisse) use the first return value when unions are implemented
_, managed.Fields, err = f.updater.Update(liveObjTyped, newObjTyped, apiVersion, managed.Fields, manager)
if err != nil {
return nil, fmt.Errorf("failed to update ManagedFields: %v", err)
}
managed.Fields = f.stripFields(managed.Fields, manager)
// If the current operation took any fields from anything, it means the object changed,
// so update the timestamp of the managedFieldsEntry and merge with any previous updates from the same manager
if vs, ok := managed.Fields[manager]; ok {
delete(managed.Fields, manager)
// Build a manager identifier which will only match previous updates from the same manager
manager, err = f.buildManagerInfo(manager, metav1.ManagedFieldsOperationUpdate)
if err != nil {
return nil, fmt.Errorf("failed to build manager identifier: %v", err)
}
managed.Times[manager] = &metav1.Time{Time: time.Now().UTC()}
if previous, ok := managed.Fields[manager]; ok {
managed.Fields[manager] = fieldpath.NewVersionedSet(vs.Set().Union(previous.Set()), vs.APIVersion(), vs.Applied())
} else {
managed.Fields[manager] = vs
}
}
if err := internal.EncodeObjectManagedFields(newObj, managed); err != nil {
return nil, fmt.Errorf("failed to encode managed fields: %v", err)
}
return newObj, nil
}
// Apply is used when server-side apply is called, as it merges the
// object and update the managed fields.
func (f *FieldManager) Apply(liveObj runtime.Object, patch []byte, fieldManager string, force bool) (runtime.Object, error) {
// If the object doesn't have metadata, apply isn't allowed.
accessor, err := meta.Accessor(liveObj)
if err != nil {
return nil, fmt.Errorf("couldn't get accessor: %v", err)
}
missingManagedFields := (len(accessor.GetManagedFields()) == 0)
managed, err := internal.DecodeObjectManagedFields(liveObj)
if err != nil {
return nil, fmt.Errorf("failed to decode managed fields: %v", err)
}
// Check that the patch object has the same version as the live object
patchObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(patch, &patchObj.Object); err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error decoding YAML: %v", err))
}
if patchObj.GetManagedFields() != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("metadata.managedFields must be nil"))
}
if patchObj.GetAPIVersion() != f.groupVersion.String() {
return nil,
errors.NewBadRequest(
fmt.Sprintf("Incorrect version specified in apply patch. "+
"Specified patch version: %s, expected: %s",
patchObj.GetAPIVersion(), f.groupVersion.String()))
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {
return nil, fmt.Errorf("failed to convert live object to proper version: %v", err)
}
internal.RemoveObjectManagedFields(liveObjVersioned)
patchObjTyped, err := f.typeConverter.YAMLToTyped(patch)
if err != nil {
return nil, fmt.Errorf("failed to create typed patch object: %v", err)
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
return nil, fmt.Errorf("failed to create typed live object: %v", err)
}
manager, err := f.buildManagerInfo(fieldManager, metav1.ManagedFieldsOperationApply)
if err != nil {
return nil, fmt.Errorf("failed to build manager identifier: %v", err)
}
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
// if managed field is missing, create a single entry for all the fields
if missingManagedFields {
unknownManager, err := internal.BuildManagerIdentifier(&metav1.ManagedFieldsEntry{
Manager: "before-first-apply",
Operation: metav1.ManagedFieldsOperationUpdate,
APIVersion: f.groupVersion.String(),
})
if err != nil {
return nil, fmt.Errorf("failed to create manager for existing fields: %v", err)
}
unknownFieldSet, err := liveObjTyped.ToFieldSet()
if err != nil {
return nil, fmt.Errorf("failed to create fieldset for existing fields: %v", err)
}
managed.Fields[unknownManager] = fieldpath.NewVersionedSet(unknownFieldSet, apiVersion, false)
f.stripFields(managed.Fields, unknownManager)
}
newObjTyped, managedFields, err := f.updater.Apply(liveObjTyped, patchObjTyped, apiVersion, managed.Fields, manager, force)
if err != nil {
if conflicts, ok := err.(merge.Conflicts); ok {
return nil, internal.NewConflictError(conflicts)
}
return nil, err
}
managed.Fields = f.stripFields(managedFields, manager)
// Update the time in the managedFieldsEntry for this operation
managed.Times[manager] = &metav1.Time{Time: time.Now().UTC()}
newObj, err := f.typeConverter.TypedToObject(newObjTyped)
if err != nil {
return nil, fmt.Errorf("failed to convert new typed object to object: %v", err)
}
if err := internal.EncodeObjectManagedFields(newObj, managed); err != nil {
return nil, fmt.Errorf("failed to encode managed fields: %v", err)
}
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
}
f.objectDefaulter.Default(newObjVersioned)
newObjUnversioned, err := f.toUnversioned(newObjVersioned)
if err != nil {
return nil, fmt.Errorf("failed to convert to unversioned: %v", err)
}
return newObjUnversioned, nil
}
func (f *FieldManager) toVersioned(obj runtime.Object) (runtime.Object, error) {
return f.objectConverter.ConvertToVersion(obj, f.groupVersion)
}
func (f *FieldManager) toUnversioned(obj runtime.Object) (runtime.Object, error) {
return f.objectConverter.ConvertToVersion(obj, f.hubVersion)
}
func (f *FieldManager) buildManagerInfo(prefix string, operation metav1.ManagedFieldsOperationType) (string, error) {
managerInfo := metav1.ManagedFieldsEntry{
Manager: prefix,
Operation: operation,
APIVersion: f.groupVersion.String(),
}
if managerInfo.Manager == "" {
managerInfo.Manager = "unknown"
}
return internal.BuildManagerIdentifier(&managerInfo)
}
// stripSet is the list of fields that should never be part of a mangedFields.
var stripSet = fieldpath.NewSet(
fieldpath.MakePathOrDie("apiVersion"),
fieldpath.MakePathOrDie("kind"),
fieldpath.MakePathOrDie("metadata"),
fieldpath.MakePathOrDie("metadata", "name"),
fieldpath.MakePathOrDie("metadata", "namespace"),
fieldpath.MakePathOrDie("metadata", "creationTimestamp"),
fieldpath.MakePathOrDie("metadata", "selfLink"),
fieldpath.MakePathOrDie("metadata", "uid"),
fieldpath.MakePathOrDie("metadata", "clusterName"),
fieldpath.MakePathOrDie("metadata", "generation"),
fieldpath.MakePathOrDie("metadata", "managedFields"),
fieldpath.MakePathOrDie("metadata", "resourceVersion"),
)
// stripFields removes a predefined set of paths found in typed from managed and returns the updated ManagedFields
func (f *FieldManager) stripFields(managed fieldpath.ManagedFields, manager string) fieldpath.ManagedFields {
vs, ok := managed[manager]
if ok {
if vs == nil {
panic(fmt.Sprintf("Found unexpected nil manager which should never happen: %s", manager))
}
newSet := vs.Set().Difference(stripSet)
if newSet.Empty() {
delete(managed, manager)
} else {
managed[manager] = fieldpath.NewVersionedSet(newSet, vs.APIVersion(), vs.Applied())
}
}
return managed
}

View File

@@ -0,0 +1,85 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"encoding/json"
"fmt"
"sort"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/structured-merge-diff/fieldpath"
"sigs.k8s.io/structured-merge-diff/merge"
)
// NewConflictError returns an error including details on the requests apply conflicts
func NewConflictError(conflicts merge.Conflicts) *errors.StatusError {
causes := []metav1.StatusCause{}
for _, conflict := range conflicts {
causes = append(causes, metav1.StatusCause{
Type: metav1.CauseTypeFieldManagerConflict,
Message: fmt.Sprintf("conflict with %v", printManager(conflict.Manager)),
Field: conflict.Path.String(),
})
}
return errors.NewApplyConflict(causes, getConflictMessage(conflicts))
}
func getConflictMessage(conflicts merge.Conflicts) string {
if len(conflicts) == 1 {
return fmt.Sprintf("Apply failed with 1 conflict: conflict with %v: %v", printManager(conflicts[0].Manager), conflicts[0].Path)
}
m := map[string][]fieldpath.Path{}
for _, conflict := range conflicts {
m[conflict.Manager] = append(m[conflict.Manager], conflict.Path)
}
uniqueManagers := []string{}
for manager := range m {
uniqueManagers = append(uniqueManagers, manager)
}
// Print conflicts by sorted managers.
sort.Strings(uniqueManagers)
messages := []string{}
for _, manager := range uniqueManagers {
messages = append(messages, fmt.Sprintf("conflicts with %v:", printManager(manager)))
for _, path := range m[manager] {
messages = append(messages, fmt.Sprintf("- %v", path))
}
}
return fmt.Sprintf("Apply failed with %d conflicts: %s", len(conflicts), strings.Join(messages, "\n"))
}
func printManager(manager string) string {
encodedManager := &metav1.ManagedFieldsEntry{}
if err := json.Unmarshal([]byte(manager), encodedManager); err != nil {
return fmt.Sprintf("%q", manager)
}
if encodedManager.Operation == metav1.ManagedFieldsOperationUpdate {
if encodedManager.Time == nil {
return fmt.Sprintf("%q using %v", encodedManager.Manager, encodedManager.APIVersion)
}
return fmt.Sprintf("%q using %v at %v", encodedManager.Manager, encodedManager.APIVersion, encodedManager.Time.UTC().Format(time.RFC3339))
}
return fmt.Sprintf("%q", encodedManager.Manager)
}

View File

@@ -0,0 +1,47 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"bytes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/structured-merge-diff/fieldpath"
)
// EmptyFields represents a set with no paths
// It looks like metav1.Fields{Raw: []byte("{}")}
var EmptyFields metav1.FieldsV1 = func() metav1.FieldsV1 {
f, err := SetToFields(*fieldpath.NewSet())
if err != nil {
panic("should never happen")
}
return f
}()
// FieldsToSet creates a set paths from an input trie of fields
func FieldsToSet(f metav1.FieldsV1) (s fieldpath.Set, err error) {
err = s.FromJSON(bytes.NewReader(f.Raw))
return s, err
}
// SetToFields creates a trie of fields from an input set of paths
func SetToFields(s fieldpath.Set) (f metav1.FieldsV1, err error) {
f.Raw, err = s.ToJSON()
return f, err
}

View File

@@ -0,0 +1,121 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kube-openapi/pkg/schemaconv"
"k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/structured-merge-diff/typed"
)
// groupVersionKindExtensionKey is the key used to lookup the
// GroupVersionKind value for an object definition from the
// definition's "extensions" map.
const groupVersionKindExtensionKey = "x-kubernetes-group-version-kind"
type gvkParser struct {
gvks map[schema.GroupVersionKind]string
parser typed.Parser
}
func (p *gvkParser) Type(gvk schema.GroupVersionKind) *typed.ParseableType {
typeName, ok := p.gvks[gvk]
if !ok {
return nil
}
t := p.parser.Type(typeName)
return &t
}
func newGVKParser(models proto.Models, preserveUnknownFields bool) (*gvkParser, error) {
typeSchema, err := schemaconv.ToSchemaWithPreserveUnknownFields(models, preserveUnknownFields)
if err != nil {
return nil, fmt.Errorf("failed to convert models to schema: %v", err)
}
parser := gvkParser{
gvks: map[schema.GroupVersionKind]string{},
}
parser.parser = typed.Parser{Schema: *typeSchema}
for _, modelName := range models.ListModels() {
model := models.LookupModel(modelName)
if model == nil {
panic(fmt.Sprintf("ListModels returns a model that can't be looked-up for: %v", modelName))
}
gvkList := parseGroupVersionKind(model)
for _, gvk := range gvkList {
if len(gvk.Kind) > 0 {
_, ok := parser.gvks[gvk]
if ok {
return nil, fmt.Errorf("duplicate entry for %v", gvk)
}
parser.gvks[gvk] = modelName
}
}
}
return &parser, nil
}
// Get and parse GroupVersionKind from the extension. Returns empty if it doesn't have one.
func parseGroupVersionKind(s proto.Schema) []schema.GroupVersionKind {
extensions := s.GetExtensions()
gvkListResult := []schema.GroupVersionKind{}
// Get the extensions
gvkExtension, ok := extensions[groupVersionKindExtensionKey]
if !ok {
return []schema.GroupVersionKind{}
}
// gvk extension must be a list of at least 1 element.
gvkList, ok := gvkExtension.([]interface{})
if !ok {
return []schema.GroupVersionKind{}
}
for _, gvk := range gvkList {
// gvk extension list must be a map with group, version, and
// kind fields
gvkMap, ok := gvk.(map[interface{}]interface{})
if !ok {
continue
}
group, ok := gvkMap["group"].(string)
if !ok {
continue
}
version, ok := gvkMap["version"].(string)
if !ok {
continue
}
kind, ok := gvkMap["kind"].(string)
if !ok {
continue
}
gvkListResult = append(gvkListResult, schema.GroupVersionKind{
Group: group,
Version: version,
Kind: kind,
})
}
return gvkListResult
}

View File

@@ -0,0 +1,205 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"encoding/json"
"fmt"
"sort"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/structured-merge-diff/fieldpath"
)
// Managed groups a fieldpath.ManagedFields together with the timestamps associated with each operation.
type Managed struct {
Fields fieldpath.ManagedFields
Times map[string]*metav1.Time
}
// RemoveObjectManagedFields removes the ManagedFields from the object
// before we merge so that it doesn't appear in the ManagedFields
// recursively.
func RemoveObjectManagedFields(obj runtime.Object) {
accessor, err := meta.Accessor(obj)
if err != nil {
panic(fmt.Sprintf("couldn't get accessor: %v", err))
}
accessor.SetManagedFields(nil)
}
// DecodeObjectManagedFields extracts and converts the objects ManagedFields into a fieldpath.ManagedFields.
func DecodeObjectManagedFields(from runtime.Object) (Managed, error) {
if from == nil {
return Managed{}, nil
}
accessor, err := meta.Accessor(from)
if err != nil {
panic(fmt.Sprintf("couldn't get accessor: %v", err))
}
managed, err := decodeManagedFields(accessor.GetManagedFields())
if err != nil {
return Managed{}, fmt.Errorf("failed to convert managed fields from API: %v", err)
}
return managed, err
}
// EncodeObjectManagedFields converts and stores the fieldpathManagedFields into the objects ManagedFields
func EncodeObjectManagedFields(obj runtime.Object, managed Managed) error {
accessor, err := meta.Accessor(obj)
if err != nil {
panic(fmt.Sprintf("couldn't get accessor: %v", err))
}
encodedManagedFields, err := encodeManagedFields(managed)
if err != nil {
return fmt.Errorf("failed to convert back managed fields to API: %v", err)
}
accessor.SetManagedFields(encodedManagedFields)
return nil
}
// decodeManagedFields converts ManagedFields from the wire format (api format)
// to the format used by sigs.k8s.io/structured-merge-diff
func decodeManagedFields(encodedManagedFields []metav1.ManagedFieldsEntry) (managed Managed, err error) {
managed.Fields = make(fieldpath.ManagedFields, len(encodedManagedFields))
managed.Times = make(map[string]*metav1.Time, len(encodedManagedFields))
for _, encodedVersionedSet := range encodedManagedFields {
manager, err := BuildManagerIdentifier(&encodedVersionedSet)
if err != nil {
return Managed{}, fmt.Errorf("error decoding manager from %v: %v", encodedVersionedSet, err)
}
managed.Fields[manager], err = decodeVersionedSet(&encodedVersionedSet)
if err != nil {
return Managed{}, fmt.Errorf("error decoding versioned set from %v: %v", encodedVersionedSet, err)
}
managed.Times[manager] = encodedVersionedSet.Time
}
return managed, nil
}
// BuildManagerIdentifier creates a manager identifier string from a ManagedFieldsEntry
func BuildManagerIdentifier(encodedManager *metav1.ManagedFieldsEntry) (manager string, err error) {
encodedManagerCopy := *encodedManager
// Never include fields type in the manager identifier
encodedManagerCopy.FieldsType = ""
// Never include the fields in the manager identifier
encodedManagerCopy.FieldsV1 = nil
// Never include the time in the manager identifier
encodedManagerCopy.Time = nil
// For appliers, don't include the APIVersion in the manager identifier,
// so it will always have the same manager identifier each time it applied.
if encodedManager.Operation == metav1.ManagedFieldsOperationApply {
encodedManagerCopy.APIVersion = ""
}
// Use the remaining fields to build the manager identifier
b, err := json.Marshal(&encodedManagerCopy)
if err != nil {
return "", fmt.Errorf("error marshalling manager identifier: %v", err)
}
return string(b), nil
}
func decodeVersionedSet(encodedVersionedSet *metav1.ManagedFieldsEntry) (versionedSet fieldpath.VersionedSet, err error) {
fields := EmptyFields
if encodedVersionedSet.FieldsV1 != nil {
fields = *encodedVersionedSet.FieldsV1
}
set, err := FieldsToSet(fields)
if err != nil {
return nil, fmt.Errorf("error decoding set: %v", err)
}
return fieldpath.NewVersionedSet(&set, fieldpath.APIVersion(encodedVersionedSet.APIVersion), encodedVersionedSet.Operation == metav1.ManagedFieldsOperationApply), nil
}
// encodeManagedFields converts ManagedFields from the format used by
// sigs.k8s.io/structured-merge-diff to the wire format (api format)
func encodeManagedFields(managed Managed) (encodedManagedFields []metav1.ManagedFieldsEntry, err error) {
encodedManagedFields = []metav1.ManagedFieldsEntry{}
for manager := range managed.Fields {
versionedSet := managed.Fields[manager]
v, err := encodeManagerVersionedSet(manager, versionedSet)
if err != nil {
return nil, fmt.Errorf("error encoding versioned set for %v: %v", manager, err)
}
if t, ok := managed.Times[manager]; ok {
v.Time = t
}
encodedManagedFields = append(encodedManagedFields, *v)
}
return sortEncodedManagedFields(encodedManagedFields)
}
func sortEncodedManagedFields(encodedManagedFields []metav1.ManagedFieldsEntry) (sortedManagedFields []metav1.ManagedFieldsEntry, err error) {
sort.Slice(encodedManagedFields, func(i, j int) bool {
p, q := encodedManagedFields[i], encodedManagedFields[j]
if p.Operation != q.Operation {
return p.Operation < q.Operation
}
ntime := &metav1.Time{Time: time.Time{}}
if p.Time == nil {
p.Time = ntime
}
if q.Time == nil {
q.Time = ntime
}
if !p.Time.Equal(q.Time) {
return p.Time.Before(q.Time)
}
return p.Manager < q.Manager
})
return encodedManagedFields, nil
}
func encodeManagerVersionedSet(manager string, versionedSet fieldpath.VersionedSet) (encodedVersionedSet *metav1.ManagedFieldsEntry, err error) {
encodedVersionedSet = &metav1.ManagedFieldsEntry{}
// Get as many fields as we can from the manager identifier
err = json.Unmarshal([]byte(manager), encodedVersionedSet)
if err != nil {
return nil, fmt.Errorf("error unmarshalling manager identifier %v: %v", manager, err)
}
// Get the APIVersion, Operation, and Fields from the VersionedSet
encodedVersionedSet.APIVersion = string(versionedSet.APIVersion())
if versionedSet.Applied() {
encodedVersionedSet.Operation = metav1.ManagedFieldsOperationApply
}
encodedVersionedSet.FieldsType = "FieldsV1"
fields, err := SetToFields(*versionedSet.Set())
if err != nil {
return nil, fmt.Errorf("error encoding set: %v", err)
}
encodedVersionedSet.FieldsV1 = &fields
return encodedVersionedSet, nil
}

View File

@@ -0,0 +1,140 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sigs.k8s.io/structured-merge-diff/fieldpath"
"sigs.k8s.io/structured-merge-diff/value"
)
const (
// Field indicates that the content of this path element is a field's name
Field = "f"
// Value indicates that the content of this path element is a field's value
Value = "v"
// Index indicates that the content of this path element is an index in an array
Index = "i"
// Key indicates that the content of this path element is a key value map
Key = "k"
// Separator separates the type of a path element from the contents
Separator = ":"
)
// NewPathElement parses a serialized path element
func NewPathElement(s string) (fieldpath.PathElement, error) {
split := strings.SplitN(s, Separator, 2)
if len(split) < 2 {
return fieldpath.PathElement{}, fmt.Errorf("missing colon: %v", s)
}
switch split[0] {
case Field:
return fieldpath.PathElement{
FieldName: &split[1],
}, nil
case Value:
val, err := value.FromJSON([]byte(split[1]))
if err != nil {
return fieldpath.PathElement{}, err
}
return fieldpath.PathElement{
Value: &val,
}, nil
case Index:
i, err := strconv.Atoi(split[1])
if err != nil {
return fieldpath.PathElement{}, err
}
return fieldpath.PathElement{
Index: &i,
}, nil
case Key:
kv := map[string]json.RawMessage{}
err := json.Unmarshal([]byte(split[1]), &kv)
if err != nil {
return fieldpath.PathElement{}, err
}
fields := []value.Field{}
for k, v := range kv {
b, err := json.Marshal(v)
if err != nil {
return fieldpath.PathElement{}, err
}
val, err := value.FromJSON(b)
if err != nil {
return fieldpath.PathElement{}, err
}
fields = append(fields, value.Field{
Name: k,
Value: val,
})
}
return fieldpath.PathElement{
Key: &value.Map{Items: fields},
}, nil
default:
// Ignore unknown key types
return fieldpath.PathElement{}, nil
}
}
// PathElementString serializes a path element
func PathElementString(pe fieldpath.PathElement) (string, error) {
switch {
case pe.FieldName != nil:
return Field + Separator + *pe.FieldName, nil
case pe.Key != nil:
kv := map[string]json.RawMessage{}
for _, k := range pe.Key.Items {
b, err := k.Value.ToJSON()
if err != nil {
return "", err
}
m := json.RawMessage{}
err = json.Unmarshal(b, &m)
if err != nil {
return "", err
}
kv[k.Name] = m
}
b, err := json.Marshal(kv)
if err != nil {
return "", err
}
return Key + ":" + string(b), nil
case pe.Value != nil:
b, err := pe.Value.ToJSON()
if err != nil {
return "", err
}
return Value + ":" + string(b), nil
case pe.Index != nil:
return Index + ":" + strconv.Itoa(*pe.Index), nil
default:
return "", errors.New("Invalid type of path element")
}
}

View File

@@ -0,0 +1,148 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/structured-merge-diff/typed"
"sigs.k8s.io/structured-merge-diff/value"
"sigs.k8s.io/yaml"
)
// TypeConverter allows you to convert from runtime.Object to
// typed.TypedValue and the other way around.
type TypeConverter interface {
ObjectToTyped(runtime.Object) (*typed.TypedValue, error)
YAMLToTyped([]byte) (*typed.TypedValue, error)
TypedToObject(*typed.TypedValue) (runtime.Object, error)
}
// DeducedTypeConverter is a TypeConverter for CRDs that don't have a
// schema. It does implement the same interface though (and create the
// same types of objects), so that everything can still work the same.
// CRDs are merged with all their fields being "atomic" (lists
// included).
//
// Note that this is not going to be sufficient for converting to/from
// CRDs that have a schema defined (we don't support that schema yet).
// TODO(jennybuckley): Use the schema provided by a CRD if it exists.
type DeducedTypeConverter struct{}
var _ TypeConverter = DeducedTypeConverter{}
// ObjectToTyped converts an object into a TypedValue with a "deduced type".
func (DeducedTypeConverter) ObjectToTyped(obj runtime.Object) (*typed.TypedValue, error) {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, err
}
return typed.DeducedParseableType.FromUnstructured(u)
}
// YAMLToTyped parses a yaml object into a TypedValue with a "deduced type".
func (DeducedTypeConverter) YAMLToTyped(from []byte) (*typed.TypedValue, error) {
return typed.DeducedParseableType.FromYAML(typed.YAMLObject(from))
}
// TypedToObject transforms the typed value into a runtime.Object. That
// is not specific to deduced type.
func (DeducedTypeConverter) TypedToObject(value *typed.TypedValue) (runtime.Object, error) {
return valueToObject(value.AsValue())
}
type typeConverter struct {
parser *gvkParser
}
var _ TypeConverter = &typeConverter{}
// NewTypeConverter builds a TypeConverter from a proto.Models. This
// will automatically find the proper version of the object, and the
// corresponding schema information.
func NewTypeConverter(models proto.Models, preserveUnknownFields bool) (TypeConverter, error) {
parser, err := newGVKParser(models, preserveUnknownFields)
if err != nil {
return nil, err
}
return &typeConverter{parser: parser}, nil
}
func (c *typeConverter) ObjectToTyped(obj runtime.Object) (*typed.TypedValue, error) {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, err
}
gvk := obj.GetObjectKind().GroupVersionKind()
t := c.parser.Type(gvk)
if t == nil {
return nil, newNoCorrespondingTypeError(gvk)
}
return t.FromUnstructured(u)
}
func (c *typeConverter) YAMLToTyped(from []byte) (*typed.TypedValue, error) {
unstructured := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(from, &unstructured.Object); err != nil {
return nil, fmt.Errorf("error decoding YAML: %v", err)
}
gvk := unstructured.GetObjectKind().GroupVersionKind()
t := c.parser.Type(gvk)
if t == nil {
return nil, newNoCorrespondingTypeError(gvk)
}
return t.FromYAML(typed.YAMLObject(string(from)))
}
func (c *typeConverter) TypedToObject(value *typed.TypedValue) (runtime.Object, error) {
return valueToObject(value.AsValue())
}
func valueToObject(value *value.Value) (runtime.Object, error) {
vu := value.ToUnstructured(false)
u, ok := vu.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("failed to convert typed to unstructured: want map, got %T", vu)
}
return &unstructured.Unstructured{Object: u}, nil
}
type noCorrespondingTypeErr struct {
gvk schema.GroupVersionKind
}
func newNoCorrespondingTypeError(gvk schema.GroupVersionKind) error {
return &noCorrespondingTypeErr{gvk: gvk}
}
func (k *noCorrespondingTypeErr) Error() string {
return fmt.Sprintf("no corresponding type for %v", k.gvk)
}
func isNoCorrespondingTypeError(err error) bool {
if err == nil {
return false
}
_, ok := err.(*noCorrespondingTypeErr)
return ok
}

View File

@@ -0,0 +1,101 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/structured-merge-diff/fieldpath"
"sigs.k8s.io/structured-merge-diff/merge"
"sigs.k8s.io/structured-merge-diff/typed"
)
// versionConverter is an implementation of
// sigs.k8s.io/structured-merge-diff/merge.Converter
type versionConverter struct {
typeConverter TypeConverter
objectConvertor runtime.ObjectConvertor
hubGetter func(from schema.GroupVersion) schema.GroupVersion
}
var _ merge.Converter = &versionConverter{}
// NewVersionConverter builds a VersionConverter from a TypeConverter and an ObjectConvertor.
func NewVersionConverter(t TypeConverter, o runtime.ObjectConvertor, h schema.GroupVersion) merge.Converter {
return &versionConverter{
typeConverter: t,
objectConvertor: o,
hubGetter: func(from schema.GroupVersion) schema.GroupVersion {
return schema.GroupVersion{
Group: from.Group,
Version: h.Version,
}
},
}
}
// NewCRDVersionConverter builds a VersionConverter for CRDs from a TypeConverter and an ObjectConvertor.
func NewCRDVersionConverter(t TypeConverter, o runtime.ObjectConvertor, h schema.GroupVersion) merge.Converter {
return &versionConverter{
typeConverter: t,
objectConvertor: o,
hubGetter: func(from schema.GroupVersion) schema.GroupVersion {
return h
},
}
}
// Convert implements sigs.k8s.io/structured-merge-diff/merge.Converter
func (v *versionConverter) Convert(object *typed.TypedValue, version fieldpath.APIVersion) (*typed.TypedValue, error) {
// Convert the smd typed value to a kubernetes object.
objectToConvert, err := v.typeConverter.TypedToObject(object)
if err != nil {
return object, err
}
// Parse the target groupVersion.
groupVersion, err := schema.ParseGroupVersion(string(version))
if err != nil {
return object, err
}
// If attempting to convert to the same version as we already have, just return it.
fromVersion := objectToConvert.GetObjectKind().GroupVersionKind().GroupVersion()
if fromVersion == groupVersion {
return object, nil
}
// Convert to internal
internalObject, err := v.objectConvertor.ConvertToVersion(objectToConvert, v.hubGetter(fromVersion))
if err != nil {
return object, err
}
// Convert the object into the target version
convertedObject, err := v.objectConvertor.ConvertToVersion(internalObject, groupVersion)
if err != nil {
return object, err
}
// Convert the object back to a smd typed value and return it.
return v.typeConverter.ObjectToTyped(convertedObject)
}
// IsMissingVersionError
func (v *versionConverter) IsMissingVersionError(err error) bool {
return runtime.IsNotRegisteredError(err) || isNoCorrespondingTypeError(err)
}

View File

@@ -33,10 +33,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
utiltrace "k8s.io/apiserver/pkg/util/trace"
utiltrace "k8s.io/utils/trace"
)
// getterFunc performs a get request with the given context and object name. The request
@@ -45,9 +46,9 @@ type getterFunc func(ctx context.Context, name string, req *http.Request, trace
// getResourceHandler is an HTTP handler function for get requests. It delegates to the
// passed-in getterFunc to perform the actual get.
func getResourceHandler(scope RequestScope, getter getterFunc) http.HandlerFunc {
func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
trace := utiltrace.New("Get " + req.URL.Path)
trace := utiltrace.New("Get", utiltrace.Field{"url", req.URL.Path})
defer trace.LogIfLong(500 * time.Millisecond)
namespace, name, err := scope.Namer.Name(req)
@@ -58,30 +59,26 @@ func getResourceHandler(scope RequestScope, getter getterFunc) http.HandlerFunc
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
result, err := getter(ctx, name, req, trace)
if err != nil {
scope.err(err, w, req)
return
}
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
scope.err(fmt.Errorf("missing requestInfo"), w, req)
return
}
if err := setSelfLink(result, requestInfo, scope.Namer); err != nil {
scope.err(err, w, req)
return
}
trace.Step("About to write a response")
scope.Trace = trace
transformResponseObject(ctx, scope, req, w, http.StatusOK, result)
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
trace.Step("Transformed response object")
}
}
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResource(r rest.Getter, e rest.Exporter, scope RequestScope) http.HandlerFunc {
func GetResource(r rest.Getter, e rest.Exporter, scope *RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
// check for export
@@ -111,7 +108,7 @@ func GetResource(r rest.Getter, e rest.Exporter, scope RequestScope) http.Handle
}
// GetResourceWithOptions returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, isSubresource bool) http.HandlerFunc {
func GetResourceWithOptions(r rest.GetterWithOptions, scope *RequestScope, isSubresource bool) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
opts, subpath, subpathKey := r.NewGetOptions()
@@ -128,7 +125,7 @@ func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, isSubr
}
// getRequestOptions parses out options and can include path information. The path information shouldn't include the subresource.
func getRequestOptions(req *http.Request, scope RequestScope, into runtime.Object, subpath bool, subpathKey string, isSubresource bool) error {
func getRequestOptions(req *http.Request, scope *RequestScope, into runtime.Object, subpath bool, subpathKey string, isSubresource bool) error {
if into == nil {
return nil
}
@@ -165,10 +162,10 @@ func getRequestOptions(req *http.Request, scope RequestScope, into runtime.Objec
return scope.ParameterCodec.DecodeParameters(query, scope.Kind.GroupVersion(), into)
}
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// For performance tracking purposes.
trace := utiltrace.New("List " + req.URL.Path)
trace := utiltrace.New("List", utiltrace.Field{"url", req.URL.Path})
namespace, err := scope.Namer.Namespace(req)
if err != nil {
@@ -187,6 +184,12 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
opts := metainternalversion.ListOptions{}
if err := metainternalversion.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &opts); err != nil {
err = errors.NewBadRequest(err.Error())
@@ -245,15 +248,16 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
}
klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
watcher, err := rw.Watch(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, func() {
serveWatch(watcher, scope, req, w, timeout)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}
@@ -267,22 +271,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
return
}
trace.Step("Listing from storage done")
numberOfItems, err := setListSelfLink(result, ctx, req, scope.Namer)
if err != nil {
scope.err(err, w, req)
return
}
trace.Step("Self-linking done")
// Ensure empty lists return a non-nil items slice
if numberOfItems == 0 && meta.IsListType(result) {
if err := meta.SetList(result, []runtime.Object{}); err != nil {
scope.err(err, w, req)
return
}
}
scope.Trace = trace
transformResponseObject(ctx, scope, req, w, http.StatusOK, result)
trace.Step(fmt.Sprintf("Writing http response done (%d items)", numberOfItems))
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
trace.Step("Writing http response done", utiltrace.Field{"count", meta.LenList(result)})
}
}

View File

@@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"net/url"
"strings"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
@@ -86,6 +87,21 @@ func (n ContextBasedNaming) Name(req *http.Request) (namespace, name string, err
return ns, requestInfo.Name, nil
}
// fastURLPathEncode encodes the provided path as a URL path
func fastURLPathEncode(path string) string {
for _, r := range []byte(path) {
switch {
case r >= '-' && r <= '9', r >= 'A' && r <= 'Z', r >= 'a' && r <= 'z':
// characters within this range do not require escaping
default:
var u url.URL
u.Path = path
return u.EscapedPath()
}
}
return path
}
func (n ContextBasedNaming) GenerateLink(requestInfo *request.RequestInfo, obj runtime.Object) (uri string, err error) {
namespace, name, err := n.ObjectName(obj)
if err == errEmptyName && len(requestInfo.Name) > 0 {
@@ -101,19 +117,23 @@ func (n ContextBasedNaming) GenerateLink(requestInfo *request.RequestInfo, obj r
return n.SelfLinkPathPrefix + url.QueryEscape(name) + n.SelfLinkPathSuffix, nil
}
return n.SelfLinkPathPrefix +
url.QueryEscape(namespace) +
"/" + url.QueryEscape(requestInfo.Resource) + "/" +
url.QueryEscape(name) +
n.SelfLinkPathSuffix,
nil
builder := strings.Builder{}
builder.Grow(len(n.SelfLinkPathPrefix) + len(namespace) + len(requestInfo.Resource) + len(name) + len(n.SelfLinkPathSuffix) + 8)
builder.WriteString(n.SelfLinkPathPrefix)
builder.WriteString(namespace)
builder.WriteByte('/')
builder.WriteString(requestInfo.Resource)
builder.WriteByte('/')
builder.WriteString(name)
builder.WriteString(n.SelfLinkPathSuffix)
return fastURLPathEncode(builder.String()), nil
}
func (n ContextBasedNaming) GenerateListLink(req *http.Request) (uri string, err error) {
if len(req.URL.RawPath) > 0 {
return req.URL.RawPath, nil
}
return req.URL.EscapedPath(), nil
return fastURLPathEncode(req.URL.Path), nil
}
func (n ContextBasedNaming) ObjectName(obj runtime.Object) (namespace, name string, err error) {

View File

@@ -29,6 +29,7 @@ type errNotAcceptable struct {
accepted []string
}
// NewNotAcceptableError returns an error of NotAcceptable which contains specified string
func NewNotAcceptableError(accepted []string) error {
return errNotAcceptable{accepted}
}
@@ -46,11 +47,40 @@ func (e errNotAcceptable) Status() metav1.Status {
}
}
// errNotAcceptableConversion indicates Accept negotiation has failed specifically
// for a conversion to a known type.
type errNotAcceptableConversion struct {
target string
accepted []string
}
// NewNotAcceptableConversionError returns an error indicating that the desired
// API transformation to the target group version kind string is not accepted and
// only the listed mime types are allowed. This is temporary while Table does not
// yet support protobuf encoding.
func NewNotAcceptableConversionError(target string, accepted []string) error {
return errNotAcceptableConversion{target, accepted}
}
func (e errNotAcceptableConversion) Error() string {
return fmt.Sprintf("only the following media types are accepted when converting to %s: %v", e.target, strings.Join(e.accepted, ", "))
}
func (e errNotAcceptableConversion) Status() metav1.Status {
return metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusNotAcceptable,
Reason: metav1.StatusReasonNotAcceptable,
Message: e.Error(),
}
}
// errUnsupportedMediaType indicates Content-Type is not recognized
type errUnsupportedMediaType struct {
accepted []string
}
// NewUnsupportedMediaTypeError returns an error of UnsupportedMediaType which contains specified string
func NewUnsupportedMediaTypeError(accepted []string) error {
return errUnsupportedMediaType{accepted}
}

View File

@@ -22,7 +22,7 @@ import (
"strconv"
"strings"
"bitbucket.org/ww/goautoneg"
"github.com/munnerz/goautoneg"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -43,33 +43,27 @@ func MediaTypesForSerializer(ns runtime.NegotiatedSerializer) (mediaTypes, strea
// NegotiateOutputMediaType negotiates the output structured media type and a serializer, or
// returns an error.
func NegotiateOutputMediaType(req *http.Request, ns runtime.NegotiatedSerializer, restrictions EndpointRestrictions) (MediaTypeOptions, runtime.SerializerInfo, error) {
mediaType, ok := NegotiateMediaTypeOptions(req.Header.Get("Accept"), AcceptedMediaTypesForEndpoint(ns), restrictions)
mediaType, ok := NegotiateMediaTypeOptions(req.Header.Get("Accept"), ns.SupportedMediaTypes(), restrictions)
if !ok {
supported, _ := MediaTypesForSerializer(ns)
return mediaType, runtime.SerializerInfo{}, NewNotAcceptableError(supported)
}
// TODO: move into resthandler
info := mediaType.Accepted.Serializer
info := mediaType.Accepted
if (mediaType.Pretty || isPrettyPrint(req)) && info.PrettySerializer != nil {
info.Serializer = info.PrettySerializer
}
return mediaType, info, nil
}
// NegotiateOutputSerializer returns a serializer for the output.
func NegotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
_, info, err := NegotiateOutputMediaType(req, ns, DefaultEndpointRestrictions)
return info, err
}
// NegotiateOutputStreamSerializer returns a stream serializer for the given request.
func NegotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
mediaType, ok := NegotiateMediaTypeOptions(req.Header.Get("Accept"), AcceptedMediaTypesForEndpoint(ns), DefaultEndpointRestrictions)
if !ok || mediaType.Accepted.Serializer.StreamSerializer == nil {
// NegotiateOutputMediaTypeStream returns a stream serializer for the given request.
func NegotiateOutputMediaTypeStream(req *http.Request, ns runtime.NegotiatedSerializer, restrictions EndpointRestrictions) (runtime.SerializerInfo, error) {
mediaType, ok := NegotiateMediaTypeOptions(req.Header.Get("Accept"), ns.SupportedMediaTypes(), restrictions)
if !ok || mediaType.Accepted.StreamSerializer == nil {
_, supported := MediaTypesForSerializer(ns)
return runtime.SerializerInfo{}, NewNotAcceptableError(supported)
}
return mediaType.Accepted.Serializer, nil
return mediaType.Accepted, nil
}
// NegotiateInputSerializer returns the input serializer for the provided request.
@@ -85,10 +79,7 @@ func NegotiateInputSerializerForMediaType(mediaType string, streaming bool, ns r
mediaType = mediaTypes[0].MediaType
}
if mediaType, _, err := mime.ParseMediaType(mediaType); err == nil {
for _, info := range mediaTypes {
if info.MediaType != mediaType {
continue
}
if info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType); ok {
return info, nil
}
}
@@ -105,10 +96,13 @@ func NegotiateInputSerializerForMediaType(mediaType string, streaming bool, ns r
func isPrettyPrint(req *http.Request) bool {
// DEPRECATED: should be part of the content type
if req.URL != nil {
pp := req.URL.Query().Get("pretty")
if len(pp) > 0 {
pretty, _ := strconv.ParseBool(pp)
return pretty
// avoid an allocation caused by parsing the URL query
if strings.Contains(req.URL.RawQuery, "pretty") {
pp := req.URL.Query().Get("pretty")
if len(pp) > 0 {
pretty, _ := strconv.ParseBool(pp)
return pretty
}
}
}
userAgent := req.UserAgent()
@@ -122,9 +116,10 @@ func isPrettyPrint(req *http.Request) bool {
// EndpointRestrictions is an interface that allows content-type negotiation
// to verify server support for specific options
type EndpointRestrictions interface {
// AllowsConversion should return true if the specified group version kind
// is an allowed target object.
AllowsConversion(schema.GroupVersionKind) bool
// AllowsMediaTypeTransform returns true if the endpoint allows either the requested mime type
// or the requested transformation. If false, the caller should ignore this mime type. If the
// target is nil, the client is not requesting a transformation.
AllowsMediaTypeTransform(mimeType, mimeSubType string, target *schema.GroupVersionKind) bool
// AllowsServerVersion should return true if the specified version is valid
// for the server group.
AllowsServerVersion(version string) bool
@@ -133,24 +128,17 @@ type EndpointRestrictions interface {
AllowsStreamSchema(schema string) bool
}
// DefaultEndpointRestrictions is the default EndpointRestrictions which allows
// content-type negotiation to verify server support for specific options
var DefaultEndpointRestrictions = emptyEndpointRestrictions{}
type emptyEndpointRestrictions struct{}
func (emptyEndpointRestrictions) AllowsConversion(schema.GroupVersionKind) bool { return false }
func (emptyEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (emptyEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
// AcceptedMediaType contains information about a valid media type that the
// server can serialize.
type AcceptedMediaType struct {
// Type is the first part of the media type ("application")
Type string
// SubType is the second part of the media type ("json")
SubType string
// Serializer is the serialization info this object accepts
Serializer runtime.SerializerInfo
func (emptyEndpointRestrictions) AllowsMediaTypeTransform(mimeType string, mimeSubType string, gvk *schema.GroupVersionKind) bool {
return gvk == nil
}
func (emptyEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (emptyEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
// MediaTypeOptions describes information for a given media type that may alter
// the server response
@@ -178,13 +166,13 @@ type MediaTypeOptions struct {
Unrecognized []string
// the accepted media type from the client
Accepted *AcceptedMediaType
Accepted runtime.SerializerInfo
}
// acceptMediaTypeOptions returns an options object that matches the provided media type params. If
// it returns false, the provided options are not allowed and the media type must be skipped. These
// parameters are unversioned and may not be changed.
func acceptMediaTypeOptions(params map[string]string, accepts *AcceptedMediaType, endpoint EndpointRestrictions) (MediaTypeOptions, bool) {
func acceptMediaTypeOptions(params map[string]string, accepts *runtime.SerializerInfo, endpoint EndpointRestrictions) (MediaTypeOptions, bool) {
var options MediaTypeOptions
// extract all known parameters
@@ -210,7 +198,7 @@ func acceptMediaTypeOptions(params map[string]string, accepts *AcceptedMediaType
// controls the streaming schema
case "stream":
if len(v) > 0 && (accepts.Serializer.StreamSerializer == nil || !endpoint.AllowsStreamSchema(v)) {
if len(v) > 0 && (accepts.StreamSerializer == nil || !endpoint.AllowsStreamSchema(v)) {
return MediaTypeOptions{}, false
}
options.Stream = v
@@ -238,16 +226,16 @@ func acceptMediaTypeOptions(params map[string]string, accepts *AcceptedMediaType
}
}
if options.Convert != nil && !endpoint.AllowsConversion(*options.Convert) {
if !endpoint.AllowsMediaTypeTransform(accepts.MediaTypeType, accepts.MediaTypeSubType, options.Convert) {
return MediaTypeOptions{}, false
}
options.Accepted = accepts
options.Accepted = *accepts
return options, true
}
type candidateMediaType struct {
accepted *AcceptedMediaType
accepted *runtime.SerializerInfo
clauses goautoneg.Accept
}
@@ -255,10 +243,10 @@ type candidateMediaTypeSlice []candidateMediaType
// NegotiateMediaTypeOptions returns the most appropriate content type given the accept header and
// a list of alternatives along with the accepted media type parameters.
func NegotiateMediaTypeOptions(header string, accepted []AcceptedMediaType, endpoint EndpointRestrictions) (MediaTypeOptions, bool) {
func NegotiateMediaTypeOptions(header string, accepted []runtime.SerializerInfo, endpoint EndpointRestrictions) (MediaTypeOptions, bool) {
if len(header) == 0 && len(accepted) > 0 {
return MediaTypeOptions{
Accepted: &accepted[0],
Accepted: accepted[0],
}, true
}
@@ -268,8 +256,8 @@ func NegotiateMediaTypeOptions(header string, accepted []AcceptedMediaType, endp
for i := range accepted {
accepts := &accepted[i]
switch {
case clause.Type == accepts.Type && clause.SubType == accepts.SubType,
clause.Type == accepts.Type && clause.SubType == "*",
case clause.Type == accepts.MediaTypeType && clause.SubType == accepts.MediaTypeSubType,
clause.Type == accepts.MediaTypeType && clause.SubType == "*",
clause.Type == "*" && clause.SubType == "*":
candidates = append(candidates, candidateMediaType{accepted: accepts, clauses: clause})
}
@@ -284,22 +272,3 @@ func NegotiateMediaTypeOptions(header string, accepted []AcceptedMediaType, endp
return MediaTypeOptions{}, false
}
// AcceptedMediaTypesForEndpoint returns an array of structs that are used to efficiently check which
// allowed media types the server exposes.
func AcceptedMediaTypesForEndpoint(ns runtime.NegotiatedSerializer) []AcceptedMediaType {
var acceptedMediaTypes []AcceptedMediaType
for _, info := range ns.SupportedMediaTypes() {
segments := strings.SplitN(info.MediaType, "/", 2)
if len(segments) == 1 {
segments = append(segments, "*")
}
t := AcceptedMediaType{
Type: segments[0],
SubType: segments[1],
Serializer: info,
}
acceptedMediaTypes = append(acceptedMediaTypes, t)
}
return acceptedMediaTypes
}

View File

@@ -23,9 +23,9 @@ import (
"strings"
"time"
"github.com/evanphx/json-patch"
jsonpatch "github.com/evanphx/json-patch"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
@@ -36,15 +36,18 @@ import (
"k8s.io/apimachinery/pkg/util/mergepatch"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utiltrace "k8s.io/apiserver/pkg/util/trace"
utiltrace "k8s.io/utils/trace"
)
const (
@@ -53,10 +56,10 @@ const (
)
// PatchResource returns a function that will handle a resource patch.
func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface, patchTypes []string) http.HandlerFunc {
func PatchResource(r rest.Patcher, scope *RequestScope, admit admission.Interface, patchTypes []string) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// For performance tracking purposes.
trace := utiltrace.New("Patch " + req.URL.Path)
trace := utiltrace.New("Patch", utiltrace.Field{"url", req.URL.Path})
defer trace.LogIfLong(500 * time.Millisecond)
if isDryRun(req.URL) && !utilfeature.DefaultFeatureGate.Enabled(features.DryRun) {
@@ -90,36 +93,48 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface
return
}
ctx := req.Context()
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
patchJS, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
options := &metav1.UpdateOptions{}
patchBytes, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
if err != nil {
scope.err(err, w, req)
return
}
options := &metav1.PatchOptions{}
if err := metainternalversion.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, options); err != nil {
err = errors.NewBadRequest(err.Error())
scope.err(err, w, req)
return
}
if errs := validation.ValidateUpdateOptions(options); len(errs) > 0 {
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "UpdateOptions"}, "", errs)
if errs := validation.ValidatePatchOptions(options, patchType); len(errs) > 0 {
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "PatchOptions"}, "", errs)
scope.err(err, w, req)
return
}
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("PatchOptions"))
ae := request.AuditEventFrom(ctx)
admit = admission.WithAudit(admit, ae)
audit.LogRequestPatch(ae, patchJS)
audit.LogRequestPatch(ae, patchBytes)
trace.Step("Recorded the audit event")
s, ok := runtime.SerializerInfoForMediaType(scope.Serializer.SupportedMediaTypes(), runtime.ContentTypeJSON)
baseContentType := runtime.ContentTypeJSON
if patchType == types.ApplyPatchType {
baseContentType = runtime.ContentTypeYAML
}
s, ok := runtime.SerializerInfoForMediaType(scope.Serializer.SupportedMediaTypes(), baseContentType)
if !ok {
scope.err(fmt.Errorf("no serializer defined for JSON"), w, req)
scope.err(fmt.Errorf("no serializer defined for %v", baseContentType), w, req)
return
}
gv := scope.Kind.GroupVersion()
@@ -130,7 +145,19 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface
)
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttributes := admission.NewAttributesRecord(
staticCreateAttributes := admission.NewAttributesRecord(
nil,
nil,
scope.Kind,
namespace,
name,
scope.Resource,
scope.Subresource,
admission.Create,
patchToCreateOptions(options),
dryrun.IsDryRun(options.DryRun),
userInfo)
staticUpdateAttributes := admission.NewAttributesRecord(
nil,
nil,
scope.Kind,
@@ -139,41 +166,43 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface
scope.Resource,
scope.Subresource,
admission.Update,
patchToUpdateOptions(options),
dryrun.IsDryRun(options.DryRun),
userInfo,
)
admissionCheck := func(updatedObject runtime.Object, currentObject runtime.Object) error {
// if we allow create-on-patch, we have this TODO: call the mutating admission chain with the CREATE verb instead of UPDATE
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && admit.Handles(admission.Update) {
return mutatingAdmission.Admit(admission.NewAttributesRecord(
updatedObject,
currentObject,
scope.Kind,
namespace,
name,
scope.Resource,
scope.Subresource,
admission.Update,
dryrun.IsDryRun(options.DryRun),
userInfo,
))
}
return nil
mutatingAdmission, _ := admit.(admission.MutationInterface)
createAuthorizerAttributes := authorizer.AttributesRecord{
User: userInfo,
ResourceRequest: true,
Path: req.URL.Path,
Verb: "create",
APIGroup: scope.Resource.Group,
APIVersion: scope.Resource.Version,
Resource: scope.Resource.Resource,
Subresource: scope.Subresource,
Namespace: namespace,
Name: name,
}
p := patcher{
namer: scope.Namer,
creater: scope.Creater,
defaulter: scope.Defaulter,
typer: scope.Typer,
unsafeConvertor: scope.UnsafeConvertor,
kind: scope.Kind,
resource: scope.Resource,
subresource: scope.Subresource,
dryRun: dryrun.IsDryRun(options.DryRun),
objectInterfaces: scope,
hubGroupVersion: scope.HubGroupVersion,
createValidation: rest.AdmissionToValidateObjectFunc(admit, staticAdmissionAttributes),
updateValidation: rest.AdmissionToValidateObjectUpdateFunc(admit, staticAdmissionAttributes),
admissionCheck: admissionCheck,
createValidation: withAuthorization(rest.AdmissionToValidateObjectFunc(admit, staticCreateAttributes, scope), scope.Authorizer, createAuthorizerAttributes),
updateValidation: rest.AdmissionToValidateObjectUpdateFunc(admit, staticUpdateAttributes, scope),
admissionCheck: mutatingAdmission,
codec: codec,
@@ -183,35 +212,34 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface
restPatcher: r,
name: name,
patchType: patchType,
patchJS: patchJS,
patchBytes: patchBytes,
userAgent: req.UserAgent(),
trace: trace,
}
result, err := p.patchResource(ctx)
result, wasCreated, err := p.patchResource(ctx, scope)
if err != nil {
scope.err(err, w, req)
return
}
trace.Step("Object stored in database")
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
scope.err(fmt.Errorf("missing requestInfo"), w, req)
return
}
if err := setSelfLink(result, requestInfo, scope.Namer); err != nil {
if err := setObjectSelfLink(ctx, result, req, scope.Namer); err != nil {
scope.err(err, w, req)
return
}
trace.Step("Self-link added")
scope.Trace = trace
transformResponseObject(ctx, scope, req, w, http.StatusOK, result)
status := http.StatusOK
if wasCreated {
status = http.StatusCreated
}
transformResponseObject(ctx, scope, trace, req, w, status, outputMediaType, result)
}
}
type mutateObjectUpdateFunc func(obj, old runtime.Object) error
type mutateObjectUpdateFunc func(ctx context.Context, obj, old runtime.Object) error
// patcher breaks the process of patch application and retries into smaller
// pieces of functionality.
@@ -223,27 +251,33 @@ type patcher struct {
namer ScopeNamer
creater runtime.ObjectCreater
defaulter runtime.ObjectDefaulter
typer runtime.ObjectTyper
unsafeConvertor runtime.ObjectConvertor
resource schema.GroupVersionResource
kind schema.GroupVersionKind
subresource string
dryRun bool
objectInterfaces admission.ObjectInterfaces
hubGroupVersion schema.GroupVersion
// Validation functions
createValidation rest.ValidateObjectFunc
updateValidation rest.ValidateObjectUpdateFunc
admissionCheck mutateObjectUpdateFunc
admissionCheck admission.MutationInterface
codec runtime.Codec
timeout time.Duration
options *metav1.UpdateOptions
options *metav1.PatchOptions
// Operation information
restPatcher rest.Patcher
name string
patchType types.PatchType
patchJS []byte
patchBytes []byte
userAgent string
trace *utiltrace.Trace
@@ -251,14 +285,18 @@ type patcher struct {
namespace string
updatedObjectInfo rest.UpdatedObjectInfo
mechanism patchMechanism
forceAllowCreate bool
}
type patchMechanism interface {
applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error)
createNewObject() (runtime.Object, error)
}
type jsonPatcher struct {
*patcher
fieldManager *fieldmanager.FieldManager
}
func (p *jsonPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) {
@@ -277,18 +315,38 @@ func (p *jsonPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (r
// Construct the resulting typed, unversioned object.
objToUpdate := p.restPatcher.New()
if err := runtime.DecodeInto(p.codec, patchedObjJS, objToUpdate); err != nil {
return nil, err
return nil, errors.NewInvalid(schema.GroupKind{}, "", field.ErrorList{
field.Invalid(field.NewPath("patch"), string(patchedObjJS), err.Error()),
})
}
if p.fieldManager != nil {
if objToUpdate, err = p.fieldManager.Update(currentObject, objToUpdate, managerOrUserAgent(p.options.FieldManager, p.userAgent)); err != nil {
return nil, fmt.Errorf("failed to update object (json PATCH for %v) managed fields: %v", p.kind, err)
}
}
return objToUpdate, nil
}
// patchJS applies the patch. Input and output objects must both have
func (p *jsonPatcher) createNewObject() (runtime.Object, error) {
return nil, errors.NewNotFound(p.resource.GroupResource(), p.name)
}
// applyJSPatch applies the patch. Input and output objects must both have
// the external version, since that is what the patch must have been constructed against.
func (p *jsonPatcher) applyJSPatch(versionedJS []byte) (patchedJS []byte, retErr error) {
switch p.patchType {
case types.JSONPatchType:
patchObj, err := jsonpatch.DecodePatch(p.patchJS)
// sanity check potentially abusive patches
// TODO(liggitt): drop this once golang json parser limits stack depth (https://github.com/golang/go/issues/31789)
if len(p.patchBytes) > 1024*1024 {
v := []interface{}{}
if err := json.Unmarshal(p.patchBytes, &v); err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error decoding patch: %v", err))
}
}
patchObj, err := jsonpatch.DecodePatch(p.patchBytes)
if err != nil {
return nil, errors.NewBadRequest(err.Error())
}
@@ -303,7 +361,16 @@ func (p *jsonPatcher) applyJSPatch(versionedJS []byte) (patchedJS []byte, retErr
}
return patchedJS, nil
case types.MergePatchType:
return jsonpatch.MergePatch(versionedJS, p.patchJS)
// sanity check potentially abusive patches
// TODO(liggitt): drop this once golang json parser limits stack depth (https://github.com/golang/go/issues/31789)
if len(p.patchBytes) > 1024*1024 {
v := map[string]interface{}{}
if err := json.Unmarshal(p.patchBytes, &v); err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error decoding patch: %v", err))
}
}
return jsonpatch.MergePatch(versionedJS, p.patchBytes)
default:
// only here as a safety net - go-restful filters content-type
return nil, fmt.Errorf("unknown Content-Type header for patch: %v", p.patchType)
@@ -315,6 +382,7 @@ type smpPatcher struct {
// Schema
schemaReferenceObj runtime.Object
fieldManager *fieldmanager.FieldManager
}
func (p *smpPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) {
@@ -328,22 +396,63 @@ func (p *smpPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (ru
if err != nil {
return nil, err
}
if err := strategicPatchObject(p.defaulter, currentVersionedObject, p.patchJS, versionedObjToUpdate, p.schemaReferenceObj); err != nil {
if err := strategicPatchObject(p.defaulter, currentVersionedObject, p.patchBytes, versionedObjToUpdate, p.schemaReferenceObj); err != nil {
return nil, err
}
// Convert the object back to the hub version
return p.unsafeConvertor.ConvertToVersion(versionedObjToUpdate, p.hubGroupVersion)
newObj, err := p.unsafeConvertor.ConvertToVersion(versionedObjToUpdate, p.hubGroupVersion)
if err != nil {
return nil, err
}
if p.fieldManager != nil {
if newObj, err = p.fieldManager.Update(currentObject, newObj, managerOrUserAgent(p.options.FieldManager, p.userAgent)); err != nil {
return nil, fmt.Errorf("failed to update object (smp PATCH for %v) managed fields: %v", p.kind, err)
}
}
return newObj, nil
}
// strategicPatchObject applies a strategic merge patch of <patchJS> to
func (p *smpPatcher) createNewObject() (runtime.Object, error) {
return nil, errors.NewNotFound(p.resource.GroupResource(), p.name)
}
type applyPatcher struct {
patch []byte
options *metav1.PatchOptions
creater runtime.ObjectCreater
kind schema.GroupVersionKind
fieldManager *fieldmanager.FieldManager
}
func (p *applyPatcher) applyPatchToCurrentObject(obj runtime.Object) (runtime.Object, error) {
force := false
if p.options.Force != nil {
force = *p.options.Force
}
if p.fieldManager == nil {
panic("FieldManager must be installed to run apply")
}
return p.fieldManager.Apply(obj, p.patch, p.options.FieldManager, force)
}
func (p *applyPatcher) createNewObject() (runtime.Object, error) {
obj, err := p.creater.New(p.kind)
if err != nil {
return nil, fmt.Errorf("failed to create new object: %v", err)
}
return p.applyPatchToCurrentObject(obj)
}
// strategicPatchObject applies a strategic merge patch of <patchBytes> to
// <originalObject> and stores the result in <objToUpdate>.
// It additionally returns the map[string]interface{} representation of the
// <originalObject> and <patchJS>.
// <originalObject> and <patchBytes>.
// NOTE: Both <originalObject> and <objToUpdate> are supposed to be versioned.
func strategicPatchObject(
defaulter runtime.ObjectDefaulter,
originalObject runtime.Object,
patchJS []byte,
patchBytes []byte,
objToUpdate runtime.Object,
schemaReferenceObj runtime.Object,
) error {
@@ -353,7 +462,7 @@ func strategicPatchObject(
}
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
if err := json.Unmarshal(patchBytes, &patchMap); err != nil {
return errors.NewBadRequest(err.Error())
}
@@ -365,52 +474,113 @@ func strategicPatchObject(
// applyPatch is called every time GuaranteedUpdate asks for the updated object,
// and is given the currently persisted object as input.
func (p *patcher) applyPatch(_ context.Context, _, currentObject runtime.Object) (runtime.Object, error) {
// TODO: rename this function because the name implies it is related to applyPatcher
func (p *patcher) applyPatch(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
// Make sure we actually have a persisted currentObject
p.trace.Step("About to apply patch")
if hasUID, err := hasUID(currentObject); err != nil {
currentObjectHasUID, err := hasUID(currentObject)
if err != nil {
return nil, err
} else if !hasUID {
return nil, errors.NewNotFound(p.resource.GroupResource(), p.name)
} else if !currentObjectHasUID {
objToUpdate, patchErr = p.mechanism.createNewObject()
} else {
objToUpdate, patchErr = p.mechanism.applyPatchToCurrentObject(currentObject)
}
objToUpdate, err := p.mechanism.applyPatchToCurrentObject(currentObject)
if patchErr != nil {
return nil, patchErr
}
objToUpdateHasUID, err := hasUID(objToUpdate)
if err != nil {
return nil, err
}
if objToUpdateHasUID && !currentObjectHasUID {
accessor, err := meta.Accessor(objToUpdate)
if err != nil {
return nil, err
}
return nil, errors.NewConflict(p.resource.GroupResource(), p.name, fmt.Errorf("uid mismatch: the provided object specified uid %s, and no existing object was found", accessor.GetUID()))
}
if err := checkName(objToUpdate, p.name, p.namespace, p.namer); err != nil {
return nil, err
}
return objToUpdate, nil
}
func (p *patcher) admissionAttributes(ctx context.Context, updatedObject runtime.Object, currentObject runtime.Object, operation admission.Operation, operationOptions runtime.Object) admission.Attributes {
userInfo, _ := request.UserFrom(ctx)
return admission.NewAttributesRecord(updatedObject, currentObject, p.kind, p.namespace, p.name, p.resource, p.subresource, operation, operationOptions, p.dryRun, userInfo)
}
// applyAdmission is called every time GuaranteedUpdate asks for the updated object,
// and is given the currently persisted object and the patched object as input.
// TODO: rename this function because the name implies it is related to applyPatcher
func (p *patcher) applyAdmission(ctx context.Context, patchedObject runtime.Object, currentObject runtime.Object) (runtime.Object, error) {
p.trace.Step("About to check admission control")
return patchedObject, p.admissionCheck(patchedObject, currentObject)
var operation admission.Operation
var options runtime.Object
if hasUID, err := hasUID(currentObject); err != nil {
return nil, err
} else if !hasUID {
operation = admission.Create
currentObject = nil
options = patchToCreateOptions(p.options)
} else {
operation = admission.Update
options = patchToUpdateOptions(p.options)
}
if p.admissionCheck != nil && p.admissionCheck.Handles(operation) {
attributes := p.admissionAttributes(ctx, patchedObject, currentObject, operation, options)
return patchedObject, p.admissionCheck.Admit(ctx, attributes, p.objectInterfaces)
}
return patchedObject, nil
}
// patchResource divides PatchResource for easier unit testing
func (p *patcher) patchResource(ctx context.Context) (runtime.Object, error) {
func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runtime.Object, bool, error) {
p.namespace = request.NamespaceValue(ctx)
switch p.patchType {
case types.JSONPatchType, types.MergePatchType:
p.mechanism = &jsonPatcher{patcher: p}
p.mechanism = &jsonPatcher{
patcher: p,
fieldManager: scope.FieldManager,
}
case types.StrategicMergePatchType:
schemaReferenceObj, err := p.unsafeConvertor.ConvertToVersion(p.restPatcher.New(), p.kind.GroupVersion())
if err != nil {
return nil, err
return nil, false, err
}
p.mechanism = &smpPatcher{patcher: p, schemaReferenceObj: schemaReferenceObj}
p.mechanism = &smpPatcher{
patcher: p,
schemaReferenceObj: schemaReferenceObj,
fieldManager: scope.FieldManager,
}
// this case is unreachable if ServerSideApply is not enabled because we will have already rejected the content type
case types.ApplyPatchType:
p.mechanism = &applyPatcher{
fieldManager: scope.FieldManager,
patch: p.patchBytes,
options: p.options,
creater: p.creater,
kind: p.kind,
}
p.forceAllowCreate = true
default:
return nil, fmt.Errorf("%v: unimplemented patch type", p.patchType)
return nil, false, fmt.Errorf("%v: unimplemented patch type", p.patchType)
}
wasCreated := false
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission)
return finishRequest(p.timeout, func() (runtime.Object, error) {
updateObject, _, updateErr := p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation, false, p.options)
result, err := finishRequest(p.timeout, func() (runtime.Object, error) {
// Pass in UpdateOptions to override UpdateStrategy.AllowUpdateOnCreate
options := patchToUpdateOptions(p.options)
updateObject, created, updateErr := p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation, p.forceAllowCreate, options)
wasCreated = created
return updateObject, updateErr
})
return result, wasCreated, err
}
// applyPatchToObject applies a strategic merge patch of <patchMap> to
@@ -430,7 +600,9 @@ func applyPatchToObject(
// Rather than serialize the patched map to JSON, then decode it to an object, we go directly from a map to an object
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(patchedObjMap, objToUpdate); err != nil {
return err
return errors.NewInvalid(schema.GroupKind{}, "", field.ErrorList{
field.Invalid(field.NewPath("patch"), fmt.Sprintf("%+v", patchMap), err.Error()),
})
}
// Decoding from JSON to a versioned object would apply defaults, so we do the same here
defaulter.Default(objToUpdate)
@@ -449,3 +621,29 @@ func interpretStrategicMergePatchError(err error) error {
return err
}
}
// patchToUpdateOptions creates an UpdateOptions with the same field values as the provided PatchOptions.
func patchToUpdateOptions(po *metav1.PatchOptions) *metav1.UpdateOptions {
if po == nil {
return nil
}
uo := &metav1.UpdateOptions{
DryRun: po.DryRun,
FieldManager: po.FieldManager,
}
uo.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("UpdateOptions"))
return uo
}
// patchToCreateOptions creates an CreateOptions with the same field values as the provided PatchOptions.
func patchToCreateOptions(po *metav1.PatchOptions) *metav1.CreateOptions {
if po == nil {
return nil
}
co := &metav1.CreateOptions{
DryRun: po.DryRun,
FieldManager: po.FieldManager,
}
co.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions"))
return co
}

View File

@@ -26,161 +26,97 @@ import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
utiltrace "k8s.io/utils/trace"
)
// transformObject takes the object as returned by storage and ensures it is in
// the client's desired form, as well as ensuring any API level fields like self-link
// are properly set.
func transformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope *RequestScope, req *http.Request) (runtime.Object, error) {
if _, ok := obj.(*metav1.Status); ok {
return obj, nil
}
if err := setObjectSelfLink(ctx, obj, req, scope.Namer); err != nil {
return nil, err
}
switch target := mediaType.Convert; {
case target == nil:
return obj, nil
case target.Kind == "PartialObjectMetadata":
return asPartialObjectMetadata(obj, target.GroupVersion())
case target.Kind == "PartialObjectMetadataList":
return asPartialObjectMetadataList(obj, target.GroupVersion())
case target.Kind == "Table":
options, ok := opts.(*metav1beta1.TableOptions)
if !ok {
return nil, fmt.Errorf("unexpected TableOptions, got %T", opts)
}
return asTable(ctx, obj, options, scope, target.GroupVersion())
default:
accepted, _ := negotiation.MediaTypesForSerializer(metainternalversion.Codecs)
err := negotiation.NewNotAcceptableError(accepted)
return nil, err
}
}
// optionsForTransform will load and validate any additional query parameter options for
// a conversion or return an error.
func optionsForTransform(mediaType negotiation.MediaTypeOptions, req *http.Request) (interface{}, error) {
switch target := mediaType.Convert; {
case target == nil:
case target.Kind == "Table" && (target.GroupVersion() == metav1beta1.SchemeGroupVersion || target.GroupVersion() == metav1.SchemeGroupVersion):
opts := &metav1beta1.TableOptions{}
if err := metav1beta1.ParameterCodec.DecodeParameters(req.URL.Query(), metav1beta1.SchemeGroupVersion, opts); err != nil {
return nil, err
}
switch errs := validation.ValidateTableOptions(opts); len(errs) {
case 0:
return opts, nil
case 1:
return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs[0].Error()))
default:
return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs))
}
}
return nil, nil
}
// targetEncodingForTransform returns the appropriate serializer for the input media type
func targetEncodingForTransform(scope *RequestScope, mediaType negotiation.MediaTypeOptions, req *http.Request) (schema.GroupVersionKind, runtime.NegotiatedSerializer, bool) {
switch target := mediaType.Convert; {
case target == nil:
case (target.Kind == "PartialObjectMetadata" || target.Kind == "PartialObjectMetadataList" || target.Kind == "Table") &&
(target.GroupVersion() == metav1beta1.SchemeGroupVersion || target.GroupVersion() == metav1.SchemeGroupVersion):
return *target, metainternalversion.Codecs, true
}
return scope.Kind, scope.Serializer, false
}
// transformResponseObject takes an object loaded from storage and performs any necessary transformations.
// Will write the complete response object.
func transformResponseObject(ctx context.Context, scope RequestScope, req *http.Request, w http.ResponseWriter, statusCode int, result runtime.Object) {
// TODO: fetch the media type much earlier in request processing and pass it into this method.
trace := scope.Trace
mediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, &scope)
func transformResponseObject(ctx context.Context, scope *RequestScope, trace *utiltrace.Trace, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) {
options, err := optionsForTransform(mediaType, req)
if err != nil {
status := responsewriters.ErrorToAPIStatus(err)
trace.Step("Writing raw JSON response")
responsewriters.WriteRawJSON(int(status.Code), status, w)
scope.err(err, w, req)
return
}
// If conversion was allowed by the scope, perform it before writing the response
if target := mediaType.Convert; target != nil {
switch {
case target.Kind == "PartialObjectMetadata" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
if meta.IsListType(result) {
// TODO: this should be calculated earlier
err = newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadata, but the requested object is a list (%T)", result))
scope.err(err, w, req)
return
}
m, err := meta.Accessor(result)
if err != nil {
scope.err(err, w, req)
return
}
partial := meta.AsPartialObjectMetadata(m)
partial.GetObjectKind().SetGroupVersionKind(metav1beta1.SchemeGroupVersion.WithKind("PartialObjectMetadata"))
// renegotiate under the internal version
_, info, err := negotiation.NegotiateOutputMediaType(req, metainternalversion.Codecs, &scope)
if err != nil {
scope.err(err, w, req)
return
}
encoder := metainternalversion.Codecs.EncoderForVersion(info.Serializer, metav1beta1.SchemeGroupVersion)
trace.Step(fmt.Sprintf("Serializing response as type %s", info.MediaType))
responsewriters.SerializeObject(info.MediaType, encoder, w, req, statusCode, partial)
return
case target.Kind == "PartialObjectMetadataList" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
if !meta.IsListType(result) {
// TODO: this should be calculated earlier
err = newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadataList, but the requested object is not a list (%T)", result))
scope.err(err, w, req)
return
}
list := &metav1beta1.PartialObjectMetadataList{}
trace.Step("Processing list items")
err := meta.EachListItem(result, func(obj runtime.Object) error {
m, err := meta.Accessor(obj)
if err != nil {
return err
}
partial := meta.AsPartialObjectMetadata(m)
partial.GetObjectKind().SetGroupVersionKind(metav1beta1.SchemeGroupVersion.WithKind("PartialObjectMetadata"))
list.Items = append(list.Items, partial)
return nil
})
if err != nil {
scope.err(err, w, req)
return
}
// renegotiate under the internal version
_, info, err := negotiation.NegotiateOutputMediaType(req, metainternalversion.Codecs, &scope)
if err != nil {
scope.err(err, w, req)
return
}
encoder := metainternalversion.Codecs.EncoderForVersion(info.Serializer, metav1beta1.SchemeGroupVersion)
trace.Step(fmt.Sprintf("Serializing response as type %s", info.MediaType))
responsewriters.SerializeObject(info.MediaType, encoder, w, req, statusCode, list)
return
case target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
// TODO: relax the version abstraction
// TODO: skip if this is a status response (delete without body)?
opts := &metav1beta1.TableOptions{}
trace.Step("Decoding parameters")
if err := metav1beta1.ParameterCodec.DecodeParameters(req.URL.Query(), metav1beta1.SchemeGroupVersion, opts); err != nil {
scope.err(err, w, req)
return
}
trace.Step("Converting to table")
table, err := scope.TableConvertor.ConvertToTable(ctx, result, opts)
if err != nil {
scope.err(err, w, req)
return
}
trace.Step("Processing rows")
for i := range table.Rows {
item := &table.Rows[i]
switch opts.IncludeObject {
case metav1beta1.IncludeObject:
item.Object.Object, err = scope.Convertor.ConvertToVersion(item.Object.Object, scope.Kind.GroupVersion())
if err != nil {
scope.err(err, w, req)
return
}
// TODO: rely on defaulting for the value here?
case metav1beta1.IncludeMetadata, "":
m, err := meta.Accessor(item.Object.Object)
if err != nil {
scope.err(err, w, req)
return
}
// TODO: turn this into an internal type and do conversion in order to get object kind automatically set?
partial := meta.AsPartialObjectMetadata(m)
partial.GetObjectKind().SetGroupVersionKind(metav1beta1.SchemeGroupVersion.WithKind("PartialObjectMetadata"))
item.Object.Object = partial
case metav1beta1.IncludeNone:
item.Object.Object = nil
default:
// TODO: move this to validation on the table options?
err = errors.NewBadRequest(fmt.Sprintf("unrecognized includeObject value: %q", opts.IncludeObject))
scope.err(err, w, req)
}
}
// renegotiate under the internal version
_, info, err := negotiation.NegotiateOutputMediaType(req, metainternalversion.Codecs, &scope)
if err != nil {
scope.err(err, w, req)
return
}
encoder := metainternalversion.Codecs.EncoderForVersion(info.Serializer, metav1beta1.SchemeGroupVersion)
trace.Step(fmt.Sprintf("Serializing response as type %s", info.MediaType))
responsewriters.SerializeObject(info.MediaType, encoder, w, req, statusCode, table)
return
default:
// this block should only be hit if scope AllowsConversion is incorrect
accepted, _ := negotiation.MediaTypesForSerializer(metainternalversion.Codecs)
err := negotiation.NewNotAcceptableError(accepted)
status := responsewriters.ErrorToAPIStatus(err)
trace.Step("Writing raw JSON response")
responsewriters.WriteRawJSON(int(status.Code), status, w)
return
}
obj, err := transformObject(ctx, result, options, mediaType, scope, req)
if err != nil {
scope.err(err, w, req)
return
}
trace.Step("Writing response")
responsewriters.WriteObject(statusCode, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
kind, serializer, _ := targetEncodingForTransform(scope, mediaType, req)
responsewriters.WriteObjectNegotiated(serializer, scope, kind.GroupVersion(), w, req, statusCode, obj)
}
// errNotAcceptable indicates Accept negotiation has failed
@@ -204,3 +140,118 @@ func (e errNotAcceptable) Status() metav1.Status {
Message: e.Error(),
}
}
func asTable(ctx context.Context, result runtime.Object, opts *metav1beta1.TableOptions, scope *RequestScope, groupVersion schema.GroupVersion) (runtime.Object, error) {
switch groupVersion {
case metav1beta1.SchemeGroupVersion, metav1.SchemeGroupVersion:
default:
return nil, newNotAcceptableError(fmt.Sprintf("no Table exists in group version %s", groupVersion))
}
obj, err := scope.TableConvertor.ConvertToTable(ctx, result, opts)
if err != nil {
return nil, err
}
table := (*metav1.Table)(obj)
for i := range table.Rows {
item := &table.Rows[i]
switch opts.IncludeObject {
case metav1.IncludeObject:
item.Object.Object, err = scope.Convertor.ConvertToVersion(item.Object.Object, scope.Kind.GroupVersion())
if err != nil {
return nil, err
}
// TODO: rely on defaulting for the value here?
case metav1.IncludeMetadata, "":
m, err := meta.Accessor(item.Object.Object)
if err != nil {
return nil, err
}
// TODO: turn this into an internal type and do conversion in order to get object kind automatically set?
partial := meta.AsPartialObjectMetadata(m)
partial.GetObjectKind().SetGroupVersionKind(groupVersion.WithKind("PartialObjectMetadata"))
item.Object.Object = partial
case metav1.IncludeNone:
item.Object.Object = nil
default:
err = errors.NewBadRequest(fmt.Sprintf("unrecognized includeObject value: %q", opts.IncludeObject))
return nil, err
}
}
return table, nil
}
func asPartialObjectMetadata(result runtime.Object, groupVersion schema.GroupVersion) (runtime.Object, error) {
if meta.IsListType(result) {
err := newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadata, but the requested object is a list (%T)", result))
return nil, err
}
switch groupVersion {
case metav1beta1.SchemeGroupVersion, metav1.SchemeGroupVersion:
default:
return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion))
}
m, err := meta.Accessor(result)
if err != nil {
return nil, err
}
partial := meta.AsPartialObjectMetadata(m)
partial.GetObjectKind().SetGroupVersionKind(groupVersion.WithKind("PartialObjectMetadata"))
return partial, nil
}
func asPartialObjectMetadataList(result runtime.Object, groupVersion schema.GroupVersion) (runtime.Object, error) {
li, ok := result.(metav1.ListInterface)
if !ok {
return nil, newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadataList, but the requested object is not a list (%T)", result))
}
gvk := groupVersion.WithKind("PartialObjectMetadata")
switch {
case groupVersion == metav1beta1.SchemeGroupVersion:
list := &metav1beta1.PartialObjectMetadataList{}
err := meta.EachListItem(result, func(obj runtime.Object) error {
m, err := meta.Accessor(obj)
if err != nil {
return err
}
partial := meta.AsPartialObjectMetadata(m)
partial.GetObjectKind().SetGroupVersionKind(gvk)
list.Items = append(list.Items, *partial)
return nil
})
if err != nil {
return nil, err
}
list.SelfLink = li.GetSelfLink()
list.ResourceVersion = li.GetResourceVersion()
list.Continue = li.GetContinue()
return list, nil
case groupVersion == metav1.SchemeGroupVersion:
list := &metav1.PartialObjectMetadataList{}
err := meta.EachListItem(result, func(obj runtime.Object) error {
m, err := meta.Accessor(obj)
if err != nil {
return err
}
partial := meta.AsPartialObjectMetadata(m)
partial.GetObjectKind().SetGroupVersionKind(gvk)
list.Items = append(list.Items, *partial)
return nil
})
if err != nil {
return nil, err
}
list.SelfLink = li.GetSelfLink()
list.ResourceVersion = li.GetResourceVersion()
list.Continue = li.GetContinue()
return list, nil
default:
return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion))
}
}

View File

@@ -38,11 +38,18 @@ func ErrorToAPIStatus(err error) *metav1.Status {
if len(status.Status) == 0 {
status.Status = metav1.StatusFailure
}
if status.Code == 0 {
switch status.Status {
case metav1.StatusSuccess:
switch status.Status {
case metav1.StatusSuccess:
if status.Code == 0 {
status.Code = http.StatusOK
case metav1.StatusFailure:
}
case metav1.StatusFailure:
if status.Code == 0 {
status.Code = http.StatusInternalServerError
}
default:
runtime.HandleError(fmt.Errorf("apiserver received an error with wrong status field : %#+v", err))
if status.Code == 0 {
status.Code = http.StatusInternalServerError
}
}

View File

@@ -17,11 +17,17 @@ limitations under the License.
package responsewriters
import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"sync"
"k8s.io/apiserver/pkg/features"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -31,47 +37,11 @@ import (
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/flushwriter"
"k8s.io/apiserver/pkg/util/wsstream"
)
// httpResponseWriterWithInit wraps http.ResponseWriter, and implements the io.Writer interface to be used
// with encoding. The purpose is to allow for encoding to a stream, while accommodating a custom HTTP status code
// if encoding fails, and meeting the encoder's io.Writer interface requirement.
type httpResponseWriterWithInit struct {
hasWritten bool
mediaType string
statusCode int
innerW http.ResponseWriter
}
func (w httpResponseWriterWithInit) Write(b []byte) (n int, err error) {
if !w.hasWritten {
w.innerW.Header().Set("Content-Type", w.mediaType)
w.innerW.WriteHeader(w.statusCode)
w.hasWritten = true
}
return w.innerW.Write(b)
}
// WriteObject renders a returned runtime.Object to the response as a stream or an encoded object. If the object
// returned by the response implements rest.ResourceStreamer that interface will be used to render the
// response. The Accept header and current API version will be passed in, and the output will be copied
// directly to the response body. If content type is returned it is used, otherwise the content type will
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
stream, ok := object.(rest.ResourceStreamer)
if ok {
requestInfo, _ := request.RequestInfoFrom(req.Context())
metrics.RecordLongRunning(req, requestInfo, func() {
StreamObject(statusCode, gv, s, stream, w, req)
})
return
}
WriteObjectNegotiated(s, gv, w, req, statusCode, object)
}
// StreamObject performs input stream negotiation from a ResourceStreamer and writes that to the response.
// If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many
// browser clients cannot easily handle binary streaming protocols).
@@ -101,6 +71,10 @@ func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSe
}
w.Header().Set("Content-Type", contentType)
w.WriteHeader(statusCode)
// Flush headers, if possible
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
writer := w.(io.Writer)
if flush {
writer = flushwriter.Wrap(w)
@@ -109,19 +83,154 @@ func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSe
}
// SerializeObject renders an object in the content type negotiated by the client using the provided encoder.
// The context is optional and can be nil.
func SerializeObject(mediaType string, encoder runtime.Encoder, innerW http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
w := httpResponseWriterWithInit{mediaType: mediaType, innerW: innerW, statusCode: statusCode}
if err := encoder.Encode(object, w); err != nil {
errSerializationFatal(err, encoder, w)
// The context is optional and can be nil. This method will perform optional content compression if requested by
// a client and the feature gate for APIResponseCompression is enabled.
func SerializeObject(mediaType string, encoder runtime.Encoder, hw http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
w := &deferredResponseWriter{
mediaType: mediaType,
statusCode: statusCode,
contentEncoding: negotiateContentEncoding(req),
hw: hw,
}
err := encoder.Encode(object, w)
if err == nil {
err = w.Close()
if err == nil {
return
}
}
// make a best effort to write the object if a failure is detected
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
status := ErrorToAPIStatus(err)
candidateStatusCode := int(status.Code)
// if the current status code is successful, allow the error's status code to overwrite it
if statusCode >= http.StatusOK && statusCode < http.StatusBadRequest {
w.statusCode = candidateStatusCode
}
output, err := runtime.Encode(encoder, status)
if err != nil {
w.mediaType = "text/plain"
output = []byte(fmt.Sprintf("%s: %s", status.Reason, status.Message))
}
if _, err := w.Write(output); err != nil {
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a fallback JSON response: %v", err))
}
w.Close()
}
var gzipPool = &sync.Pool{
New: func() interface{} {
gw, err := gzip.NewWriterLevel(nil, defaultGzipContentEncodingLevel)
if err != nil {
panic(err)
}
return gw
},
}
const (
// defaultGzipContentEncodingLevel is set to 4 which uses less CPU than the default level
defaultGzipContentEncodingLevel = 4
// defaultGzipThresholdBytes is compared to the size of the first write from the stream
// (usually the entire object), and if the size is smaller no gzipping will be performed
// if the client requests it.
defaultGzipThresholdBytes = 128 * 1024
)
// negotiateContentEncoding returns a supported client-requested content encoding for the
// provided request. It will return the empty string if no supported content encoding was
// found or if response compression is disabled.
func negotiateContentEncoding(req *http.Request) string {
encoding := req.Header.Get("Accept-Encoding")
if len(encoding) == 0 {
return ""
}
if !utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression) {
return ""
}
for len(encoding) > 0 {
var token string
if next := strings.Index(encoding, ","); next != -1 {
token = encoding[:next]
encoding = encoding[next+1:]
} else {
token = encoding
encoding = ""
}
switch strings.TrimSpace(token) {
case "gzip":
return "gzip"
}
}
return ""
}
type deferredResponseWriter struct {
mediaType string
statusCode int
contentEncoding string
hasWritten bool
hw http.ResponseWriter
w io.Writer
}
func (w *deferredResponseWriter) Write(p []byte) (n int, err error) {
if w.hasWritten {
return w.w.Write(p)
}
w.hasWritten = true
hw := w.hw
header := hw.Header()
switch {
case w.contentEncoding == "gzip" && len(p) > defaultGzipThresholdBytes:
header.Set("Content-Encoding", "gzip")
header.Add("Vary", "Accept-Encoding")
gw := gzipPool.Get().(*gzip.Writer)
gw.Reset(hw)
w.w = gw
default:
w.w = hw
}
header.Set("Content-Type", w.mediaType)
hw.WriteHeader(w.statusCode)
return w.w.Write(p)
}
func (w *deferredResponseWriter) Close() error {
if !w.hasWritten {
return nil
}
var err error
switch t := w.w.(type) {
case *gzip.Writer:
err = t.Close()
t.Reset(nil)
gzipPool.Put(t)
}
return err
}
var nopCloser = ioutil.NopCloser(nil)
// WriteObjectNegotiated renders an object in the content type negotiated by the client.
// The context is optional and can be nil.
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
serializer, err := negotiation.NegotiateOutputSerializer(req, s)
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiation.EndpointRestrictions, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
stream, ok := object.(rest.ResourceStreamer)
if ok {
requestInfo, _ := request.RequestInfoFrom(req.Context())
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
StreamObject(statusCode, gv, s, stream, w, req)
})
return
}
_, serializer, err := negotiation.NegotiateOutputMediaType(req, s, restrictions)
if err != nil {
// if original statusCode was not successful we need to return the original error
// we cannot hide it behind negotiation problems
@@ -158,29 +267,10 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV
return code
}
WriteObjectNegotiated(s, gv, w, req, code, status)
WriteObjectNegotiated(s, negotiation.DefaultEndpointRestrictions, gv, w, req, code, status)
return code
}
// errSerializationFatal renders an error to the response, and if codec fails will render plaintext.
// Returns the HTTP status code of the error.
func errSerializationFatal(err error, codec runtime.Encoder, w httpResponseWriterWithInit) {
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
status := ErrorToAPIStatus(err)
candidateStatusCode := int(status.Code)
// If original statusCode was not successful, we need to return the original error.
// We cannot hide it behind serialization problems
if w.statusCode >= http.StatusOK && w.statusCode < http.StatusBadRequest {
w.statusCode = candidateStatusCode
}
output, err := runtime.Encode(codec, status)
if err != nil {
w.mediaType = "text/plain"
output = []byte(fmt.Sprintf("%s: %s", status.Reason, status.Message))
}
w.Write(output)
}
// WriteRawJSON writes a non-API object in JSON.
func WriteRawJSON(statusCode int, object interface{}, w http.ResponseWriter) {
output, err := json.MarshalIndent(object, "", " ")

View File

@@ -25,11 +25,8 @@ import (
"net/http"
"net/url"
goruntime "runtime"
"strings"
"time"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -38,12 +35,14 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
utiltrace "k8s.io/apiserver/pkg/util/trace"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
)
// RequestScope encapsulates common fields across all RESTful handler methods.
@@ -53,16 +52,22 @@ type RequestScope struct {
Serializer runtime.NegotiatedSerializer
runtime.ParameterCodec
// StandardSerializers, if set, restricts which serializers can be used when
// we aren't transforming the output (into Table or PartialObjectMetadata).
// Used only by CRDs which do not yet support Protobuf.
StandardSerializers []runtime.SerializerInfo
Creater runtime.ObjectCreater
Convertor runtime.ObjectConvertor
Defaulter runtime.ObjectDefaulter
Typer runtime.ObjectTyper
UnsafeConvertor runtime.ObjectConvertor
Authorizer authorizer.Authorizer
Trace *utiltrace.Trace
EquivalentResourceMapper runtime.EquivalentResourceMapper
TableConvertor rest.TableConvertor
OpenAPIModels openapiproto.Models
FieldManager *fieldmanager.FieldManager
Resource schema.GroupVersionResource
Kind schema.GroupVersionKind
@@ -80,12 +85,28 @@ func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Reque
responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
}
func (scope *RequestScope) AllowsConversion(gvk schema.GroupVersionKind) bool {
func (scope *RequestScope) AllowsMediaTypeTransform(mimeType, mimeSubType string, gvk *schema.GroupVersionKind) bool {
// some handlers like CRDs can't serve all the mime types that PartialObjectMetadata or Table can - if
// gvk is nil (no conversion) allow StandardSerializers to further restrict the set of mime types.
if gvk == nil {
if len(scope.StandardSerializers) == 0 {
return true
}
for _, info := range scope.StandardSerializers {
if info.MediaTypeType == mimeType && info.MediaTypeSubType == mimeSubType {
return true
}
}
return false
}
// TODO: this is temporary, replace with an abstraction calculated at endpoint installation time
if gvk.GroupVersion() == metav1beta1.SchemeGroupVersion {
if gvk.GroupVersion() == metav1beta1.SchemeGroupVersion || gvk.GroupVersion() == metav1.SchemeGroupVersion {
switch gvk.Kind {
case "Table":
return scope.TableConvertor != nil
return scope.TableConvertor != nil &&
mimeType == "application" &&
(mimeSubType == "json" || mimeSubType == "yaml")
case "PartialObjectMetadata", "PartialObjectMetadataList":
// TODO: should delineate between lists and non-list endpoints
return true
@@ -104,8 +125,18 @@ func (scope *RequestScope) AllowsStreamSchema(s string) bool {
return s == "watch"
}
var _ admission.ObjectInterfaces = &RequestScope{}
func (r *RequestScope) GetObjectCreater() runtime.ObjectCreater { return r.Creater }
func (r *RequestScope) GetObjectTyper() runtime.ObjectTyper { return r.Typer }
func (r *RequestScope) GetObjectDefaulter() runtime.ObjectDefaulter { return r.Defaulter }
func (r *RequestScope) GetObjectConvertor() runtime.ObjectConvertor { return r.Convertor }
func (r *RequestScope) GetEquivalentResourceMapper() runtime.EquivalentResourceMapper {
return r.EquivalentResourceMapper
}
// ConnectResource returns a function that handles a connect request on a rest.Storage object.
func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admission.Interface, restPath string, isSubresource bool) http.HandlerFunc {
func ConnectResource(connecter rest.Connecter, scope *RequestScope, admit admission.Interface, restPath string, isSubresource bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
if isDryRun(req.URL) {
scope.err(errors.NewBadRequest("dryRun is not supported"), w, req)
@@ -132,14 +163,14 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi
userInfo, _ := request.UserFrom(ctx)
// TODO: remove the mutating admission here as soon as we have ported all plugin that handle CONNECT
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok {
err = mutatingAdmission.Admit(admission.NewAttributesRecord(opts, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, false, userInfo))
err = mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(opts, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, nil, false, userInfo), scope)
if err != nil {
scope.err(err, w, req)
return
}
}
if validatingAdmission, ok := admit.(admission.ValidationInterface); ok {
err = validatingAdmission.Validate(admission.NewAttributesRecord(opts, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, false, userInfo))
err = validatingAdmission.Validate(ctx, admission.NewAttributesRecord(opts, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, nil, false, userInfo), scope)
if err != nil {
scope.err(err, w, req)
return
@@ -147,7 +178,7 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi
}
}
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, func() {
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
if err != nil {
scope.err(err, w, req)
@@ -160,13 +191,13 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi
// responder implements rest.Responder for assisting a connector in writing objects or errors.
type responder struct {
scope RequestScope
scope *RequestScope
req *http.Request
w http.ResponseWriter
}
func (r *responder) Object(statusCode int, obj runtime.Object) {
responsewriters.WriteObject(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req)
responsewriters.WriteObjectNegotiated(r.scope.Serializer, r.scope, r.scope.Kind.GroupVersion(), r.w, r.req, statusCode, obj)
}
func (r *responder) Error(err error) {
@@ -189,10 +220,15 @@ func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object,
defer func() {
panicReason := recover()
if panicReason != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:goruntime.Stack(buf, false)]
panicReason = strings.TrimSuffix(fmt.Sprintf("%v\n%s", panicReason, string(buf)), "\n")
// do not wrap the sentinel ErrAbortHandler panic value
if panicReason != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:goruntime.Stack(buf, false)]
panicReason = fmt.Sprintf("%v\n%s", panicReason, buf)
}
// Propagate to parent goroutine
panicCh <- panicReason
}
@@ -222,11 +258,11 @@ func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object,
}
}
// transformDecodeError adds additional information when a decode fails.
// transformDecodeError adds additional information into a bad-request api error when a decode fails.
func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *schema.GroupVersionKind, body []byte) error {
objGVKs, _, err := typer.ObjectKinds(into)
if err != nil {
return err
return errors.NewBadRequest(err.Error())
}
objGVK := objGVKs[0]
if gvk != nil && len(gvk.Kind) > 0 {
@@ -283,23 +319,35 @@ func checkName(obj runtime.Object, name, namespace string, namer ScopeNamer) err
return nil
}
// setListSelfLink sets the self link of a list to the base URL, then sets the self links
// on all child objects returned. Returns the number of items in the list.
func setListSelfLink(obj runtime.Object, ctx context.Context, req *http.Request, namer ScopeNamer) (int, error) {
// setObjectSelfLink sets the self link of an object as needed.
// TODO: remove the need for the namer LinkSetters by requiring objects implement either Object or List
// interfaces
func setObjectSelfLink(ctx context.Context, obj runtime.Object, req *http.Request, namer ScopeNamer) error {
if utilfeature.DefaultFeatureGate.Enabled(features.RemoveSelfLink) {
return nil
}
// We only generate list links on objects that implement ListInterface - historically we duck typed this
// check via reflection, but as we move away from reflection we require that you not only carry Items but
// ListMeta into order to be identified as a list.
if !meta.IsListType(obj) {
return 0, nil
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
return fmt.Errorf("missing requestInfo")
}
return setSelfLink(obj, requestInfo, namer)
}
uri, err := namer.GenerateListLink(req)
if err != nil {
return 0, err
return err
}
if err := namer.SetSelfLink(obj, uri); err != nil {
klog.V(4).Infof("Unable to set self link on object: %v", err)
}
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
return 0, fmt.Errorf("missing requestInfo")
return fmt.Errorf("missing requestInfo")
}
count := 0
@@ -307,7 +355,14 @@ func setListSelfLink(obj runtime.Object, ctx context.Context, req *http.Request,
count++
return setSelfLink(obj, requestInfo, namer)
})
return count, err
if count == 0 {
if err := meta.SetList(obj, []runtime.Object{}); err != nil {
return err
}
}
return err
}
func summarizeData(data []byte, maxLength int) string {

View File

@@ -38,14 +38,14 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utiltrace "k8s.io/apiserver/pkg/util/trace"
utiltrace "k8s.io/utils/trace"
)
// UpdateResource returns a function that will handle a resource update
func UpdateResource(r rest.Updater, scope RequestScope, admit admission.Interface) http.HandlerFunc {
func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interface) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// For performance tracking purposes.
trace := utiltrace.New("Update " + req.URL.Path)
trace := utiltrace.New("Update", utiltrace.Field{"url", req.URL.Path})
defer trace.LogIfLong(500 * time.Millisecond)
if isDryRun(req.URL) && !utilfeature.DefaultFeatureGate.Enabled(features.DryRun) {
@@ -61,9 +61,16 @@ func UpdateResource(r rest.Updater, scope RequestScope, admit admission.Interfac
scope.err(err, w, req)
return
}
ctx := req.Context()
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
if err != nil {
scope.err(err, w, req)
@@ -81,6 +88,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, admit admission.Interfac
scope.err(err, w, req)
return
}
options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("UpdateOptions"))
s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
if err != nil {
@@ -115,7 +123,16 @@ func UpdateResource(r rest.Updater, scope RequestScope, admit admission.Interfac
}
userInfo, _ := request.UserFrom(ctx)
var transformers []rest.TransformFunc
transformers := []rest.TransformFunc{}
if scope.FieldManager != nil {
transformers = append(transformers, func(_ context.Context, newObj, liveObj runtime.Object) (runtime.Object, error) {
obj, err := scope.FieldManager.Update(liveObj, newObj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
if err != nil {
return nil, fmt.Errorf("failed to update object (Update for %v) managed fields: %v", scope.Kind, err)
}
return obj, nil
})
}
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok {
transformers = append(transformers, func(ctx context.Context, newObj, oldObj runtime.Object) (runtime.Object, error) {
isNotZeroObject, err := hasUID(oldObj)
@@ -123,11 +140,11 @@ func UpdateResource(r rest.Updater, scope RequestScope, admit admission.Interfac
return nil, fmt.Errorf("unexpected error when extracting UID from oldObj: %v", err.Error())
} else if !isNotZeroObject {
if mutatingAdmission.Handles(admission.Create) {
return newObj, mutatingAdmission.Admit(admission.NewAttributesRecord(newObj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, dryrun.IsDryRun(options.DryRun), userInfo))
return newObj, mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(newObj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, updateToCreateOptions(options), dryrun.IsDryRun(options.DryRun), userInfo), scope)
}
} else {
if mutatingAdmission.Handles(admission.Update) {
return newObj, mutatingAdmission.Admit(admission.NewAttributesRecord(newObj, oldObj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, dryrun.IsDryRun(options.DryRun), userInfo))
return newObj, mutatingAdmission.Admit(ctx, admission.NewAttributesRecord(newObj, oldObj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, options, dryrun.IsDryRun(options.DryRun), userInfo), scope)
}
}
return newObj, nil
@@ -157,11 +174,11 @@ func UpdateResource(r rest.Updater, scope RequestScope, admit admission.Interfac
rest.DefaultUpdatedObjectInfo(obj, transformers...),
withAuthorization(rest.AdmissionToValidateObjectFunc(
admit,
admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, dryrun.IsDryRun(options.DryRun), userInfo)),
admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, updateToCreateOptions(options), dryrun.IsDryRun(options.DryRun), userInfo), scope),
scope.Authorizer, createAuthorizerAttributes),
rest.AdmissionToValidateObjectUpdateFunc(
admit,
admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, dryrun.IsDryRun(options.DryRun), userInfo)),
admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, options, dryrun.IsDryRun(options.DryRun), userInfo), scope),
false,
options,
)
@@ -174,24 +191,12 @@ func UpdateResource(r rest.Updater, scope RequestScope, admit admission.Interfac
}
trace.Step("Object stored in database")
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
scope.err(fmt.Errorf("missing requestInfo"), w, req)
return
}
if err := setSelfLink(result, requestInfo, scope.Namer); err != nil {
scope.err(err, w, req)
return
}
trace.Step("Self-link added")
status := http.StatusOK
if wasCreated {
status = http.StatusCreated
}
scope.Trace = trace
transformResponseObject(ctx, scope, req, w, status, result)
transformResponseObject(ctx, scope, trace, req, w, status, outputMediaType, result)
}
}
@@ -200,7 +205,7 @@ func withAuthorization(validate rest.ValidateObjectFunc, a authorizer.Authorizer
var authorizerDecision authorizer.Decision
var authorizerReason string
var authorizerErr error
return func(obj runtime.Object) error {
return func(ctx context.Context, obj runtime.Object) error {
if a == nil {
return errors.NewInternalError(fmt.Errorf("no authorizer provided, unable to authorize a create on update"))
}
@@ -210,7 +215,7 @@ func withAuthorization(validate rest.ValidateObjectFunc, a authorizer.Authorizer
// an authorizer like RBAC could encounter evaluation errors and still allow the request, so authorizer decision is checked before error here.
if authorizerDecision == authorizer.DecisionAllow {
// Continue to validating admission
return validate(obj)
return validate(ctx, obj)
}
if authorizerErr != nil {
return errors.NewInternalError(authorizerErr)
@@ -226,3 +231,16 @@ func withAuthorization(validate rest.ValidateObjectFunc, a authorizer.Authorizer
return errors.NewForbidden(gr, name, err)
}
}
// updateToCreateOptions creates a CreateOptions with the same field values as the provided UpdateOptions.
func updateToCreateOptions(uo *metav1.UpdateOptions) *metav1.CreateOptions {
if uo == nil {
return nil
}
co := &metav1.CreateOptions{
DryRun: uo.DryRun,
FieldManager: uo.FieldManager,
}
co.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions"))
return co
}

View File

@@ -25,13 +25,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/util/wsstream"
@@ -61,42 +61,51 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
return t.C, t.Stop
}
// serveWatch handles serving requests to the server
// serveWatch will serve a watch response.
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatch(watcher watch.Interface, scope RequestScope, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
// negotiate for the stream serializer
serializer, err := negotiation.NegotiateOutputStreamSerializer(req, scope.Serializer)
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
options, err := optionsForTransform(mediaTypeOptions, req)
if err != nil {
scope.err(err, w, req)
return
}
// negotiate for the stream serializer from the scope's serializer
serializer, err := negotiation.NegotiateOutputMediaTypeStream(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
framer := serializer.StreamSerializer.Framer
streamSerializer := serializer.StreamSerializer.Serializer
embedded := serializer.Serializer
encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
useTextFraming := serializer.EncodesAsText
if framer == nil {
scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), w, req)
return
}
encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
useTextFraming := serializer.EncodesAsText
// find the embedded serializer matching the media type
embeddedEncoder := scope.Serializer.EncoderForVersion(embedded, scope.Kind.GroupVersion())
// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
mediaType := serializer.MediaType
if mediaType != runtime.ContentTypeJSON {
mediaType += ";stream=watch"
}
ctx := req.Context()
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
scope.err(fmt.Errorf("missing requestInfo"), w, req)
return
// locate the appropriate embedded encoder based on the transform
var embeddedEncoder runtime.Encoder
contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req)
if transform {
info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType)
if !ok {
scope.err(fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer), w, req)
return
}
embeddedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion())
} else {
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
}
ctx := req.Context()
server := &WatchServer{
Watching: watcher,
Scope: scope,
@@ -106,10 +115,20 @@ func serveWatch(watcher watch.Interface, scope RequestScope, req *http.Request,
Framer: framer,
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
Fixup: func(obj runtime.Object) {
if err := setSelfLink(obj, requestInfo, scope.Namer); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err))
Fixup: func(obj runtime.Object) runtime.Object {
result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
return obj
}
// When we are transformed to a table, use the table options as the state for whether we
// should print headers - on watch, we only want to print table headers on the first object
// and omit them on subsequent events.
if tableOptions, ok := options.(*metav1beta1.TableOptions); ok {
tableOptions.NoHeaders = true
}
return result
},
TimeoutFactory: &realTimeoutFactory{timeout},
@@ -121,7 +140,7 @@ func serveWatch(watcher watch.Interface, scope RequestScope, req *http.Request,
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
type WatchServer struct {
Watching watch.Interface
Scope RequestScope
Scope *RequestScope
// true if websocket messages should use text framing (as opposed to binary framing)
UseTextFraming bool
@@ -133,7 +152,8 @@ type WatchServer struct {
Encoder runtime.Encoder
// used to encode the nested object in the watch stream
EmbeddedEncoder runtime.Encoder
Fixup func(runtime.Object)
// used to correct the object before we send it to the serializer
Fixup func(runtime.Object) runtime.Object
TimeoutFactory TimeoutFactory
}
@@ -145,7 +165,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
defer metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec()
w = httplog.Unlogged(w)
w = httplog.Unlogged(req, w)
if wsstream.IsWebSocketRequest(req) {
w.Header().Set("Content-Type", s.MediaType)
@@ -191,6 +211,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var unknown runtime.Unknown
internalEvent := &metav1.InternalEvent{}
outEvent := &metav1.WatchEvent{}
buf := &bytes.Buffer{}
ch := s.Watching.ResultChan()
for {
@@ -204,9 +225,9 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// End of results.
return
}
metrics.WatchEvents.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
obj := event.Object
s.Fixup(obj)
obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
@@ -217,11 +238,13 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// type
unknown.Raw = buf.Bytes()
event.Object = &unknown
metrics.WatchEventsSizes.WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
*outEvent = metav1.WatchEvent{}
// create the external type directly and encode it. Clients will only recognize the serialization we provide.
// The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
// and we get the benefit of using conversion functions which already have to stay in sync
outEvent := &metav1.WatchEvent{}
*internalEvent = metav1.InternalEvent(event)
err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
if err != nil {
@@ -262,18 +285,19 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
buf := &bytes.Buffer{}
streamBuf := &bytes.Buffer{}
ch := s.Watching.ResultChan()
defer s.Watching.Stop()
for {
select {
case <-done:
s.Watching.Stop()
return
case event, ok := <-ch:
if !ok {
// End of results.
return
}
obj := event.Object
s.Fixup(obj)
obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
@@ -295,25 +319,21 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
// client disconnect.
s.Watching.Stop()
return
}
if err := s.Encoder.Encode(outEvent, streamBuf); err != nil {
// encoding error
utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err))
s.Watching.Stop()
return
}
if s.UseTextFraming {
if err := websocket.Message.Send(ws, streamBuf.String()); err != nil {
// Client disconnect.
s.Watching.Stop()
return
}
} else {
if err := websocket.Message.Send(ws, streamBuf.Bytes()); err != nil {
// Client disconnect.
s.Watching.Stop()
return
}
}