feat: add crd gen

Signed-off-by: runzexia <runzexia@yunify.com>
This commit is contained in:
runzexia
2019-07-30 13:36:54 +08:00
parent 9ce52aeb00
commit 1c10391faf
817 changed files with 203429 additions and 27953 deletions

30
vendor/k8s.io/apiserver/pkg/storage/OWNERS generated vendored Normal file
View File

@@ -0,0 +1,30 @@
approvers:
- lavalamp
- liggitt
- timothysc
- wojtek-t
- xiang90
reviewers:
- lavalamp
- smarterclayton
- wojtek-t
- deads2k
- caesarxuchao
- mikedanese
- liggitt
- ncdc
- tallclair
- timothysc
- hongchaodeng
- krousey
- fgrzadkowski
- xiang90
- mml
- ingvagabund
- resouer
- mbohlool
- lixiaobing10051267
- mqliang
- feihujiang
- rrati
- enj

1004
vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,95 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"sync"
"time"
)
const (
refreshPerSecond = 50 * time.Millisecond
maxBudget = 100 * time.Millisecond
)
// timeBudget implements a budget of time that you can use and is
// periodically being refreshed. The pattern to use it is:
// budget := newTimeBudget(...)
// ...
// timeout := budget.takeAvailable()
// // Now you can spend at most timeout on doing stuff
// ...
// // If you didn't use all timeout, return what you didn't use
// budget.returnUnused(<unused part of timeout>)
//
// NOTE: It's not recommended to be used concurrently from multiple threads -
// if first user takes the whole timeout, the second one will get 0 timeout
// even though the first one may return something later.
type timeBudget struct {
sync.Mutex
budget time.Duration
refresh time.Duration
maxBudget time.Duration
}
func newTimeBudget(stopCh <-chan struct{}) *timeBudget {
result := &timeBudget{
budget: time.Duration(0),
refresh: refreshPerSecond,
maxBudget: maxBudget,
}
go result.periodicallyRefresh(stopCh)
return result
}
func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
t.Lock()
if t.budget = t.budget + t.refresh; t.budget > t.maxBudget {
t.budget = t.maxBudget
}
t.Unlock()
case <-stopCh:
return
}
}
}
func (t *timeBudget) takeAvailable() time.Duration {
t.Lock()
defer t.Unlock()
result := t.budget
t.budget = time.Duration(0)
return result
}
func (t *timeBudget) returnUnused(unused time.Duration) {
t.Lock()
defer t.Unlock()
if unused < 0 {
// We used more than allowed.
return
}
if t.budget = t.budget + unused; t.budget > t.maxBudget {
t.budget = t.maxBudget
}
}

46
vendor/k8s.io/apiserver/pkg/storage/cacher/util.go generated vendored Normal file
View File

@@ -0,0 +1,46 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"strings"
)
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
func hasPathPrefix(s, pathPrefix string) bool {
// Short circuit if s doesn't contain the prefix at all
if !strings.HasPrefix(s, pathPrefix) {
return false
}
pathPrefixLength := len(pathPrefix)
if len(s) == pathPrefixLength {
// Exact match
return true
}
if strings.HasSuffix(pathPrefix, "/") {
// pathPrefix already ensured a path segment boundary
return true
}
if s[pathPrefixLength:pathPrefixLength+1] == "/" {
// The next character in s is a path segment boundary
// Check this instead of normalizing pathPrefix to avoid allocating on every call
return true
}
return false
}

View File

@@ -0,0 +1,495 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"fmt"
"sort"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/client-go/tools/cache"
)
const (
// blockTimeout determines how long we're willing to block the request
// to wait for a given resource version to be propagated to cache,
// before terminating request and returning Timeout error with retry
// after suggestion.
blockTimeout = 3 * time.Second
)
// watchCacheEvent is a single "watch event" that is send to users of
// watchCache. Additionally to a typical "watch.Event" it contains
// the previous value of the object to enable proper filtering in the
// upper layers.
type watchCacheEvent struct {
Type watch.EventType
Object runtime.Object
ObjLabels labels.Set
ObjFields fields.Set
ObjUninitialized bool
PrevObject runtime.Object
PrevObjLabels labels.Set
PrevObjFields fields.Set
PrevObjUninitialized bool
Key string
ResourceVersion uint64
}
// Computing a key of an object is generally non-trivial (it performs
// e.g. validation underneath). Similarly computing object fields and
// labels. To avoid computing them multiple times (to serve the event
// in different List/Watch requests), in the underlying store we are
// keeping structs (key, object, labels, fields, uninitialized).
type storeElement struct {
Key string
Object runtime.Object
Labels labels.Set
Fields fields.Set
Uninitialized bool
}
func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {
return "", fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Key, nil
}
// watchCacheElement is a single "watch event" stored in a cache.
// It contains the resource version of the object and the object
// itself.
type watchCacheElement struct {
resourceVersion uint64
watchCacheEvent *watchCacheEvent
}
// watchCache implements a Store interface.
// However, it depends on the elements implementing runtime.Object interface.
//
// watchCache is a "sliding window" (with a limited capacity) of objects
// observed from a watch.
type watchCache struct {
sync.RWMutex
// Condition on which lists are waiting for the fresh enough
// resource version.
cond *sync.Cond
// Maximum size of history window.
capacity int
// keyFunc is used to get a key in the underlying storage for a given object.
keyFunc func(runtime.Object) (string, error)
// getAttrsFunc is used to get labels and fields of an object.
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)
// cache is used a cyclic buffer - its first element (with the smallest
// resourceVersion) is defined by startIndex, its last element is defined
// by endIndex (if cache is full it will be startIndex + capacity).
// Both startIndex and endIndex can be greater than buffer capacity -
// you should always apply modulo capacity to get an index in cache array.
cache []watchCacheElement
startIndex int
endIndex int
// store will effectively support LIST operation from the "end of cache
// history" i.e. from the moment just after the newest cached watched event.
// It is necessary to effectively allow clients to start watching at now.
// NOTE: We assume that <store> is thread-safe.
store cache.Store
// ResourceVersion up to which the watchCache is propagated.
resourceVersion uint64
// ResourceVersion of the last list result (populated via Replace() method).
listResourceVersion uint64
// This handler is run at the end of every successful Replace() method.
onReplace func()
// This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object.
onEvent func(*watchCacheEvent)
// for testing timeouts.
clock clock.Clock
// An underlying storage.Versioner.
versioner storage.Versioner
}
func newWatchCache(
capacity int,
keyFunc func(runtime.Object) (string, error),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
versioner storage.Versioner) *watchCache {
wc := &watchCache{
capacity: capacity,
keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc,
cache: make([]watchCacheElement, capacity),
startIndex: 0,
endIndex: 0,
store: cache.NewStore(storeElementKey),
resourceVersion: 0,
listResourceVersion: 0,
clock: clock.RealClock{},
versioner: versioner,
}
wc.cond = sync.NewCond(wc.RLocker())
return wc
}
// Add takes runtime.Object as an argument.
func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Added, Object: object}
f := func(elem *storeElement) error { return w.store.Add(elem) }
return w.processEvent(event, resourceVersion, f)
}
// Update takes runtime.Object as an argument.
func (w *watchCache) Update(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Modified, Object: object}
f := func(elem *storeElement) error { return w.store.Update(elem) }
return w.processEvent(event, resourceVersion, f)
}
// Delete takes runtime.Object as an argument.
func (w *watchCache) Delete(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Deleted, Object: object}
f := func(elem *storeElement) error { return w.store.Delete(elem) }
return w.processEvent(event, resourceVersion, f)
}
func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
object, ok := obj.(runtime.Object)
if !ok {
return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
}
resourceVersion, err := w.versioner.ObjectResourceVersion(object)
if err != nil {
return nil, 0, err
}
return object, resourceVersion, nil
}
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
key, err := w.keyFunc(event.Object)
if err != nil {
return fmt.Errorf("couldn't compute key: %v", err)
}
elem := &storeElement{Key: key, Object: event.Object}
elem.Labels, elem.Fields, elem.Uninitialized, err = w.getAttrsFunc(event.Object)
if err != nil {
return err
}
watchCacheEvent := &watchCacheEvent{
Type: event.Type,
Object: elem.Object,
ObjLabels: elem.Labels,
ObjFields: elem.Fields,
ObjUninitialized: elem.Uninitialized,
Key: key,
ResourceVersion: resourceVersion,
}
// TODO: We should consider moving this lock below after the watchCacheEvent
// is created. In such situation, the only problematic scenario is Replace(
// happening after getting object from store and before acquiring a lock.
// Maybe introduce another lock for this purpose.
w.Lock()
defer w.Unlock()
previous, exists, err := w.store.Get(elem)
if err != nil {
return err
}
if exists {
previousElem := previous.(*storeElement)
watchCacheEvent.PrevObject = previousElem.Object
watchCacheEvent.PrevObjLabels = previousElem.Labels
watchCacheEvent.PrevObjFields = previousElem.Fields
watchCacheEvent.PrevObjUninitialized = previousElem.Uninitialized
}
if w.onEvent != nil {
w.onEvent(watchCacheEvent)
}
w.updateCache(resourceVersion, watchCacheEvent)
w.resourceVersion = resourceVersion
w.cond.Broadcast()
return updateFunc(elem)
}
// Assumes that lock is already held for write.
func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity {
// Cache is full - remove the oldest element.
w.startIndex++
}
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
w.endIndex++
}
// List returns list of pointers to <storeElement> objects.
func (w *watchCache) List() []interface{} {
return w.store.List()
}
// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>.
// NOTE: This function acquired lock and doesn't release it.
// You HAVE TO explicitly call w.RUnlock() after this function.
func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utiltrace.Trace) error {
startTime := w.clock.Now()
go func() {
// Wake us up when the time limit has expired. The docs
// promise that time.After (well, NewTimer, which it calls)
// will wait *at least* the duration given. Since this go
// routine starts sometime after we record the start time, and
// it will wake up the loop below sometime after the broadcast,
// we don't need to worry about waking it up before the time
// has expired accidentally.
<-w.clock.After(blockTimeout)
w.cond.Broadcast()
}()
w.RLock()
if trace != nil {
trace.Step("watchCache locked acquired")
}
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= blockTimeout {
// Timeout with retry after 1 second.
return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1)
}
w.cond.Wait()
}
if trace != nil {
trace.Step("watchCache fresh enough")
}
return nil
}
// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *utiltrace.Trace) ([]interface{}, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
defer w.RUnlock()
if err != nil {
return nil, 0, err
}
return w.store.List(), w.resourceVersion, nil
}
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *utiltrace.Trace) (interface{}, bool, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
defer w.RUnlock()
if err != nil {
return nil, false, 0, err
}
value, exists, err := w.store.GetByKey(key)
return value, exists, w.resourceVersion, err
}
func (w *watchCache) ListKeys() []string {
return w.store.ListKeys()
}
// Get takes runtime.Object as a parameter. However, it returns
// pointer to <storeElement>.
func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
object, ok := obj.(runtime.Object)
if !ok {
return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
}
key, err := w.keyFunc(object)
if err != nil {
return nil, false, fmt.Errorf("couldn't compute key: %v", err)
}
return w.store.Get(&storeElement{Key: key, Object: object})
}
// GetByKey returns pointer to <storeElement>.
func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
return w.store.GetByKey(key)
}
// Replace takes slice of runtime.Object as a parameter.
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
version, err := w.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}
toReplace := make([]interface{}, 0, len(objs))
for _, obj := range objs {
object, ok := obj.(runtime.Object)
if !ok {
return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj)
}
key, err := w.keyFunc(object)
if err != nil {
return fmt.Errorf("couldn't compute key: %v", err)
}
objLabels, objFields, objUninitialized, err := w.getAttrsFunc(object)
if err != nil {
return err
}
toReplace = append(toReplace, &storeElement{
Key: key,
Object: object,
Labels: objLabels,
Fields: objFields,
Uninitialized: objUninitialized,
})
}
w.Lock()
defer w.Unlock()
w.startIndex = 0
w.endIndex = 0
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err
}
w.listResourceVersion = version
w.resourceVersion = version
if w.onReplace != nil {
w.onReplace()
}
w.cond.Broadcast()
return nil
}
func (w *watchCache) SetOnReplace(onReplace func()) {
w.Lock()
defer w.Unlock()
w.onReplace = onReplace
}
func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
w.Lock()
defer w.Unlock()
w.onEvent = onEvent
}
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
size := w.endIndex - w.startIndex
var oldest uint64
switch {
case size >= w.capacity:
// Once the watch event buffer is full, the oldest watch event we can deliver
// is the first one in the buffer.
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
case w.listResourceVersion > 0:
// If the watch event buffer isn't full, the oldest watch event we can deliver
// is one greater than the resource version of the last full list.
oldest = w.listResourceVersion + 1
case size > 0:
// If we've never completed a list, use the resourceVersion of the oldest event
// in the buffer.
// This should only happen in unit tests that populate the buffer without
// performing list/replace operations.
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
default:
return nil, fmt.Errorf("watch cache isn't correctly initialized")
}
if resourceVersion == 0 {
// resourceVersion = 0 means that we don't require any specific starting point
// and we would like to start watching from ~now.
// However, to keep backward compatibility, we additionally need to return the
// current state and only then start watching from that point.
//
// TODO: In v2 api, we should stop returning the current state - #13969.
allItems := w.store.List()
result := make([]*watchCacheEvent, len(allItems))
for i, item := range allItems {
elem, ok := item.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", elem)
}
objLabels, objFields, objUninitialized, err := w.getAttrsFunc(elem.Object)
if err != nil {
return nil, err
}
result[i] = &watchCacheEvent{
Type: watch.Added,
Object: elem.Object,
ObjLabels: objLabels,
ObjFields: objFields,
ObjUninitialized: objUninitialized,
Key: elem.Key,
ResourceVersion: w.resourceVersion,
}
}
return result, nil
}
if resourceVersion < oldest-1 {
return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
}
// Binary search the smallest index at which resourceVersion is greater than the given one.
f := func(i int) bool {
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
}
first := sort.Search(size, f)
result := make([]*watchCacheEvent, size-first)
for i := 0; i < size-first; i++ {
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
}
return result, nil
}
func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
w.RLock()
defer w.RUnlock()
return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
}
func (w *watchCache) Resync() error {
// Nothing to do
return nil
}

