update dependencies (#6267)

Signed-off-by: hongming <coder.scala@gmail.com>
(cherry picked from commit cfebd96a1f)
This commit is contained in:
hongming
2025-03-11 14:19:32 +08:00
parent 742c1e52db
commit 39eab5ee5c
4246 changed files with 341171 additions and 131193 deletions

View File

@@ -59,6 +59,12 @@ type Config struct {
// FullResyncPeriod is the period at which ShouldResync is considered.
FullResyncPeriod time.Duration
// MinWatchTimeout, if set, will define the minimum timeout for watch requests send
// to kube-apiserver. However, values lower than 5m will not be honored to avoid
// negative performance impact on controlplane.
// Optional - if unset a default value of 5m will be used.
MinWatchTimeout time.Duration
// ShouldResync is periodically used by the reflector to determine
// whether to Resync the Queue. If ShouldResync is `nil` or
// returns true, it means the reflector should proceed with the
@@ -138,6 +144,7 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.config.Queue,
ReflectorOptions{
ResyncPeriod: c.config.FullResyncPeriod,
MinWatchTimeout: c.config.MinWatchTimeout,
TypeDescription: c.config.ObjectDescription,
Clock: c.clock,
},
@@ -336,6 +343,68 @@ func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
return MetaNamespaceKeyFunc(obj)
}
// DeletionHandlingObjectToName checks for
// DeletedFinalStateUnknown objects before calling
// ObjectToName.
func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) {
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return ParseObjectName(d.Key)
}
return ObjectToName(obj)
}
// InformerOptions configure a Reflector.
type InformerOptions struct {
// ListerWatcher implements List and Watch functions for the source of the resource
// the informer will be informing about.
ListerWatcher ListerWatcher
// ObjectType is an object of the type that informer is expected to receive.
ObjectType runtime.Object
// Handler defines functions that should called on object mutations.
Handler ResourceEventHandler
// ResyncPeriod is the underlying Reflector's resync period. If non-zero, the store
// is re-synced with that frequency - Modify events are delivered even if objects
// didn't change.
// This is useful for synchronizing objects that configure external resources
// (e.g. configure cloud provider functionalities).
// Optional - if unset, store resyncing is not happening periodically.
ResyncPeriod time.Duration
// MinWatchTimeout, if set, will define the minimum timeout for watch requests send
// to kube-apiserver. However, values lower than 5m will not be honored to avoid
// negative performance impact on controlplane.
// Optional - if unset a default value of 5m will be used.
MinWatchTimeout time.Duration
// Indexers, if set, are the indexers for the received objects to optimize
// certain queries.
// Optional - if unset no indexes are maintained.
Indexers Indexers
// Transform function, if set, will be called on all objects before they will be
// put into the Store and corresponding Add/Modify/Delete handlers will be invoked
// for them.
// Optional - if unset no additional transforming is happening.
Transform TransformFunc
}
// NewInformerWithOptions returns a Store and a controller for populating the store
// while also providing event notifications. You should only used the returned
// Store for Get/List operations; Add/Modify/Deletes will cause the event
// notifications to be faulty.
func NewInformerWithOptions(options InformerOptions) (Store, Controller) {
var clientState Store
if options.Indexers == nil {
clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
} else {
clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers)
}
return clientState, newInformer(clientState, options)
}
// NewInformer returns a Store and a controller for populating the store
// while also providing event notifications. You should only used the returned
// Store for Get/List operations; Add/Modify/Deletes will cause the event
@@ -350,6 +419,8 @@ func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
// long as possible (until the upstream source closes the watch or times out,
// or you stop the controller).
// - h is the object you want notifications sent to.
//
// Deprecated: Use NewInformerWithOptions instead.
func NewInformer(
lw ListerWatcher,
objType runtime.Object,
@@ -359,7 +430,13 @@ func NewInformer(
// This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
options := InformerOptions{
ListerWatcher: lw,
ObjectType: objType,
Handler: h,
ResyncPeriod: resyncPeriod,
}
return clientState, newInformer(clientState, options)
}
// NewIndexerInformer returns an Indexer and a Controller for populating the index
@@ -377,6 +454,8 @@ func NewInformer(
// or you stop the controller).
// - h is the object you want notifications sent to.
// - indexers is the indexer for the received object type.
//
// Deprecated: Use NewInformerWithOptions instead.
func NewIndexerInformer(
lw ListerWatcher,
objType runtime.Object,
@@ -387,7 +466,14 @@ func NewIndexerInformer(
// This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
options := InformerOptions{
ListerWatcher: lw,
ObjectType: objType,
Handler: h,
ResyncPeriod: resyncPeriod,
Indexers: indexers,
}
return clientState, newInformer(clientState, options)
}
// NewTransformingInformer returns a Store and a controller for populating
@@ -397,6 +483,8 @@ func NewIndexerInformer(
// The given transform function will be called on all objects before they will
// put into the Store and corresponding Add/Modify/Delete handlers will
// be invoked for them.
//
// Deprecated: Use NewInformerWithOptions instead.
func NewTransformingInformer(
lw ListerWatcher,
objType runtime.Object,
@@ -407,7 +495,14 @@ func NewTransformingInformer(
// This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
options := InformerOptions{
ListerWatcher: lw,
ObjectType: objType,
Handler: h,
ResyncPeriod: resyncPeriod,
Transform: transformer,
}
return clientState, newInformer(clientState, options)
}
// NewTransformingIndexerInformer returns an Indexer and a controller for
@@ -417,6 +512,8 @@ func NewTransformingInformer(
// The given transform function will be called on all objects before they will
// be put into the Index and corresponding Add/Modify/Delete handlers will
// be invoked for them.
//
// Deprecated: Use NewInformerWithOptions instead.
func NewTransformingIndexerInformer(
lw ListerWatcher,
objType runtime.Object,
@@ -428,7 +525,15 @@ func NewTransformingIndexerInformer(
// This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
options := InformerOptions{
ListerWatcher: lw,
ObjectType: objType,
Handler: h,
ResyncPeriod: resyncPeriod,
Indexers: indexers,
Transform: transformer,
}
return clientState, newInformer(clientState, options)
}
// Multiplexes updates in the form of a list of Deltas into a Store, and informs
@@ -471,42 +576,29 @@ func processDeltas(
// providing event notifications.
//
// Parameters
// - lw is list and watch functions for the source of the resource you want to
// be informed of.
// - objType is an object of the type that you expect to receive.
// - resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
// calls, even if nothing changed). Otherwise, re-list will be delayed as
// long as possible (until the upstream source closes the watch or times out,
// or you stop the controller).
// - h is the object you want notifications sent to.
// - clientState is the store you want to populate
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
transformer TransformFunc,
) Controller {
// - options contain the options to configure the controller
func newInformer(clientState Store, options InformerOptions) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transformer,
Transformer: options.Transform,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
ListerWatcher: options.ListerWatcher,
ObjectType: options.ObjectType,
FullResyncPeriod: options.ResyncPeriod,
MinWatchTimeout: options.MinWatchTimeout,
RetryOnError: false,
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, deltas, isInInitialList)
return processDeltas(options.Handler, clientState, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
},

View File

@@ -139,20 +139,17 @@ type DeltaFIFO struct {
}
// TransformFunc allows for transforming an object before it will be processed.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
//
// New in v1.27: In such cases, the contained object will already have gone
// through the transform object separately (when it was added / updated prior
// to the delete), so the TransformFunc can likely safely ignore such objects
// (i.e., just return the input object).
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
//
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
// sees the object before any other actor, and it is now safe to mutate the
// object in place instead of making a copy.
// New in v1.27: TransformFunc sees the object before any other actor, and it
// is now safe to mutate the object in place instead of making a copy.
//
// It's recommended for the TransformFunc to be idempotent.
// It MUST be idempotent if objects already present in the cache are passed to
// the Replace() to avoid re-mutating them. Default informers do not pass
// existing objects to Replace though.
//
// Note that TransformFunc is called while inserting objects into the
// notification queue and is therefore extremely performance sensitive; please
@@ -440,22 +437,38 @@ func isDeletionDup(a, b *Delta) *Delta {
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
return f.queueActionInternalLocked(actionType, actionType, obj)
}
// queueActionInternalLocked appends to the delta list for the object.
// The actionType is emitted and must honor emitDeltaTypeReplaced.
// The internalActionType is only used within this function and must
// ignore emitDeltaTypeReplaced.
// Caller must lock first.
func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// Every object comes through this code path once, so this is a good
// place to call the transform func. If obj is a
// DeletedFinalStateUnknown tombstone, then the containted inner object
// will already have gone through the transformer, but we document that
// this can happen. In cases involving Replace(), such an object can
// come through multiple times.
// place to call the transform func.
//
// If obj is a DeletedFinalStateUnknown tombstone or the action is a Sync,
// then the object have already gone through the transformer.
//
// If the objects already present in the cache are passed to Replace(),
// the transformer must be idempotent to avoid re-mutating them,
// or coordinate with all readers from the cache to avoid data races.
// Default informers do not pass existing objects to Replace.
if f.transformer != nil {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
_, isTombstone := obj.(DeletedFinalStateUnknown)
if !isTombstone && internalActionType != Sync {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
}
}
}
@@ -638,7 +651,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil {
if err := f.queueActionInternalLocked(action, Replaced, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}

View File

@@ -50,8 +50,7 @@ type Indexer interface {
// GetIndexers return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
AddIndexers(newIndexers Indexers) error
}

View File

@@ -30,7 +30,7 @@ import (
// AppendFunc is used to add a matching item to whatever list the caller is using
type AppendFunc func(interface{})
// ListAll calls appendFn with each value retrieved from store which matches the selector.
// ListAll lists items in the store matching the given selector, calling appendFn on each one.
func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
selectAll := selector.Empty()
for _, m := range store.List() {
@@ -51,7 +51,9 @@ func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
return nil
}
// ListAllByNamespace used to list items belongs to namespace from Indexer.
// ListAllByNamespace lists items in the given namespace in the store matching the given selector,
// calling appendFn on each one.
// If a blank namespace (NamespaceAll) is specified, this delegates to ListAll().
func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
if namespace == metav1.NamespaceAll {
return ListAll(indexer, selector, appendFn)

View File

@@ -36,6 +36,10 @@ type Lister interface {
// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
//
// If Watch returns an error, it should handle its own cleanup, including
// but not limited to calling Stop() on the watch, if one was constructed.
// This allows the caller to ignore the watch, if the error is non-nil.
Watch(options metav1.ListOptions) (watch.Interface, error)
}

View File

@@ -22,7 +22,6 @@ import (
"fmt"
"io"
"math/rand"
"os"
"reflect"
"strings"
"sync"
@@ -39,15 +38,23 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
"k8s.io/client-go/tools/pager"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
"k8s.io/utils/trace"
)
const defaultExpectedTypeName = "<unspecified>"
var (
// We try to spread the load on apiserver by setting timeouts for
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
defaultMinWatchTimeout = 5 * time.Minute
)
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
@@ -71,6 +78,8 @@ type Reflector struct {
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
resyncPeriod time.Duration
// minWatchTimeout defines the minimum timeout for watch requests.
minWatchTimeout time.Duration
// clock allows tests to manipulate time
clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls.
@@ -107,7 +116,9 @@ type Reflector struct {
// might result in an increased memory consumption of the APIServer.
//
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
UseWatchList bool
//
// TODO(#115478): Consider making reflector.UseWatchList a private field. Since we implemented "api streaming" on the etcd storage layer it should work.
UseWatchList *bool
}
// ResourceVersionUpdater is an interface that allows store implementation to
@@ -148,12 +159,6 @@ func DefaultWatchErrorHandler(r *Reflector, err error) {
}
}
var (
// We try to spread the load on apiserver by setting timeouts for
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
minWatchTimeout = 5 * time.Minute
)
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
// The indexer is configured to key on namespace
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
@@ -191,6 +196,10 @@ type ReflectorOptions struct {
// (do not resync).
ResyncPeriod time.Duration
// MinWatchTimeout, if non-zero, defines the minimum timeout for watch requests send to kube-apiserver.
// However, values lower than 5m will not be honored to avoid negative performance impact on controlplane.
MinWatchTimeout time.Duration
// Clock allows tests to control time. If unset defaults to clock.RealClock{}
Clock clock.Clock
}
@@ -210,9 +219,14 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
if reflectorClock == nil {
reflectorClock = clock.RealClock{}
}
minWatchTimeout := defaultMinWatchTimeout
if options.MinWatchTimeout > defaultMinWatchTimeout {
minWatchTimeout = options.MinWatchTimeout
}
r := &Reflector{
name: options.Name,
resyncPeriod: options.ResyncPeriod,
minWatchTimeout: minWatchTimeout,
typeDescription: options.TypeDescription,
listerWatcher: lw,
store: store,
@@ -237,8 +251,10 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
r.expectedGVK = getExpectedGVKFromObject(expectedType)
}
if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
r.UseWatchList = true
// don't overwrite UseWatchList if already set
// because the higher layers (e.g. storage/cacher) disabled it on purpose
if r.UseWatchList == nil {
r.UseWatchList = ptr.To(clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient))
}
return r
@@ -325,9 +341,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
var err error
var w watch.Interface
fallbackToList := !r.UseWatchList
useWatchList := ptr.Deref(r.UseWatchList, false)
fallbackToList := !useWatchList
if r.UseWatchList {
if useWatchList {
w, err = r.watchList(stopCh)
if w == nil && err == nil {
// stopCh was closed
@@ -349,12 +366,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go r.startResync(stopCh, cancelCh, resyncerrc)
return r.watch(w, stopCh, resyncerrc)
return r.watchWithResync(w, stopCh)
}
// startResync periodically calls r.store.Resync() method.
@@ -385,6 +397,15 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}
}
}
// watchWithResync runs watch with startResync in the background.
func (r *Reflector) watchWithResync(w watch.Interface, stopCh <-chan struct{}) error {
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go r.startResync(stopCh, cancelCh, resyncerrc)
return r.watch(w, stopCh, resyncerrc)
}
// watch simply starts a watch request with the server.
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
var err error
@@ -407,7 +428,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
start := r.clock.Now()
if w == nil {
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(),
// We want to avoid situations of hanging watchers. Stop any watchers that do not
@@ -434,13 +455,14 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
}
}
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
err = handleWatch(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
r.clock, resyncerrc, stopCh)
// Ensure that watch will not be reused across iterations.
w.Stop()
w = nil
retry.After(err)
if err != nil {
if err != errorStopRequested {
if !errors.Is(err, errorStopRequested) {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
@@ -634,7 +656,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
// TODO(#115478): large "list", slow clients, slow network, p&f
// might slow down streaming and eventually fail.
// maybe in such a case we should retry with an increased timeout?
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: lastKnownRV,
AllowWatchBookmarks: true,
@@ -651,14 +673,12 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
}
return nil, err
}
bookmarkReceived := pointer.Bool(false)
err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
watchListBookmarkReceived, err := handleListWatch(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
func(rv string) { resourceVersion = rv },
bookmarkReceived,
r.clock, make(chan error), stopCh)
if err != nil {
w.Stop() // stop and retry with clean state
if err == errorStopRequested {
if errors.Is(err, errorStopRequested) {
return nil, nil
}
if isErrorRetriableWithSideEffectsFn(err) {
@@ -666,7 +686,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
}
return nil, err
}
if *bookmarkReceived {
if watchListBookmarkReceived {
break
}
}
@@ -678,10 +698,10 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
// we utilize the temporaryStore to ensure independence from the current store implementation.
// as of today, the store is implemented as a queue and will be drained by the higher-level
// component as soon as it finishes replacing the content.
checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore)
checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List)
if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %w", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
@@ -698,8 +718,12 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
return r.store.Replace(found, resourceVersion)
}
// watchHandler watches w and sets setLastSyncResourceVersion
func watchHandler(start time.Time,
// handleListWatch consumes events from w, updates the Store, and records the
// last seen ResourceVersion, to allow continuing from that ResourceVersion on
// retry. If successful, the watcher will be left open after receiving the
// initial set of objects, to allow watching for future events.
func handleListWatch(
start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
@@ -707,31 +731,77 @@ func watchHandler(start time.Time,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
exitOnInitialEventsEndBookmark *bool,
clock clock.Clock,
errc chan error,
errCh chan error,
stopCh <-chan struct{},
) (bool, error) {
exitOnWatchListBookmarkReceived := true
return handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName,
setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh)
}
// handleListWatch consumes events from w, updates the Store, and records the
// last seen ResourceVersion, to allow continuing from that ResourceVersion on
// retry. The watcher will always be stopped on exit.
func handleWatch(
start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
clock clock.Clock,
errCh chan error,
stopCh <-chan struct{},
) error {
exitOnWatchListBookmarkReceived := false
_, err := handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName,
setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh)
return err
}
// handleAnyWatch consumes events from w, updates the Store, and records the last
// seen ResourceVersion, to allow continuing from that ResourceVersion on retry.
// If exitOnWatchListBookmarkReceived is true, the watch events will be consumed
// until a bookmark event is received with the WatchList annotation present.
// Returns true (watchListBookmarkReceived) if the WatchList bookmark was
// received, even if exitOnWatchListBookmarkReceived is false.
// The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is
// true and watchListBookmarkReceived is true. This allows the same watch stream
// to be re-used by the caller to continue watching for new events.
func handleAnyWatch(start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
exitOnWatchListBookmarkReceived bool,
clock clock.Clock,
errCh chan error,
stopCh <-chan struct{},
) (bool, error) {
watchListBookmarkReceived := false
eventCount := 0
if exitOnInitialEventsEndBookmark != nil {
// set it to false just in case somebody
// made it positive
*exitOnInitialEventsEndBookmark = false
}
initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnWatchListBookmarkReceived)
defer initialEventsEndBookmarkWarningTicker.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
return watchListBookmarkReceived, errorStopRequested
case err := <-errCh:
return watchListBookmarkReceived, err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
return watchListBookmarkReceived, apierrors.FromObject(event.Object)
}
if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
@@ -772,10 +842,8 @@ loop:
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = true
}
if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
watchListBookmarkReceived = true
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
@@ -785,20 +853,23 @@ loop:
rvu.UpdateResourceVersion(resourceVersion)
}
eventCount++
if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
if exitOnWatchListBookmarkReceived && watchListBookmarkReceived {
watchDuration := clock.Since(start)
klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
return nil
return watchListBookmarkReceived, nil
}
initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now())
case <-initialEventsEndBookmarkWarningTicker.C():
initialEventsEndBookmarkWarningTicker.warnIfExpired()
}
}
watchDuration := clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
return watchListBookmarkReceived, fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
return nil
return watchListBookmarkReceived, nil
}
// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
@@ -910,3 +981,95 @@ func isWatchErrorRetriable(err error) bool {
}
return false
}
// wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it.
func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
return func(_ context.Context, options metav1.ListOptions) (runtime.Object, error) {
return listFn(options)
}
}
// initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event
// which marks the end of the watch stream, has not been received within the defined tick interval.
//
// Note:
// The methods exposed by this type are not thread-safe.
type initialEventsEndBookmarkTicker struct {
clock.Ticker
clock clock.Clock
name string
watchStart time.Time
tickInterval time.Duration
lastEventObserveTime time.Time
}
// newInitialEventsEndBookmarkTicker returns a noop ticker if exitOnInitialEventsEndBookmarkRequested is false.
// Otherwise, it returns a ticker that exposes a method producing a warning if the bookmark event,
// which marks the end of the watch stream, has not been received within the defined tick interval.
//
// Note that the caller controls whether to call t.C() and t.Stop().
//
// In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method.
func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {
return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived)
}
func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {
clockWithTicker, ok := c.(clock.WithTicker)
if !ok || !exitOnWatchListBookmarkReceived {
if exitOnWatchListBookmarkReceived {
klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested")
}
return &initialEventsEndBookmarkTicker{
Ticker: &noopTicker{},
}
}
return &initialEventsEndBookmarkTicker{
Ticker: clockWithTicker.NewTicker(tickInterval),
clock: c,
name: name,
watchStart: watchStart,
tickInterval: tickInterval,
}
}
func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObserveTime time.Time) {
t.lastEventObserveTime = lastEventObserveTime
}
func (t *initialEventsEndBookmarkTicker) warnIfExpired() {
if err := t.produceWarningIfExpired(); err != nil {
klog.Warning(err)
}
}
// produceWarningIfExpired returns an error that represents a warning when
// the time elapsed since the last received event exceeds the tickInterval.
//
// Note that this method should be called when t.C() yields a value.
func (t *initialEventsEndBookmarkTicker) produceWarningIfExpired() error {
if _, ok := t.Ticker.(*noopTicker); ok {
return nil /*noop ticker*/
}
if t.lastEventObserveTime.IsZero() {
return fmt.Errorf("%s: awaiting required bookmark event for initial events stream, no events received for %v", t.name, t.clock.Since(t.watchStart))
}
elapsedTime := t.clock.Now().Sub(t.lastEventObserveTime)
hasBookmarkTimerExpired := elapsedTime >= t.tickInterval
if !hasBookmarkTimerExpired {
return nil
}
return fmt.Errorf("%s: hasn't received required bookmark event marking the end of initial events stream, received last event %v ago", t.name, elapsedTime)
}
var _ clock.Ticker = &noopTicker{}
// TODO(#115478): move to k8s/utils repo
type noopTicker struct{}
func (t *noopTicker) C() <-chan time.Time { return nil }
func (t *noopTicker) Stop() {}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2023 The Kubernetes Authors.
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -18,102 +18,26 @@ package cache
import (
"context"
"os"
"sort"
"strconv"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/client-go/util/consistencydetector"
)
var dataConsistencyDetectionEnabled = false
func init() {
dataConsistencyDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
}
// checkWatchListConsistencyIfRequested performs a data consistency check only when
// checkWatchListDataConsistencyIfRequested performs a data consistency check only when
// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
//
// The consistency check is meant to be enforced only in the CI, not in production.
// The check ensures that data retrieved by the watch-list api call
// is exactly the same as data received by the standard list api call.
// is exactly the same as data received by the standard list api call against etcd.
//
// Note that this function will panic when data inconsistency is detected.
// This is intentional because we want to catch it in the CI.
func checkWatchListConsistencyIfRequested(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) {
if !dataConsistencyDetectionEnabled {
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
if !consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
return
}
checkWatchListConsistency(stopCh, identity, lastSyncedResourceVersion, listerWatcher, store)
}
// checkWatchListConsistency exists solely for testing purposes.
// we cannot use checkWatchListConsistencyIfRequested because
// it is guarded by an environmental variable.
// we cannot manipulate the environmental variable because
// it will affect other tests in this package.
func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) {
klog.Warningf("%s: data consistency check for the watch-list feature is enabled, this will result in an additional call to the API server.", identity)
opts := metav1.ListOptions{
ResourceVersion: lastSyncedResourceVersion,
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
}
var list runtime.Object
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), time.Second, true, func(_ context.Context) (done bool, err error) {
list, err = listerWatcher.List(opts)
if err != nil {
// the consistency check will only be enabled in the CI
// and LIST calls in general will be retired by the client-go library
// if we fail simply log and retry
klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err)
return false, nil
}
return true, nil
})
if err != nil {
klog.Errorf("failed to list data from the server, the watch-list consistency check won't be performed, stopCh was closed, err: %v", err)
return
}
rawListItems, err := meta.ExtractListWithAlloc(list)
if err != nil {
panic(err) // this should never happen
}
listItems := toMetaObjectSliceOrDie(rawListItems)
storeItems := toMetaObjectSliceOrDie(store.List())
sort.Sort(byUID(listItems))
sort.Sort(byUID(storeItems))
if !cmp.Equal(listItems, storeItems) {
klog.Infof("%s: data received by the new watch-list api call is different than received by the standard list api call, diff: %v", identity, cmp.Diff(listItems, storeItems))
msg := "data inconsistency detected for the watch-list feature, panicking!"
panic(msg)
}
}
type byUID []metav1.Object
func (a byUID) Len() int { return len(a) }
func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() }
func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object {
result := make([]metav1.Object, len(s))
for i, v := range s {
m, err := meta.Accessor(v)
if err != nil {
panic(err)
}
result[i] = m
}
return result
// for informers we pass an empty ListOptions because
// listFn might be wrapped for filtering during informer construction.
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
}

