update dependencies (#6267)

Signed-off-by: hongming <coder.scala@gmail.com>
This commit is contained in:
hongming
2024-11-06 10:27:06 +08:00
committed by GitHub
parent faf255a084
commit cfebd96a1f
4263 changed files with 341374 additions and 132036 deletions

View File

@@ -21,10 +21,12 @@ import (
"fmt"
"net/http"
"reflect"
"strings"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc/metadata"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -41,16 +43,18 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
)
var (
emptyFunc = func(bool) {}
emptyFunc = func(bool) {}
coreNamespaceResource = schema.GroupResource{Group: "", Resource: "namespaces"}
)
const (
@@ -397,10 +401,18 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
// so that future reuse does not get a spurious timeout.
<-cacher.timer.C
}
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock)
var contextMetadata metadata.MD
if utilfeature.DefaultFeatureGate.Enabled(features.SeparateCacheWatchRPC) {
// Add grpc context metadata to watch and progress notify requests done by cacher to:
// * Prevent starvation of watch opened by cacher, by moving it to separate Watch RPC than watch request that bypass cacher.
// * Ensure that progress notification requests are executed on the same Watch RPC as their watch, which is required for it to work.
contextMetadata = metadata.New(map[string]string{"source": "cache"})
}
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
@@ -413,7 +425,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
reflector.MaxInternalErrorRetryDuration = time.Second * 30
// since the watch-list is provided by the watch cache instruct
// the reflector to issue a regular LIST against the store
reflector.UseWatchList = false
reflector.UseWatchList = ptr.To(false)
cacher.watchCache = watchCache
cacher.reflector = reflector
@@ -513,7 +525,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
opts.SendInitialEvents = nil
}
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
// TODO: we should eventually get rid of this legacy case
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
@@ -521,9 +534,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return nil, err
}
readyGeneration, err := c.ready.waitAndReadGeneration(ctx)
if err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
var readyGeneration int
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
var ok bool
readyGeneration, ok = c.ready.checkAndReadGeneration()
if !ok {
return nil, errors.NewTooManyRequests("storage is (re)initializing", 1)
}
} else {
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
if err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
}
}
// determine the namespace and name scope of the watch, first from the request, secondarily from the field selector
@@ -539,12 +561,19 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
scope.name = selectorName
}
// for request like '/api/v1/watch/namespaces/*', set scope.namespace to empty.
// namespaces don't populate metadata.namespace in ObjFields.
if c.groupResource == coreNamespaceResource && len(scope.namespace) > 0 && scope.namespace == scope.name {
scope.namespace = ""
}
triggerValue, triggerSupported := "", false
if c.indexedTrigger != nil {
for _, field := range pred.IndexFields {
if field == c.indexedTrigger.indexName {
if value, ok := pred.Field.RequiresExactMatch(field); ok {
triggerValue, triggerSupported = value, true
break
}
}
}
@@ -557,14 +586,20 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// watchers on our watcher having a processing hiccup
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
// Determine a function that computes the bookmarkAfterResourceVersion
bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, requestedWatchRV, opts)
// client-go is going to fall back to a standard LIST on any error
// returned for watch-list requests
if isListWatchRequest(opts) && !etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) {
return newErrWatcher(fmt.Errorf("a watch stream was requested by the client but the required storage feature %s is disabled", storage.RequestWatchProgress)), nil
}
// Determine the ResourceVersion to which the watch cache must be synchronized
requiredResourceVersion, err := c.getWatchCacheResourceVersion(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
// Determine a function that computes the watchRV we should start from
startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, requestedWatchRV, opts)
// Determine a function that computes the bookmarkAfterResourceVersion
bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(requestedWatchRV, requiredResourceVersion, opts)
if err != nil {
return newErrWatcher(err), nil
}
@@ -580,7 +615,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// to compute watcher.forget function (which has to happen under lock).
watcher := newCacheWatcher(
chanSize,
filterWithAttrsFunction(key, pred),
filterWithAttrsAndPrefixFunction(key, pred),
emptyFunc,
c.versioner,
deadline,
@@ -596,7 +631,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock
// it is safe to release the lock after the method finishes because we don't require
// any atomicity between the call to the method and further calls that actually get the events.
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
err = c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requiredResourceVersion, opts)
if err != nil {
return newErrWatcher(err), nil
}
@@ -609,13 +644,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.watchCache.RLock()
defer c.watchCache.RUnlock()
startWatchRV := startWatchResourceVersionFn()
var cacheInterval *watchCacheInterval
if forceAllEvents {
cacheInterval, err = c.watchCache.getIntervalFromStoreLocked()
} else {
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(startWatchRV)
}
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(requiredResourceVersion, key, opts)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
@@ -657,7 +687,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newImmediateCloseWatcher(), nil
}
go watcher.processInterval(ctx, cacheInterval, startWatchRV)
go watcher.processInterval(ctx, cacheInterval, requiredResourceVersion)
return watcher, nil
}
@@ -669,6 +699,14 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
return c.storage.Get(ctx, key, opts, objPtr)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() {
// If Cache is not initialized, delegate Get requests to storage
// as described in https://kep.k8s.io/4568
return c.storage.Get(ctx, key, opts, objPtr)
}
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
@@ -677,16 +715,18 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
return err
}
if getRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.Get(ctx, key, opts, objPtr)
}
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if getRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.Get(ctx, key, opts, objPtr)
}
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
}
objVal, err := conversion.EnforcePtr(objPtr)
@@ -722,17 +762,40 @@ func shouldDelegateList(opts storage.ListOptions) bool {
pred := opts.Predicate
match := opts.ResourceVersionMatch
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
// Serve consistent reads from storage if ConsistentListFromCache is disabled
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
// Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := len(pred.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
hasLimit := pred.Limit > 0 && resourceVersion != "0"
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
isLegacyExactMatch := opts.Predicate.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0"
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
return consistentReadFromStorage || hasContinuation || unsupportedMatch
}
// computeListLimit determines whether the cacher should
// apply a limit to an incoming LIST request and returns its value.
//
// note that this function doesn't check RVM nor the Continuation token.
// these parameters are validated by the shouldDelegateList function.
//
// as of today, the limit is ignored for requests that set RV == 0
func computeListLimit(opts storage.ListOptions) int64 {
if opts.Predicate.Limit <= 0 || opts.ResourceVersion == "0" {
return 0
}
return opts.Predicate.Limit
}
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
pred := opts.Predicate
noLabelSelector := pred.Label == nil || pred.Label.Empty()
noFieldSelector := pred.Field == nil || pred.Field.Empty()
hasLimit := pred.Limit > 0
return noLabelSelector && noFieldSelector && hasLimit
}
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
@@ -746,7 +809,7 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred
}
return nil, readResourceVersion, "", nil
}
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, pred.MatcherIndex())
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx))
}
// GetList implements storage.Interface
@@ -762,12 +825,31 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if err != nil {
return err
}
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) {
// If Cacher is not initialized, delegate List requests to storage
// as described in https://kep.k8s.io/4568
return c.storage.GetList(ctx, key, opts, listObj)
}
} else {
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
}
}
if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
// For recursive lists, we need to make sure the key ended with "/" so that we only
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
// "/a/b" which is the correct answer.
preparedKey := key
if opts.Recursive && !strings.HasSuffix(key, "/") {
preparedKey += "/"
}
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
if consistentRead {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return err
@@ -779,8 +861,16 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
attribute.Stringer("type", c.groupResource))
defer span.End(500 * time.Millisecond)
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() {
// If Cacher is not initialized, reject List requests
// as described in https://kep.k8s.io/4568
return errors.NewTooManyRequests("storage is (re)initializing", 1)
}
} else {
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
}
span.AddEvent("Ready")
@@ -796,25 +886,47 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterWithAttrsFunction(key, pred)
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, key, pred, recursive)
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
success := "true"
fallback := "false"
if err != nil {
if consistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
err = c.storage.GetList(ctx, key, opts, listObj)
}
if err != nil {
success = "false"
}
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
}
return err
}
if consistentRead {
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
}
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
// store pointer of eligible objects,
// Why not directly put object in the items of listObj?
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
// so we try to delay this action as much as possible
var selectedObjects []runtime.Object
for _, obj := range objs {
var lastSelectedObjectKey string
var hasMoreListItems bool
limit := computeListLimit(opts)
for i, obj := range objs {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Labels, elem.Fields) {
if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) {
selectedObjects = append(selectedObjects, elem.Object)
lastSelectedObjectKey = elem.Key
}
if limit > 0 && int64(len(selectedObjects)) >= limit {
hasMoreListItems = i < len(objs)-1
break
}
}
if len(selectedObjects) == 0 {
@@ -830,7 +942,12 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
}
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts)
if err != nil {
return err
}
if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil {
return err
}
}
@@ -861,6 +978,14 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
// ReadinessCheck implements storage.Interface.
func (c *Cacher) ReadinessCheck() error {
if !c.ready.check() {
return storage.ErrStorageNotReady
}
return nil
}
// baseObjectThreadUnsafe omits locking for cachingObject.
func baseObjectThreadUnsafe(object runtime.Object) runtime.Object {
if co, ok := object.(*cachingObject); ok {
@@ -899,7 +1024,23 @@ func (c *Cacher) dispatchEvents() {
bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
defer bookmarkTimer.Stop()
// The internal informer populates the RV as soon as it conducts
// The first successful sync with the underlying store.
// The cache must wait until this first sync is completed to be deemed ready.
// Since we cannot send a bookmark when the lastProcessedResourceVersion is 0,
// we poll aggressively for the first list RV before entering the dispatch loop.
lastProcessedResourceVersion := uint64(0)
if err := wait.PollUntilContextCancel(wait.ContextForChannel(c.stopCh), 10*time.Millisecond, true, func(_ context.Context) (bool, error) {
if rv := c.watchCache.getListResourceVersion(); rv != 0 {
lastProcessedResourceVersion = rv
return true, nil
}
return false, nil
}); err != nil {
// given the function above never returns error,
// the non-empty error means that the stopCh was closed
return
}
for {
select {
case event, ok := <-c.incoming:
@@ -923,29 +1064,6 @@ func (c *Cacher) dispatchEvents() {
metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc()
case <-bookmarkTimer.C():
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
// Never send a bookmark event if we did not see an event here, this is fine
// because we don't provide any guarantees on sending bookmarks.
//
// Just pop closed watchers and requeue others if needed.
//
// TODO(#115478): rework the following logic
// in a way that would allow more
// efficient cleanup of closed watchers
if lastProcessedResourceVersion == 0 {
func() {
c.Lock()
defer c.Unlock()
for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() {
for _, watcher := range watchers {
if watcher.stopped {
continue
}
c.bookmarkWatchers.addWatcherThreadUnsafe(watcher)
}
}
}()
continue
}
bookmarkEvent := &watchCacheEvent{
Type: watch.Bookmark,
Object: c.newFunc(),
@@ -1225,7 +1343,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
}
}
func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
if !hasPathPrefix(objKey, key) {
return false
@@ -1249,59 +1367,76 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
// spits a ResourceVersion after which the bookmark event will be delivered.
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(ctx context.Context, parsedResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || !*opts.SendInitialEvents || !opts.Predicate.AllowWatchBookmarks {
func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(parsedResourceVersion, requiredResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if !isListWatchRequest(opts) {
return func() uint64 { return 0 }, nil
}
return c.getCommonResourceVersionLockedFunc(ctx, parsedResourceVersion, opts)
}
// getStartResourceVersionForWatchLockedFunc returns a function that
// spits a ResourceVersion the watch will be started from.
// Depending on the input parameters the semantics of the returned ResourceVersion are:
// - start at Exact (return parsedWatchResourceVersion)
// - start at Most Recent (return an RV from etcd)
// - start at Any (return the current watchCache's RV)
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || *opts.SendInitialEvents {
return func() uint64 { return parsedWatchResourceVersion }, nil
}
return c.getCommonResourceVersionLockedFunc(ctx, parsedWatchResourceVersion, opts)
}
// getCommonResourceVersionLockedFunc a helper that simply computes a ResourceVersion
// based on the input parameters. Please examine callers of this method to get more context.
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
switch {
case len(opts.ResourceVersion) == 0:
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return nil, err
}
return func() uint64 { return rv }, nil
case parsedWatchResourceVersion == 0:
return func() uint64 { return requiredResourceVersion }, nil
case parsedResourceVersion == 0:
// here we assume that watchCache locked is already held
return func() uint64 { return c.watchCache.resourceVersion }, nil
default:
return func() uint64 { return parsedWatchResourceVersion }, nil
return func() uint64 { return parsedResourceVersion }, nil
}
}
func isListWatchRequest(opts storage.ListOptions) bool {
return opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks
}
// getWatchCacheResourceVersion returns a ResourceVersion to which the watch cache must be synchronized to
//
// Depending on the input parameters, the semantics of the returned ResourceVersion are:
// - must be at Exact RV (when parsedWatchResourceVersion > 0)
// - can be at Any RV (when parsedWatchResourceVersion = 0)
// - must be at Most Recent RV (return an RV from etcd)
//
// note that the above semantic is enforced by the API validation (defined elsewhere):
//
// if SendInitiaEvents != nil => ResourceVersionMatch = NotOlderThan
// if ResourceVersionmatch != nil => ResourceVersionMatch = NotOlderThan & SendInitialEvents != nil
func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (uint64, error) {
if len(opts.ResourceVersion) != 0 {
return parsedWatchResourceVersion, nil
}
// legacy case
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return 0, nil
}
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
return rv, err
}
// waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least
// as fresh as given requestedWatchRV if sendInitialEvents was requested.
// Additionally, it instructs the caller whether it should ask for
// all events from the cache (full state) or not.
func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) (bool, error) {
// otherwise, we allow for establishing the connection because the clients
// can wait for events without unnecessary blocking.
func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) error {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
// Here be dragons:
// Since the etcd feature checker needs to check all members
// to determine whether a given feature is supported,
// we may receive a positive response even if the feature is not supported.
//
// In this very rare scenario, the worst case will be that this
// request will wait for 3 seconds before it fails.
if etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) && c.watchCache.notFresh(requestedWatchRV) {
c.watchCache.waitingUntilFresh.Add()
defer c.watchCache.waitingUntilFresh.Remove()
}
err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV)
defer c.watchCache.RUnlock()
return err == nil, err
return err
}
return false, nil
return nil
}
// Wait blocks until the cacher is Ready or Stopped, it returns an error if Stopped.
func (c *Cacher) Wait(ctx context.Context) error {
return c.ready.wait(ctx)
}
// errWatcher implements watch.Interface to return a single error