18
vendor/k8s.io/apiserver/pkg/storage/doc.go generated vendored Normal file
View File

@@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Interfaces for database-related operations.
package storage // import "k8s.io/apiserver/pkg/storage"

170
vendor/k8s.io/apiserver/pkg/storage/errors.go generated vendored Normal file
View File

@@ -0,0 +1,170 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"fmt"
"k8s.io/apimachinery/pkg/util/validation/field"
)
const (
ErrCodeKeyNotFound int = iota + 1
ErrCodeKeyExists
ErrCodeResourceVersionConflicts
ErrCodeInvalidObj
ErrCodeUnreachable
)
var errCodeToMessage = map[int]string{
ErrCodeKeyNotFound: "key not found",
ErrCodeKeyExists: "key exists",
ErrCodeResourceVersionConflicts: "resource version conflicts",
ErrCodeInvalidObj: "invalid object",
ErrCodeUnreachable: "server unreachable",
}
func NewKeyNotFoundError(key string, rv int64) *StorageError {
return &StorageError{
Code: ErrCodeKeyNotFound,
Key: key,
ResourceVersion: rv,
}
}
func NewKeyExistsError(key string, rv int64) *StorageError {
return &StorageError{
Code: ErrCodeKeyExists,
Key: key,
ResourceVersion: rv,
}
}
func NewResourceVersionConflictsError(key string, rv int64) *StorageError {
return &StorageError{
Code: ErrCodeResourceVersionConflicts,
Key: key,
ResourceVersion: rv,
}
}
func NewUnreachableError(key string, rv int64) *StorageError {
return &StorageError{
Code: ErrCodeUnreachable,
Key: key,
ResourceVersion: rv,
}
}
func NewInvalidObjError(key, msg string) *StorageError {
return &StorageError{
Code: ErrCodeInvalidObj,
Key: key,
AdditionalErrorMsg: msg,
}
}
type StorageError struct {
Code int
Key string
ResourceVersion int64
AdditionalErrorMsg string
}
func (e *StorageError) Error() string {
return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %s",
errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.AdditionalErrorMsg)
}
// IsNotFound returns true if and only if err is "key" not found error.
func IsNotFound(err error) bool {
return isErrCode(err, ErrCodeKeyNotFound)
}
// IsNodeExist returns true if and only if err is an node already exist error.
func IsNodeExist(err error) bool {
return isErrCode(err, ErrCodeKeyExists)
}
// IsUnreachable returns true if and only if err indicates the server could not be reached.
func IsUnreachable(err error) bool {
return isErrCode(err, ErrCodeUnreachable)
}
// IsConflict returns true if and only if err is a write conflict.
func IsConflict(err error) bool {
return isErrCode(err, ErrCodeResourceVersionConflicts)
}
// IsInvalidObj returns true if and only if err is invalid error
func IsInvalidObj(err error) bool {
return isErrCode(err, ErrCodeInvalidObj)
}
func isErrCode(err error, code int) bool {
if err == nil {
return false
}
if e, ok := err.(*StorageError); ok {
return e.Code == code
}
return false
}
// InvalidError is generated when an error caused by invalid API object occurs
// in the storage package.
type InvalidError struct {
Errs field.ErrorList
}
func (e InvalidError) Error() string {
return e.Errs.ToAggregate().Error()
}
// IsInvalidError returns true if and only if err is an InvalidError.
func IsInvalidError(err error) bool {
_, ok := err.(InvalidError)
return ok
}
func NewInvalidError(errors field.ErrorList) InvalidError {
return InvalidError{errors}
}
// InternalError is generated when an error occurs in the storage package, i.e.,
// not from the underlying storage backend (e.g., etcd).
type InternalError struct {
Reason string
}
func (e InternalError) Error() string {
return e.Reason
}
// IsInternalError returns true if and only if err is an InternalError.
func IsInternalError(err error) bool {
_, ok := err.(InternalError)
return ok
}
func NewInternalError(reason string) InternalError {
return InternalError{reason}
}
func NewInternalErrorf(format string, a ...interface{}) InternalError {
return InternalError{fmt.Sprintf(format, a...)}
}

18
vendor/k8s.io/apiserver/pkg/storage/errors/doc.go generated vendored Normal file
View File

@@ -0,0 +1,18 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package etcd provides conversion of etcd errors to API errors.
package storage // import "k8s.io/apiserver/pkg/storage/errors"

116
vendor/k8s.io/apiserver/pkg/storage/errors/storage.go generated vendored Normal file
View File