View File

@@ -31,6 +31,8 @@ import (
"k8s.io/utils/clock"
"k8s.io/klog/v2"
clientgofeaturegate "k8s.io/client-go/features"
)
// SharedInformer provides eventually consistent linkage of its
@@ -409,6 +411,10 @@ func (v *dummyController) HasSynced() bool {
}
func (v *dummyController) LastSyncResourceVersion() string {
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) {
return v.informer.LastSyncResourceVersion()
}
return ""
}
@@ -540,8 +546,8 @@ func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
if s.stopped {
return fmt.Errorf("indexer was not added because it has stopped already")
}
return s.indexer.AddIndexers(indexers)

View File

@@ -52,8 +52,7 @@ type ThreadSafeStore interface {
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
AddIndexers(newIndexers Indexers) error
// Resync is a no-op and is deprecated
Resync() error
@@ -135,50 +134,66 @@ func (i *storeIndex) addIndexers(newIndexers Indexers) error {
return nil
}
// updateSingleIndex modifies the objects location in the named index:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
// updateSingleIndex must be called from a function that already has a lock on the cache
func (i *storeIndex) updateSingleIndex(name string, oldObj interface{}, newObj interface{}, key string) {
var oldIndexValues, indexValues []string
indexFunc, ok := i.indexers[name]
if !ok {
// Should never happen. Caller is responsible for ensuring this exists, and should call with lock
// held to avoid any races.
panic(fmt.Errorf("indexer %q does not exist", name))
}
if oldObj != nil {
var err error
oldIndexValues, err = indexFunc(oldObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
} else {
oldIndexValues = oldIndexValues[:0]
}
if newObj != nil {
var err error
indexValues, err = indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
} else {
indexValues = indexValues[:0]
}
index := i.indices[name]
if index == nil {
index = Index{}
i.indices[name] = index
}
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
// We optimize for the most common case where indexFunc returns a single value which has not been changed
return
}
for _, value := range oldIndexValues {
i.deleteKeyFromIndex(key, value, index)
}
for _, value := range indexValues {
i.addKeyToIndex(key, value, index)
}
}
// updateIndices modifies the objects location in the managed indexes:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
var oldIndexValues, indexValues []string
var err error
for name, indexFunc := range i.indexers {
if oldObj != nil {
oldIndexValues, err = indexFunc(oldObj)
} else {
oldIndexValues = oldIndexValues[:0]
}
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
if newObj != nil {
indexValues, err = indexFunc(newObj)
} else {
indexValues = indexValues[:0]
}
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := i.indices[name]
if index == nil {
index = Index{}
i.indices[name] = index
}
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
// We optimize for the most common case where indexFunc returns a single value which has not been changed
continue
}
for _, value := range oldIndexValues {
i.deleteKeyFromIndex(key, value, index)
}
for _, value := range indexValues {
i.addKeyToIndex(key, value, index)
}
for name := range i.indexers {
i.updateSingleIndex(name, oldObj, newObj, key)
}
}
@@ -339,11 +354,18 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
c.lock.Lock()
defer c.lock.Unlock()
if len(c.items) > 0 {
return fmt.Errorf("cannot add indexers to running index")
if err := c.index.addIndexers(newIndexers); err != nil {
return err
}
return c.index.addIndexers(newIndexers)
// If there are already items, index them
for key, item := range c.items {
for name := range newIndexers {
c.index.updateSingleIndex(name, nil, item, key)
}
}
return nil
}
func (c *threadSafeMap) Resync() error {

View File

@@ -16,4 +16,4 @@ limitations under the License.
// +k8s:deepcopy-gen=package
package api
package api // import "k8s.io/client-go/tools/clientcmd/api"

View File

@@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"os"
"path"
"path/filepath"
"reflect"
"strings"
@@ -115,7 +114,7 @@ func ShortenConfig(config *Config) {
// FlattenConfig changes the config object into a self-contained config (useful for making secrets)
func FlattenConfig(config *Config) error {
for key, authInfo := range config.AuthInfos {
baseDir, err := MakeAbs(path.Dir(authInfo.LocationOfOrigin), "")
baseDir, err := MakeAbs(filepath.Dir(authInfo.LocationOfOrigin), "")
if err != nil {
return err
}
@@ -130,7 +129,7 @@ func FlattenConfig(config *Config) error {
config.AuthInfos[key] = authInfo
}
for key, cluster := range config.Clusters {
baseDir, err := MakeAbs(path.Dir(cluster.LocationOfOrigin), "")
baseDir, err := MakeAbs(filepath.Dir(cluster.LocationOfOrigin), "")
if err != nil {
return err
}

View File

@@ -18,4 +18,4 @@ limitations under the License.
// +k8s:deepcopy-gen=package
// +k8s:defaulter-gen=Kind
package v1
package v1 // import "k8s.io/client-go/tools/clientcmd/api/v1"

View File

@@ -72,6 +72,13 @@ type ClientConfig interface {
ConfigAccess() ConfigAccess
}
// OverridingClientConfig is used to enable overrriding the raw KubeConfig
type OverridingClientConfig interface {
ClientConfig
// MergedRawConfig return the RawConfig merged with all overrides.
MergedRawConfig() (clientcmdapi.Config, error)
}
type PersistAuthProviderConfigForUser func(user string) restclient.AuthProviderConfigPersister
type promptedCredentials struct {
@@ -91,22 +98,22 @@ type DirectClientConfig struct {
}
// NewDefaultClientConfig creates a DirectClientConfig using the config.CurrentContext as the context name
func NewDefaultClientConfig(config clientcmdapi.Config, overrides *ConfigOverrides) ClientConfig {
func NewDefaultClientConfig(config clientcmdapi.Config, overrides *ConfigOverrides) OverridingClientConfig {
return &DirectClientConfig{config, config.CurrentContext, overrides, nil, NewDefaultClientConfigLoadingRules(), promptedCredentials{}}
}
// NewNonInteractiveClientConfig creates a DirectClientConfig using the passed context name and does not have a fallback reader for auth information
func NewNonInteractiveClientConfig(config clientcmdapi.Config, contextName string, overrides *ConfigOverrides, configAccess ConfigAccess) ClientConfig {
func NewNonInteractiveClientConfig(config clientcmdapi.Config, contextName string, overrides *ConfigOverrides, configAccess ConfigAccess) OverridingClientConfig {
return &DirectClientConfig{config, contextName, overrides, nil, configAccess, promptedCredentials{}}
}
// NewInteractiveClientConfig creates a DirectClientConfig using the passed context name and a reader in case auth information is not provided via files or flags
func NewInteractiveClientConfig(config clientcmdapi.Config, contextName string, overrides *ConfigOverrides, fallbackReader io.Reader, configAccess ConfigAccess) ClientConfig {
func NewInteractiveClientConfig(config clientcmdapi.Config, contextName string, overrides *ConfigOverrides, fallbackReader io.Reader, configAccess ConfigAccess) OverridingClientConfig {
return &DirectClientConfig{config, contextName, overrides, fallbackReader, configAccess, promptedCredentials{}}
}
// NewClientConfigFromBytes takes your kubeconfig and gives you back a ClientConfig
func NewClientConfigFromBytes(configBytes []byte) (ClientConfig, error) {
func NewClientConfigFromBytes(configBytes []byte) (OverridingClientConfig, error) {
config, err := Load(configBytes)
if err != nil {
return nil, err
@@ -129,6 +136,40 @@ func (config *DirectClientConfig) RawConfig() (clientcmdapi.Config, error) {
return config.config, nil
}
// MergedRawConfig returns the raw kube config merged with the overrides
func (config *DirectClientConfig) MergedRawConfig() (clientcmdapi.Config, error) {
if err := config.ConfirmUsable(); err != nil {
return clientcmdapi.Config{}, err
}
merged := config.config.DeepCopy()
// set the AuthInfo merged with overrides in the merged config
mergedAuthInfo, err := config.getAuthInfo()
if err != nil {
return clientcmdapi.Config{}, err
}
mergedAuthInfoName, _ := config.getAuthInfoName()
merged.AuthInfos[mergedAuthInfoName] = &mergedAuthInfo
// set the Context merged with overrides in the merged config
mergedContext, err := config.getContext()
if err != nil {
return clientcmdapi.Config{}, err
}
mergedContextName, _ := config.getContextName()
merged.Contexts[mergedContextName] = &mergedContext
merged.CurrentContext = mergedContextName
// set the Cluster merged with overrides in the merged config
configClusterInfo, err := config.getCluster()
if err != nil {
return clientcmdapi.Config{}, err
}
configClusterName, _ := config.getClusterName()
merged.Clusters[configClusterName] = &configClusterInfo
return *merged, nil
}
// ClientConfig implements ClientConfig
func (config *DirectClientConfig) ClientConfig() (*restclient.Config, error) {
// check that getAuthInfo, getContext, and getCluster do not return an error.

View File

@@ -19,7 +19,6 @@ package clientcmd
import (
"errors"
"os"
"path"
"path/filepath"
"reflect"
"sort"
@@ -148,7 +147,7 @@ func NewDefaultPathOptions() *PathOptions {
EnvVar: RecommendedConfigPathEnvVar,
ExplicitFileFlag: RecommendedConfigPathFlag,
GlobalFileSubpath: path.Join(RecommendedHomeDir, RecommendedFileName),
GlobalFileSubpath: filepath.Join(RecommendedHomeDir, RecommendedFileName),
LoadingRules: NewDefaultClientConfigLoadingRules(),
}

View File

@@ -404,7 +404,15 @@ type eventBroadcasterAdapterImpl struct {
// NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify
// migration of individual components to the new Event API.
//
//logcheck:context // NewEventBroadcasterAdapterWithContext should be used instead because record.NewBroadcaster is called and works better when a context is supplied (contextual logging, cancellation).
func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdapter {
return NewEventBroadcasterAdapterWithContext(context.Background(), client)
}
// NewEventBroadcasterAdapterWithContext creates a wrapper around new and legacy broadcasters to simplify
// migration of individual components to the new Event API.
func NewEventBroadcasterAdapterWithContext(ctx context.Context, client clientset.Interface) EventBroadcasterAdapter {
eventClient := &eventBroadcasterAdapterImpl{}
if _, err := client.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String()); err == nil {
eventClient.eventsv1Client = client.EventsV1()
@@ -414,7 +422,7 @@ func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdap
// we create it unconditionally because its overhead is minor and will simplify using usage
// patterns of this library in all components.
eventClient.coreClient = client.CoreV1()
eventClient.coreBroadcaster = record.NewBroadcaster()
eventClient.coreBroadcaster = record.NewBroadcaster(record.WithContext(ctx))
return eventClient
}

View File

@@ -159,6 +159,10 @@ type LeaderElectionConfig struct {
// Name is the name of the resource lock for debugging
Name string
// Coordinated will use the Coordinated Leader Election feature
// WARNING: Coordinated leader election is ALPHA.
Coordinated bool
}
// LeaderCallbacks are callbacks that are triggered during certain
@@ -249,7 +253,11 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
succeeded = le.tryAcquireOrRenew(ctx)
if !le.config.Coordinated {
succeeded = le.tryAcquireOrRenew(ctx)
} else {
succeeded = le.tryCoordinatedRenew(ctx)
}
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
@@ -272,7 +280,11 @@ func (le *LeaderElector) renew(ctx context.Context) {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(timeoutCtx), nil
if !le.config.Coordinated {
return le.tryAcquireOrRenew(timeoutCtx), nil
} else {
return le.tryCoordinatedRenew(timeoutCtx), nil
}
}, timeoutCtx.Done())
le.maybeReportTransition()
@@ -304,7 +316,9 @@ func (le *LeaderElector) release() bool {
RenewTime: now,
AcquireTime: now,
}
if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), le.config.RenewDeadline)
defer timeoutCancel()
if err := le.config.Lock.Update(timeoutCtx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)
return false
}
@@ -313,6 +327,81 @@ func (le *LeaderElector) release() bool {
return true
}
// tryCoordinatedRenew checks if it acquired a lease and tries to renew the
// lease if it has already been acquired. Returns true on success else returns
// false.
func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
now := metav1.NewTime(le.clock.Now())
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1. obtain the electionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
klog.Infof("lease lock not found: %v", le.config.Lock.Describe())
return false
}
// 2. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
hasExpired := le.observedTime.Add(time.Second * time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).Before(now.Time)
if hasExpired {
klog.Infof("lock has expired: %v", le.config.Lock.Describe())
return false
}
if !le.IsLeader() {
klog.V(6).Infof("lock is held by %v and has not yet expired: %v", oldLeaderElectionRecord.HolderIdentity, le.config.Lock.Describe())
return false
}
// 2b. If the lease has been marked as "end of term", don't renew it
if le.IsLeader() && oldLeaderElectionRecord.PreferredHolder != "" {
klog.V(4).Infof("lock is marked as 'end of term': %v", le.config.Lock.Describe())
// TODO: Instead of letting lease expire, the holder may deleted it directly
// This will not be compatible with all controllers, so it needs to be opt-in behavior.
// We must ensure all code guarded by this lease has successfully completed
// prior to releasing or there may be two processes
// simultaneously acting on the critical path.
// Usually once this returns false, the process is terminated..
// xref: OnStoppedLeading
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
leaderElectionRecord.Strategy = oldLeaderElectionRecord.Strategy
le.metrics.slowpathExercised(le.config.Name)
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// update the lock itself
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
@@ -325,7 +414,22 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
AcquireTime: now,
}
// 1. obtain or create the ElectionRecord
// 1. fast path for the leader to update optimistically assuming that the record observed
// last time is the current version.
if le.IsLeader() && le.isLeaseValid(now.Time) {
oldObservedRecord := le.getObservedRecord()
leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions
err := le.config.Lock.Update(ctx, leaderElectionRecord)
if err == nil {
le.setObservedRecord(&leaderElectionRecord)
return true
}
klog.Errorf("Failed to update lock optimitically: %v, falling back to slow path", err)
}
// 2. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
@@ -342,24 +446,23 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
return true
}
// 2. Record obtained, check the Identity & Time
// 3. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) &&
!le.IsLeader() {
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// 4. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
le.metrics.slowpathExercised(le.config.Name)
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
@@ -400,6 +503,10 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
return nil
}
func (le *LeaderElector) isLeaseValid(now time.Time) bool {
return le.observedTime.Add(time.Second * time.Duration(le.getObservedRecord().LeaseDurationSeconds)).After(now)
}
// setObservedRecord will set a new observedRecord and update observedTime to the current time.
// Protect critical sections with lock.
func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {

View File

@@ -0,0 +1,202 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"context"
"reflect"
"time"
v1 "k8s.io/api/coordination/v1"
v1alpha1 "k8s.io/api/coordination/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
coordinationv1alpha1client "k8s.io/client-go/kubernetes/typed/coordination/v1alpha1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
const requeueInterval = 5 * time.Minute
type CacheSyncWaiter interface {
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}
type LeaseCandidate struct {
leaseClient coordinationv1alpha1client.LeaseCandidateInterface
leaseCandidateInformer cache.SharedIndexInformer
informerFactory informers.SharedInformerFactory
hasSynced cache.InformerSynced
// At most there will be one item in this Queue (since we only watch one item)
queue workqueue.TypedRateLimitingInterface[int]
name string
namespace string
// controller lease
leaseName string
clock clock.Clock
binaryVersion, emulationVersion string
preferredStrategies []v1.CoordinatedLeaseStrategy
}
// NewCandidate creates new LeaseCandidate controller that creates a
// LeaseCandidate object if it does not exist and watches changes
// to the corresponding object and renews if PingTime is set.
// WARNING: This is an ALPHA feature. Ensure that the CoordinatedLeaderElection
// feature gate is on.
func NewCandidate(clientset kubernetes.Interface,
candidateNamespace string,
candidateName string,
targetLease string,
binaryVersion, emulationVersion string,
preferredStrategies []v1.CoordinatedLeaseStrategy,
) (*LeaseCandidate, CacheSyncWaiter, error) {
fieldSelector := fields.OneTermEqualSelector("metadata.name", candidateName).String()
// A separate informer factory is required because this must start before informerFactories
// are started for leader elected components
informerFactory := informers.NewSharedInformerFactoryWithOptions(
clientset, 5*time.Minute,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector
}),
)
leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates().Informer()
lc := &LeaseCandidate{
leaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace),
leaseCandidateInformer: leaseCandidateInformer,
informerFactory: informerFactory,
name: candidateName,
namespace: candidateNamespace,
leaseName: targetLease,
clock: clock.RealClock{},
binaryVersion: binaryVersion,
emulationVersion: emulationVersion,
preferredStrategies: preferredStrategies,
}
lc.queue = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[int](), workqueue.TypedRateLimitingQueueConfig[int]{Name: "leasecandidate"})
h, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok {
if leasecandidate.Spec.PingTime != nil && leasecandidate.Spec.PingTime.After(leasecandidate.Spec.RenewTime.Time) {
lc.enqueueLease()
}
}
},
})
if err != nil {
return nil, nil, err
}
lc.hasSynced = h.HasSynced
return lc, informerFactory, nil
}
func (c *LeaseCandidate) Run(ctx context.Context) {
defer c.queue.ShutDown()
c.informerFactory.Start(ctx.Done())
if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) {
return
}
c.enqueueLease()
go c.runWorker(ctx)
<-ctx.Done()
}
func (c *LeaseCandidate) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *LeaseCandidate) processNextWorkItem(ctx context.Context) bool {
key, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(key)
err := c.ensureLease(ctx)
if err == nil {
c.queue.AddAfter(key, requeueInterval)
return true
}
utilruntime.HandleError(err)
c.queue.AddRateLimited(key)
return true
}
func (c *LeaseCandidate) enqueueLease() {
c.queue.Add(0)
}
// ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and
// a bool (true if this call created the lease), or any error that occurs.
func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
lease, err := c.leaseClient.Get(ctx, c.name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
klog.V(2).Infof("Creating lease candidate")
// lease does not exist, create it.
leaseToCreate := c.newLeaseCandidate()
if _, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}); err != nil {
return err
}
klog.V(2).Infof("Created lease candidate")
return nil
} else if err != nil {
return err
}
klog.V(2).Infof("lease candidate exists. Renewing.")
clone := lease.DeepCopy()
clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
_, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
func (c *LeaseCandidate) newLeaseCandidate() *v1alpha1.LeaseCandidate {
lc := &v1alpha1.LeaseCandidate{
ObjectMeta: metav1.ObjectMeta{
Name: c.name,
Namespace: c.namespace,
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: c.leaseName,
BinaryVersion: c.binaryVersion,
EmulationVersion: c.emulationVersion,
PreferredStrategies: c.preferredStrategies,
},
}
lc.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
return lc
}

