Bump sigs.k8s.io/controller-runtime to v0.14.4 (#5507)

* Bump sigs.k8s.io/controller-runtime to v0.14.4

* Update gofmt
This commit is contained in:
hongming
2023-02-08 14:06:15 +08:00
committed by GitHub
parent 129e6fbec3
commit 1c49fcd57e
1404 changed files with 141422 additions and 47769 deletions

View File

@@ -40,7 +40,7 @@ func init() {
}
// StartCompactor starts a compactor in the background to compact old version of keys that's not needed.
// By default, we save the most recent 10 minutes data and compact versions > 10minutes ago.
// By default, we save the most recent 5 minutes data and compact versions > 5minutes ago.
// It should be enough for slow watchers and to tolerate burst.
// TODO: We might keep a longer history (12h) in the future once storage API can take advantage of past version of keys.
func StartCompactor(ctx context.Context, client *clientv3.Client, compactInterval time.Duration) {
@@ -84,7 +84,7 @@ func compactor(ctx context.Context, client *clientv3.Client, interval time.Durat
// Technical details/insights:
//
// The protocol here is lease based. If one compactor CAS successfully, the others would know it when they fail in
// CAS later and would try again in 10 minutes. If an APIServer crashed, another one would "take over" the lease.
// CAS later and would try again in 5 minutes. If an APIServer crashed, another one would "take over" the lease.
//
// For example, in the following diagram, we have a compactor C1 doing compaction in t1, t2. Another compactor C2
// at t1' (t1 < t1' < t2) would CAS fail, set its known oldRev to rev at t1', and try again in t2' (t2' > t2).
@@ -100,14 +100,14 @@ func compactor(ctx context.Context, client *clientv3.Client, interval time.Durat
// t0 t1 t2
//
// We have the guarantees:
// - in normal cases, the interval is 10 minutes.
// - in failover, the interval is >10m and <20m
// - in normal cases, the interval is 5 minutes.
// - in failover, the interval is >5m and <10m
//
// FAQ:
// - What if time is not accurate? We don't care as long as someone did the compaction. Atomicity is ensured using
// etcd API.
// - What happened under heavy load scenarios? Initially, each apiserver will do only one compaction
// every 10 minutes. This is very unlikely affecting or affected w.r.t. server load.
// every 5 minutes. This is very unlikely affecting or affected w.r.t. server load.
var compactTime int64
var rev int64

View File

@@ -57,8 +57,9 @@ var (
)
dbTotalSize = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "etcd_db_total_size_in_bytes",
Help: "Total size of the etcd database file physically allocated in bytes.",
Subsystem: "apiserver",
Name: "storage_db_total_size_in_bytes",
Help: "Total size of the storage database file physically allocated in bytes.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"endpoint"},

View File

@@ -27,6 +27,7 @@ import (
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/otel/attribute"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -37,14 +38,14 @@ import (
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
const (
@@ -98,19 +99,24 @@ func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object,
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
versioner := storage.APIObjectVersioner{}
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix := path.Join("/", prefix)
if !strings.HasSuffix(pathPrefix, "/") {
// Ensure the pathPrefix ends in "/" here to simplify key concatenation later.
pathPrefix += "/"
}
result := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
pathPrefix: pathPrefix,
groupResource: groupResource,
groupResourceString: groupResource.String(),
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
watcher: newWatcher(c, codec, groupResource, newFunc, versioner),
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
}
return result
@@ -123,10 +129,13 @@ func (s *store) Versioner() storage.Versioner {
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
key = path.Join(s.pathPrefix, key)
preparedKey, err := s.prepareKey(key)
if err != nil {
return err
}
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
getResp, err := s.client.KV.Get(ctx, preparedKey)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
if err != nil {
return err
}
@@ -138,11 +147,11 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
if opts.IgnoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(key, 0)
return storage.NewKeyNotFoundError(preparedKey, 0)
}
kv := getResp.Kvs[0]
data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(key))
data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(preparedKey))
if err != nil {
return storage.NewInternalError(err.Error())
}
@@ -152,58 +161,68 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
trace := utiltrace.New("Create etcd3",
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
utiltrace.Field{"key", key},
utiltrace.Field{"type", getTypeName(obj)},
preparedKey, err := s.prepareKey(key)
if err != nil {
return err
}
ctx, span := tracing.Start(ctx, "Create etcd3",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("type", getTypeName(obj)),
attribute.String("resource", s.groupResourceString),
)
defer trace.LogIfLong(500 * time.Millisecond)
defer span.End(500 * time.Millisecond)
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
trace.Step("About to Encode")
span.AddEvent("About to Encode")
data, err := runtime.Encode(s.codec, obj)
trace.Step("Encode finished", utiltrace.Field{"len", len(data)}, utiltrace.Field{"err", err})
if err != nil {
span.AddEvent("Encode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
return err
}
key = path.Join(s.pathPrefix, key)
span.AddEvent("Encode succeeded", attribute.Int("len", len(data)))
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
newData, err := s.transformer.TransformToStorage(ctx, data, authenticatedDataString(key))
trace.Step("TransformToStorage finished", utiltrace.Field{"err", err})
newData, err := s.transformer.TransformToStorage(ctx, data, authenticatedDataString(preparedKey))
if err != nil {
span.AddEvent("TransformToStorage failed", attribute.String("err", err.Error()))
return storage.NewInternalError(err.Error())
}
span.AddEvent("TransformToStorage succeeded")
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
notFound(preparedKey),
).Then(
clientv3.OpPut(key, string(newData), opts...),
clientv3.OpPut(preparedKey, string(newData), opts...),
).Commit()
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
trace.Step("Txn call finished", utiltrace.Field{"err", err})
metrics.RecordEtcdRequestLatency("create", s.groupResourceString, startTime)
if err != nil {
span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
return err
}
span.AddEvent("Txn call succeeded")
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
return storage.NewKeyExistsError(preparedKey, 0)
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
trace.Step("decode finished", utiltrace.Field{"len", len(data)}, utiltrace.Field{"err", err})
return err
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
return err
}
span.AddEvent("decode succeeded", attribute.Int("len", len(data)))
}
return nil
}
@@ -212,12 +231,15 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
func (s *store) Delete(
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
preparedKey, err := s.prepareKey(key)
if err != nil {
return err
}
v, err := conversion.EnforcePtr(out)
if err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
key = path.Join(s.pathPrefix, key)
return s.conditionalDelete(ctx, key, out, v, preconditions, validateDeletion, cachedExistingObject)
return s.conditionalDelete(ctx, preparedKey, out, v, preconditions, validateDeletion, cachedExistingObject)
}
func (s *store) conditionalDelete(
@@ -226,7 +248,7 @@ func (s *store) conditionalDelete(
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
if err != nil {
return nil, err
}
@@ -308,7 +330,7 @@ func (s *store) conditionalDelete(
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
metrics.RecordEtcdRequestLatency("delete", s.groupResourceString, startTime)
if err != nil {
return err
}
@@ -322,7 +344,15 @@ func (s *store) conditionalDelete(
origStateIsCurrent = true
continue
}
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
if len(txnResp.Responses) == 0 || txnResp.Responses[0].GetResponseDeleteRange() == nil {
return errors.New(fmt.Sprintf("invalid DeleteRange response: %v", txnResp.Responses))
}
deleteResp := txnResp.Responses[0].GetResponseDeleteRange()
if deleteResp.Header == nil {
return errors.New("invalid DeleteRange response - nil header")
}
return decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
}
}
@@ -330,26 +360,30 @@ func (s *store) conditionalDelete(
func (s *store) GuaranteedUpdate(
ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
trace := utiltrace.New("GuaranteedUpdate etcd3",
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
utiltrace.Field{"key", key},
utiltrace.Field{"type", getTypeName(destination)})
defer trace.LogIfLong(500 * time.Millisecond)
preparedKey, err := s.prepareKey(key)
if err != nil {
return err
}
ctx, span := tracing.Start(ctx, "GuaranteedUpdate etcd3",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("type", getTypeName(destination)),
attribute.String("resource", s.groupResourceString))
defer span.End(500 * time.Millisecond)
v, err := conversion.EnforcePtr(destination)
if err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
key = path.Join(s.pathPrefix, key)
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(destination), startTime)
getResp, err := s.client.KV.Get(ctx, preparedKey)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
if err != nil {
return nil, err
}
return s.getState(ctx, getResp, key, v, ignoreNotFound)
return s.getState(ctx, getResp, preparedKey, v, ignoreNotFound)
}
var origState *objState
@@ -363,11 +397,11 @@ func (s *store) GuaranteedUpdate(
if err != nil {
return err
}
trace.Step("initial value restored")
span.AddEvent("initial value restored")
transformContext := authenticatedDataString(key)
transformContext := authenticatedDataString(preparedKey)
for {
if err := preconditions.Check(key, origState.obj); err != nil {
if err := preconditions.Check(preparedKey, origState.obj); err != nil {
// If our data is already up to date, return the error
if origStateIsCurrent {
return err
@@ -412,12 +446,13 @@ func (s *store) GuaranteedUpdate(
continue
}
trace.Step("About to Encode")
span.AddEvent("About to Encode")
data, err := runtime.Encode(s.codec, ret)
trace.Step("Encode finished", utiltrace.Field{"len", len(data)}, utiltrace.Field{"err", err})
if err != nil {
span.AddEvent("Encode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
return err
}
span.AddEvent("Encode succeeded", attribute.Int("len", len(data)))
if !origState.stale && bytes.Equal(data, origState.data) {
// if we skipped the original Get in this loop, we must refresh from
// etcd in order to be sure the data in the store is equivalent to
@@ -440,47 +475,53 @@ func (s *store) GuaranteedUpdate(
}
newData, err := s.transformer.TransformToStorage(ctx, data, transformContext)
trace.Step("TransformToStorage finished", utiltrace.Field{"err", err})
if err != nil {
span.AddEvent("TransformToStorage failed", attribute.String("err", err.Error()))
return storage.NewInternalError(err.Error())
}
span.AddEvent("TransformToStorage succeeded")
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
trace.Step("Transaction prepared")
span.AddEvent("Transaction prepared")
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
clientv3.Compare(clientv3.ModRevision(preparedKey), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(newData), opts...),
clientv3.OpPut(preparedKey, string(newData), opts...),
).Else(
clientv3.OpGet(key),
clientv3.OpGet(preparedKey),
).Commit()
metrics.RecordEtcdRequestLatency("update", getTypeName(destination), startTime)
trace.Step("Txn call finished", utiltrace.Field{"err", err})
metrics.RecordEtcdRequestLatency("update", s.groupResourceString, startTime)
if err != nil {
span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
return err
}
trace.Step("Transaction committed")
span.AddEvent("Txn call completed")
span.AddEvent("Transaction committed")
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(ctx, getResp, key, v, ignoreNotFound)
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey)
origState, err = s.getState(ctx, getResp, preparedKey, v, ignoreNotFound)
if err != nil {
return err
}
trace.Step("Retry value restored")
span.AddEvent("Retry value restored")
origStateIsCurrent = true
continue
}
putResp := txnResp.Responses[0].GetResponsePut()
err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
trace.Step("decode finished", utiltrace.Field{"len", len(data)}, utiltrace.Field{"err", err})
return err
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
return err
}
span.AddEvent("decode succeeded", attribute.Int("len", len(data)))
return nil
}
}
@@ -502,18 +543,21 @@ func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Obje
}
func (s *store) Count(key string) (int64, error) {
key = path.Join(s.pathPrefix, key)
preparedKey, err := s.prepareKey(key)
if err != nil {
return 0, err
}
// We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
if !strings.HasSuffix(key, "/") {
key += "/"
if !strings.HasSuffix(preparedKey, "/") {
preparedKey += "/"
}
startTime := time.Now()
getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
metrics.RecordEtcdRequestLatency("listWithCount", key, startTime)
getResp, err := s.client.KV.Get(context.Background(), preparedKey, clientv3.WithRange(clientv3.GetPrefixRangeEnd(preparedKey)), clientv3.WithCountOnly())
metrics.RecordEtcdRequestLatency("listWithCount", preparedKey, startTime)
if err != nil {
return 0, err
}
@@ -522,18 +566,22 @@ func (s *store) Count(key string) (int64, error) {
// GetList implements storage.Interface.
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
preparedKey, err := s.prepareKey(key)
if err != nil {
return err
}
recursive := opts.Recursive
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pred := opts.Predicate
trace := utiltrace.New(fmt.Sprintf("List(recursive=%v) etcd3", recursive),
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion},
utiltrace.Field{"resourceVersionMatch", match},
utiltrace.Field{"limit", pred.Limit},
utiltrace.Field{"continue", pred.Continue})
defer trace.LogIfLong(500 * time.Millisecond)
ctx, span := tracing.Start(ctx, fmt.Sprintf("List(recursive=%v) etcd3", recursive),
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("resourceVersion", resourceVersion),
attribute.String("resourceVersionMatch", string(match)),
attribute.Int("limit", int(pred.Limit)),
attribute.String("continue", pred.Continue))
defer span.End(500 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
@@ -542,16 +590,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if err != nil || v.Kind() != reflect.Slice {
return fmt.Errorf("need ptr to slice: %v", err)
}
key = path.Join(s.pathPrefix, key)
// For recursive lists, we need to make sure the key ended with "/" so that we only
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
// "/a/b" which is the correct answer.
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
if recursive && !strings.HasSuffix(preparedKey, "/") {
preparedKey += "/"
}
keyPrefix := key
keyPrefix := preparedKey
// set the appropriate clientv3 options to filter the returned data set
var limitOption *clientv3.OpOption
@@ -590,7 +637,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
key = continueKey
preparedKey = continueKey
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
@@ -657,11 +704,11 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
}()
for {
startTime := time.Now()
getResp, err = s.client.KV.Get(ctx, key, options...)
getResp, err = s.client.KV.Get(ctx, preparedKey, options...)
if recursive {
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
metrics.RecordEtcdRequestLatency("list", s.groupResourceString, startTime)
} else {
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
}
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
@@ -729,7 +776,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
}
*limitOption = clientv3.WithLimit(limit)
}
key = string(lastKey) + "\x00"
preparedKey = string(lastKey) + "\x00"
if withRev == 0 {
withRev = returnedRV
options = append(options, clientv3.WithRev(withRev))
@@ -794,12 +841,15 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
// Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
preparedKey, err := s.prepareKey(key)
if err != nil {
return nil, err
}
rev, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
}
key = path.Join(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), opts.Recursive, opts.ProgressNotify, opts.Predicate)
return s.watcher.Watch(ctx, preparedKey, int64(rev), opts.Recursive, opts.ProgressNotify, s.transformer, opts.Predicate)
}
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
@@ -911,6 +961,30 @@ func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, ac
return nil
}
func (s *store) prepareKey(key string) (string, error) {
if key == ".." ||
strings.HasPrefix(key, "../") ||
strings.HasSuffix(key, "/..") ||
strings.Contains(key, "/../") {
return "", fmt.Errorf("invalid key: %q", key)
}
if key == "." ||
strings.HasPrefix(key, "./") ||
strings.HasSuffix(key, "/.") ||
strings.Contains(key, "/./") {
return "", fmt.Errorf("invalid key: %q", key)
}
if key == "" || key == "/" {
return "", fmt.Errorf("empty key: %q", key)
}
// We ensured that pathPrefix ends in '/' in construction, so skip any leading '/' in the key now.
startIndex := 0
if key[0] == '/' {
startIndex = 1
}
return s.pathPrefix + key[startIndex:], nil
}
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {

View File

@@ -18,7 +18,6 @@ package etcd3
import (
"context"
"errors"
"fmt"
"os"
"reflect"
@@ -28,6 +27,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
@@ -47,16 +47,6 @@ const (
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
var fatalOnDecodeError = false
// errTestingDecode is the only error that testingDeferOnDecodeError catches during a panic
var errTestingDecode = errors.New("sentinel error only used during testing to indicate watch decoding error")
// testingDeferOnDecodeError is used during testing to recover from a panic caused by errTestingDecode, all other values continue to panic
func testingDeferOnDecodeError() {
if r := recover(); r != nil && r != errTestingDecode {
panic(r)
}
}
func init() {
// check to see if we are running in a test environment
TestOnlySetFatalOnDecodeError(true)
@@ -69,17 +59,18 @@ func TestOnlySetFatalOnDecodeError(b bool) {
}
type watcher struct {
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
versioner storage.Versioner
transformer value.Transformer
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
groupResource schema.GroupResource
versioner storage.Versioner
}
// watchChan implements watch.Interface.
type watchChan struct {
watcher *watcher
transformer value.Transformer
key string
initialRev int64
recursive bool
@@ -92,13 +83,13 @@ type watchChan struct {
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner) *watcher {
res := &watcher{
client: client,
codec: codec,
newFunc: newFunc,
versioner: versioner,
transformer: transformer,
client: client,
codec: codec,
groupResource: groupResource,
newFunc: newFunc,
versioner: versioner,
}
if newFunc == nil {
res.objectType = "<unknown>"
@@ -115,11 +106,11 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() run
// If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if pred matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) {
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, transformer, pred)
go wc.run()
// For etcd watch we don't have an easy way to answer whether the watch
@@ -132,9 +123,10 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, p
return wc, nil
}
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) *watchChan {
wc := &watchChan{
watcher: w,
transformer: transformer,
key: key,
initialRev: rev,
recursive: recursive,
@@ -259,7 +251,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
}
if wres.IsProgressNotify() {
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
metrics.RecordEtcdBookmark(wc.watcher.objectType)
metrics.RecordEtcdBookmark(wc.watcher.groupResource.String())
continue
}
@@ -292,7 +284,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
continue
}
if len(wc.resultChan) == outgoingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType)
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
@@ -411,7 +403,7 @@ func (wc *watchChan) sendError(err error) {
func (wc *watchChan) sendEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType)
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
select {
case wc.incomingEventChan <- e:
@@ -426,7 +418,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
}
if !e.isDeleted {
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
@@ -441,7 +433,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
// we need the object only to compute whether it was filtered out
// before).
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
@@ -459,9 +451,6 @@ func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, re
obj, err := runtime.Decode(codec, []byte(data))
if err != nil {
if fatalOnDecodeError {
// catch watch decode error iff we caused it on
// purpose during a unit test
defer testingDeferOnDecodeError()
// we are running in a test environment and thus an
// error here is due to a coder mistake if the defer
// does not catch it