@@ -0,0 +1,116 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/storage"
)
// InterpretListError converts a generic error on a retrieval
// operation into the appropriate API error.
func InterpretListError(err error, qualifiedResource schema.GroupResource) error {
switch {
case storage.IsNotFound(err):
return errors.NewNotFound(qualifiedResource, "")
case storage.IsUnreachable(err):
return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level
case storage.IsInternalError(err):
return errors.NewInternalError(err)
default:
return err
}
}
// InterpretGetError converts a generic error on a retrieval
// operation into the appropriate API error.
func InterpretGetError(err error, qualifiedResource schema.GroupResource, name string) error {
switch {
case storage.IsNotFound(err):
return errors.NewNotFound(qualifiedResource, name)
case storage.IsUnreachable(err):
return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level
case storage.IsInternalError(err):
return errors.NewInternalError(err)
default:
return err
}
}
// InterpretCreateError converts a generic error on a create
// operation into the appropriate API error.
func InterpretCreateError(err error, qualifiedResource schema.GroupResource, name string) error {
switch {
case storage.IsNodeExist(err):
return errors.NewAlreadyExists(qualifiedResource, name)
case storage.IsUnreachable(err):
return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level
case storage.IsInternalError(err):
return errors.NewInternalError(err)
default:
return err
}
}
// InterpretUpdateError converts a generic error on an update
// operation into the appropriate API error.
func InterpretUpdateError(err error, qualifiedResource schema.GroupResource, name string) error {
switch {
case storage.IsConflict(err), storage.IsNodeExist(err), storage.IsInvalidObj(err):
return errors.NewConflict(qualifiedResource, name, err)
case storage.IsUnreachable(err):
return errors.NewServerTimeout(qualifiedResource, "update", 2) // TODO: make configurable or handled at a higher level
case storage.IsNotFound(err):
return errors.NewNotFound(qualifiedResource, name)
case storage.IsInternalError(err):
return errors.NewInternalError(err)
default:
return err
}
}
// InterpretDeleteError converts a generic error on a delete
// operation into the appropriate API error.
func InterpretDeleteError(err error, qualifiedResource schema.GroupResource, name string) error {
switch {
case storage.IsNotFound(err):
return errors.NewNotFound(qualifiedResource, name)
case storage.IsUnreachable(err):
return errors.NewServerTimeout(qualifiedResource, "delete", 2) // TODO: make configurable or handled at a higher level
case storage.IsConflict(err), storage.IsNodeExist(err), storage.IsInvalidObj(err):
return errors.NewConflict(qualifiedResource, name, err)
case storage.IsInternalError(err):
return errors.NewInternalError(err)
default:
return err
}
}
// InterpretWatchError converts a generic error on a watch
// operation into the appropriate API error.
func InterpretWatchError(err error, resource schema.GroupResource, name string) error {
switch {
case storage.IsInvalidError(err):
invalidError, _ := err.(storage.InvalidError)
return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs)
case storage.IsInternalError(err):
return errors.NewInternalError(err)
default:
return err
}
}

25
vendor/k8s.io/apiserver/pkg/storage/etcd/OWNERS generated vendored Normal file
View File

@@ -0,0 +1,25 @@
reviewers:
- lavalamp
- smarterclayton
- wojtek-t
- deads2k
- derekwaynecarr
- caesarxuchao
- mikedanese
- liggitt
- davidopp
- pmorie
- luxas
- janetkuo
- roberthbailey
- tallclair
- timothysc
- dims
- hongchaodeng
- krousey
- fgrzadkowski
- resouer
- pweil-
- mqliang
- feihujiang
- enj

View File

@@ -0,0 +1,129 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"strconv"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/storage"
)
// APIObjectVersioner implements versioning and extracting etcd node information
// for objects that have an embedded ObjectMeta or ListMeta field.
type APIObjectVersioner struct{}
// UpdateObject implements Versioner
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
versionString := ""
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
accessor.SetResourceVersion(versionString)
return nil
}
// UpdateList implements Versioner
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string) error {
listAccessor, err := meta.ListAccessor(obj)
if err != nil || listAccessor == nil {
return err
}
versionString := ""
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
listAccessor.SetResourceVersion(versionString)
listAccessor.SetContinue(nextKey)
return nil
}
// PrepareObjectForStorage clears resource version and self link prior to writing to etcd.
func (a APIObjectVersioner) PrepareObjectForStorage(obj runtime.Object) error {
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
accessor.SetResourceVersion("")
accessor.SetSelfLink("")
return nil
}
// ObjectResourceVersion implements Versioner
func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return 0, err
}
version := accessor.GetResourceVersion()
if len(version) == 0 {
return 0, nil
}
return strconv.ParseUint(version, 10, 64)
}
// ParseResourceVersion takes a resource version argument and converts it to
// the etcd version. For watch we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func (a APIObjectVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, storage.NewInvalidError(field.ErrorList{
// Validation errors are supposed to return version-specific field
// paths, but this is probably close enough.
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
})
}
return version, nil
}
// APIObjectVersioner implements Versioner
var Versioner storage.Versioner = APIObjectVersioner{}
// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
// but etcd resource versions are special, they're actually ints, so we can easily compare them.
func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
lhsVersion, err := Versioner.ObjectResourceVersion(lhs)
if err != nil {
// coder error
panic(err)
}
rhsVersion, err := Versioner.ObjectResourceVersion(rhs)
if err != nil {
// coder error
panic(err)
}
if lhsVersion == rhsVersion {
return 0
}
if lhsVersion < rhsVersion {
return -1
}
return 1
}

17
vendor/k8s.io/apiserver/pkg/storage/etcd/doc.go generated vendored Normal file
View File

@@ -0,0 +1,17 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd // import "k8s.io/apiserver/pkg/storage/etcd"

View File

@@ -0,0 +1,122 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
var (
cacheHitCounterOpts = prometheus.CounterOpts{
Name: "etcd_helper_cache_hit_count",
Help: "Counter of etcd helper cache hits.",
}
cacheHitCounter = prometheus.NewCounter(cacheHitCounterOpts)
cacheMissCounterOpts = prometheus.CounterOpts{
Name: "etcd_helper_cache_miss_count",
Help: "Counter of etcd helper cache miss.",
}
cacheMissCounter = prometheus.NewCounter(cacheMissCounterOpts)
cacheEntryCounterOpts = prometheus.CounterOpts{
Name: "etcd_helper_cache_entry_count",
Help: "Counter of etcd helper cache entries. This can be different from etcd_helper_cache_miss_count " +
"because two concurrent threads can miss the cache and generate the same entry twice.",
}
cacheEntryCounter = prometheus.NewCounter(cacheEntryCounterOpts)
cacheGetLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "etcd_request_cache_get_latencies_summary",
Help: "Latency in microseconds of getting an object from etcd cache",
},
)
cacheAddLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "etcd_request_cache_add_latencies_summary",
Help: "Latency in microseconds of adding an object to etcd cache",
},
)
etcdRequestLatenciesSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "etcd_request_latencies_summary",
Help: "Etcd request latency summary in microseconds for each operation and object type.",
},
[]string{"operation", "type"},
)
objectCounts = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "etcd_object_counts",
Help: "Number of stored objects at the time of last check split by kind.",
},
[]string{"resource"},
)
)
var registerMetrics sync.Once
// Register all metrics.
func Register() {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(cacheHitCounter)
prometheus.MustRegister(cacheMissCounter)
prometheus.MustRegister(cacheEntryCounter)
prometheus.MustRegister(cacheAddLatency)
prometheus.MustRegister(cacheGetLatency)
prometheus.MustRegister(etcdRequestLatenciesSummary)
prometheus.MustRegister(objectCounts)
})
}
func UpdateObjectCount(resourcePrefix string, count int64) {
objectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
}
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
}
func ObserveGetCache(startTime time.Time) {
cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
}
func ObserveAddCache(startTime time.Time) {
cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
}
func ObserveCacheHit() {
cacheHitCounter.Inc()
}
func ObserveCacheMiss() {
cacheMissCounter.Inc()
}
func ObserveNewEntry() {
cacheEntryCounter.Inc()
}
func Reset() {
cacheHitCounter = prometheus.NewCounter(cacheHitCounterOpts)
cacheMissCounter = prometheus.NewCounter(cacheMissCounterOpts)
cacheEntryCounter = prometheus.NewCounter(cacheEntryCounterOpts)
// TODO: Reset cacheAddLatency.
// TODO: Reset cacheGetLatency.
etcdRequestLatenciesSummary.Reset()
}

5
vendor/k8s.io/apiserver/pkg/storage/etcd3/OWNERS generated vendored Normal file
View File

@@ -0,0 +1,5 @@
reviewers:
- wojtek-t
- timothysc
- madhusudancs
- hongchaodeng

162
vendor/k8s.io/apiserver/pkg/storage/etcd3/compact.go generated vendored Normal file
View File

@@ -0,0 +1,162 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"strconv"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"k8s.io/klog"
)
const (
compactRevKey = "compact_rev_key"
)
var (
endpointsMapMu sync.Mutex
endpointsMap map[string]struct{}
)
func init() {
endpointsMap = make(map[string]struct{})
}
// StartCompactor starts a compactor in the background to compact old version of keys that's not needed.
// By default, we save the most recent 10 minutes data and compact versions > 10minutes ago.
// It should be enough for slow watchers and to tolerate burst.
// TODO: We might keep a longer history (12h) in the future once storage API can take advantage of past version of keys.
func StartCompactor(ctx context.Context, client *clientv3.Client, compactInterval time.Duration) {
endpointsMapMu.Lock()
defer endpointsMapMu.Unlock()
// In one process, we can have only one compactor for one cluster.
// Currently we rely on endpoints to differentiate clusters.
for _, ep := range client.Endpoints() {
if _, ok := endpointsMap[ep]; ok {
klog.V(4).Infof("compactor already exists for endpoints %v", client.Endpoints())
return
}
}
for _, ep := range client.Endpoints() {
endpointsMap[ep] = struct{}{}
}
if compactInterval != 0 {
go compactor(ctx, client, compactInterval)
}
}
// compactor periodically compacts historical versions of keys in etcd.
// It will compact keys with versions older than given interval.
// In other words, after compaction, it will only contain keys set during last interval.
// Any API call for the older versions of keys will return error.
// Interval is the time interval between each compaction. The first compaction happens after "interval".
func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) {
// Technical definitions:
// We have a special key in etcd defined as *compactRevKey*.
// compactRevKey's value will be set to the string of last compacted revision.
// compactRevKey's version will be used as logical time for comparison. THe version is referred as compact time.
// Initially, because the key doesn't exist, the compact time (version) is 0.
//
// Algorithm:
// - Compare to see if (local compact_time) = (remote compact_time).
// - If yes, increment both local and remote compact_time, and do a compaction.
// - If not, set local to remote compact_time.
//
// Technical details/insights:
//
// The protocol here is lease based. If one compactor CAS successfully, the others would know it when they fail in
// CAS later and would try again in 10 minutes. If an APIServer crashed, another one would "take over" the lease.
//
// For example, in the following diagram, we have a compactor C1 doing compaction in t1, t2. Another compactor C2
// at t1' (t1 < t1' < t2) would CAS fail, set its known oldRev to rev at t1', and try again in t2' (t2' > t2).
// If C1 crashed and wouldn't compact at t2, C2 would CAS successfully at t2'.
//
// oldRev(t2) curRev(t2)
// +
// oldRev curRev |
// + + |
// | | |
// | | t1' | t2'
// +---v-------------v----^---------v------^---->
// t0 t1 t2
//
// We have the guarantees:
// - in normal cases, the interval is 10 minutes.
// - in failover, the interval is >10m and <20m
//
// FAQ:
// - What if time is not accurate? We don't care as long as someone did the compaction. Atomicity is ensured using
// etcd API.
// - What happened under heavy load scenarios? Initially, each apiserver will do only one compaction
// every 10 minutes. This is very unlikely affecting or affected w.r.t. server load.
var compactTime int64
var rev int64
var err error
for {
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
compactTime, rev, err = compact(ctx, client, compactTime, rev)
if err != nil {
klog.Errorf("etcd: endpoint (%v) compact failed: %v", client.Endpoints(), err)
continue
}
}
}
// compact compacts etcd store and returns current rev.
// It will return the current compact time and global revision if no error occurred.
// Note that CAS fail will not incur any error.
func compact(ctx context.Context, client *clientv3.Client, t, rev int64) (int64, int64, error) {
resp, err := client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.Version(compactRevKey), "=", t),
).Then(
clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10)), // Expect side effect: increment Version
).Else(
clientv3.OpGet(compactRevKey),
).Commit()
if err != nil {
return t, rev, err
}
curRev := resp.Header.Revision
if !resp.Succeeded {
curTime := resp.Responses[0].GetResponseRange().Kvs[0].Version
return curTime, curRev, nil
}
curTime := t + 1
if rev == 0 {
// We don't compact on bootstrap.
return curTime, curRev, nil
}
if _, err = client.Compact(ctx, rev); err != nil {
return curTime, curRev, err
}
klog.V(4).Infof("etcd: compacted rev (%d), endpoints (%v)", rev, client.Endpoints())
return curTime, curRev, nil
}