View File

@@ -26,24 +26,26 @@ import (
type leaderMetricsAdapter interface {
leaderOn(name string)
leaderOff(name string)
slowpathExercised(name string)
}
// GaugeMetric represents a single numerical value that can arbitrarily go up
// and down.
type SwitchMetric interface {
// LeaderMetric instruments metrics used in leader election.
type LeaderMetric interface {
On(name string)
Off(name string)
SlowpathExercised(name string)
}
type noopMetric struct{}
func (noopMetric) On(name string) {}
func (noopMetric) Off(name string) {}
func (noopMetric) On(name string) {}
func (noopMetric) Off(name string) {}
func (noopMetric) SlowpathExercised(name string) {}
// defaultLeaderMetrics expects the caller to lock before setting any metrics.
type defaultLeaderMetrics struct {
// leader's value indicates if the current process is the owner of name lease
leader SwitchMetric
leader LeaderMetric
}
func (m *defaultLeaderMetrics) leaderOn(name string) {
@@ -60,19 +62,27 @@ func (m *defaultLeaderMetrics) leaderOff(name string) {
m.leader.Off(name)
}
func (m *defaultLeaderMetrics) slowpathExercised(name string) {
if m == nil {
return
}
m.leader.SlowpathExercised(name)
}
type noMetrics struct{}
func (noMetrics) leaderOn(name string) {}
func (noMetrics) leaderOff(name string) {}
func (noMetrics) leaderOn(name string) {}
func (noMetrics) leaderOff(name string) {}
func (noMetrics) slowpathExercised(name string) {}
// MetricsProvider generates various metrics used by the leader election.
type MetricsProvider interface {
NewLeaderMetric() SwitchMetric
NewLeaderMetric() LeaderMetric
}
type noopMetricsProvider struct{}
func (_ noopMetricsProvider) NewLeaderMetric() SwitchMetric {
func (noopMetricsProvider) NewLeaderMetric() LeaderMetric {
return noopMetric{}
}

View File

@@ -19,14 +19,15 @@ package resourcelock
import (
"context"
"fmt"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"time"
v1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
)
const (
@@ -114,11 +115,13 @@ type LeaderElectionRecord struct {
// attempt to acquire leases with empty identities and will wait for the full lease
// interval to expire before attempting to reacquire. This value is set to empty when
// a client voluntarily steps down.
HolderIdentity string `json:"holderIdentity"`
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
AcquireTime metav1.Time `json:"acquireTime"`
RenewTime metav1.Time `json:"renewTime"`
LeaderTransitions int `json:"leaderTransitions"`
HolderIdentity string `json:"holderIdentity"`
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
AcquireTime metav1.Time `json:"acquireTime"`
RenewTime metav1.Time `json:"renewTime"`
LeaderTransitions int `json:"leaderTransitions"`
Strategy v1.CoordinatedLeaseStrategy `json:"strategy"`
PreferredHolder string `json:"preferredHolder"`
}
// EventRecorder records a change in the ResourceLock.

View File

@@ -122,6 +122,12 @@ func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElec
if spec.RenewTime != nil {
r.RenewTime = metav1.Time{Time: spec.RenewTime.Time}
}
if spec.PreferredHolder != nil {
r.PreferredHolder = *spec.PreferredHolder
}
if spec.Strategy != nil {
r.Strategy = *spec.Strategy
}
return &r
}
@@ -129,11 +135,18 @@ func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElec
func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec {
leaseDurationSeconds := int32(ler.LeaseDurationSeconds)
leaseTransitions := int32(ler.LeaderTransitions)
return coordinationv1.LeaseSpec{
spec := coordinationv1.LeaseSpec{
HolderIdentity: &ler.HolderIdentity,
LeaseDurationSeconds: &leaseDurationSeconds,
AcquireTime: &metav1.MicroTime{Time: ler.AcquireTime.Time},
RenewTime: &metav1.MicroTime{Time: ler.RenewTime.Time},
LeaseTransitions: &leaseTransitions,
}
if ler.PreferredHolder != "" {
spec.PreferredHolder = &ler.PreferredHolder
}
if ler.Strategy != "" {
spec.Strategy = &ler.Strategy
}
return spec
}

View File

@@ -198,16 +198,29 @@ func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
ctx := c.Context
if ctx == nil {
ctx = context.Background()
} else {
}
// The are two scenarios where it makes no sense to wait for context cancelation:
// - The context was nil.
// - The context was context.Background() to begin with.
//
// Both cases get checked here: we have cancelation if (and only if) there is a channel.
haveCtxCancelation := ctx.Done() != nil
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
if haveCtxCancelation {
// Calling Shutdown is not required when a context was provided:
// when the context is canceled, this goroutine will shut down
// the broadcaster.
//
// If Shutdown is called first, then this goroutine will
// also stop.
go func() {
<-ctx.Done()
<-eventBroadcaster.cancelationCtx.Done()
eventBroadcaster.Broadcaster.Shutdown()
}()
}
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
return eventBroadcaster
}
@@ -382,7 +395,11 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher, err := e.Watch()
if err != nil {
// This function traditionally returns no error even though it can fail.
// Instead, it logs the error and returns an empty watch. The empty
// watch ensures that callers don't crash when calling Stop.
klog.FromContext(e.cancelationCtx).Error(err, "Unable start event watcher (will not retry!)")
return watch.NewEmptyWatch()
}
go func() {
defer utilruntime.HandleCrash()

10
vendor/k8s.io/client-go/tools/remotecommand/OWNERS generated vendored Normal file
View File

@@ -0,0 +1,10 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- aojea
- liggitt
- seans3
reviewers:
- aojea
- liggitt
- seans3

View File

@@ -18,11 +18,13 @@ package remotecommand
import (
"context"
"k8s.io/klog/v2"
)
var _ Executor = &fallbackExecutor{}
var _ Executor = &FallbackExecutor{}
type fallbackExecutor struct {
type FallbackExecutor struct {
primary Executor
secondary Executor
shouldFallback func(error) bool
@@ -33,7 +35,7 @@ type fallbackExecutor struct {
// websocket "StreamWithContext" call fails.
// func NewFallbackExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
func NewFallbackExecutor(primary, secondary Executor, shouldFallback func(error) bool) (Executor, error) {
return &fallbackExecutor{
return &FallbackExecutor{
primary: primary,
secondary: secondary,
shouldFallback: shouldFallback,
@@ -41,16 +43,17 @@ func NewFallbackExecutor(primary, secondary Executor, shouldFallback func(error)
}
// Stream is deprecated. Please use "StreamWithContext".
func (f *fallbackExecutor) Stream(options StreamOptions) error {
func (f *FallbackExecutor) Stream(options StreamOptions) error {
return f.StreamWithContext(context.Background(), options)
}
// StreamWithContext initially attempts to call "StreamWithContext" using the
// primary executor, falling back to calling the secondary executor if the
// initial primary call to upgrade to a websocket connection fails.
func (f *fallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
func (f *FallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
err := f.primary.StreamWithContext(ctx, options)
if f.shouldFallback(err) {
klog.V(4).Infof("RemoteCommand fallback: %v", err)
return f.secondary.StreamWithContext(ctx, options)
}
return err

View File

@@ -36,13 +36,9 @@ import (
"k8s.io/klog/v2"
)
// writeDeadline defines the time that a write to the websocket connection
// must complete by, otherwise an i/o timeout occurs. The writeDeadline
// has nothing to do with a response from the other websocket connection
// endpoint; only that the message was successfully processed by the
// local websocket connection. The typical write deadline within the websocket
// library is one second.
const writeDeadline = 2 * time.Second
// writeDeadline defines the time that a client-side write to the websocket
// connection must complete before an i/o timeout occurs.
const writeDeadline = 60 * time.Second
var (
_ Executor = &wsStreamExecutor{}
@@ -65,8 +61,8 @@ const (
// "pong" message before a timeout error occurs for websocket reading.
// This duration must always be greater than the "pingPeriod". By defining
// this deadline in terms of the ping period, we are essentially saying
// we can drop "X-1" (e.g. 3-1=2) pings before firing the timeout.
pingReadDeadline = (pingPeriod * 3) + (1 * time.Second)
// we can drop "X" (e.g. 12) pings before firing the timeout.
pingReadDeadline = (pingPeriod * 12) + (1 * time.Second)
)
// wsStreamExecutor handles transporting standard shell streams over an httpstream connection.
@@ -187,6 +183,9 @@ type wsStreamCreator struct {
// map of stream id to stream; multiple streams read/write the connection
streams map[byte]*stream
streamsMu sync.Mutex
// setStreamErr holds the error to return to anyone calling setStreams.
// this is populated in closeAllStreamReaders
setStreamErr error
}
func newWSStreamCreator(conn *gwebsocket.Conn) *wsStreamCreator {
@@ -202,10 +201,14 @@ func (c *wsStreamCreator) getStream(id byte) *stream {
return c.streams[id]
}
func (c *wsStreamCreator) setStream(id byte, s *stream) {
func (c *wsStreamCreator) setStream(id byte, s *stream) error {
c.streamsMu.Lock()
defer c.streamsMu.Unlock()
if c.setStreamErr != nil {
return c.setStreamErr
}
c.streams[id] = s
return nil
}
// CreateStream uses id from passed headers to create a stream over "c.conn" connection.
@@ -228,7 +231,11 @@ func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream,
connWriteLock: &c.connWriteLock,
id: id,
}
c.setStream(id, s)
if err := c.setStream(id, s); err != nil {
_ = s.writePipe.Close()
_ = s.readPipe.Close()
return nil, err
}
return s, nil
}
@@ -312,7 +319,7 @@ func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, de
}
// closeAllStreamReaders closes readers in all streams.
// This unblocks all stream.Read() calls.
// This unblocks all stream.Read() calls, and keeps any future streams from being created.
func (c *wsStreamCreator) closeAllStreamReaders(err error) {
c.streamsMu.Lock()
defer c.streamsMu.Unlock()
@@ -320,6 +327,12 @@ func (c *wsStreamCreator) closeAllStreamReaders(err error) {
// Closing writePipe unblocks all readPipe.Read() callers and prevents any future writes.
_ = s.writePipe.CloseWithError(err)
}
// ensure callers to setStreams receive an error after this point
if err != nil {
c.setStreamErr = err
} else {
c.setStreamErr = fmt.Errorf("closed all streams")
}
}
type stream struct {
@@ -480,7 +493,7 @@ func (h *heartbeat) start() {
// "WriteControl" does not need to be protected by a mutex. According to
// gorilla/websockets library docs: "The Close and WriteControl methods can
// be called concurrently with all other methods."
if err := h.conn.WriteControl(gwebsocket.PingMessage, h.message, time.Now().Add(writeDeadline)); err == nil {
if err := h.conn.WriteControl(gwebsocket.PingMessage, h.message, time.Now().Add(pingReadDeadline)); err == nil {
klog.V(8).Infof("Websocket Ping succeeeded")
} else {
klog.Errorf("Websocket Ping failed: %v", err)

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"io"
"net/http"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -53,6 +54,7 @@ type RetryWatcher struct {
stopChan chan struct{}
doneChan chan struct{}
minRestartDelay time.Duration
stopChanLock sync.Mutex
}
// NewRetryWatcher creates a new RetryWatcher.
@@ -286,7 +288,15 @@ func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
// Stop implements Interface.
func (rw *RetryWatcher) Stop() {
close(rw.stopChan)
rw.stopChanLock.Lock()
defer rw.stopChanLock.Unlock()
// Prevent closing an already closed channel to prevent a panic
select {
case <-rw.stopChan:
default:
close(rw.stopChan)
}
}
// Done allows the caller to be notified when Retry watcher stops.