improve IAM module

Signed-off-by: hongming <talonwan@yunify.com>
This commit is contained in:
hongming
2020-05-22 09:35:05 +08:00
parent 0d12529051
commit 8f93266ec0
640 changed files with 50221 additions and 18179 deletions

View File

@@ -301,8 +301,6 @@ type Cacher struct {
watchersToStop []*cacheWatcher
// Maintain a timeout queue to send the bookmark event before the watcher times out.
bookmarkWatchers *watcherBookmarkTimeBuckets
// watchBookmark feature-gate
watchBookmarkEnabled bool
}
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
@@ -355,11 +353,10 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: stopCh,
clock: clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(clock),
watchBookmarkEnabled: utilfeature.DefaultFeatureGate.Enabled(features.WatchBookmark),
stopCh: stopCh,
clock: clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(clock),
}
// Ensure that timer is stopped.
@@ -516,8 +513,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
// Add it to the queue only when server and client support watch bookmarks.
if c.watchBookmarkEnabled && watcher.allowWatchBookmarks {
// Add it to the queue only when the client support watch bookmarks.
if watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcher(watcher)
}
c.watcherIdx++
@@ -624,7 +621,10 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil || listVal.Kind() != reflect.Slice {
if err != nil {
return err
}
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterWithAttrsFunction(key, pred)
@@ -693,7 +693,10 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil || listVal.Kind() != reflect.Slice {
if err != nil {
return err
}
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterWithAttrsFunction(key, pred)
@@ -748,17 +751,25 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
// baseObjectThreadUnsafe omits locking for cachingObject.
func baseObjectThreadUnsafe(object runtime.Object) runtime.Object {
if co, ok := object.(*cachingObject); ok {
return co.object
}
return object
}
func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bool) {
if c.indexedTrigger == nil {
return nil, false
}
result := make([]string, 0, 2)
result = append(result, c.indexedTrigger.indexerFunc(event.Object))
result = append(result, c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.Object)))
if event.PrevObject == nil {
return result, true
}
prevTriggerValue := c.indexedTrigger.indexerFunc(event.PrevObject)
prevTriggerValue := c.indexedTrigger.indexerFunc(baseObjectThreadUnsafe(event.PrevObject))
if result[0] != prevTriggerValue {
result = append(result, prevTriggerValue)
}
@@ -776,10 +787,6 @@ func (c *Cacher) processEvent(event *watchCacheEvent) {
func (c *Cacher) dispatchEvents() {
// Jitter to help level out any aggregate load.
bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
// Stop the timer when watchBookmarkFeatureGate is not enabled.
if !c.watchBookmarkEnabled && !bookmarkTimer.Stop() {
<-bookmarkTimer.C()
}
defer bookmarkTimer.Stop()
lastProcessedResourceVersion := uint64(0)
@@ -816,6 +823,37 @@ func (c *Cacher) dispatchEvents() {
}
}
func setCachingObjects(event *watchCacheEvent, versioner storage.Versioner) {
switch event.Type {
case watch.Added, watch.Modified:
if object, err := newCachingObject(event.Object); err == nil {
event.Object = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
// Don't wrap PrevObject for update event (for create events it is nil).
// We only encode those to deliver DELETE watch events, so if
// event.Object is not nil it can be used only for watchers for which
// selector was satisfied for its previous version and is no longer
// satisfied for the current version.
// This is rare enough that it doesn't justify making deep-copy of the
// object (done by newCachingObject) every time.
case watch.Deleted:
// Don't wrap Object for delete events - these are not to deliver any
// events. Only wrap PrevObject.
if object, err := newCachingObject(event.PrevObject); err == nil {
// Update resource version of the underlying object.
// event.PrevObject is used to deliver DELETE watch events and
// for them, we set resourceVersion to <current> instead of
// the resourceVersion of the last modification of the object.
updateResourceVersionIfNeeded(object.object, versioner, event.ResourceVersion)
event.PrevObject = object
} else {
klog.Errorf("couldn't create cachingObject from: %#v", event.Object)
}
}
}
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
defer c.finishDispatching()
@@ -829,6 +867,23 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
watcher.nonblockingAdd(event)
}
} else {
// Set up caching of object serializations only for dispatching this event.
//
// Storing serializations in memory would result in increased memory usage,
// but it would help for caching encodings for watches started from old
// versions. However, we still don't have a convincing data that the gain
// from it justifies increased memory usage, so for now we drop the cached
// serializations after dispatching this event.
//
// Given the deep-copies that are done to create cachingObjects,
// we try to cache serializations only if there are at least 3 watchers.
if len(c.watchersBuffer) >= 3 {
// Make a shallow copy to allow overwriting Object and PrevObject.
wcEvent := *event
setCachingObjects(&wcEvent, c.versioner)
event = &wcEvent
}
c.blockedWatchers = c.blockedWatchers[:0]
for _, watcher := range c.watchersBuffer {
if !watcher.nonblockingAdd(event) {
@@ -886,7 +941,10 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
// startDispatching chooses watchers potentially interested in a given event
// a marks dispatching as true.
func (c *Cacher) startDispatching(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)
// It is safe to call triggerValuesThreadUnsafe here, because at this
// point only this thread can access this event (we create a separate
// watchCacheEvent for every dispatch).
triggerValues, supported := c.triggerValuesThreadUnsafe(event)
c.Lock()
defer c.Unlock()
@@ -1159,7 +1217,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", reflect.TypeOf(event.Object).String())
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
c.forget()
}
@@ -1187,6 +1245,25 @@ func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
return c.deadline.Add(-2 * time.Second), true
}
func getEventObject(object runtime.Object) runtime.Object {
if _, ok := object.(runtime.CacheableObject); ok {
// It is safe to return without deep-copy, because the underlying
// object was already deep-copied during construction.
return object
}
return object.DeepCopyObject()
}
func updateResourceVersionIfNeeded(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
if _, ok := object.(*cachingObject); ok {
// We assume that for cachingObject resourceVersion was already propagated before.
return
}
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
}
}
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
if event.Type == watch.Bookmark {
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
@@ -1204,15 +1281,13 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event
switch {
case curObjPasses && !oldObjPasses:
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
return &watch.Event{Type: watch.Added, Object: getEventObject(event.Object)}
case curObjPasses && oldObjPasses:
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
return &watch.Event{Type: watch.Modified, Object: getEventObject(event.Object)}
case !curObjPasses && oldObjPasses:
// return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
oldObj := getEventObject(event.PrevObject)
updateResourceVersionIfNeeded(oldObj, c.versioner, event.ResourceVersion)
return &watch.Event{Type: watch.Deleted, Object: oldObj}
}

View File

@@ -0,0 +1,397 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"bytes"
"fmt"
"io"
"reflect"
"runtime/debug"
"sync"
"sync/atomic"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
)
var _ runtime.CacheableObject = &cachingObject{}
// metaRuntimeInterface implements runtime.Object and
// metav1.Object interfaces.
type metaRuntimeInterface interface {
runtime.Object
metav1.Object
}
// serializationResult captures a result of serialization.
type serializationResult struct {
// once should be used to ensure serialization is computed once.
once sync.Once
// raw is serialized object.
raw []byte
// err is error from serialization.
err error
}
// serializationsCache is a type for caching serialization results.
type serializationsCache map[runtime.Identifier]*serializationResult
// cachingObject is an object that is able to cache its serializations
// so that each of those is computed exactly once.
//
// cachingObject implements the metav1.Object interface (accessors for
// all metadata fields). However, setters for all fields except from
// SelfLink (which is set lately in the path) are ignored.
type cachingObject struct {
lock sync.RWMutex
// Object for which serializations are cached.
object metaRuntimeInterface
// serializations is a cache containing object`s serializations.
// The value stored in atomic.Value is of type serializationsCache.
// The atomic.Value type is used to allow fast-path.
serializations atomic.Value
}
// newCachingObject performs a deep copy of the given object and wraps it
// into a cachingObject.
// An error is returned if it's not possible to cast the object to
// metav1.Object type.
func newCachingObject(object runtime.Object) (*cachingObject, error) {
if obj, ok := object.(metaRuntimeInterface); ok {
result := &cachingObject{object: obj.DeepCopyObject().(metaRuntimeInterface)}
result.serializations.Store(make(serializationsCache))
return result, nil
}
return nil, fmt.Errorf("can't cast object to metav1.Object: %#v", object)
}
func (o *cachingObject) getSerializationResult(id runtime.Identifier) *serializationResult {
// Fast-path for getting from cache.
serializations := o.serializations.Load().(serializationsCache)
if result, exists := serializations[id]; exists {
return result
}
// Slow-path (that may require insert).
o.lock.Lock()
defer o.lock.Unlock()
serializations = o.serializations.Load().(serializationsCache)
// Check if in the meantime it wasn't inserted.
if result, exists := serializations[id]; exists {
return result
}
// Insert an entry for <id>. This requires copy of existing map.
newSerializations := make(serializationsCache)
for k, v := range serializations {
newSerializations[k] = v
}
result := &serializationResult{}
newSerializations[id] = result
o.serializations.Store(newSerializations)
return result
}
// CacheEncode implements runtime.CacheableObject interface.
// It serializes the object and writes the result to given io.Writer trying
// to first use the already cached result and falls back to a given encode
// function in case of cache miss.
// It assumes that for a given identifier, the encode function always encodes
// each input object into the same output format.
func (o *cachingObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
result := o.getSerializationResult(id)
result.once.Do(func() {
buffer := bytes.NewBuffer(nil)
result.err = encode(o.GetObject(), buffer)
result.raw = buffer.Bytes()
})
// Once invoked, fields of serialization will not change.
if result.err != nil {
return result.err
}
_, err := w.Write(result.raw)
return err
}
// GetObject implements runtime.CacheableObject interface.
// It returns deep-copy of the wrapped object to return ownership of it
// to the called according to the contract of the interface.
func (o *cachingObject) GetObject() runtime.Object {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.DeepCopyObject().(metaRuntimeInterface)
}
// GetObjectKind implements runtime.Object interface.
func (o *cachingObject) GetObjectKind() schema.ObjectKind {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetObjectKind()
}
// DeepCopyObject implements runtime.Object interface.
func (o *cachingObject) DeepCopyObject() runtime.Object {
// DeepCopyObject on cachingObject is not expected to be called anywhere.
// However, to be on the safe-side, we implement it, though given the
// cache is only an optimization we ignore copying it.
result := &cachingObject{}
result.serializations.Store(make(serializationsCache))
o.lock.RLock()
defer o.lock.RUnlock()
result.object = o.object.DeepCopyObject().(metaRuntimeInterface)
return result
}
var (
invalidationCacheTimestampLock sync.Mutex
invalidationCacheTimestamp time.Time
)
// shouldLogCacheInvalidation allows for logging cache-invalidation
// at most once per second (to avoid spamming logs in case of issues).
func shouldLogCacheInvalidation(now time.Time) bool {
invalidationCacheTimestampLock.Lock()
defer invalidationCacheTimestampLock.Unlock()
if invalidationCacheTimestamp.Add(time.Second).Before(now) {
invalidationCacheTimestamp = now
return true
}
return false
}
func (o *cachingObject) invalidateCacheLocked() {
if cache, ok := o.serializations.Load().(serializationsCache); ok && len(cache) == 0 {
return
}
// We don't expect cache invalidation to happen - so we want
// to log the stacktrace to allow debugging if that will happen.
// OTOH, we don't want to spam logs with it.
// So we try to log it at most once per second.
if shouldLogCacheInvalidation(time.Now()) {
klog.Warningf("Unexpected cache invalidation for %#v\n%s", o.object, string(debug.Stack()))
}
o.serializations.Store(make(serializationsCache))
}
// The following functions implement metav1.Object interface:
// - getters simply delegate for the underlying object
// - setters check if operations isn't noop and if so,
// invalidate the cache and delegate for the underlying object
func (o *cachingObject) conditionalSet(isNoop func() bool, set func()) {
if fastPath := func() bool {
o.lock.RLock()
defer o.lock.RUnlock()
return isNoop()
}(); fastPath {
return
}
o.lock.Lock()
defer o.lock.Unlock()
if isNoop() {
return
}
o.invalidateCacheLocked()
set()
}
func (o *cachingObject) GetNamespace() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetNamespace()
}
func (o *cachingObject) SetNamespace(namespace string) {
o.conditionalSet(
func() bool { return o.object.GetNamespace() == namespace },
func() { o.object.SetNamespace(namespace) },
)
}
func (o *cachingObject) GetName() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetName()
}
func (o *cachingObject) SetName(name string) {
o.conditionalSet(
func() bool { return o.object.GetName() == name },
func() { o.object.SetName(name) },
)
}
func (o *cachingObject) GetGenerateName() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetGenerateName()
}
func (o *cachingObject) SetGenerateName(name string) {
o.conditionalSet(
func() bool { return o.object.GetGenerateName() == name },
func() { o.object.SetGenerateName(name) },
)
}
func (o *cachingObject) GetUID() types.UID {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetUID()
}
func (o *cachingObject) SetUID(uid types.UID) {
o.conditionalSet(
func() bool { return o.object.GetUID() == uid },
func() { o.object.SetUID(uid) },
)
}
func (o *cachingObject) GetResourceVersion() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetResourceVersion()
}
func (o *cachingObject) SetResourceVersion(version string) {
o.conditionalSet(
func() bool { return o.object.GetResourceVersion() == version },
func() { o.object.SetResourceVersion(version) },
)
}
func (o *cachingObject) GetGeneration() int64 {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetGeneration()
}
func (o *cachingObject) SetGeneration(generation int64) {
o.conditionalSet(
func() bool { return o.object.GetGeneration() == generation },
func() { o.object.SetGeneration(generation) },
)
}
func (o *cachingObject) GetSelfLink() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetSelfLink()
}
func (o *cachingObject) SetSelfLink(selfLink string) {
o.conditionalSet(
func() bool { return o.object.GetSelfLink() == selfLink },
func() { o.object.SetSelfLink(selfLink) },
)
}
func (o *cachingObject) GetCreationTimestamp() metav1.Time {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetCreationTimestamp()
}
func (o *cachingObject) SetCreationTimestamp(timestamp metav1.Time) {
o.conditionalSet(
func() bool { return o.object.GetCreationTimestamp() == timestamp },
func() { o.object.SetCreationTimestamp(timestamp) },
)
}
func (o *cachingObject) GetDeletionTimestamp() *metav1.Time {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetDeletionTimestamp()
}
func (o *cachingObject) SetDeletionTimestamp(timestamp *metav1.Time) {
o.conditionalSet(
func() bool { return o.object.GetDeletionTimestamp() == timestamp },
func() { o.object.SetDeletionTimestamp(timestamp) },
)
}
func (o *cachingObject) GetDeletionGracePeriodSeconds() *int64 {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetDeletionGracePeriodSeconds()
}
func (o *cachingObject) SetDeletionGracePeriodSeconds(gracePeriodSeconds *int64) {
o.conditionalSet(
func() bool { return o.object.GetDeletionGracePeriodSeconds() == gracePeriodSeconds },
func() { o.object.SetDeletionGracePeriodSeconds(gracePeriodSeconds) },
)
}
func (o *cachingObject) GetLabels() map[string]string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetLabels()
}
func (o *cachingObject) SetLabels(labels map[string]string) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetLabels(), labels) },
func() { o.object.SetLabels(labels) },
)
}
func (o *cachingObject) GetAnnotations() map[string]string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetAnnotations()
}
func (o *cachingObject) SetAnnotations(annotations map[string]string) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetAnnotations(), annotations) },
func() { o.object.SetAnnotations(annotations) },
)
}
func (o *cachingObject) GetFinalizers() []string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetFinalizers()
}
func (o *cachingObject) SetFinalizers(finalizers []string) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetFinalizers(), finalizers) },
func() { o.object.SetFinalizers(finalizers) },
)
}
func (o *cachingObject) GetOwnerReferences() []metav1.OwnerReference {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetOwnerReferences()
}
func (o *cachingObject) SetOwnerReferences(references []metav1.OwnerReference) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetOwnerReferences(), references) },
func() { o.object.SetOwnerReferences(references) },
)
}
func (o *cachingObject) GetClusterName() string {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetClusterName()
}
func (o *cachingObject) SetClusterName(clusterName string) {
o.conditionalSet(
func() bool { return o.object.GetClusterName() == clusterName },
func() { o.object.SetClusterName(clusterName) },
)
}
func (o *cachingObject) GetManagedFields() []metav1.ManagedFieldsEntry {
o.lock.RLock()
defer o.lock.RUnlock()
return o.object.GetManagedFields()
}
func (o *cachingObject) SetManagedFields(managedFields []metav1.ManagedFieldsEntry) {
o.conditionalSet(
func() bool { return reflect.DeepEqual(o.object.GetManagedFields(), managedFields) },
func() { o.object.SetManagedFields(managedFields) },
)
}