71
vendor/k8s.io/apiserver/pkg/storage/etcd3/errors.go generated vendored Normal file
View File

@@ -0,0 +1,71 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"k8s.io/apimachinery/pkg/api/errors"
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)
func interpretWatchError(err error) error {
switch {
case err == etcdrpc.ErrCompacted:
return errors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
}
return err
}
const (
expired string = "The resourceVersion for the provided list is too old."
continueExpired string = "The provided continue parameter is too old " +
"to display a consistent list result. You can start a new list without " +
"the continue parameter."
inconsistentContinue string = "The provided continue parameter is too old " +
"to display a consistent list result. You can start a new list without " +
"the continue parameter, or use the continue token in this response to " +
"retrieve the remainder of the results. Continuing with the provided " +
"token results in an inconsistent list - objects that were created, " +
"modified, or deleted between the time the first chunk was returned " +
"and now may show up in the list."
)
func interpretListError(err error, paging bool, continueKey, keyPrefix string) error {
switch {
case err == etcdrpc.ErrCompacted:
if paging {
return handleCompactedErrorForPaging(continueKey, keyPrefix)
}
return errors.NewResourceExpired(expired)
}
return err
}
func handleCompactedErrorForPaging(continueKey, keyPrefix string) error {
// continueToken.ResoureVersion=-1 means that the apiserver can
// continue the list at the latest resource version. We don't use rv=0
// for this purpose to distinguish from a bad token that has empty rv.
newToken, err := encodeContinue(continueKey, keyPrefix, -1)
if err != nil {
utilruntime.HandleError(err)
return errors.NewResourceExpired(continueExpired)
}
statusError := errors.NewResourceExpired(inconsistentContinue)
statusError.ErrStatus.ListMeta.Continue = newToken
return statusError
}

63
vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go generated vendored Normal file
View File

@@ -0,0 +1,63 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
)
type event struct {
key string
value []byte
prevValue []byte
rev int64
isDeleted bool
isCreated bool
}
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
func parseKV(kv *mvccpb.KeyValue) *event {
return &event{
key: string(kv.Key),
value: kv.Value,
prevValue: nil,
rev: kv.ModRevision,
isDeleted: false,
isCreated: true,
}
}
func parseEvent(e *clientv3.Event) (*event, error) {
if !e.IsCreate() && e.PrevKv == nil {
// If the previous value is nil, error. One example of how this is possible is if the previous value has been compacted already.
return nil, fmt.Errorf("etcd event received with PrevKv=nil (key=%q, modRevision=%d, type=%s)", string(e.Kv.Key), e.Kv.ModRevision, e.Type.String())
}
ret := &event{
key: string(e.Kv.Key),
value: e.Kv.Value,
rev: e.Kv.ModRevision,
isDeleted: e.Type == clientv3.EventTypeDelete,
isCreated: e.IsCreate(),
}
if e.PrevKv != nil {
ret.prevValue = e.PrevKv.Value
}
return ret, nil
}

View File

@@ -0,0 +1,102 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
)
// leaseManager is used to manage leases requested from etcd. If a new write
// needs a lease that has similar expiration time to the previous one, the old
// lease will be reused to reduce the overhead of etcd, since lease operations
// are expensive. In the implementation, we only store one previous lease,
// since all the events have the same ttl.
type leaseManager struct {
client *clientv3.Client // etcd client used to grant leases
leaseMu sync.Mutex
prevLeaseID clientv3.LeaseID
prevLeaseExpirationTime time.Time
// The period of time in seconds and percent of TTL that each lease is
// reused. The minimum of them is used to avoid unreasonably large
// numbers. We use var instead of const for testing purposes.
leaseReuseDurationSeconds int64
leaseReuseDurationPercent float64
}
// newDefaultLeaseManager creates a new lease manager using default setting.
func newDefaultLeaseManager(client *clientv3.Client) *leaseManager {
return newLeaseManager(client, 60, 0.05)
}
// newLeaseManager creates a new lease manager with the number of buffered
// leases, lease reuse duration in seconds and percentage. The percentage
// value x means x*100%.
func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64) *leaseManager {
return &leaseManager{
client: client,
leaseReuseDurationSeconds: leaseReuseDurationSeconds,
leaseReuseDurationPercent: leaseReuseDurationPercent,
}
}
// setLeaseReuseDurationSeconds is used for testing purpose. It is used to
// reduce the extra lease duration to avoid unnecessary timeout in testing.
func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) {
l.leaseMu.Lock()
defer l.leaseMu.Unlock()
l.leaseReuseDurationSeconds = duration
}
// GetLease returns a lease based on requested ttl: if the cached previous
// lease can be reused, reuse it; otherwise request a new one from etcd.
func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
now := time.Now()
l.leaseMu.Lock()
defer l.leaseMu.Unlock()
// check if previous lease can be reused
reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl)
valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime)
sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime)
if valid && sufficient {
return l.prevLeaseID, nil
}
// request a lease with a little extra ttl from etcd
ttl += reuseDurationSeconds
lcr, err := l.client.Lease.Grant(ctx, ttl)
if err != nil {
return clientv3.LeaseID(0), err
}
// cache the new lease id
l.prevLeaseID = lcr.ID
l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second)
return lcr.ID, nil
}
// getReuseDurationSecondsLocked returns the reusable duration in seconds
// based on the configuration. Lock has to be acquired before calling this
// function.
func (l *leaseManager) getReuseDurationSecondsLocked(ttl int64) int64 {
reuseDurationSeconds := int64(l.leaseReuseDurationPercent * float64(ttl))
if reuseDurationSeconds > l.leaseReuseDurationSeconds {
reuseDurationSeconds = l.leaseReuseDurationSeconds
}
return reuseDurationSeconds
}

788
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go generated vendored Normal file
View File

