feat: kubesphere 4.0 (#6115)
* feat: kubesphere 4.0 Signed-off-by: ci-bot <ci-bot@kubesphere.io> * feat: kubesphere 4.0 Signed-off-by: ci-bot <ci-bot@kubesphere.io> --------- Signed-off-by: ci-bot <ci-bot@kubesphere.io> Co-authored-by: ks-ci-bot <ks-ci-bot@example.com> Co-authored-by: joyceliu <joyceliu@yunify.com>
This commit is contained in:
committed by
GitHub
parent
b5015ec7b9
commit
447a51f08b
11
vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go
generated
vendored
11
vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go
generated
vendored
@@ -30,6 +30,17 @@ type event struct {
|
||||
isDeleted bool
|
||||
isCreated bool
|
||||
isProgressNotify bool
|
||||
// isInitialEventsEndBookmark helps us keep track
|
||||
// of whether we have sent an annotated bookmark event.
|
||||
//
|
||||
// when this variable is set to true,
|
||||
// a special annotation will be added
|
||||
// to the bookmark event.
|
||||
//
|
||||
// note that we decided to extend the event
|
||||
// struct field to eliminate contention
|
||||
// between startWatching and processEvent
|
||||
isInitialEventsEndBookmark bool
|
||||
}
|
||||
|
||||
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
|
||||
|
||||
1
vendor/k8s.io/apiserver/pkg/storage/etcd3/healthcheck.go
generated
vendored
1
vendor/k8s.io/apiserver/pkg/storage/etcd3/healthcheck.go
generated
vendored
@@ -28,6 +28,7 @@ type etcdHealth struct {
|
||||
}
|
||||
|
||||
// EtcdHealthCheck decodes data returned from etcd /healthz handler.
|
||||
// Deprecated: Validate health by passing storagebackend.Config directly to storagefactory.CreateProber.
|
||||
func EtcdHealthCheck(data []byte) error {
|
||||
obj := etcdHealth{}
|
||||
if err := json.Unmarshal(data, &obj); err != nil {
|
||||
|
||||
3
vendor/k8s.io/apiserver/pkg/storage/etcd3/latency_tracker.go
generated
vendored
3
vendor/k8s.io/apiserver/pkg/storage/etcd3/latency_tracker.go
generated
vendored
@@ -47,8 +47,7 @@ func NewETCDLatencyTracker(delegate clientv3.KV) clientv3.KV {
|
||||
// tracking function TrackStorageLatency is thread safe.
|
||||
//
|
||||
// NOTE: Compact is an asynchronous process and is not associated with
|
||||
//
|
||||
// any request, so we will not be tracking its latency.
|
||||
// any request, so we will not be tracking its latency.
|
||||
type clientV3KVLatencyTracker struct {
|
||||
clientv3.KV
|
||||
}
|
||||
|
||||
147
vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go
generated
vendored
147
vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go
generated
vendored
@@ -17,11 +17,14 @@ limitations under the License.
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
compbasemetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
/*
|
||||
@@ -47,23 +50,51 @@ var (
|
||||
},
|
||||
[]string{"operation", "type"},
|
||||
)
|
||||
etcdRequestCounts = compbasemetrics.NewCounterVec(
|
||||
&compbasemetrics.CounterOpts{
|
||||
Name: "etcd_requests_total",
|
||||
Help: "Etcd request counts for each operation and object type.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"operation", "type"},
|
||||
)
|
||||
etcdRequestErrorCounts = compbasemetrics.NewCounterVec(
|
||||
&compbasemetrics.CounterOpts{
|
||||
Name: "etcd_request_errors_total",
|
||||
Help: "Etcd failed request counts for each operation and object type.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"operation", "type"},
|
||||
)
|
||||
objectCounts = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Name: "apiserver_storage_objects",
|
||||
Help: "Number of stored objects at the time of last check split by kind.",
|
||||
Help: "Number of stored objects at the time of last check split by kind. In case of a fetching error, the value will be -1.",
|
||||
StabilityLevel: compbasemetrics.STABLE,
|
||||
},
|
||||
[]string{"resource"},
|
||||
)
|
||||
dbTotalSize = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Subsystem: "apiserver",
|
||||
Name: "storage_db_total_size_in_bytes",
|
||||
Help: "Total size of the storage database file physically allocated in bytes.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
Subsystem: "apiserver",
|
||||
Name: "storage_db_total_size_in_bytes",
|
||||
Help: "Total size of the storage database file physically allocated in bytes.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
DeprecatedVersion: "1.28.0",
|
||||
},
|
||||
[]string{"endpoint"},
|
||||
)
|
||||
storageSizeDescription = compbasemetrics.NewDesc("apiserver_storage_size_bytes", "Size of the storage database file physically allocated in bytes.", []string{"cluster"}, nil, compbasemetrics.ALPHA, "")
|
||||
storageMonitor = &monitorCollector{monitorGetter: func() ([]Monitor, error) { return nil, nil }}
|
||||
etcdEventsReceivedCounts = compbasemetrics.NewCounterVec(
|
||||
&compbasemetrics.CounterOpts{
|
||||
Subsystem: "apiserver",
|
||||
Name: "storage_events_received_total",
|
||||
Help: "Number of etcd events received split by kind.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"resource"},
|
||||
)
|
||||
etcdBookmarkCounts = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Name: "etcd_bookmark_counts",
|
||||
@@ -113,6 +144,15 @@ var (
|
||||
},
|
||||
[]string{"resource"},
|
||||
)
|
||||
decodeErrorCounts = compbasemetrics.NewCounterVec(
|
||||
&compbasemetrics.CounterOpts{
|
||||
Namespace: "apiserver",
|
||||
Name: "storage_decode_errors_total",
|
||||
Help: "Number of stored object decode errors split by object type",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"resource"},
|
||||
)
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
@@ -122,14 +162,18 @@ func Register() {
|
||||
// Register the metrics.
|
||||
registerMetrics.Do(func() {
|
||||
legacyregistry.MustRegister(etcdRequestLatency)
|
||||
legacyregistry.MustRegister(etcdRequestCounts)
|
||||
legacyregistry.MustRegister(etcdRequestErrorCounts)
|
||||
legacyregistry.MustRegister(objectCounts)
|
||||
legacyregistry.MustRegister(dbTotalSize)
|
||||
legacyregistry.CustomMustRegister(storageMonitor)
|
||||
legacyregistry.MustRegister(etcdBookmarkCounts)
|
||||
legacyregistry.MustRegister(etcdLeaseObjectCounts)
|
||||
legacyregistry.MustRegister(listStorageCount)
|
||||
legacyregistry.MustRegister(listStorageNumFetched)
|
||||
legacyregistry.MustRegister(listStorageNumSelectorEvals)
|
||||
legacyregistry.MustRegister(listStorageNumReturned)
|
||||
legacyregistry.MustRegister(decodeErrorCounts)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -138,9 +182,20 @@ func UpdateObjectCount(resourcePrefix string, count int64) {
|
||||
objectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
|
||||
}
|
||||
|
||||
// RecordEtcdRequestLatency sets the etcd_request_duration_seconds metrics.
|
||||
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
|
||||
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
|
||||
// RecordEtcdRequest updates and sets the etcd_request_duration_seconds,
|
||||
// etcd_request_total, etcd_request_errors_total metrics.
|
||||
func RecordEtcdRequest(verb, resource string, err error, startTime time.Time) {
|
||||
v := []string{verb, resource}
|
||||
etcdRequestLatency.WithLabelValues(v...).Observe(sinceInSeconds(startTime))
|
||||
etcdRequestCounts.WithLabelValues(v...).Inc()
|
||||
if err != nil {
|
||||
etcdRequestErrorCounts.WithLabelValues(v...).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// RecordEtcdEvent updated the etcd_events_received_total metric.
|
||||
func RecordEtcdEvent(resource string) {
|
||||
etcdEventsReceivedCounts.WithLabelValues(resource).Inc()
|
||||
}
|
||||
|
||||
// RecordEtcdBookmark updates the etcd_bookmark_counts metric.
|
||||
@@ -148,21 +203,34 @@ func RecordEtcdBookmark(resource string) {
|
||||
etcdBookmarkCounts.WithLabelValues(resource).Inc()
|
||||
}
|
||||
|
||||
// RecordDecodeError sets the storage_decode_errors metrics.
|
||||
func RecordDecodeError(resource string) {
|
||||
decodeErrorCounts.WithLabelValues(resource).Inc()
|
||||
}
|
||||
|
||||
// Reset resets the etcd_request_duration_seconds metric.
|
||||
func Reset() {
|
||||
etcdRequestLatency.Reset()
|
||||
}
|
||||
|
||||
// sinceInSeconds gets the time since the specified start in seconds.
|
||||
func sinceInSeconds(start time.Time) float64 {
|
||||
//
|
||||
// This is a variable to facilitate testing.
|
||||
var sinceInSeconds = func(start time.Time) float64 {
|
||||
return time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
|
||||
// Deprecated: Metric etcd_db_total_size_in_bytes will be replaced with apiserver_storage_size_bytes
|
||||
func UpdateEtcdDbSize(ep string, size int64) {
|
||||
dbTotalSize.WithLabelValues(ep).Set(float64(size))
|
||||
}
|
||||
|
||||
// SetStorageMonitorGetter sets monitor getter to allow monitoring etcd stats.
|
||||
func SetStorageMonitorGetter(getter func() ([]Monitor, error)) {
|
||||
storageMonitor.setGetter(getter)
|
||||
}
|
||||
|
||||
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
|
||||
func UpdateLeaseObjectCount(count int64) {
|
||||
// Currently we only store one previous lease, since all the events have the same ttl.
|
||||
@@ -177,3 +245,64 @@ func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned
|
||||
listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald))
|
||||
listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned))
|
||||
}
|
||||
|
||||
type Monitor interface {
|
||||
Monitor(ctx context.Context) (StorageMetrics, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type StorageMetrics struct {
|
||||
Size int64
|
||||
}
|
||||
|
||||
type monitorCollector struct {
|
||||
compbasemetrics.BaseStableCollector
|
||||
|
||||
mutex sync.Mutex
|
||||
monitorGetter func() ([]Monitor, error)
|
||||
}
|
||||
|
||||
func (m *monitorCollector) setGetter(monitorGetter func() ([]Monitor, error)) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
m.monitorGetter = monitorGetter
|
||||
}
|
||||
|
||||
func (m *monitorCollector) getGetter() func() ([]Monitor, error) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
return m.monitorGetter
|
||||
}
|
||||
|
||||
// DescribeWithStability implements compbasemetrics.StableColletor
|
||||
func (c *monitorCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) {
|
||||
ch <- storageSizeDescription
|
||||
}
|
||||
|
||||
// CollectWithStability implements compbasemetrics.StableColletor
|
||||
func (c *monitorCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) {
|
||||
monitors, err := c.getGetter()()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for i, m := range monitors {
|
||||
cluster := fmt.Sprintf("etcd-%d", i)
|
||||
|
||||
klog.V(4).InfoS("Start collecting storage metrics", "cluster", cluster)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
metrics, err := m.Monitor(ctx)
|
||||
cancel()
|
||||
m.Close()
|
||||
if err != nil {
|
||||
klog.InfoS("Failed to get storage metrics", "cluster", cluster, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
metric, err := compbasemetrics.NewConstMetric(storageSizeDescription, compbasemetrics.GaugeValue, float64(metrics.Size), cluster)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to create metric", "cluster", cluster)
|
||||
}
|
||||
ch <- metric
|
||||
}
|
||||
}
|
||||
|
||||
289
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
generated
vendored
289
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
generated
vendored
@@ -32,18 +32,15 @@ import (
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/conversion"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/audit"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/tracing"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@@ -80,10 +77,15 @@ type store struct {
|
||||
groupResource schema.GroupResource
|
||||
groupResourceString string
|
||||
watcher *watcher
|
||||
pagingEnabled bool
|
||||
leaseManager *leaseManager
|
||||
}
|
||||
|
||||
func (s *store) RequestWatchProgress(ctx context.Context) error {
|
||||
// Use watchContext to match ctx metadata provided when creating the watch.
|
||||
// In best case scenario we would use the same context that watch was created, but there is no way access it from watchCache.
|
||||
return s.client.RequestProgress(s.watchContext(ctx))
|
||||
}
|
||||
|
||||
type objState struct {
|
||||
obj runtime.Object
|
||||
meta *storage.ResponseMeta
|
||||
@@ -93,11 +95,11 @@ type objState struct {
|
||||
}
|
||||
|
||||
// New returns an etcd3 implementation of storage.Interface.
|
||||
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
|
||||
return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
|
||||
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) storage.Interface {
|
||||
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig)
|
||||
}
|
||||
|
||||
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
|
||||
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store {
|
||||
versioner := storage.APIObjectVersioner{}
|
||||
// for compatibility with etcd2 impl.
|
||||
// no-op for default prefix of '/registry'.
|
||||
@@ -107,19 +109,36 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
|
||||
// Ensure the pathPrefix ends in "/" here to simplify key concatenation later.
|
||||
pathPrefix += "/"
|
||||
}
|
||||
result := &store{
|
||||
|
||||
w := &watcher{
|
||||
client: c,
|
||||
codec: codec,
|
||||
newFunc: newFunc,
|
||||
groupResource: groupResource,
|
||||
versioner: versioner,
|
||||
transformer: transformer,
|
||||
}
|
||||
if newFunc == nil {
|
||||
w.objectType = "<unknown>"
|
||||
} else {
|
||||
w.objectType = reflect.TypeOf(newFunc()).String()
|
||||
}
|
||||
s := &store{
|
||||
client: c,
|
||||
codec: codec,
|
||||
versioner: versioner,
|
||||
transformer: transformer,
|
||||
pagingEnabled: pagingEnabled,
|
||||
pathPrefix: pathPrefix,
|
||||
groupResource: groupResource,
|
||||
groupResourceString: groupResource.String(),
|
||||
watcher: newWatcher(c, codec, groupResource, newFunc, versioner),
|
||||
watcher: w,
|
||||
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
|
||||
}
|
||||
return result
|
||||
|
||||
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
|
||||
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Versioner implements storage.Interface.Versioner.
|
||||
@@ -135,7 +154,7 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
|
||||
}
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, preparedKey)
|
||||
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
|
||||
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -156,7 +175,12 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
|
||||
return storage.NewInternalError(err.Error())
|
||||
}
|
||||
|
||||
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
|
||||
err = decode(s.codec, s.versioner, data, out, kv.ModRevision)
|
||||
if err != nil {
|
||||
recordDecodeError(s.groupResourceString, preparedKey)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create implements storage.Interface.Create.
|
||||
@@ -173,7 +197,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
||||
)
|
||||
defer span.End(500 * time.Millisecond)
|
||||
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
||||
return errors.New("resourceVersion should not be set on objects to be created")
|
||||
return storage.ErrResourceVersionSetOnCreate
|
||||
}
|
||||
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
|
||||
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
||||
@@ -204,7 +228,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
||||
).Then(
|
||||
clientv3.OpPut(preparedKey, string(newData), opts...),
|
||||
).Commit()
|
||||
metrics.RecordEtcdRequestLatency("create", s.groupResourceString, startTime)
|
||||
metrics.RecordEtcdRequest("create", s.groupResourceString, err, startTime)
|
||||
if err != nil {
|
||||
span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
|
||||
return err
|
||||
@@ -220,6 +244,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
||||
err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
|
||||
if err != nil {
|
||||
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
|
||||
recordDecodeError(s.groupResourceString, preparedKey)
|
||||
return err
|
||||
}
|
||||
span.AddEvent("decode succeeded", attribute.Int("len", len(data)))
|
||||
@@ -245,15 +270,7 @@ func (s *store) Delete(
|
||||
func (s *store) conditionalDelete(
|
||||
ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
|
||||
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
|
||||
getCurrentState := func() (*objState, error) {
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.getState(ctx, getResp, key, v, false)
|
||||
}
|
||||
getCurrentState := s.getCurrentState(ctx, key, v, false)
|
||||
|
||||
var origState *objState
|
||||
var err error
|
||||
@@ -330,7 +347,7 @@ func (s *store) conditionalDelete(
|
||||
).Else(
|
||||
clientv3.OpGet(key),
|
||||
).Commit()
|
||||
metrics.RecordEtcdRequestLatency("delete", s.groupResourceString, startTime)
|
||||
metrics.RecordEtcdRequest("delete", s.groupResourceString, err, startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -352,7 +369,12 @@ func (s *store) conditionalDelete(
|
||||
if deleteResp.Header == nil {
|
||||
return errors.New("invalid DeleteRange response - nil header")
|
||||
}
|
||||
return decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
|
||||
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
|
||||
if err != nil {
|
||||
recordDecodeError(s.groupResourceString, key)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -376,15 +398,7 @@ func (s *store) GuaranteedUpdate(
|
||||
return fmt.Errorf("unable to convert output object to pointer: %v", err)
|
||||
}
|
||||
|
||||
getCurrentState := func() (*objState, error) {
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, preparedKey)
|
||||
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.getState(ctx, getResp, preparedKey, v, ignoreNotFound)
|
||||
}
|
||||
getCurrentState := s.getCurrentState(ctx, preparedKey, v, ignoreNotFound)
|
||||
|
||||
var origState *objState
|
||||
var origStateIsCurrent bool
|
||||
@@ -470,7 +484,12 @@ func (s *store) GuaranteedUpdate(
|
||||
}
|
||||
// recheck that the data from etcd is not stale before short-circuiting a write
|
||||
if !origState.stale {
|
||||
return decode(s.codec, s.versioner, origState.data, destination, origState.rev)
|
||||
err = decode(s.codec, s.versioner, origState.data, destination, origState.rev)
|
||||
if err != nil {
|
||||
recordDecodeError(s.groupResourceString, preparedKey)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -495,7 +514,7 @@ func (s *store) GuaranteedUpdate(
|
||||
).Else(
|
||||
clientv3.OpGet(preparedKey),
|
||||
).Commit()
|
||||
metrics.RecordEtcdRequestLatency("update", s.groupResourceString, startTime)
|
||||
metrics.RecordEtcdRequest("update", s.groupResourceString, err, startTime)
|
||||
if err != nil {
|
||||
span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
|
||||
return err
|
||||
@@ -518,6 +537,7 @@ func (s *store) GuaranteedUpdate(
|
||||
err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
|
||||
if err != nil {
|
||||
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
|
||||
recordDecodeError(s.groupResourceString, preparedKey)
|
||||
return err
|
||||
}
|
||||
span.AddEvent("decode succeeded", attribute.Int("len", len(data)))
|
||||
@@ -557,7 +577,7 @@ func (s *store) Count(key string) (int64, error) {
|
||||
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(context.Background(), preparedKey, clientv3.WithRange(clientv3.GetPrefixRangeEnd(preparedKey)), clientv3.WithCountOnly())
|
||||
metrics.RecordEtcdRequestLatency("listWithCount", preparedKey, startTime)
|
||||
metrics.RecordEtcdRequest("listWithCount", preparedKey, err, startTime)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -570,17 +590,13 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
recursive := opts.Recursive
|
||||
resourceVersion := opts.ResourceVersion
|
||||
match := opts.ResourceVersionMatch
|
||||
pred := opts.Predicate
|
||||
ctx, span := tracing.Start(ctx, fmt.Sprintf("List(recursive=%v) etcd3", recursive),
|
||||
ctx, span := tracing.Start(ctx, fmt.Sprintf("List(recursive=%v) etcd3", opts.Recursive),
|
||||
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
|
||||
attribute.String("key", key),
|
||||
attribute.String("resourceVersion", resourceVersion),
|
||||
attribute.String("resourceVersionMatch", string(match)),
|
||||
attribute.Int("limit", int(pred.Limit)),
|
||||
attribute.String("continue", pred.Continue))
|
||||
attribute.String("resourceVersion", opts.ResourceVersion),
|
||||
attribute.String("resourceVersionMatch", string(opts.ResourceVersionMatch)),
|
||||
attribute.Int("limit", int(opts.Predicate.Limit)),
|
||||
attribute.String("continue", opts.Predicate.Continue))
|
||||
defer span.End(500 * time.Millisecond)
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
@@ -595,97 +611,68 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
|
||||
// with prefix "/a" will return all three, while with prefix "/a/" will return only
|
||||
// "/a/b" which is the correct answer.
|
||||
if recursive && !strings.HasSuffix(preparedKey, "/") {
|
||||
if opts.Recursive && !strings.HasSuffix(preparedKey, "/") {
|
||||
preparedKey += "/"
|
||||
}
|
||||
keyPrefix := preparedKey
|
||||
|
||||
// set the appropriate clientv3 options to filter the returned data set
|
||||
var limitOption *clientv3.OpOption
|
||||
limit := pred.Limit
|
||||
limit := opts.Predicate.Limit
|
||||
var paging bool
|
||||
options := make([]clientv3.OpOption, 0, 4)
|
||||
if s.pagingEnabled && pred.Limit > 0 {
|
||||
if opts.Predicate.Limit > 0 {
|
||||
paging = true
|
||||
options = append(options, clientv3.WithLimit(limit))
|
||||
limitOption = &options[len(options)-1]
|
||||
}
|
||||
|
||||
newItemFunc := getNewItemFunc(listObj, v)
|
||||
|
||||
var fromRV *uint64
|
||||
if len(resourceVersion) > 0 {
|
||||
parsedRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||
}
|
||||
fromRV = &parsedRV
|
||||
if opts.Recursive {
|
||||
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
|
||||
options = append(options, clientv3.WithRange(rangeEnd))
|
||||
}
|
||||
|
||||
var returnedRV, continueRV, withRev int64
|
||||
newItemFunc := getNewItemFunc(listObj, v)
|
||||
|
||||
var continueRV, withRev int64
|
||||
var continueKey string
|
||||
switch {
|
||||
case recursive && s.pagingEnabled && len(pred.Continue) > 0:
|
||||
continueKey, continueRV, err = storage.DecodeContinue(pred.Continue, keyPrefix)
|
||||
case opts.Recursive && len(opts.Predicate.Continue) > 0:
|
||||
continueKey, continueRV, err = storage.DecodeContinue(opts.Predicate.Continue, keyPrefix)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
|
||||
}
|
||||
|
||||
if len(resourceVersion) > 0 && resourceVersion != "0" {
|
||||
if len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
|
||||
return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
|
||||
}
|
||||
|
||||
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
|
||||
options = append(options, clientv3.WithRange(rangeEnd))
|
||||
preparedKey = continueKey
|
||||
|
||||
// If continueRV > 0, the LIST request needs a specific resource version.
|
||||
// continueRV==0 is invalid.
|
||||
// If continueRV < 0, the request is for the latest resource version.
|
||||
if continueRV > 0 {
|
||||
withRev = continueRV
|
||||
returnedRV = continueRV
|
||||
}
|
||||
case recursive && s.pagingEnabled && pred.Limit > 0:
|
||||
if fromRV != nil {
|
||||
switch match {
|
||||
case metav1.ResourceVersionMatchNotOlderThan:
|
||||
// The not older than constraint is checked after we get a response from etcd,
|
||||
// and returnedRV is then set to the revision we get from the etcd response.
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
returnedRV = int64(*fromRV)
|
||||
withRev = returnedRV
|
||||
case "": // legacy case
|
||||
if *fromRV > 0 {
|
||||
returnedRV = int64(*fromRV)
|
||||
withRev = returnedRV
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
|
||||
case len(opts.ResourceVersion) > 0:
|
||||
parsedRV, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||
}
|
||||
switch opts.ResourceVersionMatch {
|
||||
case metav1.ResourceVersionMatchNotOlderThan:
|
||||
// The not older than constraint is checked after we get a response from etcd,
|
||||
// and returnedRV is then set to the revision we get from the etcd response.
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
withRev = int64(parsedRV)
|
||||
case "": // legacy case
|
||||
if opts.Recursive && opts.Predicate.Limit > 0 && parsedRV > 0 {
|
||||
withRev = int64(parsedRV)
|
||||
}
|
||||
}
|
||||
|
||||
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
|
||||
options = append(options, clientv3.WithRange(rangeEnd))
|
||||
default:
|
||||
if fromRV != nil {
|
||||
switch match {
|
||||
case metav1.ResourceVersionMatchNotOlderThan:
|
||||
// The not older than constraint is checked after we get a response from etcd,
|
||||
// and returnedRV is then set to the revision we get from the etcd response.
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
returnedRV = int64(*fromRV)
|
||||
withRev = returnedRV
|
||||
case "": // legacy case
|
||||
default:
|
||||
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
|
||||
}
|
||||
}
|
||||
|
||||
if recursive {
|
||||
options = append(options, clientv3.WithPrefix())
|
||||
default:
|
||||
return fmt.Errorf("unknown ResourceVersionMatch value: %v", opts.ResourceVersionMatch)
|
||||
}
|
||||
}
|
||||
|
||||
if withRev != 0 {
|
||||
options = append(options, clientv3.WithRev(withRev))
|
||||
}
|
||||
@@ -702,19 +689,21 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
numReturn := v.Len()
|
||||
metrics.RecordStorageListMetrics(s.groupResourceString, numFetched, numEvald, numReturn)
|
||||
}()
|
||||
|
||||
metricsOp := "get"
|
||||
if opts.Recursive {
|
||||
metricsOp = "list"
|
||||
}
|
||||
|
||||
for {
|
||||
startTime := time.Now()
|
||||
getResp, err = s.client.KV.Get(ctx, preparedKey, options...)
|
||||
if recursive {
|
||||
metrics.RecordEtcdRequestLatency("list", s.groupResourceString, startTime)
|
||||
} else {
|
||||
metrics.RecordEtcdRequestLatency("get", s.groupResourceString, startTime)
|
||||
}
|
||||
metrics.RecordEtcdRequest(metricsOp, s.groupResourceString, err, startTime)
|
||||
if err != nil {
|
||||
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
|
||||
return interpretListError(err, len(opts.Predicate.Continue) > 0, continueKey, keyPrefix)
|
||||
}
|
||||
numFetched += len(getResp.Kvs)
|
||||
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
return err
|
||||
}
|
||||
hasMore = getResp.More
|
||||
@@ -722,10 +711,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
if len(getResp.Kvs) == 0 && getResp.More {
|
||||
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
|
||||
}
|
||||
// indicate to the client which resource version was returned, and use the same resource version for subsequent requests.
|
||||
if withRev == 0 {
|
||||
withRev = getResp.Header.Revision
|
||||
options = append(options, clientv3.WithRev(withRev))
|
||||
}
|
||||
|
||||
// avoid small allocations for the result slice, since this can be called in many
|
||||
// different contexts and we don't know how significantly the result will be filtered
|
||||
if pred.Empty() {
|
||||
if opts.Predicate.Empty() {
|
||||
growSlice(v, len(getResp.Kvs))
|
||||
} else {
|
||||
growSlice(v, 2048, len(getResp.Kvs))
|
||||
@@ -733,7 +727,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
|
||||
// take items from the response until the bucket is full, filtering as we go
|
||||
for i, kv := range getResp.Kvs {
|
||||
if paging && int64(v.Len()) >= pred.Limit {
|
||||
if paging && int64(v.Len()) >= opts.Predicate.Limit {
|
||||
hasMore = true
|
||||
break
|
||||
}
|
||||
@@ -744,7 +738,8 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
|
||||
}
|
||||
|
||||
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
|
||||
if err := appendListItem(v, data, uint64(kv.ModRevision), opts.Predicate, s.codec, s.versioner, newItemFunc); err != nil {
|
||||
recordDecodeError(s.groupResourceString, string(kv.Key))
|
||||
return err
|
||||
}
|
||||
numEvald++
|
||||
@@ -753,17 +748,12 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
getResp.Kvs[i] = nil
|
||||
}
|
||||
|
||||
// indicate to the client which resource version was returned
|
||||
if returnedRV == 0 {
|
||||
returnedRV = getResp.Header.Revision
|
||||
}
|
||||
|
||||
// no more results remain or we didn't request paging
|
||||
if !hasMore || !paging {
|
||||
break
|
||||
}
|
||||
// we're paging but we have filled our bucket
|
||||
if int64(v.Len()) >= pred.Limit {
|
||||
if int64(v.Len()) >= opts.Predicate.Limit {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -777,17 +767,18 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
*limitOption = clientv3.WithLimit(limit)
|
||||
}
|
||||
preparedKey = string(lastKey) + "\x00"
|
||||
if withRev == 0 {
|
||||
withRev = returnedRV
|
||||
options = append(options, clientv3.WithRev(withRev))
|
||||
}
|
||||
}
|
||||
|
||||
if v.IsNil() {
|
||||
// Ensure that we never return a nil Items pointer in the result for consistency.
|
||||
v.Set(reflect.MakeSlice(v.Type(), 0, 0))
|
||||
}
|
||||
|
||||
// instruct the client to begin querying from immediately after the last key we returned
|
||||
// we never return a key that the client wouldn't be allowed to see
|
||||
if hasMore {
|
||||
// we want to start immediately after the last key
|
||||
next, err := storage.EncodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
|
||||
next, err := storage.EncodeContinue(string(lastKey)+"\x00", keyPrefix, withRev)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -795,17 +786,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
// getResp.Count counts in objects that do not match the pred.
|
||||
// Instead of returning inaccurate count for non-empty selectors, we return nil.
|
||||
// Only set remainingItemCount if the predicate is empty.
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.RemainingItemCount) {
|
||||
if pred.Empty() {
|
||||
c := int64(getResp.Count - pred.Limit)
|
||||
remainingItemCount = &c
|
||||
}
|
||||
if opts.Predicate.Empty() {
|
||||
c := int64(getResp.Count - opts.Predicate.Limit)
|
||||
remainingItemCount = &c
|
||||
}
|
||||
return s.versioner.UpdateList(listObj, uint64(returnedRV), next, remainingItemCount)
|
||||
return s.versioner.UpdateList(listObj, uint64(withRev), next, remainingItemCount)
|
||||
}
|
||||
|
||||
// no continuation
|
||||
return s.versioner.UpdateList(listObj, uint64(returnedRV), "", nil)
|
||||
return s.versioner.UpdateList(listObj, uint64(withRev), "", nil)
|
||||
}
|
||||
|
||||
// growSlice takes a slice value and grows its capacity up
|
||||
@@ -849,7 +838,30 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.watcher.Watch(ctx, preparedKey, int64(rev), opts.Recursive, opts.ProgressNotify, s.transformer, opts.Predicate)
|
||||
return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), opts)
|
||||
}
|
||||
|
||||
func (s *store) watchContext(ctx context.Context) context.Context {
|
||||
// The etcd server waits until it cannot find a leader for 3 election
|
||||
// timeouts to cancel existing streams. 3 is currently a hard coded
|
||||
// constant. The election timeout defaults to 1000ms. If the cluster is
|
||||
// healthy, when the leader is stopped, the leadership transfer should be
|
||||
// smooth. (leader transfers its leadership before stopping). If leader is
|
||||
// hard killed, other servers will take an election timeout to realize
|
||||
// leader lost and start campaign.
|
||||
return clientv3.WithRequireLeader(ctx)
|
||||
}
|
||||
|
||||
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool) func() (*objState, error) {
|
||||
return func() (*objState, error) {
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.getState(ctx, getResp, key, v, ignoreNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
|
||||
@@ -880,6 +892,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
|
||||
state.data = data
|
||||
state.stale = stale
|
||||
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
|
||||
recordDecodeError(s.groupResourceString, key)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@@ -1018,6 +1031,12 @@ func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.Selec
|
||||
return nil
|
||||
}
|
||||
|
||||
// recordDecodeError record decode error split by object type.
|
||||
func recordDecodeError(resource string, key string) {
|
||||
metrics.RecordDecodeError(resource)
|
||||
klog.V(4).Infof("Decoding %s \"%s\" failed", resource, key)
|
||||
}
|
||||
|
||||
func notFound(key string) clientv3.Cmp {
|
||||
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
|
||||
}
|
||||
|
||||
287
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
generated
vendored
287
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
generated
vendored
@@ -18,23 +18,29 @@ package etcd3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
grpccodes "google.golang.org/grpc/codes"
|
||||
grpcstatus "google.golang.org/grpc/status"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
@@ -44,6 +50,9 @@ const (
|
||||
outgoingBufSize = 100
|
||||
)
|
||||
|
||||
// defaultWatcherMaxLimit is used to facilitate construction tests
|
||||
var defaultWatcherMaxLimit int64 = maxLimit
|
||||
|
||||
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
|
||||
var fatalOnDecodeError = false
|
||||
|
||||
@@ -59,18 +68,19 @@ func TestOnlySetFatalOnDecodeError(b bool) {
|
||||
}
|
||||
|
||||
type watcher struct {
|
||||
client *clientv3.Client
|
||||
codec runtime.Codec
|
||||
newFunc func() runtime.Object
|
||||
objectType string
|
||||
groupResource schema.GroupResource
|
||||
versioner storage.Versioner
|
||||
client *clientv3.Client
|
||||
codec runtime.Codec
|
||||
newFunc func() runtime.Object
|
||||
objectType string
|
||||
groupResource schema.GroupResource
|
||||
versioner storage.Versioner
|
||||
transformer value.Transformer
|
||||
getCurrentStorageRV func(context.Context) (uint64, error)
|
||||
}
|
||||
|
||||
// watchChan implements watch.Interface.
|
||||
type watchChan struct {
|
||||
watcher *watcher
|
||||
transformer value.Transformer
|
||||
key string
|
||||
initialRev int64
|
||||
recursive bool
|
||||
@@ -83,35 +93,26 @@ type watchChan struct {
|
||||
errChan chan error
|
||||
}
|
||||
|
||||
func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner) *watcher {
|
||||
res := &watcher{
|
||||
client: client,
|
||||
codec: codec,
|
||||
groupResource: groupResource,
|
||||
newFunc: newFunc,
|
||||
versioner: versioner,
|
||||
}
|
||||
if newFunc == nil {
|
||||
res.objectType = "<unknown>"
|
||||
} else {
|
||||
res.objectType = reflect.TypeOf(newFunc()).String()
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
|
||||
// If rev is zero, it will return the existing object(s) and then start watching from
|
||||
// the maximum revision+1 from returned objects.
|
||||
// If rev is non-zero, it will watch events happened after given revision.
|
||||
// If recursive is false, it watches on given key.
|
||||
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
|
||||
// pred must be non-nil. Only if pred matches the change, it will be returned.
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
if recursive && !strings.HasSuffix(key, "/") {
|
||||
// If opts.Recursive is false, it watches on given key.
|
||||
// If opts.Recursive is true, it watches any children and directories under the key, excluding the root key itself.
|
||||
// pred must be non-nil. Only if opts.Predicate matches the change, it will be returned.
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage.ListOptions) (watch.Interface, error) {
|
||||
if opts.Recursive && !strings.HasSuffix(key, "/") {
|
||||
key += "/"
|
||||
}
|
||||
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, transformer, pred)
|
||||
go wc.run()
|
||||
if opts.ProgressNotify && w.newFunc == nil {
|
||||
return nil, apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided"))
|
||||
}
|
||||
startWatchRV, err := w.getStartWatchResourceVersion(ctx, rev, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wc := w.createWatchChan(ctx, key, startWatchRV, opts.Recursive, opts.ProgressNotify, opts.Predicate)
|
||||
go wc.run(isInitialEventsEndBookmarkRequired(opts), areInitialEventsRequired(rev, opts))
|
||||
|
||||
// For etcd watch we don't have an easy way to answer whether the watch
|
||||
// has already caught up. So in the initial version (given that watchcache
|
||||
@@ -123,10 +124,9 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, p
|
||||
return wc, nil
|
||||
}
|
||||
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) *watchChan {
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
|
||||
wc := &watchChan{
|
||||
watcher: w,
|
||||
transformer: transformer,
|
||||
key: key,
|
||||
initialRev: rev,
|
||||
recursive: recursive,
|
||||
@@ -140,21 +140,94 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
|
||||
// The filter doesn't filter out any object.
|
||||
wc.internalPred = storage.Everything
|
||||
}
|
||||
|
||||
// The etcd server waits until it cannot find a leader for 3 election
|
||||
// timeouts to cancel existing streams. 3 is currently a hard coded
|
||||
// constant. The election timeout defaults to 1000ms. If the cluster is
|
||||
// healthy, when the leader is stopped, the leadership transfer should be
|
||||
// smooth. (leader transfers its leadership before stopping). If leader is
|
||||
// hard killed, other servers will take an election timeout to realize
|
||||
// leader lost and start campaign.
|
||||
wc.ctx, wc.cancel = context.WithCancel(clientv3.WithRequireLeader(ctx))
|
||||
wc.ctx, wc.cancel = context.WithCancel(ctx)
|
||||
return wc
|
||||
}
|
||||
|
||||
func (wc *watchChan) run() {
|
||||
// getStartWatchResourceVersion returns a ResourceVersion
|
||||
// the watch will be started from.
|
||||
// Depending on the input parameters the semantics of the returned ResourceVersion are:
|
||||
// - start at Exact (return resourceVersion)
|
||||
// - start at Most Recent (return an RV from etcd)
|
||||
func (w *watcher) getStartWatchResourceVersion(ctx context.Context, resourceVersion int64, opts storage.ListOptions) (int64, error) {
|
||||
if resourceVersion > 0 {
|
||||
return resourceVersion, nil
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
|
||||
return 0, nil
|
||||
}
|
||||
if opts.SendInitialEvents == nil || *opts.SendInitialEvents {
|
||||
// note that when opts.SendInitialEvents=true
|
||||
// we will be issuing a consistent LIST request
|
||||
// against etcd followed by the special bookmark event
|
||||
return 0, nil
|
||||
}
|
||||
// at this point the clients is interested
|
||||
// only in getting a stream of events
|
||||
// starting at the MostRecent point in time (RV)
|
||||
currentStorageRV, err := w.getCurrentStorageRV(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// currentStorageRV is taken from resp.Header.Revision (int64)
|
||||
// and cast to uint64, so it is safe to do reverse
|
||||
// at some point we should unify the interface but that
|
||||
// would require changing Versioner.UpdateList
|
||||
return int64(currentStorageRV), nil
|
||||
}
|
||||
|
||||
// isInitialEventsEndBookmarkRequired since there is no way to directly set
|
||||
// opts.ProgressNotify from the API and the etcd3 impl doesn't support
|
||||
// notification for external clients we simply return initialEventsEndBookmarkRequired
|
||||
// to only send the bookmark event after the initial list call.
|
||||
//
|
||||
// see: https://github.com/kubernetes/kubernetes/issues/120348
|
||||
func isInitialEventsEndBookmarkRequired(opts storage.ListOptions) bool {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
|
||||
return false
|
||||
}
|
||||
return opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks
|
||||
}
|
||||
|
||||
// areInitialEventsRequired returns true if all events from the etcd should be returned.
|
||||
func areInitialEventsRequired(resourceVersion int64, opts storage.ListOptions) bool {
|
||||
if opts.SendInitialEvents == nil && resourceVersion == 0 {
|
||||
return true // legacy case
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
|
||||
return false
|
||||
}
|
||||
return opts.SendInitialEvents != nil && *opts.SendInitialEvents
|
||||
}
|
||||
|
||||
type etcdError interface {
|
||||
Code() grpccodes.Code
|
||||
Error() string
|
||||
}
|
||||
|
||||
type grpcError interface {
|
||||
GRPCStatus() *grpcstatus.Status
|
||||
}
|
||||
|
||||
func isCancelError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if err == context.Canceled {
|
||||
return true
|
||||
}
|
||||
if etcdErr, ok := err.(etcdError); ok && etcdErr.Code() == grpccodes.Canceled {
|
||||
return true
|
||||
}
|
||||
if grpcErr, ok := err.(grpcError); ok && grpcErr.GRPCStatus().Code() == grpccodes.Canceled {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bool) {
|
||||
watchClosedCh := make(chan struct{})
|
||||
go wc.startWatching(watchClosedCh)
|
||||
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
|
||||
|
||||
var resultChanWG sync.WaitGroup
|
||||
resultChanWG.Add(1)
|
||||
@@ -162,7 +235,7 @@ func (wc *watchChan) run() {
|
||||
|
||||
select {
|
||||
case err := <-wc.errChan:
|
||||
if err == context.Canceled {
|
||||
if isCancelError(err) {
|
||||
break
|
||||
}
|
||||
errResult := transformErrorToEvent(err)
|
||||
@@ -194,45 +267,123 @@ func (wc *watchChan) ResultChan() <-chan watch.Event {
|
||||
return wc.resultChan
|
||||
}
|
||||
|
||||
func (wc *watchChan) RequestWatchProgress() error {
|
||||
return wc.watcher.client.RequestProgress(wc.ctx)
|
||||
}
|
||||
|
||||
// sync tries to retrieve existing data and send them to process.
|
||||
// The revision to watch will be set to the revision in response.
|
||||
// All events sent will have isCreated=true
|
||||
func (wc *watchChan) sync() error {
|
||||
opts := []clientv3.OpOption{}
|
||||
if wc.recursive {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
opts = append(opts, clientv3.WithLimit(defaultWatcherMaxLimit))
|
||||
rangeEnd := clientv3.GetPrefixRangeEnd(wc.key)
|
||||
opts = append(opts, clientv3.WithRange(rangeEnd))
|
||||
}
|
||||
getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
var err error
|
||||
var lastKey []byte
|
||||
var withRev int64
|
||||
var getResp *clientv3.GetResponse
|
||||
|
||||
metricsOp := "get"
|
||||
if wc.recursive {
|
||||
metricsOp = "list"
|
||||
}
|
||||
wc.initialRev = getResp.Header.Revision
|
||||
for _, kv := range getResp.Kvs {
|
||||
wc.sendEvent(parseKV(kv))
|
||||
|
||||
preparedKey := wc.key
|
||||
|
||||
for {
|
||||
startTime := time.Now()
|
||||
getResp, err = wc.watcher.client.KV.Get(wc.ctx, preparedKey, opts...)
|
||||
metrics.RecordEtcdRequest(metricsOp, wc.watcher.groupResource.String(), err, startTime)
|
||||
if err != nil {
|
||||
return interpretListError(err, true, preparedKey, wc.key)
|
||||
}
|
||||
|
||||
if len(getResp.Kvs) == 0 && getResp.More {
|
||||
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
|
||||
}
|
||||
|
||||
// send items from the response until no more results
|
||||
for i, kv := range getResp.Kvs {
|
||||
lastKey = kv.Key
|
||||
wc.sendEvent(parseKV(kv))
|
||||
// free kv early. Long lists can take O(seconds) to decode.
|
||||
getResp.Kvs[i] = nil
|
||||
}
|
||||
|
||||
if withRev == 0 {
|
||||
wc.initialRev = getResp.Header.Revision
|
||||
}
|
||||
|
||||
// no more results remain
|
||||
if !getResp.More {
|
||||
return nil
|
||||
}
|
||||
|
||||
preparedKey = string(lastKey) + "\x00"
|
||||
if withRev == 0 {
|
||||
withRev = getResp.Header.Revision
|
||||
opts = append(opts, clientv3.WithRev(withRev))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// logWatchChannelErr checks whether the error is about mvcc revision compaction which is regarded as warning
|
||||
func logWatchChannelErr(err error) {
|
||||
if !strings.Contains(err.Error(), "mvcc: required revision has been compacted") {
|
||||
klog.Errorf("watch chan error: %v", err)
|
||||
} else {
|
||||
switch {
|
||||
case strings.Contains(err.Error(), "mvcc: required revision has been compacted"):
|
||||
// mvcc revision compaction which is regarded as warning, not error
|
||||
klog.Warningf("watch chan error: %v", err)
|
||||
case isCancelError(err):
|
||||
// expected when watches close, no need to log
|
||||
default:
|
||||
klog.Errorf("watch chan error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// startWatching does:
|
||||
// - get current objects if initialRev=0; set initialRev to current rev
|
||||
// - watch on given key and send events to process.
|
||||
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
|
||||
if wc.initialRev == 0 {
|
||||
//
|
||||
// initialEventsEndBookmarkSent helps us keep track
|
||||
// of whether we have sent an annotated bookmark event.
|
||||
//
|
||||
// it's important to note that we don't
|
||||
// need to track the actual RV because
|
||||
// we only send the bookmark event
|
||||
// after the initial list call.
|
||||
//
|
||||
// when this variable is set to false,
|
||||
// it means we don't have any specific
|
||||
// preferences for delivering bookmark events.
|
||||
func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEndBookmarkRequired, forceInitialEvents bool) {
|
||||
if wc.initialRev > 0 && forceInitialEvents {
|
||||
currentStorageRV, err := wc.watcher.getCurrentStorageRV(wc.ctx)
|
||||
if err != nil {
|
||||
wc.sendError(err)
|
||||
return
|
||||
}
|
||||
if uint64(wc.initialRev) > currentStorageRV {
|
||||
wc.sendError(storage.NewTooLargeResourceVersionError(uint64(wc.initialRev), currentStorageRV, int(wait.Jitter(1*time.Second, 3).Seconds())))
|
||||
return
|
||||
}
|
||||
}
|
||||
if forceInitialEvents {
|
||||
if err := wc.sync(); err != nil {
|
||||
klog.Errorf("failed to sync with latest state: %v", err)
|
||||
wc.sendError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if initialEventsEndBookmarkRequired {
|
||||
wc.sendEvent(func() *event {
|
||||
e := progressNotifyEvent(wc.initialRev)
|
||||
e.isInitialEventsEndBookmark = true
|
||||
return e
|
||||
}())
|
||||
}
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
|
||||
if wc.recursive {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
@@ -256,6 +407,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
|
||||
}
|
||||
|
||||
for _, e := range wres.Events {
|
||||
metrics.RecordEtcdEvent(wc.watcher.groupResource.String())
|
||||
parsedEvent, err := parseEvent(e)
|
||||
if err != nil {
|
||||
logWatchChannelErr(err)
|
||||
@@ -323,14 +475,17 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
||||
|
||||
switch {
|
||||
case e.isProgressNotify:
|
||||
if wc.watcher.newFunc == nil {
|
||||
return nil
|
||||
}
|
||||
object := wc.watcher.newFunc()
|
||||
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
|
||||
klog.Errorf("failed to propagate object version: %v", err)
|
||||
return nil
|
||||
}
|
||||
if e.isInitialEventsEndBookmark {
|
||||
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
|
||||
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
res = &watch.Event{
|
||||
Type: watch.Bookmark,
|
||||
Object: object,
|
||||
@@ -418,7 +573,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
|
||||
}
|
||||
|
||||
if !e.isDeleted {
|
||||
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -433,7 +588,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
|
||||
// we need the object only to compute whether it was filtered out
|
||||
// before).
|
||||
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
|
||||
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user