View File

@@ -40,6 +40,10 @@ const (
// before terminating request and returning Timeout error with retry
// after suggestion.
blockTimeout = 3 * time.Second
// resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client
// after receiving a 'too high resource version' error.
resourceVersionTooHighRetrySeconds = 1
)
// watchCacheEvent is a single "watch event" that is send to users of
@@ -219,7 +223,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
return err
}
watchCacheEvent := &watchCacheEvent{
wcEvent := &watchCacheEvent{
Type: event.Type,
Object: elem.Object,
ObjLabels: elem.Labels,
@@ -242,12 +246,12 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
}
if exists {
previousElem := previous.(*storeElement)
watchCacheEvent.PrevObject = previousElem.Object
watchCacheEvent.PrevObjLabels = previousElem.Labels
watchCacheEvent.PrevObjFields = previousElem.Fields
wcEvent.PrevObject = previousElem.Object
wcEvent.PrevObjLabels = previousElem.Labels
wcEvent.PrevObjFields = previousElem.Fields
}
w.updateCache(watchCacheEvent)
w.updateCache(wcEvent)
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()
@@ -260,7 +264,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
// This is safe as long as there is at most one call to processEvent in flight
// at any point in time.
if w.eventHandler != nil {
w.eventHandler(watchCacheEvent)
w.eventHandler(wcEvent)
}
return nil
}
@@ -303,8 +307,8 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utilt
}
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= blockTimeout {
// Timeout with retry after 1 second.
return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1)
// Request that the client retry after 'resourceVersionTooHighRetrySeconds' seconds.
return storage.NewTooLargeResourceVersionError(resourceVersion, w.resourceVersion, resourceVersionTooHighRetrySeconds)
}
w.cond.Wait()
}
@@ -464,7 +468,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
return result, nil
}
if resourceVersion < oldest-1 {
return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
}
// Binary search the smallest index at which resourceVersion is greater than the given one.