@@ -0,0 +1,788 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"path"
"reflect"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"k8s.io/klog"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/value"
utiltrace "k8s.io/apiserver/pkg/util/trace"
)
// authenticatedDataString satisfies the value.Context interface. It uses the key to
// authenticate the stored data. This does not defend against reuse of previously
// encrypted values under the same key, but will prevent an attacker from using an
// encrypted value from a different key. A stronger authenticated data segment would
// include the etcd3 Version field (which is incremented on each write to a key and
// reset when the key is deleted), but an attacker with write access to etcd can
// force deletion and recreation of keys to weaken that angle.
type authenticatedDataString string
// AuthenticatedData implements the value.Context interface.
func (d authenticatedDataString) AuthenticatedData() []byte {
return []byte(string(d))
}
var _ value.Context = authenticatedDataString("")
type store struct {
client *clientv3.Client
// getOpts contains additional options that should be passed
// to all Get() calls.
getOps []clientv3.OpOption
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
pathPrefix string
watcher *watcher
pagingEnabled bool
leaseManager *leaseManager
}
type objState struct {
obj runtime.Object
meta *storage.ResponseMeta
rev int64
data []byte
stale bool
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, pagingEnabled, codec, prefix, transformer)
}
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := etcd.APIObjectVersioner{}
result := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, versioner, transformer),
leaseManager: newDefaultLeaseManager(c),
}
return result
}
// Versioner implements storage.Interface.Versioner.
func (s *store) Versioner() storage.Versioner {
return s.versioner
}
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
key = path.Join(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return err
}
if len(getResp.Kvs) == 0 {
if ignoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(key, 0)
}
kv := getResp.Kvs[0]
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
key = path.Join(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
// Delete implements storage.Interface.Delete.
func (s *store) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
}
key = path.Join(s.pathPrefix, key)
if preconditions == nil {
return s.unconditionalDelete(ctx, key, out)
}
return s.conditionalDelete(ctx, key, out, v, preconditions)
}
func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
// We need to do get and delete in single transaction in order to
// know the value and revision before deleting it.
txnResp, err := s.client.KV.Txn(ctx).If().Then(
clientv3.OpGet(key),
clientv3.OpDelete(key),
).Commit()
if err != nil {
return err
}
getResp := txnResp.Responses[0].GetResponseRange()
if len(getResp.Kvs) == 0 {
return storage.NewKeyNotFoundError(key, 0)
}
kv := getResp.Kvs[0]
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error {
getResp, err := s.client.KV.Get(ctx, key)
if err != nil {
return err
}
for {
origState, err := s.getState(getResp, key, v, false)
if err != nil {
return err
}
if err := preconditions.Check(key, origState.obj); err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpDelete(key),
).Else(
clientv3.OpGet(key),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
continue
}
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
func (s *store) GuaranteedUpdate(
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", reflect.TypeOf(out).String()))
defer trace.LogIfLong(500 * time.Millisecond)
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
}
key = path.Join(s.pathPrefix, key)
getCurrentState := func() (*objState, error) {
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return nil, err
}
return s.getState(getResp, key, v, ignoreNotFound)
}
var origState *objState
var mustCheckData bool
if len(suggestion) == 1 && suggestion[0] != nil {
origState, err = s.getStateFromObject(suggestion[0])
if err != nil {
return err
}
mustCheckData = true
} else {
origState, err = getCurrentState()
if err != nil {
return err
}
}
trace.Step("initial value restored")
transformContext := authenticatedDataString(key)
for {
if err := preconditions.Check(key, origState.obj); err != nil {
return err
}
ret, ttl, err := s.updateState(origState, tryUpdate)
if err != nil {
// It's possible we were working with stale data
if mustCheckData && apierrors.IsConflict(err) {
// Actually fetch
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
// Retry
continue
}
return err
}
data, err := runtime.Encode(s.codec, ret)
if err != nil {
return err
}
if !origState.stale && bytes.Equal(data, origState.data) {
// if we skipped the original Get in this loop, we must refresh from
// etcd in order to be sure the data in the store is equivalent to
// our desired serialization
if mustCheckData {
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
if !bytes.Equal(data, origState.data) {
// original data changed, restart loop
continue
}
}
// recheck that the data from etcd is not stale before short-circuiting a write
if !origState.stale {
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
newData, err := s.transformer.TransformToStorage(data, transformContext)
if err != nil {
return storage.NewInternalError(err.Error())
}
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
trace.Step("Transaction prepared")
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Else(
clientv3.OpGet(key),
).Commit()
if err != nil {
return err
}
trace.Step("Transaction committed")
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(getResp, key, v, ignoreNotFound)
if err != nil {
return err
}
trace.Step("Retry value restored")
mustCheckData = false
continue
}
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
}
// GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
}
key = path.Join(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return err
}
if len(getResp.Kvs) > 0 {
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil {
return err
}
}
// update version with cluster level revision
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "")
}
func (s *store) Count(key string) (int64, error) {
key = path.Join(s.pathPrefix, key)
getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
if err != nil {
return 0, err
}
return getResp.Count, nil
}
// continueToken is a simple structured object for encoding the state of a continue token.
// TODO: if we change the version of the encoded from, we can't start encoding the new version
// until all other servers are upgraded (i.e. we need to support rolling schema)
// This is a public API struct and cannot change.
type continueToken struct {
APIVersion string `json:"v"`
ResourceVersion int64 `json:"rv"`
StartKey string `json:"start"`
}
// parseFrom transforms an encoded predicate from into a versioned struct.
// TODO: return a typed error that instructs clients that they must relist
func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) {
data, err := base64.RawURLEncoding.DecodeString(continueValue)
if err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
var c continueToken
if err := json.Unmarshal(data, &c); err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
switch c.APIVersion {
case "meta.k8s.io/v1":
if c.ResourceVersion == 0 {
return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)")
}
if len(c.StartKey) == 0 {
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)")
}
// defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot
// be at a higher level of the hierarchy, and so when we append the key prefix we will end up with
// continue start key that is fully qualified and cannot range over anything less specific than
// keyPrefix.
key := c.StartKey
if !strings.HasPrefix(key, "/") {
key = "/" + key
}
cleaned := path.Clean(key)
if cleaned != key {
return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey)
}
return keyPrefix + cleaned[1:], c.ResourceVersion, nil
default:
return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion)
}
}
// encodeContinue returns a string representing the encoded continuation of the current query.
func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) {
nextKey := strings.TrimPrefix(key, keyPrefix)
if nextKey == key {
return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match")
}
out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(out), nil
}
// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
}
if s.pathPrefix != "" {
key = path.Join(s.pathPrefix, key)
}
// We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
if !strings.HasSuffix(key, "/") {
key += "/"
}
keyPrefix := key
// set the appropriate clientv3 options to filter the returned data set
var paging bool
options := make([]clientv3.OpOption, 0, 4)
if s.pagingEnabled && pred.Limit > 0 {
paging = true
options = append(options, clientv3.WithLimit(pred.Limit))
}
var returnedRV, continueRV int64
var continueKey string
switch {
case s.pagingEnabled && len(pred.Continue) > 0:
continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
if len(resourceVersion) > 0 && resourceVersion != "0" {
return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
key = continueKey
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
options = append(options, clientv3.WithRev(continueRV))
returnedRV = continueRV
}
case s.pagingEnabled && pred.Limit > 0:
if len(resourceVersion) > 0 {
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(int64(fromRV)))
}
returnedRV = int64(fromRV)
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
default:
if len(resourceVersion) > 0 {
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(int64(fromRV)))
}
returnedRV = int64(fromRV)
}
options = append(options, clientv3.WithPrefix())
}
// loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte
var hasMore bool
for {
getResp, err := s.client.KV.Get(ctx, key, options...)
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
}
hasMore = getResp.More
if len(getResp.Kvs) == 0 && getResp.More {
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
}
// avoid small allocations for the result slice, since this can be called in many
// different contexts and we don't know how significantly the result will be filtered
if pred.Empty() {
growSlice(v, len(getResp.Kvs))
} else {
growSlice(v, 2048, len(getResp.Kvs))
}
// take items from the response until the bucket is full, filtering as we go
for _, kv := range getResp.Kvs {
if paging && int64(v.Len()) >= pred.Limit {
hasMore = true
break
}
lastKey = kv.Key
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
if err != nil {
return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
}
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner); err != nil {
return err
}
}
// indicate to the client which resource version was returned
if returnedRV == 0 {
returnedRV = getResp.Header.Revision
}
// no more results remain or we didn't request paging
if !hasMore || !paging {
break
}
// we're paging but we have filled our bucket
if int64(v.Len()) >= pred.Limit {
break
}
key = string(lastKey) + "\x00"
}
// instruct the client to begin querying from immediately after the last key we returned
// we never return a key that the client wouldn't be allowed to see
if hasMore {
// we want to start immediately after the last key
next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
if err != nil {
return err
}
return s.versioner.UpdateList(listObj, uint64(returnedRV), next)
}
// no continuation
return s.versioner.UpdateList(listObj, uint64(returnedRV), "")
}
// growSlice takes a slice value and grows its capacity up
// to the maximum of the passed sizes or maxCapacity, whichever
// is smaller. Above maxCapacity decisions about allocation are left
// to the Go runtime on append. This allows a caller to make an
// educated guess about the potential size of the total list while
// still avoiding overly aggressive initial allocation. If sizes
// is empty maxCapacity will be used as the size to grow.
func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
cap := v.Cap()
max := cap
for _, size := range sizes {
if size > max {
max = size
}
}
if len(sizes) == 0 || max > maxCapacity {
max = maxCapacity
}
if max <= cap {
return
}
if v.Len() > 0 {
extra := reflect.MakeSlice(v.Type(), 0, max)
reflect.Copy(extra, v)
v.Set(extra)
} else {
extra := reflect.MakeSlice(v.Type(), 0, max)
v.Set(extra)
}
}
// Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, pred, false)
}
// WatchList implements storage.Interface.WatchList.
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, pred, true)
}
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
rev, err := s.versioner.ParseResourceVersion(rv)
if err != nil {
return nil, err
}
key = path.Join(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), recursive, pred)
}
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
state := &objState{
obj: reflect.New(v.Type()).Interface().(runtime.Object),
meta: &storage.ResponseMeta{},
}
if len(getResp.Kvs) == 0 {
if !ignoreNotFound {
return nil, storage.NewKeyNotFoundError(key, 0)
}
if err := runtime.SetZeroValue(state.obj); err != nil {
return nil, err
}
} else {
data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
if err != nil {
return nil, storage.NewInternalError(err.Error())
}
state.rev = getResp.Kvs[0].ModRevision
state.meta.ResourceVersion = uint64(state.rev)
state.data = data
state.stale = stale
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
return nil, err
}
}
return state, nil
}
func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) {
state := &objState{
obj: obj,
meta: &storage.ResponseMeta{},
}
rv, err := s.versioner.ObjectResourceVersion(obj)
if err != nil {
return nil, fmt.Errorf("couldn't get resource version: %v", err)
}
state.rev = int64(rv)
state.meta.ResourceVersion = uint64(state.rev)
// Compute the serialized form - for that we need to temporarily clean
// its resource version field (those are not stored in etcd).
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return nil, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
state.data, err = runtime.Encode(s.codec, obj)
if err != nil {
return nil, err
}
s.versioner.UpdateObject(state.obj, uint64(rv))
return state, nil
}
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
if err != nil {
return nil, 0, err
}
if err := s.versioner.PrepareObjectForStorage(ret); err != nil {
return nil, 0, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
var ttl uint64
if ttlPtr != nil {
ttl = *ttlPtr
}
return ret, ttl, nil
}
// ttlOpts returns client options based on given ttl.
// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
if ttl == 0 {
return nil, nil
}
id, err := s.leaseManager.GetLease(ctx, ttl)
if err != nil {
return nil, err
}
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
}
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
if _, err := conversion.EnforcePtr(objPtr); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err := codec.Decode(value, nil, objPtr)
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(objPtr, uint64(rev))
return nil
}
// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner) error {
obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(obj, rev)
if matched, err := pred.Matches(obj); err == nil && matched {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
return nil
}
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}

408
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go generated vendored Normal file
View File

