1
vendor/k8s.io/apiserver/pkg/storage/etcd3/OWNERS
generated
vendored
1
vendor/k8s.io/apiserver/pkg/storage/etcd3/OWNERS
generated
vendored
@@ -3,5 +3,4 @@
|
||||
reviewers:
|
||||
- wojtek-t
|
||||
- timothysc
|
||||
- madhusudancs
|
||||
- hongchaodeng
|
||||
|
||||
9
vendor/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner.go
generated
vendored
9
vendor/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner.go
generated
vendored
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
@@ -45,14 +46,14 @@ func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uin
|
||||
|
||||
// UpdateList implements Versioner
|
||||
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string, count *int64) error {
|
||||
if resourceVersion == 0 {
|
||||
return fmt.Errorf("illegal resource version from storage: %d", resourceVersion)
|
||||
}
|
||||
listAccessor, err := meta.ListAccessor(obj)
|
||||
if err != nil || listAccessor == nil {
|
||||
return err
|
||||
}
|
||||
versionString := ""
|
||||
if resourceVersion != 0 {
|
||||
versionString = strconv.FormatUint(resourceVersion, 10)
|
||||
}
|
||||
versionString := strconv.FormatUint(resourceVersion, 10)
|
||||
listAccessor.SetResourceVersion(versionString)
|
||||
listAccessor.SetContinue(nextKey)
|
||||
listAccessor.SetRemainingItemCount(count)
|
||||
|
||||
2
vendor/k8s.io/apiserver/pkg/storage/etcd3/compact.go
generated
vendored
2
vendor/k8s.io/apiserver/pkg/storage/etcd3/compact.go
generated
vendored
@@ -23,7 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
20
vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go
generated
vendored
20
vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go
generated
vendored
@@ -23,12 +23,13 @@ import (
|
||||
)
|
||||
|
||||
type event struct {
|
||||
key string
|
||||
value []byte
|
||||
prevValue []byte
|
||||
rev int64
|
||||
isDeleted bool
|
||||
isCreated bool
|
||||
key string
|
||||
value []byte
|
||||
prevValue []byte
|
||||
rev int64
|
||||
isDeleted bool
|
||||
isCreated bool
|
||||
isProgressNotify bool
|
||||
}
|
||||
|
||||
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
|
||||
@@ -61,3 +62,10 @@ func parseEvent(e *clientv3.Event) (*event, error) {
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func progressNotifyEvent(rev int64) *event {
|
||||
return &event{
|
||||
rev: rev,
|
||||
isProgressNotify: true,
|
||||
}
|
||||
}
|
||||
|
||||
65
vendor/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go
generated
vendored
65
vendor/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go
generated
vendored
@@ -22,8 +22,30 @@ import (
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultLeaseReuseDurationSeconds = 60
|
||||
defaultLeaseMaxObjectCount = 1000
|
||||
)
|
||||
|
||||
// LeaseManagerConfig is configuration for creating a lease manager.
|
||||
type LeaseManagerConfig struct {
|
||||
// ReuseDurationSeconds specifies time in seconds that each lease is reused
|
||||
ReuseDurationSeconds int64
|
||||
// MaxObjectCount specifies how many objects that a lease can attach
|
||||
MaxObjectCount int64
|
||||
}
|
||||
|
||||
// NewDefaultLeaseManagerConfig creates a LeaseManagerConfig with default values
|
||||
func NewDefaultLeaseManagerConfig() LeaseManagerConfig {
|
||||
return LeaseManagerConfig{
|
||||
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
|
||||
MaxObjectCount: defaultLeaseMaxObjectCount,
|
||||
}
|
||||
}
|
||||
|
||||
// leaseManager is used to manage leases requested from etcd. If a new write
|
||||
// needs a lease that has similar expiration time to the previous one, the old
|
||||
// lease will be reused to reduce the overhead of etcd, since lease operations
|
||||
@@ -36,35 +58,33 @@ type leaseManager struct {
|
||||
prevLeaseExpirationTime time.Time
|
||||
// The period of time in seconds and percent of TTL that each lease is
|
||||
// reused. The minimum of them is used to avoid unreasonably large
|
||||
// numbers. We use var instead of const for testing purposes.
|
||||
leaseReuseDurationSeconds int64
|
||||
leaseReuseDurationPercent float64
|
||||
// numbers.
|
||||
leaseReuseDurationSeconds int64
|
||||
leaseReuseDurationPercent float64
|
||||
leaseMaxAttachedObjectCount int64
|
||||
leaseAttachedObjectCount int64
|
||||
}
|
||||
|
||||
// newDefaultLeaseManager creates a new lease manager using default setting.
|
||||
func newDefaultLeaseManager(client *clientv3.Client) *leaseManager {
|
||||
return newLeaseManager(client, 60, 0.05)
|
||||
func newDefaultLeaseManager(client *clientv3.Client, config LeaseManagerConfig) *leaseManager {
|
||||
if config.MaxObjectCount <= 0 {
|
||||
config.MaxObjectCount = defaultLeaseMaxObjectCount
|
||||
}
|
||||
return newLeaseManager(client, config.ReuseDurationSeconds, 0.05, config.MaxObjectCount)
|
||||
}
|
||||
|
||||
// newLeaseManager creates a new lease manager with the number of buffered
|
||||
// leases, lease reuse duration in seconds and percentage. The percentage
|
||||
// value x means x*100%.
|
||||
func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64) *leaseManager {
|
||||
func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64, maxObjectCount int64) *leaseManager {
|
||||
return &leaseManager{
|
||||
client: client,
|
||||
leaseReuseDurationSeconds: leaseReuseDurationSeconds,
|
||||
leaseReuseDurationPercent: leaseReuseDurationPercent,
|
||||
client: client,
|
||||
leaseReuseDurationSeconds: leaseReuseDurationSeconds,
|
||||
leaseReuseDurationPercent: leaseReuseDurationPercent,
|
||||
leaseMaxAttachedObjectCount: maxObjectCount,
|
||||
}
|
||||
}
|
||||
|
||||
// setLeaseReuseDurationSeconds is used for testing purpose. It is used to
|
||||
// reduce the extra lease duration to avoid unnecessary timeout in testing.
|
||||
func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) {
|
||||
l.leaseMu.Lock()
|
||||
defer l.leaseMu.Unlock()
|
||||
l.leaseReuseDurationSeconds = duration
|
||||
}
|
||||
|
||||
// GetLease returns a lease based on requested ttl: if the cached previous
|
||||
// lease can be reused, reuse it; otherwise request a new one from etcd.
|
||||
func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
|
||||
@@ -75,9 +95,15 @@ func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseI
|
||||
reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl)
|
||||
valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime)
|
||||
sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime)
|
||||
if valid && sufficient {
|
||||
|
||||
// We count all operations that happened in the same lease, regardless of success or failure.
|
||||
// Currently each GetLease call only attach 1 object
|
||||
l.leaseAttachedObjectCount++
|
||||
|
||||
if valid && sufficient && l.leaseAttachedObjectCount <= l.leaseMaxAttachedObjectCount {
|
||||
return l.prevLeaseID, nil
|
||||
}
|
||||
|
||||
// request a lease with a little extra ttl from etcd
|
||||
ttl += reuseDurationSeconds
|
||||
lcr, err := l.client.Lease.Grant(ctx, ttl)
|
||||
@@ -87,6 +113,9 @@ func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseI
|
||||
// cache the new lease id
|
||||
l.prevLeaseID = lcr.ID
|
||||
l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second)
|
||||
// refresh count
|
||||
metrics.UpdateLeaseObjectCount(l.leaseAttachedObjectCount)
|
||||
l.leaseAttachedObjectCount = 1
|
||||
return lcr.ID, nil
|
||||
}
|
||||
|
||||
|
||||
4
vendor/k8s.io/apiserver/pkg/storage/etcd3/logger.go
generated
vendored
4
vendor/k8s.io/apiserver/pkg/storage/etcd3/logger.go
generated
vendored
@@ -20,7 +20,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -80,5 +80,5 @@ func (klogWrapper) Fatalf(format string, args ...interface{}) {
|
||||
}
|
||||
|
||||
func (klogWrapper) V(l int) bool {
|
||||
return bool(klog.V(klog.Level(l)))
|
||||
return bool(klog.V(klog.Level(l)).Enabled())
|
||||
}
|
||||
|
||||
4
vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics/OWNERS
generated
vendored
Normal file
4
vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics/OWNERS
generated
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
# See the OWNERS docs at https://go.k8s.io/owners
|
||||
|
||||
approvers:
|
||||
- logicalhan
|
||||
68
vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go
generated
vendored
68
vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go
generated
vendored
@@ -26,7 +26,7 @@ import (
|
||||
|
||||
/*
|
||||
* By default, all the following metrics are defined as falling under
|
||||
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/20190404-kubernetes-control-plane-metrics-stability.md#stability-classes)
|
||||
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/1209-metrics-stability/20190404-kubernetes-control-plane-metrics-stability.md#stability-classes)
|
||||
*
|
||||
* Promoting the stability level of the metric is a responsibility of the component owner, since it
|
||||
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with
|
||||
@@ -35,20 +35,56 @@ import (
|
||||
var (
|
||||
etcdRequestLatency = compbasemetrics.NewHistogramVec(
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Name: "etcd_request_duration_seconds",
|
||||
Help: "Etcd request latency in seconds for each operation and object type.",
|
||||
Name: "etcd_request_duration_seconds",
|
||||
Help: "Etcd request latency in seconds for each operation and object type.",
|
||||
// Etcd request latency in seconds for each operation and object type.
|
||||
Buckets: []float64{0.005, 0.025, 0.1, 0.25, 0.5, 1.0, 2.0, 4.0, 15.0, 30.0, 60.0},
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"operation", "type"},
|
||||
)
|
||||
etcdObjectCounts = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Name: "etcd_object_counts",
|
||||
DeprecatedVersion: "1.22.0",
|
||||
Help: "Number of stored objects at the time of last check split by kind. This metric is replaced by apiserver_storage_object_counts.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"resource"},
|
||||
)
|
||||
objectCounts = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Name: "etcd_object_counts",
|
||||
Name: "apiserver_storage_objects",
|
||||
Help: "Number of stored objects at the time of last check split by kind.",
|
||||
StabilityLevel: compbasemetrics.STABLE,
|
||||
},
|
||||
[]string{"resource"},
|
||||
)
|
||||
dbTotalSize = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Name: "etcd_db_total_size_in_bytes",
|
||||
Help: "Total size of the etcd database file physically allocated in bytes.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"endpoint"},
|
||||
)
|
||||
etcdBookmarkCounts = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Name: "etcd_bookmark_counts",
|
||||
Help: "Number of etcd bookmarks (progress notify events) split by kind.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"resource"},
|
||||
)
|
||||
etcdLeaseObjectCounts = compbasemetrics.NewHistogramVec(
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Name: "etcd_lease_object_counts",
|
||||
Help: "Number of objects attached to a single etcd lease.",
|
||||
Buckets: []float64{10, 50, 100, 500, 1000, 2500, 5000},
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{},
|
||||
)
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
@@ -59,12 +95,17 @@ func Register() {
|
||||
registerMetrics.Do(func() {
|
||||
legacyregistry.MustRegister(etcdRequestLatency)
|
||||
legacyregistry.MustRegister(objectCounts)
|
||||
legacyregistry.MustRegister(etcdObjectCounts)
|
||||
legacyregistry.MustRegister(dbTotalSize)
|
||||
legacyregistry.MustRegister(etcdBookmarkCounts)
|
||||
legacyregistry.MustRegister(etcdLeaseObjectCounts)
|
||||
})
|
||||
}
|
||||
|
||||
// UpdateObjectCount sets the etcd_object_counts metric.
|
||||
// UpdateObjectCount sets the apiserver_storage_object_counts and etcd_object_counts (deprecated) metric.
|
||||
func UpdateObjectCount(resourcePrefix string, count int64) {
|
||||
objectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
|
||||
etcdObjectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
|
||||
}
|
||||
|
||||
// RecordEtcdRequestLatency sets the etcd_request_duration_seconds metrics.
|
||||
@@ -72,6 +113,11 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
|
||||
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
|
||||
}
|
||||
|
||||
// RecordEtcdBookmark updates the etcd_bookmark_counts metric.
|
||||
func RecordEtcdBookmark(resource string) {
|
||||
etcdBookmarkCounts.WithLabelValues(resource).Inc()
|
||||
}
|
||||
|
||||
// Reset resets the etcd_request_duration_seconds metric.
|
||||
func Reset() {
|
||||
etcdRequestLatency.Reset()
|
||||
@@ -81,3 +127,15 @@ func Reset() {
|
||||
func sinceInSeconds(start time.Time) float64 {
|
||||
return time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
|
||||
func UpdateEtcdDbSize(ep string, size int64) {
|
||||
dbTotalSize.WithLabelValues(ep).Set(float64(size))
|
||||
}
|
||||
|
||||
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
|
||||
func UpdateLeaseObjectCount(count int64) {
|
||||
// Currently we only store one previous lease, since all the events have the same ttl.
|
||||
// See pkg/storage/etcd3/lease_manager.go
|
||||
etcdLeaseObjectCounts.WithLabelValues().Observe(float64(count))
|
||||
}
|
||||
|
||||
246
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
generated
vendored
246
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
generated
vendored
@@ -32,6 +32,8 @@ import (
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/conversion"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@@ -41,7 +43,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/klog/v2"
|
||||
utiltrace "k8s.io/utils/trace"
|
||||
)
|
||||
|
||||
@@ -62,10 +64,7 @@ func (d authenticatedDataString) AuthenticatedData() []byte {
|
||||
var _ value.Context = authenticatedDataString("")
|
||||
|
||||
type store struct {
|
||||
client *clientv3.Client
|
||||
// getOpts contains additional options that should be passed
|
||||
// to all Get() calls.
|
||||
getOps []clientv3.OpOption
|
||||
client *clientv3.Client
|
||||
codec runtime.Codec
|
||||
versioner storage.Versioner
|
||||
transformer value.Transformer
|
||||
@@ -84,11 +83,11 @@ type objState struct {
|
||||
}
|
||||
|
||||
// New returns an etcd3 implementation of storage.Interface.
|
||||
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
|
||||
return newStore(c, pagingEnabled, codec, prefix, transformer)
|
||||
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
|
||||
return newStore(c, codec, newFunc, prefix, transformer, pagingEnabled, leaseManagerConfig)
|
||||
}
|
||||
|
||||
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
|
||||
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
|
||||
versioner := APIObjectVersioner{}
|
||||
result := &store{
|
||||
client: c,
|
||||
@@ -100,8 +99,8 @@ func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefi
|
||||
// no-op for default prefix of '/registry'.
|
||||
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
|
||||
pathPrefix: path.Join("/", prefix),
|
||||
watcher: newWatcher(c, codec, versioner, transformer),
|
||||
leaseManager: newDefaultLeaseManager(c),
|
||||
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
|
||||
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -112,20 +111,20 @@ func (s *store) Versioner() storage.Versioner {
|
||||
}
|
||||
|
||||
// Get implements storage.Interface.Get.
|
||||
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
|
||||
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(getResp.Kvs) == 0 {
|
||||
if ignoreNotFound {
|
||||
if opts.IgnoreNotFound {
|
||||
return runtime.SetZeroValue(out)
|
||||
}
|
||||
return storage.NewKeyNotFoundError(key, 0)
|
||||
@@ -186,35 +185,77 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
||||
}
|
||||
|
||||
// Delete implements storage.Interface.Delete.
|
||||
func (s *store) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
|
||||
func (s *store) Delete(
|
||||
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
|
||||
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
|
||||
v, err := conversion.EnforcePtr(out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to convert output object to pointer: %v", err)
|
||||
}
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
return s.conditionalDelete(ctx, key, out, v, preconditions, validateDeletion)
|
||||
return s.conditionalDelete(ctx, key, out, v, preconditions, validateDeletion, cachedExistingObject)
|
||||
}
|
||||
|
||||
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||
func (s *store) conditionalDelete(
|
||||
ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
|
||||
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
|
||||
getCurrentState := func() (*objState, error) {
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.getState(getResp, key, v, false)
|
||||
}
|
||||
|
||||
var origState *objState
|
||||
var err error
|
||||
var origStateIsCurrent bool
|
||||
if cachedExistingObject != nil {
|
||||
origState, err = s.getStateFromObject(cachedExistingObject)
|
||||
} else {
|
||||
origState, err = getCurrentState()
|
||||
origStateIsCurrent = true
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
origState, err := s.getState(getResp, key, v, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if preconditions != nil {
|
||||
if err := preconditions.Check(key, origState.obj); err != nil {
|
||||
return err
|
||||
if origStateIsCurrent {
|
||||
return err
|
||||
}
|
||||
|
||||
// It's possible we're working with stale data.
|
||||
// Actually fetch
|
||||
origState, err = getCurrentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origStateIsCurrent = true
|
||||
// Retry
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err := validateDeletion(ctx, origState.obj); err != nil {
|
||||
return err
|
||||
if origStateIsCurrent {
|
||||
return err
|
||||
}
|
||||
|
||||
// It's possible we're working with stale data.
|
||||
// Actually fetch
|
||||
origState, err = getCurrentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origStateIsCurrent = true
|
||||
// Retry
|
||||
continue
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
||||
@@ -228,8 +269,13 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
|
||||
return err
|
||||
}
|
||||
if !txnResp.Succeeded {
|
||||
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
||||
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
||||
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
|
||||
origState, err = s.getState(getResp, key, v, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origStateIsCurrent = true
|
||||
continue
|
||||
}
|
||||
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
||||
@@ -239,7 +285,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
|
||||
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
|
||||
func (s *store) GuaranteedUpdate(
|
||||
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
|
||||
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
|
||||
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
|
||||
trace := utiltrace.New("GuaranteedUpdate etcd3", utiltrace.Field{"type", getTypeName(out)})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
|
||||
@@ -251,7 +297,7 @@ func (s *store) GuaranteedUpdate(
|
||||
|
||||
getCurrentState := func() (*objState, error) {
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -260,18 +306,15 @@ func (s *store) GuaranteedUpdate(
|
||||
}
|
||||
|
||||
var origState *objState
|
||||
var mustCheckData bool
|
||||
if len(suggestion) == 1 && suggestion[0] != nil {
|
||||
origState, err = s.getStateFromObject(suggestion[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mustCheckData = true
|
||||
var origStateIsCurrent bool
|
||||
if cachedExistingObject != nil {
|
||||
origState, err = s.getStateFromObject(cachedExistingObject)
|
||||
} else {
|
||||
origState, err = getCurrentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
origStateIsCurrent = true
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("initial value restored")
|
||||
|
||||
@@ -279,7 +322,7 @@ func (s *store) GuaranteedUpdate(
|
||||
for {
|
||||
if err := preconditions.Check(key, origState.obj); err != nil {
|
||||
// If our data is already up to date, return the error
|
||||
if !mustCheckData {
|
||||
if origStateIsCurrent {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -289,7 +332,7 @@ func (s *store) GuaranteedUpdate(
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mustCheckData = false
|
||||
origStateIsCurrent = true
|
||||
// Retry
|
||||
continue
|
||||
}
|
||||
@@ -297,7 +340,7 @@ func (s *store) GuaranteedUpdate(
|
||||
ret, ttl, err := s.updateState(origState, tryUpdate)
|
||||
if err != nil {
|
||||
// If our data is already up to date, return the error
|
||||
if !mustCheckData {
|
||||
if origStateIsCurrent {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -307,7 +350,7 @@ func (s *store) GuaranteedUpdate(
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mustCheckData = false
|
||||
origStateIsCurrent = true
|
||||
// Retry
|
||||
continue
|
||||
}
|
||||
@@ -320,12 +363,12 @@ func (s *store) GuaranteedUpdate(
|
||||
// if we skipped the original Get in this loop, we must refresh from
|
||||
// etcd in order to be sure the data in the store is equivalent to
|
||||
// our desired serialization
|
||||
if mustCheckData {
|
||||
if !origStateIsCurrent {
|
||||
origState, err = getCurrentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mustCheckData = false
|
||||
origStateIsCurrent = true
|
||||
if !bytes.Equal(data, origState.data) {
|
||||
// original data changed, restart loop
|
||||
continue
|
||||
@@ -369,7 +412,7 @@ func (s *store) GuaranteedUpdate(
|
||||
return err
|
||||
}
|
||||
trace.Step("Retry value restored")
|
||||
mustCheckData = false
|
||||
origStateIsCurrent = true
|
||||
continue
|
||||
}
|
||||
putResp := txnResp.Responses[0].GetResponsePut()
|
||||
@@ -379,10 +422,14 @@ func (s *store) GuaranteedUpdate(
|
||||
}
|
||||
|
||||
// GetToList implements storage.Interface.GetToList.
|
||||
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
||||
func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error {
|
||||
resourceVersion := listOpts.ResourceVersion
|
||||
match := listOpts.ResourceVersionMatch
|
||||
pred := listOpts.Predicate
|
||||
trace := utiltrace.New("GetToList etcd3",
|
||||
utiltrace.Field{"key", key},
|
||||
utiltrace.Field{"resourceVersion", resourceVersion},
|
||||
utiltrace.Field{"resourceVersionMatch", match},
|
||||
utiltrace.Field{"limit", pred.Limit},
|
||||
utiltrace.Field{"continue", pred.Continue})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
@@ -399,12 +446,21 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
|
||||
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
var opts []clientv3.OpOption
|
||||
if len(resourceVersion) > 0 && match == metav1.ResourceVersionMatchExact {
|
||||
rv, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||
}
|
||||
opts = append(opts, clientv3.WithRev(int64(rv)))
|
||||
}
|
||||
|
||||
getResp, err := s.client.KV.Get(ctx, key, opts...)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -440,6 +496,14 @@ func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Obje
|
||||
|
||||
func (s *store) Count(key string) (int64, error) {
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
|
||||
// We need to make sure the key ended with "/" so that we only get children "directories".
|
||||
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
|
||||
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
|
||||
if !strings.HasSuffix(key, "/") {
|
||||
key += "/"
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
|
||||
metrics.RecordEtcdRequestLatency("listWithCount", key, startTime)
|
||||
@@ -510,10 +574,14 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error
|
||||
}
|
||||
|
||||
// List implements storage.Interface.List.
|
||||
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
||||
func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
resourceVersion := opts.ResourceVersion
|
||||
match := opts.ResourceVersionMatch
|
||||
pred := opts.Predicate
|
||||
trace := utiltrace.New("List etcd3",
|
||||
utiltrace.Field{"key", key},
|
||||
utiltrace.Field{"resourceVersion", resourceVersion},
|
||||
utiltrace.Field{"resourceVersionMatch", match},
|
||||
utiltrace.Field{"limit", pred.Limit},
|
||||
utiltrace.Field{"continue", pred.Continue})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
@@ -547,7 +615,16 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
||||
|
||||
newItemFunc := getNewItemFunc(listObj, v)
|
||||
|
||||
var returnedRV, continueRV int64
|
||||
var fromRV *uint64
|
||||
if len(resourceVersion) > 0 {
|
||||
parsedRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||
}
|
||||
fromRV = &parsedRV
|
||||
}
|
||||
|
||||
var returnedRV, continueRV, withRev int64
|
||||
var continueKey string
|
||||
switch {
|
||||
case s.pagingEnabled && len(pred.Continue) > 0:
|
||||
@@ -568,27 +645,50 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
||||
// continueRV==0 is invalid.
|
||||
// If continueRV < 0, the request is for the latest resource version.
|
||||
if continueRV > 0 {
|
||||
options = append(options, clientv3.WithRev(continueRV))
|
||||
withRev = continueRV
|
||||
returnedRV = continueRV
|
||||
}
|
||||
case s.pagingEnabled && pred.Limit > 0:
|
||||
if len(resourceVersion) > 0 {
|
||||
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||
if fromRV != nil {
|
||||
switch match {
|
||||
case metav1.ResourceVersionMatchNotOlderThan:
|
||||
// The not older than constraint is checked after we get a response from etcd,
|
||||
// and returnedRV is then set to the revision we get from the etcd response.
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
returnedRV = int64(*fromRV)
|
||||
withRev = returnedRV
|
||||
case "": // legacy case
|
||||
if *fromRV > 0 {
|
||||
returnedRV = int64(*fromRV)
|
||||
withRev = returnedRV
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
|
||||
}
|
||||
if fromRV > 0 {
|
||||
options = append(options, clientv3.WithRev(int64(fromRV)))
|
||||
}
|
||||
returnedRV = int64(fromRV)
|
||||
}
|
||||
|
||||
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
|
||||
options = append(options, clientv3.WithRange(rangeEnd))
|
||||
|
||||
default:
|
||||
if fromRV != nil {
|
||||
switch match {
|
||||
case metav1.ResourceVersionMatchNotOlderThan:
|
||||
// The not older than constraint is checked after we get a response from etcd,
|
||||
// and returnedRV is then set to the revision we get from the etcd response.
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
returnedRV = int64(*fromRV)
|
||||
withRev = returnedRV
|
||||
case "": // legacy case
|
||||
default:
|
||||
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
|
||||
}
|
||||
}
|
||||
|
||||
options = append(options, clientv3.WithPrefix())
|
||||
}
|
||||
if withRev != 0 {
|
||||
options = append(options, clientv3.WithRev(withRev))
|
||||
}
|
||||
|
||||
// loop until we have filled the requested limit from etcd or there are no more results
|
||||
var lastKey []byte
|
||||
@@ -601,7 +701,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
||||
if err != nil {
|
||||
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
|
||||
}
|
||||
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
return err
|
||||
}
|
||||
hasMore = getResp.More
|
||||
@@ -650,6 +750,10 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
||||
break
|
||||
}
|
||||
key = string(lastKey) + "\x00"
|
||||
if withRev == 0 {
|
||||
withRev = returnedRV
|
||||
options = append(options, clientv3.WithRev(withRev))
|
||||
}
|
||||
}
|
||||
|
||||
// instruct the client to begin querying from immediately after the last key we returned
|
||||
@@ -709,22 +813,22 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
|
||||
}
|
||||
|
||||
// Watch implements storage.Interface.Watch.
|
||||
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
return s.watch(ctx, key, resourceVersion, pred, false)
|
||||
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||
return s.watch(ctx, key, opts, false)
|
||||
}
|
||||
|
||||
// WatchList implements storage.Interface.WatchList.
|
||||
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
return s.watch(ctx, key, resourceVersion, pred, true)
|
||||
func (s *store) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||
return s.watch(ctx, key, opts, true)
|
||||
}
|
||||
|
||||
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
|
||||
rev, err := s.versioner.ParseResourceVersion(rv)
|
||||
func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions, recursive bool) (watch.Interface, error) {
|
||||
rev, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
return s.watcher.Watch(ctx, key, int64(rev), recursive, pred)
|
||||
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.ProgressNotify, opts.Predicate)
|
||||
}
|
||||
|
||||
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
|
||||
@@ -818,9 +922,9 @@ func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, er
|
||||
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
|
||||
}
|
||||
|
||||
// ensureMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
|
||||
// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
|
||||
// greater than the most recent actualRevision available from storage.
|
||||
func (s *store) ensureMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
|
||||
func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
|
||||
if minimumResourceVersion == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
69
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
generated
vendored
69
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
generated
vendored
@@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -29,10 +30,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -68,6 +70,8 @@ func TestOnlySetFatalOnDecodeError(b bool) {
|
||||
type watcher struct {
|
||||
client *clientv3.Client
|
||||
codec runtime.Codec
|
||||
newFunc func() runtime.Object
|
||||
objectType string
|
||||
versioner storage.Versioner
|
||||
transformer value.Transformer
|
||||
}
|
||||
@@ -78,6 +82,7 @@ type watchChan struct {
|
||||
key string
|
||||
initialRev int64
|
||||
recursive bool
|
||||
progressNotify bool
|
||||
internalPred storage.SelectionPredicate
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
@@ -86,13 +91,20 @@ type watchChan struct {
|
||||
errChan chan error
|
||||
}
|
||||
|
||||
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher {
|
||||
return &watcher{
|
||||
func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
|
||||
res := &watcher{
|
||||
client: client,
|
||||
codec: codec,
|
||||
newFunc: newFunc,
|
||||
versioner: versioner,
|
||||
transformer: transformer,
|
||||
}
|
||||
if newFunc == nil {
|
||||
res.objectType = "<unknown>"
|
||||
} else {
|
||||
res.objectType = reflect.TypeOf(newFunc()).String()
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
|
||||
@@ -102,21 +114,22 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.
|
||||
// If recursive is false, it watches on given key.
|
||||
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
|
||||
// pred must be non-nil. Only if pred matches the change, it will be returned.
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
if recursive && !strings.HasSuffix(key, "/") {
|
||||
key += "/"
|
||||
}
|
||||
wc := w.createWatchChan(ctx, key, rev, recursive, pred)
|
||||
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
|
||||
go wc.run()
|
||||
return wc, nil
|
||||
}
|
||||
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan {
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
|
||||
wc := &watchChan{
|
||||
watcher: w,
|
||||
key: key,
|
||||
initialRev: rev,
|
||||
recursive: recursive,
|
||||
progressNotify: progressNotify,
|
||||
internalPred: pred,
|
||||
incomingEventChan: make(chan *event, incomingBufSize),
|
||||
resultChan: make(chan watch.Event, outgoingBufSize),
|
||||
@@ -126,7 +139,15 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
|
||||
// The filter doesn't filter out any object.
|
||||
wc.internalPred = storage.Everything
|
||||
}
|
||||
wc.ctx, wc.cancel = context.WithCancel(ctx)
|
||||
|
||||
// The etcd server waits until it cannot find a leader for 3 election
|
||||
// timeouts to cancel existing streams. 3 is currently a hard coded
|
||||
// constant. The election timeout defaults to 1000ms. If the cluster is
|
||||
// healthy, when the leader is stopped, the leadership transfer should be
|
||||
// smooth. (leader transfers its leadership before stopping). If leader is
|
||||
// hard killed, other servers will take an election timeout to realize
|
||||
// leader lost and start campaign.
|
||||
wc.ctx, wc.cancel = context.WithCancel(clientv3.WithRequireLeader(ctx))
|
||||
return wc
|
||||
}
|
||||
|
||||
@@ -215,6 +236,9 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
|
||||
if wc.recursive {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
}
|
||||
if wc.progressNotify {
|
||||
opts = append(opts, clientv3.WithProgressNotify())
|
||||
}
|
||||
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
|
||||
for wres := range wch {
|
||||
if wres.Err() != nil {
|
||||
@@ -224,6 +248,12 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
|
||||
wc.sendError(err)
|
||||
return
|
||||
}
|
||||
if wres.IsProgressNotify() {
|
||||
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
|
||||
metrics.RecordEtcdBookmark(wc.watcher.objectType)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, e := range wres.Events {
|
||||
parsedEvent, err := parseEvent(e)
|
||||
if err != nil {
|
||||
@@ -253,8 +283,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
||||
continue
|
||||
}
|
||||
if len(wc.resultChan) == outgoingBufSize {
|
||||
klog.V(3).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
|
||||
"Probably caused by slow dispatching events to watchers", outgoingBufSize)
|
||||
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize)
|
||||
}
|
||||
// If user couldn't receive results fast enough, we also block incoming events from watcher.
|
||||
// Because storing events in local will cause more memory usage.
|
||||
@@ -292,6 +321,19 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
||||
}
|
||||
|
||||
switch {
|
||||
case e.isProgressNotify:
|
||||
if wc.watcher.newFunc == nil {
|
||||
return nil
|
||||
}
|
||||
object := wc.watcher.newFunc()
|
||||
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
|
||||
klog.Errorf("failed to propagate object version: %v", err)
|
||||
return nil
|
||||
}
|
||||
res = &watch.Event{
|
||||
Type: watch.Bookmark,
|
||||
Object: object,
|
||||
}
|
||||
case e.isDeleted:
|
||||
if !wc.filter(oldObj) {
|
||||
return nil
|
||||
@@ -360,9 +402,7 @@ func (wc *watchChan) sendError(err error) {
|
||||
|
||||
func (wc *watchChan) sendEvent(e *event) {
|
||||
if len(wc.incomingEventChan) == incomingBufSize {
|
||||
klog.V(3).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
|
||||
"Probably caused by slow decoding, user not receiving fast, or other processing logic",
|
||||
incomingBufSize)
|
||||
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize)
|
||||
}
|
||||
select {
|
||||
case wc.incomingEventChan <- e:
|
||||
@@ -371,6 +411,11 @@ func (wc *watchChan) sendEvent(e *event) {
|
||||
}
|
||||
|
||||
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
|
||||
if e.isProgressNotify {
|
||||
// progressNotify events doesn't contain neither current nor previous object version,
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
if !e.isDeleted {
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user