Upgrade k8s package verison (#5358)

* upgrade k8s package version

Signed-off-by: hongzhouzi <hongzhouzi@kubesphere.io>

* Script upgrade and code formatting.

Signed-off-by: hongzhouzi <hongzhouzi@kubesphere.io>

Signed-off-by: hongzhouzi <hongzhouzi@kubesphere.io>
This commit is contained in:
hongzhouzi
2022-11-15 14:56:38 +08:00
committed by GitHub
parent 5f91c1663a
commit 44167aa47a
3106 changed files with 321340 additions and 172080 deletions

View File

@@ -31,20 +31,23 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
utiltrace "k8s.io/utils/trace"
)
var (
emptyFunc = func() {}
emptyFunc = func(bool) {}
)
const (
@@ -144,6 +147,10 @@ func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool
}
func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) {
// note that we don't have to call setDrainInputBufferLocked method on the watchers
// because we take advantage of the default value - stop immediately
// also watchers that have had already its draining strategy set
// are no longer available (they were removed from the allWatchers and the valueWatchers maps)
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
klog.Warningf("Terminating all watchers from cacher %v", objectType)
}
@@ -180,6 +187,10 @@ func newTimeBucketWatchers(clock clock.Clock, bookmarkFrequency time.Duration) *
// adds a watcher to the bucket, if the deadline is before the start, it will be
// added to the first one.
func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
// note that the returned time can be before t.createTime,
// especially in cases when the nextBookmarkTime method
// give us the zero value of type Time
// so buckedID can hold a negative value
nextTime, ok := w.nextBookmarkTime(t.clock.Now(), t.bookmarkFrequency)
if !ok {
return false
@@ -190,7 +201,7 @@ func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
if bucketID < t.startBucketID {
bucketID = t.startBucketID
}
watchers, _ := t.watchersBuckets[bucketID]
watchers := t.watchersBuckets[bucketID]
t.watchersBuckets[bucketID] = append(watchers, w)
return true
}
@@ -230,6 +241,8 @@ type Cacher struct {
// Incoming events that should be dispatched to watchers.
incoming chan watchCacheEvent
resourcePrefix string
sync.RWMutex
// Before accessing the cacher's cache, wait for the ready to be ok.
@@ -292,6 +305,8 @@ type Cacher struct {
watchersToStop []*cacheWatcher
// Maintain a timeout queue to send the bookmark event before the watcher times out.
bookmarkWatchers *watcherBookmarkTimeBuckets
// expiredBookmarkWatchers is a list of watchers that were expired and need to be schedule for a next bookmark event
expiredBookmarkWatchers []*cacheWatcher
}
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
@@ -328,6 +343,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
}
objType := reflect.TypeOf(obj)
cacher := &Cacher{
resourcePrefix: config.ResourcePrefix,
ready: newReady(),
storage: config.Storage,
objectType: objType,
@@ -341,7 +357,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
},
// TODO: Figure out the correct value for the buffer size.
incoming: make(chan watchCacheEvent, 100),
dispatchTimeoutBudget: newTimeBudget(stopCh),
dispatchTimeoutBudget: newTimeBudget(),
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
@@ -369,6 +385,10 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
reflector.WatchListPageSize = storageWatchListPageSize
// When etcd loses leader for 3 cycles, it returns error "no leader".
// We don't want to terminate all watchers as recreating all watchers puts high load on api-server.
// In most of the cases, leader is reelected within few cycles.
reflector.MaxInternalErrorRetryDuration = time.Second * 30
cacher.watchCache = watchCache
cacher.reflector = reflector
@@ -403,6 +423,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
successfulList = true
c.ready.set(true)
klog.V(1).Infof("cacher (%v): initialized", c.objectType.String())
metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc()
})
defer func() {
if successfulList {
@@ -456,7 +477,9 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return nil, err
}
c.ready.wait()
if err := c.ready.wait(); err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
}
triggerValue, triggerSupported := "", false
if c.indexedTrigger != nil {
@@ -469,21 +492,12 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
}
}
// If there is indexedTrigger defined, but triggerSupported is false,
// we can't narrow the amount of events significantly at this point.
//
// That said, currently indexedTrigger is defined only for couple resources:
// Pods, Nodes, Secrets and ConfigMaps and there is only a constant
// number of watchers for which triggerSupported is false (excluding those
// issued explicitly by users).
// Thus, to reduce the risk of those watchers blocking all watchers of a
// given resource in the system, we increase the sizes of buffers for them.
chanSize := 10
if c.indexedTrigger != nil && !triggerSupported {
// TODO: We should tune this value and ideally make it dependent on the
// number of objects of a given type and/or their churn.
chanSize = 1000
}
// It boils down to a tradeoff between:
// - having it as small as possible to reduce memory usage
// - having it large enough to ensure that watchers that need to process
// a bunch of changes have enough buffer to avoid from blocking other
// watchers on our watcher having a processing hiccup
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
// Determine watch timeout('0' means deadline is not set, ignore checking)
deadline, _ := ctx.Deadline()
@@ -503,7 +517,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
@@ -511,18 +525,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newErrWatcher(err), nil
}
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
if len(initEvents) > 0 {
watchRV = initEvents[len(initEvents)-1].ResourceVersion
}
func() {
c.Lock()
defer c.Unlock()
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
// Add it to the queue only when the client support watch bookmarks.
@@ -532,15 +539,10 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.watcherIdx++
}()
go watcher.process(ctx, initEvents, watchRV)
go watcher.processInterval(ctx, cacheInterval, watchRV)
return watcher, nil
}
// WatchList implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return c.Watch(ctx, key, opts)
}
// Get implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
if opts.ResourceVersion == "" {
@@ -565,7 +567,9 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
c.ready.wait()
if err := c.ready.wait(); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
objVal, err := conversion.EnforcePtr(objPtr)
if err != nil {
@@ -592,95 +596,46 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
return nil
}
// GetToList implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
// NOTICE: Keep in sync with shouldListFromStorage function in
//
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
func shouldDelegateList(opts storage.ListOptions) bool {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero
return c.storage.GetToList(ctx, key, opts, listObj)
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetToList(ctx, key, opts, listObj)
}
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)
c.ready.wait()
trace.Step("Ready")
// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil {
return err
}
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterWithAttrsFunction(key, pred)
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
if err != nil {
return err
}
trace.Step("Got from cache")
if exists {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Labels, elem.Fields) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err
}
}
return nil
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero
return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact
}
// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
func (c *Cacher) listItems(listRV uint64, key string, pred storage.SelectionPredicate, trace *utiltrace.Trace, recursive bool) ([]interface{}, uint64, string, error) {
if !recursive {
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
if err != nil {
return nil, 0, "", err
}
if exists {
return []interface{}{obj}, readResourceVersion, "", nil
}
return nil, readResourceVersion, "", nil
}
return c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
}
// GetList implements storage.Interface
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
recursive := opts.Recursive
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero.
return c.storage.List(ctx, key, opts, listObj)
if shouldDelegateList(opts) {
return c.storage.GetList(ctx, key, opts, listObj)
}
// If resourceVersion is specified, serve it from cache.
@@ -694,13 +649,17 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.List(ctx, key, opts, listObj)
return c.storage.GetList(ctx, key, opts, listObj)
}
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
trace := utiltrace.New("cacher list",
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
utiltrace.Field{Key: "type", Value: c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)
c.ready.wait()
if err := c.ready.wait(); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
trace.Step("Ready")
// List elements with at least 'listRV' from cache.
@@ -717,11 +676,11 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
}
filter := filterWithAttrsFunction(key, pred)
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
objs, readResourceVersion, indexUsed, err := c.listItems(listRV, key, pred, trace, recursive)
if err != nil {
return err
}
trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)})
trace.Step("Listed items from cache", utiltrace.Field{Key: "count", Value: len(objs)})
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
// Resize the slice appropriately, since we already know that none
// of the elements will be filtered out.
@@ -737,18 +696,19 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()})
trace.Step("Filtered items", utiltrace.Field{Key: "count", Value: listVal.Len()})
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err
}
}
metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(objs), listVal.Len())
return nil
}
// GuaranteedUpdate implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ runtime.Object) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
@@ -758,10 +718,10 @@ func (c *Cacher) GuaranteedUpdate(
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, currObj)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, nil)
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
}
// Count implements storage.Interface.
@@ -828,6 +788,7 @@ func (c *Cacher) dispatchEvents() {
c.dispatchEvent(&event)
}
lastProcessedResourceVersion = event.ResourceVersion
metrics.EventsCounter.WithLabelValues(c.objectType.String()).Inc()
case <-bookmarkTimer.C():
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
// Never send a bookmark event if we did not see an event here, this is fine
@@ -872,11 +833,11 @@ func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
// Don't wrap Object for delete events - these are not to deliver any
// events. Only wrap PrevObject.
if object, err := newCachingObject(event.PrevObject); err == nil {
// Update resource version of the underlying object.
// Update resource version of the object.
// event.PrevObject is used to deliver DELETE watch events and
// for them, we set resourceVersion to <current> instead of
// the resourceVersion of the last modification of the object.
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
updateResourceVersion(object, versioner, event.ResourceVersion)
event.PrevObject = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
@@ -905,14 +866,14 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
// from it justifies increased memory usage, so for now we drop the cached
// serializations after dispatching this event.
//
// Given the deep-copies that are done to create cachingObjects,
// we try to cache serializations only if there are at least 3 watchers.
if len(c.watchersBuffer) >= 3 {
// Make a shallow copy to allow overwriting Object and PrevObject.
wcEvent := *event
setCachingObjects(&wcEvent, c.versioner)
event = &wcEvent
}
// Given that CachingObject is just wrapping the object and not perfoming
// deep-copying (until some field is explicitly being modified), we create
// it unconditionally to ensure safety and reduce deep-copying.
//
// Make a shallow copy to allow overwriting Object and PrevObject.
wcEvent := *event
setCachingObjects(&wcEvent, c.versioner)
event = &wcEvent
c.blockedWatchers = c.blockedWatchers[:0]
for _, watcher := range c.watchersBuffer {
@@ -928,8 +889,11 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
timeout := c.dispatchTimeoutBudget.takeAvailable()
c.timer.Reset(timeout)
// Make sure every watcher will try to send event without blocking first,
// even if the timer has already expired.
// Send event to all blocked watchers. As long as timer is running,
// `add` will wait for the watcher to unblock. After timeout,
// `add` will not wait, but immediately close a still blocked watcher.
// Hence, every watcher gets the chance to unblock itself while timer
// is running, not only the first ones in the list.
timer := c.timer
for _, watcher := range c.blockedWatchers {
if !watcher.add(event, timer) {
@@ -950,7 +914,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
}
}
func (c *Cacher) startDispatchingBookmarkEvents() {
func (c *Cacher) startDispatchingBookmarkEventsLocked() {
// Pop already expired watchers. However, explicitly ignore stopped ones,
// as we don't delete watcher from bookmarkWatchers when it is stopped.
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
@@ -961,8 +925,7 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
continue
}
c.watchersBuffer = append(c.watchersBuffer, watcher)
// Requeue the watcher for the next bookmark if needed.
c.bookmarkWatchers.addWatcher(watcher)
c.expiredBookmarkWatchers = append(c.expiredBookmarkWatchers, watcher)
}
}
}
@@ -987,7 +950,7 @@ func (c *Cacher) startDispatching(event *watchCacheEvent) {
c.watchersBuffer = c.watchersBuffer[:0]
if event.Type == watch.Bookmark {
c.startDispatchingBookmarkEvents()
c.startDispatchingBookmarkEventsLocked()
// return here to reduce following code indentation and diff
return
}
@@ -1028,22 +991,31 @@ func (c *Cacher) finishDispatching() {
defer c.Unlock()
c.dispatching = false
for _, watcher := range c.watchersToStop {
watcher.stopThreadUnsafe()
watcher.stopLocked()
}
c.watchersToStop = c.watchersToStop[:0]
for _, watcher := range c.expiredBookmarkWatchers {
if watcher.stopped {
continue
}
// requeue the watcher for the next bookmark if needed.
c.bookmarkWatchers.addWatcher(watcher)
}
c.expiredBookmarkWatchers = c.expiredBookmarkWatchers[:0]
}
func (c *Cacher) terminateAllWatchers() {
c.Lock()
defer c.Unlock()
c.watchers.terminateAll(c.objectType, c.stopWatcherThreadUnsafe)
c.watchers.terminateAll(c.objectType, c.stopWatcherLocked)
}
func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) {
func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) {
if c.dispatching {
c.watchersToStop = append(c.watchersToStop, watcher)
} else {
watcher.stopThreadUnsafe()
watcher.stopLocked()
}
}
@@ -1062,20 +1034,23 @@ func (c *Cacher) Stop() {
return
}
c.stopped = true
c.ready.stop()
c.stopLock.Unlock()
close(c.stopCh)
c.stopWg.Wait()
}
func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func() {
return func() {
func forgetWatcher(c *Cacher, w *cacheWatcher, index int, triggerValue string, triggerSupported bool) func(bool) {
return func(drainWatcher bool) {
c.Lock()
defer c.Unlock()
w.setDrainInputBufferLocked(drainWatcher)
// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopThreadUnsafe()
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe)
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherLocked)
}
}
@@ -1091,7 +1066,9 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit
// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
c.ready.wait()
if err := c.ready.wait(); err != nil {
return 0, errors.NewServiceUnavailable(err.Error())
}
resourceVersion := c.reflector.LastSyncResourceVersion()
return c.versioner.ParseResourceVersion(resourceVersion)
@@ -1123,7 +1100,12 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
Continue: options.Continue,
}
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersionMatch: options.ResourceVersionMatch, Predicate: pred}, list); err != nil {
storageOpts := storage.ListOptions{
ResourceVersionMatch: options.ResourceVersionMatch,
Predicate: pred,
Recursive: true,
}
if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
return nil, err
}
return list, nil
@@ -1134,11 +1116,10 @@ func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interfac
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
ProgressNotify: true,
}
if utilfeature.DefaultFeatureGate.Enabled(features.EfficientWatchResumption) {
opts.ProgressNotify = true
}
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, opts)
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
}
// errWatcher implements watch.Interface to return a single error
@@ -1189,7 +1170,7 @@ type cacheWatcher struct {
done chan struct{}
filter filterWithAttrsFunc
stopped bool
forget func()
forget func(bool)
versioner storage.Versioner
// The watcher will be closed by server after the deadline,
// save it here to send bookmark events before that.
@@ -1201,9 +1182,13 @@ type cacheWatcher struct {
// human readable identifier that helps assigning cacheWatcher
// instance with request
identifier string
// drainInputBuffer indicates whether we should delay closing this watcher
// and send all event in the input buffer.
drainInputBuffer bool
}
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher {
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher {
return &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
@@ -1226,16 +1211,29 @@ func (c *cacheWatcher) ResultChan() <-chan watch.Event {
// Implements watch.Interface.
func (c *cacheWatcher) Stop() {
c.forget()
c.forget(false)
}
// we rely on the fact that stopThredUnsafe is actually protected by Cacher.Lock()
func (c *cacheWatcher) stopThreadUnsafe() {
// we rely on the fact that stopLocked is actually protected by Cacher.Lock()
func (c *cacheWatcher) stopLocked() {
if !c.stopped {
c.stopped = true
close(c.done)
// stop without draining the input channel was requested.
if !c.drainInputBuffer {
close(c.done)
}
close(c.input)
}
// Even if the watcher was already stopped, if it previously was
// using draining mode and it's not using it now we need to
// close the done channel now. Otherwise we could leak the
// processing goroutine if it will be trying to put more objects
// into result channel, the channel will be full and there will
// already be noone on the processing the events on the receiving end.
if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() {
close(c.done)
}
}
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
@@ -1259,8 +1257,8 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// Since we don't want to block on it infinitely,
// we simply terminate it.
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.objectType.String(), c.identifier, len(c.input), len(c.result))
terminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc()
c.forget()
metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc()
c.forget(false)
}
if timer == nil {
@@ -1280,12 +1278,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) {
// We try to send bookmarks:
// (a) roughly every minute
// (b) right before the watcher timeout - for now we simply set it 2s before
//
// (a) right before the watcher timeout - for now we simply set it 2s before
// the deadline
// The former gives us periodicity if the watch breaks due to unexpected
// conditions, the later ensures that on timeout the watcher is as close to
//
// (b) roughly every minute
//
// (b) gives us periodicity if the watch breaks due to unexpected
// conditions, (a) ensures that on timeout the watcher is as close to
// now as possible - this covers 99% of cases.
heartbeatTime := now.Add(bookmarkFrequency)
if c.deadline.IsZero() {
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
@@ -1302,20 +1304,33 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Du
return heartbeatTime, true
}
func getEventObject(object runtime.Object) runtime.Object {
if _, ok := object.(runtime.CacheableObject); ok {
// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher
// until we send all events residing in the input buffer.
func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) {
c.drainInputBuffer = drain
}
// isDoneChannelClosed checks if c.done channel is closed
func (c *cacheWatcher) isDoneChannelClosedLocked() bool {
select {
case <-c.done:
return true
default:
}
return false
}
func getMutableObject(object runtime.Object) runtime.Object {
if _, ok := object.(*cachingObject); ok {
// It is safe to return without deep-copy, because the underlying
// object was already deep-copied during construction.
// object will lazily perform deep-copy on the first try to change
// any of its fields.
return object
}
return object.DeepCopyObject()
}
func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
if _, ok := object.(*cachingObject); ok {
// We assume that for cachingObject resourceVersion was already propagated before.
return
}
func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
}
@@ -1338,13 +1353,17 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event
switch {
case curObjPasses && !oldObjPasses:
return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)}
return &watch.Event{Type: watch.Added, Object: getMutableObject(event.Object)}
case curObjPasses && oldObjPasses:
return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)}
return &watch.Event{Type: watch.Modified, Object: getMutableObject(event.Object)}
case !curObjPasses && oldObjPasses:
// return a delete event with the previous object content, but with the event's resource version
oldObj := getEventObject(event.PrevObject)
updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion)
oldObj := getMutableObject(event.PrevObject)
// We know that if oldObj is cachingObject (which can only be set via
// setCachingObjects), its resourceVersion is already set correctly and
// we don't need to update it. However, since cachingObject efficiently
// handles noop updates, we avoid this microoptimization here.
updateResourceVersion(oldObj, c.versioner, event.ResourceVersion)
return &watch.Event{Type: watch.Deleted, Object: oldObj}
}
@@ -1366,7 +1385,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
// would give us non-determinism.
// At the same time, we don't want to block infinitely on putting
// to c.result, when c.done is already closed.
//
// This ensures that with c.done already close, we at most once go
// into the next select after this. With that, no matter which
// statement we choose there, we will deliver only consecutive
@@ -1383,8 +1402,10 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
}
}
func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
defer utilruntime.HandleCrash()
defer close(c.result)
defer c.Stop()
// Check how long we are processing initEvents.
// As long as these are not processed, we are not processing
@@ -1401,20 +1422,60 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
// consider increase size of result buffer in those cases.
const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now()
for _, event := range initEvents {
initEventCount := 0
for {
event, err := cacheInterval.Next()
if err != nil {
// An error indicates that the cache interval
// has been invalidated and can no longer serve
// events.
//
// Initially we considered sending an "out-of-history"
// Error event in this case, but because historically
// such events weren't sent out of the watchCache, we
// decided not to. This is still ok, because on watch
// closure, the watcher will try to re-instantiate the
// watch and then will get an explicit "out-of-history"
// window. There is potential for optimization, but for
// now, in order to be on the safe side and not break
// custom clients, the cost of it is something that we
// are fully accepting.
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
return
}
if event == nil {
break
}
c.sendWatchCacheEvent(event)
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
resourceVersion = event.ResourceVersion
initEventCount++
}
objType := c.objectType.String()
if len(initEvents) > 0 {
initCounter.WithLabelValues(objType).Add(float64(len(initEvents)))
if initEventCount > 0 {
metrics.InitCounter.WithLabelValues(objType).Add(float64(initEventCount))
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime)
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, objType, c.identifier, processingTime)
}
defer close(c.result)
defer c.Stop()
c.process(ctx, resourceVersion)
}
func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
// At this point we already start processing incoming watch events.
// However, the init event can still be processed because their serialization
// and sending to the client happens asynchrnously.
// TODO: As describe in the KEP, we would like to estimate that by delaying
// the initialization signal proportionally to the number of events to
// process, but we're leaving this to the tuning phase.
utilflowcontrol.WatchInitialized(ctx)
for {
select {
case event, ok := <-c.input:
@@ -1430,36 +1491,3 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
}
}
}
type ready struct {
ok bool
c *sync.Cond
}
func newReady() *ready {
return &ready{c: sync.NewCond(&sync.RWMutex{})}
}
func (r *ready) wait() {
r.c.L.Lock()
for !r.ok {
r.c.Wait()
}
r.c.L.Unlock()
}
// TODO: Make check() function more sophisticated, in particular
// allow it to behave as "waitWithTimeout".
func (r *ready) check() bool {
rwMutex := r.c.L.(*sync.RWMutex)
rwMutex.RLock()
defer rwMutex.RUnlock()
return r.ok
}
func (r *ready) set(ok bool) {
r.c.L.Lock()
defer r.c.L.Unlock()
r.ok = ok
r.c.Broadcast()
}