@@ -0,0 +1,408 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/value"
"github.com/coreos/etcd/clientv3"
"k8s.io/klog"
)
const (
// We have set a buffer in order to reduce times of context switches.
incomingBufSize = 100
outgoingBufSize = 100
)
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
var fatalOnDecodeError = false
// errTestingDecode is the only error that testingDeferOnDecodeError catches during a panic
var errTestingDecode = errors.New("sentinel error only used during testing to indicate watch decoding error")
// testingDeferOnDecodeError is used during testing to recover from a panic caused by errTestingDecode, all other values continue to panic
func testingDeferOnDecodeError() {
if r := recover(); r != nil && r != errTestingDecode {
panic(r)
}
}
func init() {
// check to see if we are running in a test environment
fatalOnDecodeError, _ = strconv.ParseBool(os.Getenv("KUBE_PANIC_WATCH_DECODE_ERROR"))
}
type watcher struct {
client *clientv3.Client
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
}
// watchChan implements watch.Interface.
type watchChan struct {
watcher *watcher
key string
initialRev int64
recursive bool
internalPred storage.SelectionPredicate
ctx context.Context
cancel context.CancelFunc
incomingEventChan chan *event
resultChan chan watch.Event
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher {
return &watcher{
client: client,
codec: codec,
versioner: versioner,
transformer: transformer,
}
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
// If rev is zero, it will return the existing object(s) and then start watching from
// the maximum revision+1 from returned objects.
// If rev is non-zero, it will watch events happened after given revision.
// If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if pred matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, pred)
go wc.run()
return wc, nil
}
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan {
wc := &watchChan{
watcher: w,
key: key,
initialRev: rev,
recursive: recursive,
internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
errChan: make(chan error, 1),
}
if pred.Empty() {
// The filter doesn't filter out any object.
wc.internalPred = storage.Everything
}
wc.ctx, wc.cancel = context.WithCancel(ctx)
return wc
}
func (wc *watchChan) run() {
watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)
var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
go wc.processEvent(&resultChanWG)
select {
case err := <-wc.errChan:
if err == context.Canceled {
break
}
errResult := transformErrorToEvent(err)
if errResult != nil {
// error result is guaranteed to be received by user before closing ResultChan.
select {
case wc.resultChan <- *errResult:
case <-wc.ctx.Done(): // user has given up all results
}
}
case <-watchClosedCh:
case <-wc.ctx.Done(): // user cancel
}
// We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all.
// It's fine to double cancel.
wc.cancel()
// we need to wait until resultChan wouldn't be used anymore
resultChanWG.Wait()
close(wc.resultChan)
}
func (wc *watchChan) Stop() {
wc.cancel()
}
func (wc *watchChan) ResultChan() <-chan watch.Event {
return wc.resultChan
}
// sync tries to retrieve existing data and send them to process.
// The revision to watch will be set to the revision in response.
// All events sent will have isCreated=true
func (wc *watchChan) sync() error {
opts := []clientv3.OpOption{}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
if err != nil {
return err
}
wc.initialRev = getResp.Header.Revision
for _, kv := range getResp.Kvs {
wc.sendEvent(parseKV(kv))
}
return nil
}
// startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.initialRev == 0 {
if err := wc.sync(); err != nil {
klog.Errorf("failed to sync with latest state: %v", err)
wc.sendError(err)
return
}
}
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
for wres := range wch {
if wres.Err() != nil {
err := wres.Err()
// If there is an error on server (e.g. compaction), the channel will return it before closed.
klog.Errorf("watch chan error: %v", err)
wc.sendError(err)
return
}
for _, e := range wres.Events {
parsedEvent, err := parseEvent(e)
if err != nil {
klog.Errorf("watch chan error: %v", err)
wc.sendError(err)
return
}
wc.sendEvent(parsedEvent)
}
}
// When we come to this point, it's only possible that client side ends the watch.
// e.g. cancel the context, close the client.
// If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
// We should notify the main thread that this goroutine has exited.
close(watchClosedCh)
}
// processEvent processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case e := <-wc.incomingEventChan:
res := wc.transform(e)
if res == nil {
continue
}
if len(wc.resultChan) == outgoingBufSize {
klog.V(3).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
"Probably caused by slow dispatching events to watchers", outgoingBufSize)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case wc.resultChan <- *res:
case <-wc.ctx.Done():
return
}
case <-wc.ctx.Done():
return
}
}
}
func (wc *watchChan) filter(obj runtime.Object) bool {
if wc.internalPred.Empty() {
return true
}
matched, err := wc.internalPred.Matches(obj)
return err == nil && matched
}
func (wc *watchChan) acceptAll() bool {
return wc.internalPred.Empty()
}
// transform transforms an event into a result for user if not filtered.
func (wc *watchChan) transform(e *event) (res *watch.Event) {
curObj, oldObj, err := wc.prepareObjs(e)
if err != nil {
klog.Errorf("failed to prepare current and previous objects: %v", err)
wc.sendError(err)
return nil
}
switch {
case e.isDeleted:
if !wc.filter(oldObj) {
return nil
}
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
case e.isCreated:
if !wc.filter(curObj) {
return nil
}
res = &watch.Event{
Type: watch.Added,
Object: curObj,
}
default:
if wc.acceptAll() {
res = &watch.Event{
Type: watch.Modified,
Object: curObj,
}
return res
}
curObjPasses := wc.filter(curObj)
oldObjPasses := wc.filter(oldObj)
switch {
case curObjPasses && oldObjPasses:
res = &watch.Event{
Type: watch.Modified,
Object: curObj,
}
case curObjPasses && !oldObjPasses:
res = &watch.Event{
Type: watch.Added,
Object: curObj,
}
case !curObjPasses && oldObjPasses:
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
}
}
return res
}
func transformErrorToEvent(err error) *watch.Event {
err = interpretWatchError(err)
if _, ok := err.(apierrs.APIStatus); !ok {
err = apierrs.NewInternalError(err)
}
status := err.(apierrs.APIStatus).Status()
return &watch.Event{
Type: watch.Error,
Object: &status,
}
}
func (wc *watchChan) sendError(err error) {
select {
case wc.errChan <- err:
case <-wc.ctx.Done():
}
}
func (wc *watchChan) sendEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
klog.V(3).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
"Probably caused by slow decoding, user not receiving fast, or other processing logic",
incomingBufSize)
}
select {
case wc.incomingEventChan <- e:
case <-wc.ctx.Done():
}
}
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
if !e.isDeleted {
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
}
}
// We need to decode prevValue, only if this is deletion event or
// the underlying filter doesn't accept all objects (otherwise we
// know that the filter for previous object will return true and
// we need the object only to compute whether it was filtered out
// before).
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
// Note that this sends the *old* object with the etcd revision for the time at
// which it gets deleted.
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
}
}
return curObj, oldObj, nil
}
func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) {
obj, err := runtime.Decode(codec, []byte(data))
if err != nil {
if fatalOnDecodeError {
// catch watch decode error iff we caused it on
// purpose during a unit test
defer testingDeferOnDecodeError()
// we are running in a test environment and thus an
// error here is due to a coder mistake if the defer
// does not catch it
panic(err)
}
return nil, err
}
// ensure resource version is set on the object we load from etcd
if err := versioner.UpdateObject(obj, uint64(rev)); err != nil {
return nil, fmt.Errorf("failure to version api object (%d) %#v: %v", rev, obj, err)
}
return obj, nil
}

223
vendor/k8s.io/apiserver/pkg/storage/interfaces.go generated vendored Normal file
View File

@@ -0,0 +1,223 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
)
// Versioner abstracts setting and retrieving metadata fields from database response
// onto the object ot list. It is required to maintain storage invariants - updating an
// object twice with the same data except for the ResourceVersion and SelfLink must be
// a no-op. A resourceVersion of type uint64 is a 'raw' resourceVersion,
// intended to be sent directly to or from the backend. A resourceVersion of
// type string is a 'safe' resourceVersion, intended for consumption by users.
type Versioner interface {
// UpdateObject sets storage metadata into an API object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
// from database.
UpdateObject(obj runtime.Object, resourceVersion uint64) error
// UpdateList sets the resource version into an API list object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
// from database. continueValue is optional and indicates that more results are available if
// the client passes that value to the server in a subsequent call.
UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error
// PrepareObjectForStorage should set SelfLink and ResourceVersion to the empty value. Should
// return an error if the specified object cannot be updated.
PrepareObjectForStorage(obj runtime.Object) error
// ObjectResourceVersion returns the resource version (for persistence) of the specified object.
// Should return an error if the specified object does not have a persistable version.
ObjectResourceVersion(obj runtime.Object) (uint64, error)
// ParseResourceVersion takes a resource version argument and
// converts it to the storage backend. For watch we should pass to helper.Watch().
// Because resourceVersion is an opaque value, the default watch
// behavior for non-zero watch is to watch the next value (if you pass
// "1", you will see updates from "2" onwards).
ParseResourceVersion(resourceVersion string) (uint64, error)
}
// ResponseMeta contains information about the database metadata that is associated with
// an object. It abstracts the actual underlying objects to prevent coupling with concrete
// database and to improve testability.
type ResponseMeta struct {
// TTL is the time to live of the node that contained the returned object. It may be
// zero or negative in some cases (objects may be expired after the requested
// expiration time due to server lag).
TTL int64
// The resource version of the node that contained the returned object.
ResourceVersion uint64
}
// MatchValue defines a pair (<index name>, <value for that index>).
type MatchValue struct {
IndexName string
Value string
}
// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs
// (<index name>, <index value for the given object>) for all indexes known
// to that function.
type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
// Everything accepts all objects.
var Everything = SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
// TODO: split this into a new top level constant?
IncludeUninitialized: true,
}
// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update
// that is guaranteed to succeed.
// See the comment for GuaranteedUpdate for more details.
type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
// Preconditions must be fulfilled before an operation (update, delete, etc.) is carried out.
type Preconditions struct {
// Specifies the target UID.
// +optional
UID *types.UID `json:"uid,omitempty"`
}
// NewUIDPreconditions returns a Preconditions with UID set.
func NewUIDPreconditions(uid string) *Preconditions {
u := types.UID(uid)
return &Preconditions{UID: &u}
}
func (p *Preconditions) Check(key string, obj runtime.Object) error {
if p == nil {
return nil
}
objMeta, err := meta.Accessor(obj)
if err != nil {
return NewInternalErrorf(
"can't enforce preconditions %v on un-introspectable object %v, got error: %v",
*p,
obj,
err)
}
if p.UID != nil && *p.UID != objMeta.GetUID() {
err := fmt.Sprintf(
"Precondition failed: UID in precondition: %v, UID in object meta: %v",
*p.UID,
objMeta.GetUID())
return NewInvalidObjError(key, err)
}
return nil
}
// Interface offers a common interface for object marshaling/unmarshaling operations and
// hides all the storage-related operations behind it.
type Interface interface {
// Returns Versioner associated with this interface.
Versioner() Versioner
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from database.
Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
// Delete removes the specified key and returns the value that existed at that spot.
// If key didn't exist, it will return NotFound storage error.
Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items selected by 'p' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts.
Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item selected by 'p' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will list current objects directory defined by key
// and send them in "ADDED" events, before watch starts.
WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
// Treats empty responses and nil response nodes exactly like a not found error.
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
// GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is index conflict.
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
// the current contents of the object when deciding how the update object should look.
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
// or zero value in 'ptrToType' parameter otherwise.
// If the object to update has the same value as previous, it won't do any update
// but will return the object in 'ptrToType' parameter.
// If 'suggestion' can contain zero or one element - in such case this can be used as
// a suggestion about the current version of the object to avoid read operation from
// storage to get it.
//
// Example:
//
// s := /* implementation of Interface */
// err := s.GuaranteedUpdate(
// "myKey", &MyType{}, true,
// func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
// // Before each incovation of the user defined function, "input" is reset to
// // current contents for "myKey" in database.
// curr := input.(*MyType) // Guaranteed to succeed.
//
// // Make the modification
// curr.Counter++
//
// // Return the modified object - return an error to stop iterating. Return
// // a uint64 to alter the TTL on the object, or nil to keep it the same value.
// return cur, nil, nil
// }
// })
GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
// Count returns number of different entries under the key (generally being path prefix).
Count(key string) (int64, error)
}