View File

@@ -19,6 +19,8 @@ package cacher
import (
"context"
"google.golang.org/grpc/metadata"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
@@ -30,17 +32,19 @@ import (
// listerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type listerWatcher struct {
storage storage.Interface
resourcePrefix string
newListFunc func() runtime.Object
storage storage.Interface
resourcePrefix string
newListFunc func() runtime.Object
contextMetadata metadata.MD
}
// NewListerWatcher returns a storage.Interface backed ListerWatcher.
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object, contextMetadata metadata.MD) cache.ListerWatcher {
return &listerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
contextMetadata: contextMetadata,
}
}
@@ -59,7 +63,11 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error
Predicate: pred,
Recursive: true,
}
if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
ctx := context.Background()
if lw.contextMetadata != nil {
ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata)
}
if err := lw.storage.GetList(ctx, lw.resourcePrefix, storageOpts, list); err != nil {
return nil, err
}
return list, nil
@@ -73,5 +81,9 @@ func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, err
Recursive: true,
ProgressNotify: true,
}
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
ctx := context.Background()
if lw.contextMetadata != nil {
ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata)
}
return lw.storage.Watch(ctx, lw.resourcePrefix, opts)
}

View File

@@ -106,6 +106,17 @@ var (
[]string{"resource"},
)
watchCacheResourceVersion = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "resource_version",
Help: "Current resource version of watch cache broken by resource type.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
watchCacheCapacityIncreaseTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Subsystem: subsystem,
@@ -146,6 +157,25 @@ var (
},
[]string{"resource"},
)
WatchCacheReadWait = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_wait_seconds",
Help: "Histogram of time spent waiting for a watch cache to become fresh.",
StabilityLevel: compbasemetrics.ALPHA,
Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3},
}, []string{"resource"})
ConsistentReadTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "consistent_read_total",
Help: "Counter for consistent reads from cache.",
StabilityLevel: compbasemetrics.ALPHA,
}, []string{"resource", "success", "fallback"})
)
var registerMetrics sync.Once
@@ -161,10 +191,13 @@ func Register() {
legacyregistry.MustRegister(EventsReceivedCounter)
legacyregistry.MustRegister(EventsCounter)
legacyregistry.MustRegister(TerminatedWatchersCounter)
legacyregistry.MustRegister(watchCacheResourceVersion)
legacyregistry.MustRegister(watchCacheCapacityIncreaseTotal)
legacyregistry.MustRegister(watchCacheCapacityDecreaseTotal)
legacyregistry.MustRegister(WatchCacheCapacity)
legacyregistry.MustRegister(WatchCacheInitializations)
legacyregistry.MustRegister(WatchCacheReadWait)
legacyregistry.MustRegister(ConsistentReadTotal)
})
}
@@ -175,6 +208,11 @@ func RecordListCacheMetrics(resourcePrefix, indexName string, numFetched, numRet
listCacheNumReturned.WithLabelValues(resourcePrefix).Add(float64(numReturned))
}
// RecordResourceVersion sets the current resource version for a given resource type.
func RecordResourceVersion(resourcePrefix string, resourceVersion uint64) {
watchCacheResourceVersion.WithLabelValues(resourcePrefix).Set(float64(resourceVersion))
}
// RecordsWatchCacheCapacityChange record watchCache capacity resize(increase or decrease) operations.
func RecordsWatchCacheCapacityChange(objType string, old, new int) {
WatchCacheCapacity.WithLabelValues(objType).Set(float64(new))

View File

@@ -44,17 +44,3 @@ func hasPathPrefix(s, pathPrefix string) bool {
}
return false
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func min(a, b int) int {
if a < b {
return a
}
return b
}

View File

@@ -33,6 +33,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"
@@ -311,25 +312,26 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
RecordTime: w.clock.Now(),
}
// We can call w.store.Get() outside of a critical section,
// because the w.store itself is thread-safe and the only
// place where w.store is modified is below (via updateFunc)
// and these calls are serialized because reflector is processing
// events one-by-one.
previous, exists, err := w.store.Get(elem)
if err != nil {
return err
}
if exists {
previousElem := previous.(*storeElement)
wcEvent.PrevObject = previousElem.Object
wcEvent.PrevObjLabels = previousElem.Labels
wcEvent.PrevObjFields = previousElem.Fields
}
if err := func() error {
// TODO: We should consider moving this lock below after the watchCacheEvent
// is created. In such situation, the only problematic scenario is Replace()
// happening after getting object from store and before acquiring a lock.
// Maybe introduce another lock for this purpose.
w.Lock()
defer w.Unlock()
previous, exists, err := w.store.Get(elem)
if err != nil {
return err
}
if exists {
previousElem := previous.(*storeElement)
wcEvent.PrevObject = previousElem.Object
wcEvent.PrevObjLabels = previousElem.Labels
wcEvent.PrevObjFields = previousElem.Fields
}
w.updateCache(wcEvent)
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()
@@ -346,6 +348,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
if w.eventHandler != nil {
w.eventHandler(wcEvent)
}
metrics.RecordResourceVersion(w.groupResource.String(), resourceVersion)
return nil
}
@@ -428,6 +431,7 @@ func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
}
w.eventHandler(wcEvent)
}
metrics.RecordResourceVersion(w.groupResource.String(), rv)
}
// List returns list of pointers to <storeElement> objects.
@@ -440,6 +444,11 @@ func (w *watchCache) List() []interface{} {
// You HAVE TO explicitly call w.RUnlock() after this function.
func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion uint64) error {
startTime := w.clock.Now()
defer func() {
if resourceVersion > 0 {
metrics.WatchCacheReadWait.WithContext(ctx).WithLabelValues(w.groupResource.String()).Observe(w.clock.Since(startTime).Seconds())
}
}()
// In case resourceVersion is 0, we accept arbitrarily stale result.
// As a result, the condition in the below for loop will never be
@@ -492,21 +501,44 @@ func (s sortableStoreElements) Swap(i, j int) {
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
var err error
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) {
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
items, rv, index, err := w.waitUntilFreshAndListItems(ctx, resourceVersion, key, matchValues)
if err != nil {
return nil, 0, "", err
}
var result []interface{}
for _, item := range items {
elem, ok := item.(*storeElement)
if !ok {
return nil, 0, "", fmt.Errorf("non *storeElement returned from storage: %v", item)
}
if !hasPathPrefix(elem.Key, key) {
continue
}
result = append(result, item)
}
sort.Sort(sortableStoreElements(result))
return result, rv, index, nil
}
func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
}
defer w.RUnlock()
if err != nil {
return nil, 0, "", err
return result, rv, index, err
}
result, rv, index, err := func() ([]interface{}, uint64, string, error) {
result, rv, index, err = func() ([]interface{}, uint64, string, error) {
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
@@ -519,7 +551,6 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
return w.store.List(), w.resourceVersion, "", nil
}()
sort.Sort(sortableStoreElements(result))
return result, rv, index, err
}
@@ -531,7 +562,14 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool {
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
err := w.waitUntilFreshAndBlock(ctx, resourceVersion)
var err error
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
}
defer w.RUnlock()
if err != nil {
return nil, false, 0, err
@@ -614,7 +652,9 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
w.onReplace()
}
w.cond.Broadcast()
klog.V(3).Infof("Replace watchCache (rev: %v) ", resourceVersion)
metrics.RecordResourceVersion(w.groupResource.String(), version)
klog.V(3).Infof("Replaced watchCache (rev: %v) ", resourceVersion)
return nil
}
@@ -629,6 +669,12 @@ func (w *watchCache) Resync() error {
return nil
}
func (w *watchCache) getListResourceVersion() uint64 {
w.RLock()
defer w.RUnlock()
return w.listResourceVersion
}
func (w *watchCache) currentCapacity() int {
w.RLock()
defer w.RUnlock()
@@ -691,7 +737,12 @@ func (w *watchCache) isIndexValidLocked(index int) bool {
// getAllEventsSinceLocked returns a watchCacheInterval that can be used to
// retrieve events since a certain resourceVersion. This function assumes to
// be called under the watchCache lock.
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCacheInterval, error) {
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string, opts storage.ListOptions) (*watchCacheInterval, error) {
_, matchesSingle := opts.Predicate.MatchesSingle()
if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
return w.getIntervalFromStoreLocked(key, matchesSingle)
}
size := w.endIndex - w.startIndex
var oldest uint64
switch {
@@ -711,13 +762,19 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
}
if resourceVersion == 0 {
// resourceVersion = 0 means that we don't require any specific starting point
// and we would like to start watching from ~now.
// However, to keep backward compatibility, we additionally need to return the
// current state and only then start watching from that point.
//
// TODO: In v2 api, we should stop returning the current state - #13969.
return w.getIntervalFromStoreLocked()
if opts.SendInitialEvents == nil {
// resourceVersion = 0 means that we don't require any specific starting point
// and we would like to start watching from ~now.
// However, to keep backward compatibility, we additionally need to return the
// current state and only then start watching from that point.
//
// TODO: In v2 api, we should stop returning the current state - #13969.
return w.getIntervalFromStoreLocked(key, matchesSingle)
}
// SendInitialEvents = false and resourceVersion = 0
// means that the request would like to start watching
// from Any resourceVersion
resourceVersion = w.resourceVersion
}
if resourceVersion < oldest-1 {
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
@@ -731,15 +788,15 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
indexerFunc := func(i int) *watchCacheEvent {
return w.cache[i%w.capacity]
}
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex)
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, w.RWMutex.RLocker())
return ci, nil
}
// getIntervalFromStoreLocked returns a watchCacheInterval
// that covers the entire storage state.
// This function assumes to be called under the watchCache lock.
func (w *watchCache) getIntervalFromStoreLocked() (*watchCacheInterval, error) {
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
func (w *watchCache) getIntervalFromStoreLocked(key string, matchesSingle bool) (*watchCacheInterval, error) {
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc, key, matchesSingle)
if err != nil {
return nil, err
}

View File

@@ -18,6 +18,7 @@ package cacher
import (
"fmt"
"sort"
"sync"
"k8s.io/apimachinery/pkg/fields"
@@ -114,12 +115,40 @@ func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValida
}
}
type sortableWatchCacheEvents []*watchCacheEvent
func (s sortableWatchCacheEvents) Len() int {
return len(s)
}
func (s sortableWatchCacheEvents) Less(i, j int) bool {
return s[i].Key < s[j].Key
}
func (s sortableWatchCacheEvents) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// newCacheIntervalFromStore is meant to handle the case of rv=0, such that the events
// returned by Next() need to be events from a List() done on the underlying store of
// the watch cache.
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) {
// The items returned in the interval will be sorted by Key.
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
buffer := &watchCacheIntervalBuffer{}
allItems := store.List()
var allItems []interface{}
if matchesSingle {
item, exists, err := store.GetByKey(key)
if err != nil {
return nil, err
}
if exists {
allItems = append(allItems, item)
}
} else {
allItems = store.List()
}
buffer.buffer = make([]*watchCacheEvent, len(allItems))
for i, item := range allItems {
elem, ok := item.(*storeElement)
@@ -140,6 +169,7 @@ func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getA
}
buffer.endIndex++
}
sort.Sort(sortableWatchCacheEvents(buffer.buffer))
ci := &watchCacheInterval{
startIndex: 0,
// Simulate that we already have all the events we're looking for.

View File

@@ -21,6 +21,8 @@ import (
"sync"
"time"
"google.golang.org/grpc/metadata"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@@ -34,19 +36,20 @@ const (
progressRequestPeriod = 100 * time.Millisecond
)
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory) *conditionalProgressRequester {
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester {
pr := &conditionalProgressRequester{
clock: clock,
requestWatchProgress: requestWatchProgress,
contextMetadata: contextMetadata,
}
pr.cond = sync.NewCond(pr.mux.RLocker())
pr.cond = sync.NewCond(&pr.mux)
return pr
}
type WatchProgressRequester func(ctx context.Context) error
type TickerFactory interface {
NewTicker(time.Duration) clock.Ticker
NewTimer(time.Duration) clock.Timer
}
// conditionalProgressRequester will request progress notification if there
@@ -54,8 +57,9 @@ type TickerFactory interface {
type conditionalProgressRequester struct {
clock TickerFactory
requestWatchProgress WatchProgressRequester
contextMetadata metadata.MD
mux sync.RWMutex
mux sync.Mutex
cond *sync.Cond
waiting int
stopped bool
@@ -63,6 +67,9 @@ type conditionalProgressRequester struct {
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
ctx := wait.ContextForChannel(stopCh)
if pr.contextMetadata != nil {
ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata)
}
go func() {
defer utilruntime.HandleCrash()
<-stopCh
@@ -71,12 +78,12 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
pr.stopped = true
pr.cond.Signal()
}()
ticker := pr.clock.NewTicker(progressRequestPeriod)
defer ticker.Stop()
timer := pr.clock.NewTimer(progressRequestPeriod)
defer timer.Stop()
for {
stopped := func() bool {
pr.mux.RLock()
defer pr.mux.RUnlock()
pr.mux.Lock()
defer pr.mux.Unlock()
for pr.waiting == 0 && !pr.stopped {
pr.cond.Wait()
}
@@ -87,15 +94,17 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
}
select {
case <-ticker.C():
case <-timer.C():
shouldRequest := func() bool {
pr.mux.RLock()
defer pr.mux.RUnlock()
pr.mux.Lock()
defer pr.mux.Unlock()
return pr.waiting > 0 && !pr.stopped
}()
if !shouldRequest {
timer.Reset(0)
continue
}
timer.Reset(progressRequestPeriod)
err := pr.requestWatchProgress(ctx)
if err != nil {
klog.V(4).InfoS("Error requesting bookmark", "err", err)
@@ -117,5 +126,4 @@ func (pr *conditionalProgressRequester) Remove() {
pr.mux.Lock()
defer pr.mux.Unlock()
pr.waiting -= 1
pr.cond.Signal()
}

View File

@@ -91,3 +91,30 @@ func EncodeContinue(key, keyPrefix string, resourceVersion int64) (string, error
}
return base64.RawURLEncoding.EncodeToString(out), nil
}
// PrepareContinueToken prepares optional
// parameters for retrieving additional results for a paginated request.
//
// This function sets up parameters that a client can use to fetch the remaining results
// from the server if they are available.
func PrepareContinueToken(keyLastItem, keyPrefix string, resourceVersion int64, itemsCount int64, hasMoreItems bool, opts ListOptions) (string, *int64, error) {
var remainingItemCount *int64
var continueValue string
var err error
if hasMoreItems {
// Instruct the client to begin querying from immediately after the last key.
continueValue, err = EncodeContinue(keyLastItem+"\x00", keyPrefix, resourceVersion)
if err != nil {
return "", remainingItemCount, err
}
// Etcd response counts in objects that do not match the pred.
// Instead of returning inaccurate count for non-empty selectors, we return nil.
// We only set remainingItemCount if the predicate is empty.
if opts.Predicate.Empty() {
remainingItems := itemsCount - opts.Predicate.Limit
remainingItemCount = &remainingItems
}
}
return continueValue, remainingItemCount, err
}

View File

@@ -25,7 +25,10 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
)
var ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
var (
ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
ErrStorageNotReady = errors.New("storage not ready")
)
const (
ErrCodeKeyNotFound int = iota + 1
@@ -33,6 +36,7 @@ const (
ErrCodeResourceVersionConflicts
ErrCodeInvalidObj
ErrCodeUnreachable
ErrCodeTimeout
)
var errCodeToMessage = map[int]string{
@@ -41,6 +45,7 @@ var errCodeToMessage = map[int]string{
ErrCodeResourceVersionConflicts: "resource version conflicts",
ErrCodeInvalidObj: "invalid object",
ErrCodeUnreachable: "server unreachable",
ErrCodeTimeout: "request timeout",
}
func NewKeyNotFoundError(key string, rv int64) *StorageError {
@@ -75,6 +80,14 @@ func NewUnreachableError(key string, rv int64) *StorageError {
}
}
func NewTimeoutError(key, msg string) *StorageError {
return &StorageError{
Code: ErrCodeTimeout,
Key: key,
AdditionalErrorMsg: msg,
}
}
func NewInvalidObjError(key, msg string) *StorageError {
return &StorageError{
Code: ErrCodeInvalidObj,
@@ -115,6 +128,11 @@ func IsConflict(err error) bool {
return isErrCode(err, ErrCodeResourceVersionConflicts)
}
// IsRequestTimeout returns true if and only if err indicates that the request has timed out.
func IsRequestTimeout(err error) bool {
return isErrCode(err, ErrCodeTimeout)
}
// IsInvalidObj returns true if and only if err is invalid error
func IsInvalidObj(err error) bool {
return isErrCode(err, ErrCodeInvalidObj)

View File

@@ -28,7 +28,7 @@ func InterpretListError(err error, qualifiedResource schema.GroupResource) error
switch {
case storage.IsNotFound(err):
return errors.NewNotFound(qualifiedResource, "")
case storage.IsUnreachable(err):
case storage.IsUnreachable(err), storage.IsRequestTimeout(err):
return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level
case storage.IsInternalError(err):
return errors.NewInternalError(err)

View File

@@ -2,3 +2,6 @@
reviewers:
- wojtek-t
- serathius
labels:
- sig/etcd

View File

@@ -84,7 +84,7 @@ var (
},
[]string{"endpoint"},
)
storageSizeDescription = compbasemetrics.NewDesc("apiserver_storage_size_bytes", "Size of the storage database file physically allocated in bytes.", []string{"cluster"}, nil, compbasemetrics.ALPHA, "")
storageSizeDescription = compbasemetrics.NewDesc("apiserver_storage_size_bytes", "Size of the storage database file physically allocated in bytes.", []string{"storage_cluster_id"}, nil, compbasemetrics.STABLE, "")
storageMonitor = &monitorCollector{monitorGetter: func() ([]Monitor, error) { return nil, nil }}
etcdEventsReceivedCounts = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
@@ -167,6 +167,7 @@ func Register() {
legacyregistry.MustRegister(objectCounts)
legacyregistry.MustRegister(dbTotalSize)
legacyregistry.CustomMustRegister(storageMonitor)
legacyregistry.MustRegister(etcdEventsReceivedCounts)
legacyregistry.MustRegister(etcdBookmarkCounts)
legacyregistry.MustRegister(etcdLeaseObjectCounts)
legacyregistry.MustRegister(listStorageCount)
@@ -287,21 +288,21 @@ func (c *monitorCollector) CollectWithStability(ch chan<- compbasemetrics.Metric
}
for i, m := range monitors {
cluster := fmt.Sprintf("etcd-%d", i)
storageClusterID := fmt.Sprintf("etcd-%d", i)
klog.V(4).InfoS("Start collecting storage metrics", "cluster", cluster)
klog.V(4).InfoS("Start collecting storage metrics", "storage_cluster_id", storageClusterID)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
metrics, err := m.Monitor(ctx)
cancel()
m.Close()
if err != nil {
klog.InfoS("Failed to get storage metrics", "cluster", cluster, "err", err)
klog.InfoS("Failed to get storage metrics", "storage_cluster_id", storageClusterID, "err", err)
continue
}
metric, err := compbasemetrics.NewConstMetric(storageSizeDescription, compbasemetrics.GaugeValue, float64(metrics.Size), cluster)
metric, err := compbasemetrics.NewConstMetric(storageSizeDescription, compbasemetrics.GaugeValue, float64(metrics.Size), storageClusterID)
if err != nil {
klog.ErrorS(err, "Failed to create metric", "cluster", cluster)
klog.ErrorS(err, "Failed to create metric", "storage_cluster_id", storageClusterID)
}
ch <- metric
}

View File

@@ -38,9 +38,13 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
)
@@ -138,6 +142,9 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) || utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
etcdfeature.DefaultFeatureSupportChecker.CheckClient(c.Ctx(), c, storage.RequestWatchProgress)
}
return s
}
@@ -584,6 +591,52 @@ func (s *store) Count(key string) (int64, error) {
return getResp.Count, nil
}
// ReadinessCheck implements storage.Interface.
func (s *store) ReadinessCheck() error {
return nil
}
// resolveGetListRev is used by GetList to resolve the rev to use in the client.KV.Get request.
func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts storage.ListOptions) (int64, error) {
var withRev int64
// Uses continueRV if this is a continuation request.
if len(continueKey) > 0 {
if len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return withRev, apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
withRev = continueRV
}
return withRev, nil
}
// Returns 0 if ResourceVersion is not specified.
if len(opts.ResourceVersion) == 0 {
return withRev, nil
}
parsedRV, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return withRev, apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
withRev = int64(parsedRV)
case "": // legacy case
if opts.Recursive && opts.Predicate.Limit > 0 && parsedRV > 0 {
withRev = int64(parsedRV)
}
default:
return withRev, fmt.Errorf("unknown ResourceVersionMatch value: %v", opts.ResourceVersionMatch)
}
return withRev, nil
}
// GetList implements storage.Interface.
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
preparedKey, err := s.prepareKey(key)
@@ -636,41 +689,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
var continueRV, withRev int64
var continueKey string
switch {
case opts.Recursive && len(opts.Predicate.Continue) > 0:
if opts.Recursive && len(opts.Predicate.Continue) > 0 {
continueKey, continueRV, err = storage.DecodeContinue(opts.Predicate.Continue, keyPrefix)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
if len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
preparedKey = continueKey
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
withRev = continueRV
}
case len(opts.ResourceVersion) > 0:
parsedRV, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
withRev = int64(parsedRV)
case "": // legacy case
if opts.Recursive && opts.Predicate.Limit > 0 && parsedRV > 0 {
withRev = int64(parsedRV)
}
default:
return fmt.Errorf("unknown ResourceVersionMatch value: %v", opts.ResourceVersionMatch)
}
}
if withRev, err = s.resolveGetListRev(continueKey, continueRV, opts); err != nil {
return err
}
if withRev != 0 {
@@ -738,10 +765,25 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
}
if err := appendListItem(v, data, uint64(kv.ModRevision), opts.Predicate, s.codec, s.versioner, newItemFunc); err != nil {
// Check if the request has already timed out before decode object
select {
case <-ctx.Done():
// parent context is canceled or timed out, no point in continuing
return storage.NewTimeoutError(string(kv.Key), "request did not complete within requested timeout")
default:
}
obj, err := decodeListItem(ctx, data, uint64(kv.ModRevision), s.codec, s.versioner, newItemFunc)
if err != nil {
recordDecodeError(s.groupResourceString, string(kv.Key))
return err
}
// being unable to set the version does not prevent the object from being extracted
if matched, err := opts.Predicate.Matches(obj); err == nil && matched {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
numEvald++
// free kv early. Long lists can take O(seconds) to decode.
@@ -774,27 +816,11 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
v.Set(reflect.MakeSlice(v.Type(), 0, 0))
}
// instruct the client to begin querying from immediately after the last key we returned
// we never return a key that the client wouldn't be allowed to see
if hasMore {
// we want to start immediately after the last key
next, err := storage.EncodeContinue(string(lastKey)+"\x00", keyPrefix, withRev)
if err != nil {
return err
}
var remainingItemCount *int64
// getResp.Count counts in objects that do not match the pred.
// Instead of returning inaccurate count for non-empty selectors, we return nil.
// Only set remainingItemCount if the predicate is empty.
if opts.Predicate.Empty() {
c := int64(getResp.Count - opts.Predicate.Limit)
remainingItemCount = &c
}
return s.versioner.UpdateList(listObj, uint64(withRev), next, remainingItemCount)
continueValue, remainingItemCount, err := storage.PrepareContinueToken(string(lastKey), keyPrefix, withRev, getResp.Count, hasMore, opts)
if err != nil {
return err
}
// no continuation
return s.versioner.UpdateList(listObj, uint64(withRev), "", nil)
return s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount)
}
// growSlice takes a slice value and grows its capacity up
@@ -1015,20 +1041,23 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
return nil
}
// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) error {
// decodeListItem decodes bytes value in array into object.
func decodeListItem(ctx context.Context, data []byte, rev uint64, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) (runtime.Object, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
}()
obj, _, err := codec.Decode(data, nil, newItemFunc())
if err != nil {
return err
return nil, err
}
// being unable to set the version does not prevent the object from being extracted
if err := versioner.UpdateObject(obj, rev); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
if matched, err := pred.Matches(obj); err == nil && matched {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
return nil
return obj, nil
}
// recordDecodeError record decode error split by object type.

View File

@@ -46,8 +46,9 @@ import (
const (
// We have set a buffer in order to reduce times of context switches.
incomingBufSize = 100
outgoingBufSize = 100
incomingBufSize = 100
outgoingBufSize = 100
processEventConcurrency = 10
)
// defaultWatcherMaxLimit is used to facilitate construction tests
@@ -230,8 +231,7 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
go wc.processEvent(&resultChanWG)
wc.processEvents(&resultChanWG)
select {
case err := <-wc.errChan:
@@ -424,10 +424,17 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
close(watchClosedCh)
}
// processEvent processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
// processEvents processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvents(wg *sync.WaitGroup) {
if utilfeature.DefaultFeatureGate.Enabled(features.ConcurrentWatchObjectDecode) {
wc.concurrentProcessEvents(wg)
} else {
wg.Add(1)
go wc.serialProcessEvents(wg)
}
}
func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case e := <-wc.incomingEventChan:
@@ -435,7 +442,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
if res == nil {
continue
}
if len(wc.resultChan) == outgoingBufSize {
if len(wc.resultChan) == cap(wc.resultChan) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
@@ -452,6 +459,95 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
}
}
func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
p := concurrentOrderedEventProcessing{
input: wc.incomingEventChan,
processFunc: wc.transform,
output: wc.resultChan,
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
objectType: wc.watcher.objectType,
groupResource: wc.watcher.groupResource,
}
wg.Add(1)
go func() {
defer wg.Done()
p.scheduleEventProcessing(wc.ctx, wg)
}()
wg.Add(1)
go func() {
defer wg.Done()
p.collectEventProcessing(wc.ctx)
}()
}
type concurrentOrderedEventProcessing struct {
input chan *event
processFunc func(*event) *watch.Event
output chan watch.Event
processingQueue chan chan *watch.Event
// Metadata for logging
objectType string
groupResource schema.GroupResource
}
func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.Context, wg *sync.WaitGroup) {
var e *event
for {
select {
case <-ctx.Done():
return
case e = <-p.input:
}
processingResponse := make(chan *watch.Event, 1)
select {
case <-ctx.Done():
return
case p.processingQueue <- processingResponse:
}
wg.Add(1)
go func(e *event, response chan<- *watch.Event) {
defer wg.Done()
select {
case <-ctx.Done():
case response <- p.processFunc(e):
}
}(e, processingResponse)
}
}
func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
var processingResponse chan *watch.Event
var e *watch.Event
for {
select {
case <-ctx.Done():
return
case processingResponse = <-p.processingQueue:
}
select {
case <-ctx.Done():
return
case e = <-processingResponse:
}
if e == nil {
continue
}
if len(p.output) == cap(p.output) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case <-ctx.Done():
return
case p.output <- *e:
}
}
}
func (wc *watchChan) filter(obj runtime.Object) bool {
if wc.internalPred.Empty() {
return true

View File

@@ -0,0 +1,172 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package feature
import (
"context"
"fmt"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
var (
// Define these static versions to use for checking version of etcd, issue on kubernetes #123192
v3_4_31 = version.MustParseSemantic("3.4.31")
v3_5_0 = version.MustParseSemantic("3.5.0")
v3_5_13 = version.MustParseSemantic("3.5.13")
// DefaultFeatureSupportChecker is a shared global etcd FeatureSupportChecker.
DefaultFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker()
)
// FeatureSupportChecker to define Supports functions.
type FeatureSupportChecker interface {
// Supports check if the feature is supported or not by checking internal cache.
// By default all calls to this function before calling CheckClient returns false.
// Returns true if all endpoints in etcd clients are supporting the feature.
// If client A supports and client B doesn't support the feature, the `Supports` will
// first return true at client A initializtion and then return false on client B
// initialzation, it can flip the support at runtime.
Supports(feature storage.Feature) bool
// CheckClient works with etcd client to recalcualte feature support and cache it internally.
// All etcd clients should support feature to cause `Supports` return true.
// If client A supports and client B doesn't support the feature, the `Supports` will
// first return true at client A initializtion and then return false on client B
// initialzation, it can flip the support at runtime.
CheckClient(ctx context.Context, c client, feature storage.Feature)
}
type defaultFeatureSupportChecker struct {
lock sync.Mutex
progressNotifySupported *bool
checkingEndpoint map[string]struct{}
}
func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker {
return &defaultFeatureSupportChecker{
checkingEndpoint: make(map[string]struct{}),
}
}
// Supports can check the featue from anywhere without storage if it was cached before.
func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) bool {
switch feature {
case storage.RequestWatchProgress:
f.lock.Lock()
defer f.lock.Unlock()
return ptr.Deref(f.progressNotifySupported, false)
default:
runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature))
return false
}
}
// CheckClient accepts client and calculate the support per endpoint and caches it.
func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) {
switch feature {
case storage.RequestWatchProgress:
f.checkClient(ctx, c)
default:
runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature))
}
}
func (f *defaultFeatureSupportChecker) checkClient(ctx context.Context, c client) {
// start with 10 ms, multiply by 2 each step, until 15 s and stays on 15 seconds.
delayFunc := wait.Backoff{
Duration: 10 * time.Millisecond,
Cap: 15 * time.Second,
Factor: 2.0,
Steps: 11}.DelayFunc()
f.lock.Lock()
defer f.lock.Unlock()
for _, ep := range c.Endpoints() {
if _, found := f.checkingEndpoint[ep]; found {
continue
}
f.checkingEndpoint[ep] = struct{}{}
go func(ep string) {
defer runtime.HandleCrash()
err := delayFunc.Until(ctx, true, true, func(ctx context.Context) (done bool, err error) {
internalErr := f.clientSupportsRequestWatchProgress(ctx, c, ep)
return internalErr == nil, nil
})
if err != nil {
klog.ErrorS(err, "Failed to check if RequestWatchProgress is supported by etcd after retrying")
}
}(ep)
}
}
func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client, ep string) error {
supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep)
if err != nil {
return err
}
f.lock.Lock()
defer f.lock.Unlock()
if !supported {
klog.Infof("RequestWatchProgress feature is not supported by %q endpoint", ep)
f.progressNotifySupported = ptr.To(false)
return nil
}
if f.progressNotifySupported == nil {
f.progressNotifySupported = ptr.To(true)
}
return nil
}
// Sub interface of etcd client.
type client interface {
// Endpoints returns list of endpoints in etcd client.
Endpoints() []string
// Status retrieves the status information from the etcd client connected to the specified endpoint.
// It takes a context.Context parameter for cancellation or timeout control.
// It returns a clientv3.StatusResponse containing the status information or an error if the operation fails.
Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
}
// endpointSupportsRequestWatchProgress evaluates whether RequestWatchProgress supported by current version of etcd endpoint.
// Based on this issues:
// - https://github.com/etcd-io/etcd/issues/15220 - Fixed in etcd v3.4.25+ and v3.5.8+
// - https://github.com/etcd-io/etcd/issues/17507 - Fixed in etcd v3.4.31+ and v3.5.13+
func endpointSupportsRequestWatchProgress(ctx context.Context, c client, endpoint string) (bool, error) {
resp, err := c.Status(ctx, endpoint)
if err != nil {
return false, fmt.Errorf("failed checking etcd version, endpoint: %q: %w", endpoint, err)
}
ver, err := version.ParseSemantic(resp.Version)
if err != nil {
// Assume feature is not supported if etcd version cannot be parsed.
klog.ErrorS(err, "Failed to parse etcd version", "version", resp.Version)
return false, nil
}
if ver.LessThan(v3_4_31) || ver.AtLeast(v3_5_0) && ver.LessThan(v3_5_13) {
return false, nil
}
return true, nil
}