54
vendor/k8s.io/apiserver/pkg/storage/names/generate.go generated vendored Normal file
View File

@@ -0,0 +1,54 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package names
import (
"fmt"
utilrand "k8s.io/apimachinery/pkg/util/rand"
)
// NameGenerator generates names for objects. Some backends may have more information
// available to guide selection of new names and this interface hides those details.
type NameGenerator interface {
// GenerateName generates a valid name from the base name, adding a random suffix to the
// the base. If base is valid, the returned name must also be valid. The generator is
// responsible for knowing the maximum valid name length.
GenerateName(base string) string
}
// simpleNameGenerator generates random names.
type simpleNameGenerator struct{}
// SimpleNameGenerator is a generator that returns the name plus a random suffix of five alphanumerics
// when a name is requested. The string is guaranteed to not exceed the length of a standard Kubernetes
// name (63 characters)
var SimpleNameGenerator NameGenerator = simpleNameGenerator{}
const (
// TODO: make this flexible for non-core resources with alternate naming rules.
maxNameLength = 63
randomLength = 5
maxGeneratedNameLength = maxNameLength - randomLength
)
func (simpleNameGenerator) GenerateName(base string) string {
if len(base) > maxGeneratedNameLength {
base = base[:maxGeneratedNameLength]
}
return fmt.Sprintf("%s%s", base, utilrand.String(randomLength))
}

View File

@@ -0,0 +1,150 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)
// AttrFunc returns label and field sets and the uninitialized flag for List or Watch to match.
// In any failure to parse given object, it returns error.
type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, bool, error)
// FieldMutationFunc allows the mutation of the field selection fields. It is mutating to
// avoid the extra allocation on this common path
type FieldMutationFunc func(obj runtime.Object, fieldSet fields.Set) error
func DefaultClusterScopedAttr(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
metadata, err := meta.Accessor(obj)
if err != nil {
return nil, nil, false, err
}
fieldSet := fields.Set{
"metadata.name": metadata.GetName(),
}
return labels.Set(metadata.GetLabels()), fieldSet, metadata.GetInitializers() != nil, nil
}
func DefaultNamespaceScopedAttr(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
metadata, err := meta.Accessor(obj)
if err != nil {
return nil, nil, false, err
}
fieldSet := fields.Set{
"metadata.name": metadata.GetName(),
"metadata.namespace": metadata.GetNamespace(),
}
return labels.Set(metadata.GetLabels()), fieldSet, metadata.GetInitializers() != nil, nil
}
func (f AttrFunc) WithFieldMutation(fieldMutator FieldMutationFunc) AttrFunc {
return func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
labelSet, fieldSet, initialized, err := f(obj)
if err != nil {
return nil, nil, false, err
}
if err := fieldMutator(obj, fieldSet); err != nil {
return nil, nil, false, err
}
return labelSet, fieldSet, initialized, nil
}
}
// SelectionPredicate is used to represent the way to select objects from api storage.
type SelectionPredicate struct {
Label labels.Selector
Field fields.Selector
IncludeUninitialized bool
GetAttrs AttrFunc
IndexFields []string
Limit int64
Continue string
}
// Matches returns true if the given object's labels and fields (as
// returned by s.GetAttrs) match s.Label and s.Field. An error is
// returned if s.GetAttrs fails.
func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
if s.Empty() {
return true, nil
}
labels, fields, uninitialized, err := s.GetAttrs(obj)
if err != nil {
return false, err
}
if !s.IncludeUninitialized && uninitialized {
return false, nil
}
matched := s.Label.Matches(labels)
if matched && s.Field != nil {
matched = matched && s.Field.Matches(fields)
}
return matched, nil
}
// MatchesObjectAttributes returns true if the given labels and fields
// match s.Label and s.Field.
func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set, uninitialized bool) bool {
if !s.IncludeUninitialized && uninitialized {
return false
}
if s.Label.Empty() && s.Field.Empty() {
return true
}
matched := s.Label.Matches(l)
if matched && s.Field != nil {
matched = (matched && s.Field.Matches(f))
}
return matched
}
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
// name.
func (s *SelectionPredicate) MatchesSingle() (string, bool) {
if len(s.Continue) > 0 {
return "", false
}
// TODO: should be namespace.name
if name, ok := s.Field.RequiresExactMatch("metadata.name"); ok {
return name, true
}
return "", false
}
// For any index defined by IndexFields, if a matcher can match only (a subset)
// of objects that return <value> for a given index, a pair (<index name>, <value>)
// wil be returned.
// TODO: Consider supporting also labels.
func (s *SelectionPredicate) MatcherIndex() []MatchValue {
var result []MatchValue
for _, field := range s.IndexFields {
if value, ok := s.Field.RequiresExactMatch(field); ok {
result = append(result, MatchValue{IndexName: field, Value: value})
}
}
return result
}
// Empty returns true if the predicate performs no filtering.
func (s *SelectionPredicate) Empty() bool {
return s.Label.Empty() && s.Field.Empty() && s.IncludeUninitialized
}

View File

@@ -0,0 +1,6 @@
reviewers:
- lavalamp
- smarterclayton
- wojtek-t
- timothysc
- hongchaodeng

View File

@@ -0,0 +1,69 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storagebackend
import (
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/storage/value"
)
const (
StorageTypeUnset = ""
StorageTypeETCD3 = "etcd3"
DefaultCompactInterval = 5 * time.Minute
)
// Config is configuration for creating a storage backend.
type Config struct {
// Type defines the type of storage backend. Default ("") is "etcd3".
Type string
// Prefix is the prefix to all keys passed to storage.Interface methods.
Prefix string
// ServerList is the list of storage servers to connect with.
ServerList []string
// TLS credentials
KeyFile string
CertFile string
CAFile string
// Paging indicates whether the server implementation should allow paging (if it is
// supported). This is generally configured by feature gating, or by a specific
// resource type not wishing to allow paging, and is not intended for end users to
// set.
Paging bool
Codec runtime.Codec
// Transformer allows the value to be transformed prior to persisting into etcd.
Transformer value.Transformer
// CompactionInterval is an interval of requesting compaction from apiserver.
// If the value is 0, no compaction will be issued.
CompactionInterval time.Duration
// CountMetricPollPeriod specifies how often should count metric be updated
CountMetricPollPeriod time.Duration
}
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
return &Config{
Prefix: prefix,
Codec: codec,
CompactionInterval: DefaultCompactInterval,
}
}

View File

@@ -0,0 +1,127 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/value"
)
// The short keepalive timeout and interval have been chosen to aggressively
// detect a failed etcd server without introducing much overhead.
const keepaliveTime = 30 * time.Second
const keepaliveTimeout = 10 * time.Second
// dialTimeout is the timeout for failing to establish a connection.
// It is set to 20 seconds as times shorter than that will cause TLS connections to fail
// on heavily loaded arm64 CPUs (issue #64649)
const dialTimeout = 20 * time.Second
func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) {
// constructing the etcd v3 client blocks and times out if etcd is not available.
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
clientValue := &atomic.Value{}
clientErrMsg := &atomic.Value{}
clientErrMsg.Store("etcd client connection not yet established")
go wait.PollUntil(time.Second, func() (bool, error) {
client, err := newETCD3Client(c)
if err != nil {
clientErrMsg.Store(err.Error())
return false, nil
}
clientValue.Store(client)
clientErrMsg.Store("")
return true, nil
}, wait.NeverStop)
return func() error {
if errMsg := clientErrMsg.Load().(string); len(errMsg) > 0 {
return fmt.Errorf(errMsg)
}
client := clientValue.Load().(*clientv3.Client)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := client.Cluster.MemberList(ctx); err != nil {
return fmt.Errorf("error listing etcd members: %v", err)
}
return nil
}, nil
}
func newETCD3Client(c storagebackend.Config) (*clientv3.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
CAFile: c.CAFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
// NOTE: Client relies on nil tlsConfig
// for non-secure connections, update the implicit variable
if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.CAFile) == 0 {
tlsConfig = nil
}
cfg := clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
},
Endpoints: c.ServerList,
TLS: tlsConfig,
}
return clientv3.New(cfg)
}
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
client, err := newETCD3Client(c)
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithCancel(context.Background())
etcd3.StartCompactor(ctx, client, c.CompactionInterval)
destroyFunc := func() {
cancel()
client.Close()
}
transformer := c.Transformer
if transformer == nil {
transformer = value.IdentityTransformer
}
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}

View File

@@ -0,0 +1,51 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"fmt"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
// DestroyFunc is to destroy any resources used by the storage returned in Create() together.
type DestroyFunc func()
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
// CreateHealthCheck creates a healthcheck function based on given config.
func CreateHealthCheck(c storagebackend.Config) (func() error, error) {
switch c.Type {
case "etcd2":
return nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3HealthCheck(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

89
vendor/k8s.io/apiserver/pkg/storage/util.go generated vendored Normal file
View File

@@ -0,0 +1,89 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"fmt"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/validation/path"
"k8s.io/apimachinery/pkg/runtime"
)
type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error)
// SimpleUpdateFunc converts SimpleUpdateFunc into UpdateFunc
func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) {
out, err := fn(input)
return out, nil, err
}
}
func EverythingFunc(runtime.Object) bool {
return true
}
func NoTriggerFunc() []MatchValue {
return nil
}
func NoTriggerPublisher(runtime.Object) []MatchValue {
return nil
}
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return "", err
}
name := meta.GetName()
if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
return "", fmt.Errorf("invalid name: %v", msgs)
}
return prefix + "/" + meta.GetNamespace() + "/" + name, nil
}
func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return "", err
}
name := meta.GetName()
if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
return "", fmt.Errorf("invalid name: %v", msgs)
}
return prefix + "/" + name, nil
}
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64
// Update returns true if and only if 'current' is the highest value ever seen.
func (hwm *HighWaterMark) Update(current int64) bool {
for {
old := atomic.LoadInt64((*int64)(hwm))
if current <= old {
return false
}
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
return true
}
}
}

124
vendor/k8s.io/apiserver/pkg/storage/value/metrics.go generated vendored Normal file
View File

@@ -0,0 +1,124 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package value
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
const (
namespace = "apiserver"
subsystem = "storage"
)
var (
transformerLatencies = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "transformation_latencies_microseconds",
Help: "Latencies in microseconds of value transformation operations.",
// In-process transformations (ex. AES CBC) complete on the order of 20 microseconds. However, when
// external KMS is involved latencies may climb into milliseconds.
Buckets: prometheus.ExponentialBuckets(5, 2, 14),
},
[]string{"transformation_type"},
)
transformerFailuresTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "transformation_failures_total",
Help: "Total number of failed transformation operations.",
},
[]string{"transformation_type"},
)
envelopeTransformationCacheMissTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "envelope_transformation_cache_misses_total",
Help: "Total number of cache misses while accessing key decryption key(KEK).",
},
)
dataKeyGenerationLatencies = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "data_key_generation_latencies_microseconds",
Help: "Latencies in microseconds of data encryption key(DEK) generation operations.",
Buckets: prometheus.ExponentialBuckets(5, 2, 14),
},
)
dataKeyGenerationFailuresTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "data_key_generation_failures_total",
Help: "Total number of failed data encryption key(DEK) generation operations.",
},
)
)
var registerMetrics sync.Once
func RegisterMetrics() {
registerMetrics.Do(func() {
prometheus.MustRegister(transformerLatencies)
prometheus.MustRegister(transformerFailuresTotal)
prometheus.MustRegister(envelopeTransformationCacheMissTotal)
prometheus.MustRegister(dataKeyGenerationLatencies)
prometheus.MustRegister(dataKeyGenerationFailuresTotal)
})
}
// RecordTransformation records latencies and count of TransformFromStorage and TransformToStorage operations.
func RecordTransformation(transformationType string, start time.Time, err error) {
if err != nil {
transformerFailuresTotal.WithLabelValues(transformationType).Inc()
return
}
since := sinceInMicroseconds(start)
transformerLatencies.WithLabelValues(transformationType).Observe(float64(since))
}
// RecordCacheMiss records a miss on Key Encryption Key(KEK) - call to KMS was required to decrypt KEK.
func RecordCacheMiss() {
envelopeTransformationCacheMissTotal.Inc()
}
// RecordDataKeyGeneration records latencies and count of Data Encryption Key generation operations.
func RecordDataKeyGeneration(start time.Time, err error) {
if err != nil {
dataKeyGenerationFailuresTotal.Inc()
return
}
since := sinceInMicroseconds(start)
dataKeyGenerationLatencies.Observe(float64(since))
}
func sinceInMicroseconds(start time.Time) int64 {
elapsedNanoseconds := time.Since(start).Nanoseconds()
return elapsedNanoseconds / int64(time.Microsecond)
}

View File

@@ -0,0 +1,164 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package value contains methods for assisting with transformation of values in storage.
package value
import (
"bytes"
"fmt"
"sync"
"time"
)
func init() {
RegisterMetrics()
}
// Context is additional information that a storage transformation may need to verify the data at rest.
type Context interface {
// AuthenticatedData should return an array of bytes that describes the current value. If the value changes,
// the transformer may report the value as unreadable or tampered. This may be nil if no such description exists
// or is needed. For additional verification, set this to data that strongly identifies the value, such as
// the key and creation version of the stored data.
AuthenticatedData() []byte
}
// Transformer allows a value to be transformed before being read from or written to the underlying store. The methods
// must be able to undo the transformation caused by the other.
type Transformer interface {
// TransformFromStorage may transform the provided data from its underlying storage representation or return an error.
// Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
// have not changed.
TransformFromStorage(data []byte, context Context) (out []byte, stale bool, err error)
// TransformToStorage may transform the provided data into the appropriate form in storage or return an error.
TransformToStorage(data []byte, context Context) (out []byte, err error)
}
type identityTransformer struct{}
// IdentityTransformer performs no transformation of the provided data.
var IdentityTransformer Transformer = identityTransformer{}
func (identityTransformer) TransformFromStorage(b []byte, ctx Context) ([]byte, bool, error) {
return b, false, nil
}
func (identityTransformer) TransformToStorage(b []byte, ctx Context) ([]byte, error) {
return b, nil
}
// DefaultContext is a simple implementation of Context for a slice of bytes.
type DefaultContext []byte
// AuthenticatedData returns itself.
func (c DefaultContext) AuthenticatedData() []byte { return []byte(c) }
// MutableTransformer allows a transformer to be changed safely at runtime.
type MutableTransformer struct {
lock sync.RWMutex
transformer Transformer
}
// NewMutableTransformer creates a transformer that can be updated at any time by calling Set()
func NewMutableTransformer(transformer Transformer) *MutableTransformer {
return &MutableTransformer{transformer: transformer}
}
// Set updates the nested transformer.
func (t *MutableTransformer) Set(transformer Transformer) {
t.lock.Lock()
t.transformer = transformer
t.lock.Unlock()
}
func (t *MutableTransformer) TransformFromStorage(data []byte, context Context) (out []byte, stale bool, err error) {
defer func(start time.Time) {
RecordTransformation("from_storage", start, err)
}(time.Now())
t.lock.RLock()
transformer := t.transformer
t.lock.RUnlock()
return transformer.TransformFromStorage(data, context)
}
func (t *MutableTransformer) TransformToStorage(data []byte, context Context) (out []byte, err error) {
defer func(start time.Time) {
RecordTransformation("to_storage", start, err)
}(time.Now())
t.lock.RLock()
transformer := t.transformer
t.lock.RUnlock()
return transformer.TransformToStorage(data, context)
}
// PrefixTransformer holds a transformer interface and the prefix that the transformation is located under.
type PrefixTransformer struct {
Prefix []byte
Transformer Transformer
}
type prefixTransformers struct {
transformers []PrefixTransformer
err error
}
var _ Transformer = &prefixTransformers{}
// NewPrefixTransformers supports the Transformer interface by checking the incoming data against the provided
// prefixes in order. The first matching prefix will be used to transform the value (the prefix is stripped
// before the Transformer interface is invoked). The first provided transformer will be used when writing to
// the store.
func NewPrefixTransformers(err error, transformers ...PrefixTransformer) Transformer {
if err == nil {
err = fmt.Errorf("the provided value does not match any of the supported transformers")
}
return &prefixTransformers{
transformers: transformers,
err: err,
}
}
// TransformFromStorage finds the first transformer with a prefix matching the provided data and returns
// the result of transforming the value. It will always mark any transformation as stale that is not using
// the first transformer.
func (t *prefixTransformers) TransformFromStorage(data []byte, context Context) ([]byte, bool, error) {
for i, transformer := range t.transformers {
if bytes.HasPrefix(data, transformer.Prefix) {
result, stale, err := transformer.Transformer.TransformFromStorage(data[len(transformer.Prefix):], context)
// To migrate away from encryption, user can specify an identity transformer higher up
// (in the config file) than the encryption transformer. In that scenario, the identity transformer needs to
// identify (during reads from disk) whether the data being read is encrypted or not. If the data is encrypted,
// it shall throw an error, but that error should not prevent subsequent transformers from being tried.
if len(transformer.Prefix) == 0 && err != nil {
continue
}
return result, stale || i != 0, err
}
}
return nil, false, t.err
}
// TransformToStorage uses the first transformer and adds its prefix to the data.
func (t *prefixTransformers) TransformToStorage(data []byte, context Context) ([]byte, error) {
transformer := t.transformers[0]
prefixedData := make([]byte, len(transformer.Prefix), len(data)+len(transformer.Prefix))
copy(prefixedData, transformer.Prefix)
result, err := transformer.Transformer.TransformToStorage(data, context)
if err != nil {
return nil, err
}
prefixedData = append(prefixedData, result...)
return prefixedData, nil
}