View File

@@ -29,6 +29,12 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
// Feature is the name of each feature in storage that we check in feature_support_checker.
type Feature = string
// RequestWatchProgress is an etcd feature that may use to check if it supported or not.
var RequestWatchProgress Feature = "RequestWatchProgress"
// Versioner abstracts setting and retrieving metadata fields from database response
// onto the object ot list. It is required to maintain storage invariants - updating an
// object twice with the same data except for the ResourceVersion and SelfLink must be
@@ -237,6 +243,9 @@ type Interface interface {
// Count returns number of different entries under the key (generally being path prefix).
Count(key string) (int64, error)
// ReadinessCheck checks if the storage is ready for accepting requests.
ReadinessCheck() error
// RequestWatchProgress requests the a watch stream progress status be sent in the
// watch response stream as soon as possible.
// Used for monitor watch progress even if watching resources with no changes.

View File

@@ -17,10 +17,13 @@ limitations under the License.
package storage
import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/request"
)
// AttrFunc returns label and field sets and the uninitialized flag for List or Watch to match.
@@ -115,7 +118,7 @@ func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set)
// MatchesSingleNamespace will return (namespace, true) if and only if s.Field matches on the object's
// namespace.
func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) {
if len(s.Continue) > 0 {
if len(s.Continue) > 0 || s.Field == nil {
return "", false
}
if namespace, ok := s.Field.RequiresExactMatch("metadata.namespace"); ok {
@@ -127,7 +130,7 @@ func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) {
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
// name.
func (s *SelectionPredicate) MatchesSingle() (string, bool) {
if len(s.Continue) > 0 {
if len(s.Continue) > 0 || s.Field == nil {
return "", false
}
// TODO: should be namespace.name
@@ -145,11 +148,16 @@ func (s *SelectionPredicate) Empty() bool {
// For any index defined by IndexFields, if a matcher can match only (a subset)
// of objects that return <value> for a given index, a pair (<index name>, <value>)
// wil be returned.
func (s *SelectionPredicate) MatcherIndex() []MatchValue {
func (s *SelectionPredicate) MatcherIndex(ctx context.Context) []MatchValue {
var result []MatchValue
for _, field := range s.IndexFields {
if value, ok := s.Field.RequiresExactMatch(field); ok {
result = append(result, MatchValue{IndexName: FieldIndex(field), Value: value})
} else if field == "metadata.namespace" {
// list pods in the namespace. i.e. /api/v1/namespaces/default/pods
if namespace, isNamespaceScope := isNamespaceScopedRequest(ctx); isNamespaceScope {
result = append(result, MatchValue{IndexName: FieldIndex(field), Value: namespace})
}
}
}
for _, label := range s.IndexLabels {
@@ -160,6 +168,14 @@ func (s *SelectionPredicate) MatcherIndex() []MatchValue {
return result
}
func isNamespaceScopedRequest(ctx context.Context) (string, bool) {
re, _ := request.RequestInfoFrom(ctx)
if re == nil || len(re.Namespace) == 0 {
return "", false
}
return re.Namespace, true
}
// LabelIndex add prefix for label index.
func LabelIndex(label string) string {
return "l:" + label

View File

@@ -20,6 +20,7 @@ import (
"time"
oteltrace "go.opentelemetry.io/otel/trace"
noopoteltrace "go.opentelemetry.io/otel/trace/noop"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -117,6 +118,6 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
HealthcheckTimeout: DefaultHealthcheckTimeout,
ReadycheckTimeout: DefaultReadinessTimeout,
LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),
Transport: TransportConfig{TracerProvider: oteltrace.NewNoopTracerProvider()},
Transport: TransportConfig{TracerProvider: noopoteltrace.NewTracerProvider()},
}
}

View File

@@ -317,6 +317,7 @@ var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, e
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
tracingOpts := []otelgrpc.Option{
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
otelgrpc.WithPropagators(tracing.Propagators()),
otelgrpc.WithTracerProvider(c.TracerProvider),
}

View File

@@ -24,18 +24,12 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/validation/path"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)
const (
// initialEventsAnnotationKey the name of the key
// under which an annotation marking the end of list stream
// is kept.
initialEventsAnnotationKey = "k8s.io/initial-events-end"
)
type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error)
// SimpleUpdateFunc converts SimpleUpdateFunc into UpdateFunc
@@ -46,10 +40,6 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
}
}
func EverythingFunc(runtime.Object) bool {
return true
}
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
@@ -144,7 +134,7 @@ func AnnotateInitialEventsEndBookmark(obj runtime.Object) error {
if objAnnotations == nil {
objAnnotations = map[string]string{}
}
objAnnotations[initialEventsAnnotationKey] = "true"
objAnnotations[metav1.InitialEventsAnnotationKey] = "true"
objMeta.SetAnnotations(objAnnotations)
return nil
}
@@ -157,5 +147,5 @@ func HasInitialEventsEndBookmarkAnnotation(obj runtime.Object) (bool, error) {
return false, err
}
objAnnotations := objMeta.GetAnnotations()
return objAnnotations[initialEventsAnnotationKey] == "true", nil
return objAnnotations[metav1.InitialEventsAnnotationKey] == "true", nil
}