Initial commit

This commit is contained in:
jeff
2019-03-07 17:08:54 +08:00
commit 47bf8820f4
2817 changed files with 960937 additions and 0 deletions

201
vendor/sigs.k8s.io/controller-runtime/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -0,0 +1,121 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)
var log = logf.KBLog.WithName("object-cache")
// Cache implements CacheReader by reading objects from a cache populated by InformersMap
type Cache interface {
// Cache implements the client CacheReader
client.Reader
// Cache implements InformersMap
Informers
}
// Informers knows how to create or fetch informers for different group-version-kinds.
// It's safe to call GetInformer from multiple threads.
type Informers interface {
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
// API kind and resource.
GetInformer(obj runtime.Object) (toolscache.SharedIndexInformer, error)
// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(gvk schema.GroupVersionKind) (toolscache.SharedIndexInformer, error)
// Start runs all the informers known to this cache until the given channel is closed.
// It blocks.
Start(stopCh <-chan struct{}) error
// WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache.
WaitForCacheSync(stop <-chan struct{}) bool
// IndexField adds an index with the given field name on the given object type
// by using the given function to extract the value for that field. If you want
// compatibility with the Kubernetes API server, only return one key, and only use
// fields that the API server supports. Otherwise, you can return multiple keys,
// and "equality" in the field selector means that at least one key matches the value.
IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error
}
// Options are the optional arguments for creating a new InformersMap object
type Options struct {
// Scheme is the scheme to use for mapping objects to GroupVersionKinds
Scheme *runtime.Scheme
// Mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources
Mapper meta.RESTMapper
// Resync is the resync period. Defaults to defaultResyncTime.
Resync *time.Duration
// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string
}
var defaultResyncTime = 10 * time.Hour
// New initializes and returns a new Cache
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// Use the default Kubernetes Scheme if unset
if opts.Scheme == nil {
opts.Scheme = scheme.Scheme
}
// Construct a new Mapper if unset
if opts.Mapper == nil {
var err error
opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config)
if err != nil {
log.WithName("setup").Error(err, "Failed to get API Group-Resources")
return opts, fmt.Errorf("could not create RESTMapper from config")
}
}
// Default the resync period to 10 hours if unset
if opts.Resync == nil {
opts.Resync = &defaultResyncTime
}
return opts, nil
}

View File

@@ -0,0 +1,178 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"context"
"fmt"
"reflect"
"strings"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
var (
_ Informers = &informerCache{}
_ client.Reader = &informerCache{}
_ Cache = &informerCache{}
)
// informerCache is a Kubernetes Object cache populated from InformersMap. informerCache wraps an InformersMap.
type informerCache struct {
*internal.InformersMap
}
// Get implements Reader
func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runtime.Object) error {
gvk, err := apiutil.GVKForObject(out, ip.Scheme)
if err != nil {
return err
}
cache, err := ip.InformersMap.Get(gvk, out)
if err != nil {
return err
}
return cache.Reader.Get(ctx, key, out)
}
// List implements Reader
func (ip *informerCache) List(ctx context.Context, opts *client.ListOptions, out runtime.Object) error {
gvk, err := apiutil.GVKForObject(out, ip.Scheme)
if err != nil {
return err
}
if !strings.HasSuffix(gvk.Kind, "List") {
return fmt.Errorf("non-list type %T (kind %q) passed as output", out, gvk)
}
// we need the non-list GVK, so chop off the "List" from the end of the kind
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
_, isUnstructured := out.(*unstructured.UnstructuredList)
var cacheTypeObj runtime.Object
if isUnstructured {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(gvk)
cacheTypeObj = u
} else {
itemsPtr, err := apimeta.GetItemsPtr(out)
if err != nil {
return nil
}
// http://knowyourmeme.com/memes/this-is-fine
elemType := reflect.Indirect(reflect.ValueOf(itemsPtr)).Type().Elem()
cacheTypeValue := reflect.Zero(reflect.PtrTo(elemType))
var ok bool
cacheTypeObj, ok = cacheTypeValue.Interface().(runtime.Object)
if !ok {
return fmt.Errorf("cannot get cache for %T, its element %T is not a runtime.Object", out, cacheTypeValue.Interface())
}
}
cache, err := ip.InformersMap.Get(gvk, cacheTypeObj)
if err != nil {
return err
}
return cache.Reader.List(ctx, opts, out)
}
// GetInformerForKind returns the informer for the GroupVersionKind
func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (cache.SharedIndexInformer, error) {
// Map the gvk to an object
obj, err := ip.Scheme.New(gvk)
if err != nil {
return nil, err
}
i, err := ip.InformersMap.Get(gvk, obj)
if err != nil {
return nil, err
}
return i.Informer, err
}
// GetInformer returns the informer for the obj
func (ip *informerCache) GetInformer(obj runtime.Object) (cache.SharedIndexInformer, error) {
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
if err != nil {
return nil, err
}
i, err := ip.InformersMap.Get(gvk, obj)
if err != nil {
return nil, err
}
return i.Informer, err
}
// IndexField adds an indexer to the underlying cache, using extraction function to get
// value(s) from the given field. This index can then be used by passing a field selector
// to List. For one-to-one compatibility with "normal" field selectors, only return one value.
// The values may be anything. They will automatically be prefixed with the namespace of the
// given object, if present. The objects passed are guaranteed to be objects of the correct type.
func (ip *informerCache) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error {
informer, err := ip.GetInformer(obj)
if err != nil {
return err
}
return indexByField(informer.GetIndexer(), field, extractValue)
}
func indexByField(indexer cache.Indexer, field string, extractor client.IndexerFunc) error {
indexFunc := func(objRaw interface{}) ([]string, error) {
// TODO(directxman12): check if this is the correct type?
obj, isObj := objRaw.(runtime.Object)
if !isObj {
return nil, fmt.Errorf("object of type %T is not an Object", objRaw)
}
meta, err := apimeta.Accessor(obj)
if err != nil {
return nil, err
}
ns := meta.GetNamespace()
rawVals := extractor(obj)
var vals []string
if ns == "" {
// if we're not doubling the keys for the namespaced case, just re-use what was returned to us
vals = rawVals
} else {
// if we need to add non-namespaced versions too, double the length
vals = make([]string, len(rawVals)*2)
}
for i, rawVal := range rawVals {
// save a namespaced variant, so that we can ask
// "what are all the object matching a given index *in a given namespace*"
vals[i] = internal.KeyToNamespacedKey(ns, rawVal)
if ns != "" {
// if we have a namespace, also inject a special index key for listing
// regardless of the object namespace
vals[i+len(rawVals)] = internal.KeyToNamespacedKey("", rawVal)
}
}
return vals, nil
}
return indexer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc})
}

View File

@@ -0,0 +1,187 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"context"
"fmt"
"reflect"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// CacheReader is a CacheReader
var _ client.Reader = &CacheReader{}
// CacheReader wraps a cache.Index to implement the client.CacheReader interface for a single type
type CacheReader struct {
// indexer is the underlying indexer wrapped by this cache.
indexer cache.Indexer
// groupVersionKind is the group-version-kind of the resource.
groupVersionKind schema.GroupVersionKind
}
// Get checks the indexer for the object and writes a copy of it if found
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out runtime.Object) error {
storeKey := objectKeyToStoreKey(key)
// Lookup the object from the indexer cache
obj, exists, err := c.indexer.GetByKey(storeKey)
if err != nil {
return err
}
// Not found, return an error
if !exists {
// Resource gets transformed into Kind in the error anyway, so this is fine
return errors.NewNotFound(schema.GroupResource{
Group: c.groupVersionKind.Group,
Resource: c.groupVersionKind.Kind,
}, key.Name)
}
// Verify the result is a runtime.Object
if _, isObj := obj.(runtime.Object); !isObj {
// This should never happen
return fmt.Errorf("cache contained %T, which is not an Object", obj)
}
// deep copy to avoid mutating cache
// TODO(directxman12): revisit the decision to always deepcopy
obj = obj.(runtime.Object).DeepCopyObject()
// Copy the value of the item in the cache to the returned value
// TODO(directxman12): this is a terrible hack, pls fix (we should have deepcopyinto)
outVal := reflect.ValueOf(out)
objVal := reflect.ValueOf(obj)
if !objVal.Type().AssignableTo(outVal.Type()) {
return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type())
}
reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
out.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
return nil
}
// List lists items out of the indexer and writes them to out
func (c *CacheReader) List(_ context.Context, opts *client.ListOptions, out runtime.Object) error {
var objs []interface{}
var err error
if opts != nil && opts.FieldSelector != nil {
// TODO(directxman12): support more complicated field selectors by
// combining multiple indicies, GetIndexers, etc
field, val, requiresExact := requiresExactMatch(opts.FieldSelector)
if !requiresExact {
return fmt.Errorf("non-exact field matches are not supported by the cache")
}
// list all objects by the field selector. If this is namespaced and we have one, ask for the
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
// namespace.
objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(opts.Namespace, val))
} else if opts != nil && opts.Namespace != "" {
objs, err = c.indexer.ByIndex(cache.NamespaceIndex, opts.Namespace)
} else {
objs = c.indexer.List()
}
if err != nil {
return err
}
var labelSel labels.Selector
if opts != nil && opts.LabelSelector != nil {
labelSel = opts.LabelSelector
}
outItems, err := c.getListItems(objs, labelSel)
if err != nil {
return err
}
return apimeta.SetList(out, outItems)
}
func (c *CacheReader) getListItems(objs []interface{}, labelSel labels.Selector) ([]runtime.Object, error) {
outItems := make([]runtime.Object, 0, len(objs))
for _, item := range objs {
obj, isObj := item.(runtime.Object)
if !isObj {
return nil, fmt.Errorf("cache contained %T, which is not an Object", obj)
}
meta, err := apimeta.Accessor(obj)
if err != nil {
return nil, err
}
if labelSel != nil {
lbls := labels.Set(meta.GetLabels())
if !labelSel.Matches(lbls) {
continue
}
}
outItems = append(outItems, obj.DeepCopyObject())
}
return outItems, nil
}
// objectKeyToStorageKey converts an object key to store key.
// It's akin to MetaNamespaceKeyFunc. It's separate from
// String to allow keeping the key format easily in sync with
// MetaNamespaceKeyFunc.
func objectKeyToStoreKey(k client.ObjectKey) string {
if k.Namespace == "" {
return k.Name
}
return k.Namespace + "/" + k.Name
}
// requiresExactMatch checks if the given field selector is of the form `k=v` or `k==v`.
func requiresExactMatch(sel fields.Selector) (field, val string, required bool) {
reqs := sel.Requirements()
if len(reqs) != 1 {
return "", "", false
}
req := reqs[0]
if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals {
return "", "", false
}
return req.Field, req.Value, true
}
// FieldIndexName constructs the name of the index over the given field,
// for use with an indexer.
func FieldIndexName(field string) string {
return "field:" + field
}
// noNamespaceNamespace is used as the "namespace" when we want to list across all namespaces
const allNamespacesNamespace = "__all_namespaces"
// KeyToNamespacedKey prefixes the given index key with a namespace
// for use in field selector indexes.
func KeyToNamespacedKey(ns string, baseKey string) string {
if ns != "" {
return ns + "/" + baseKey
}
return allNamespacesNamespace + "/" + baseKey
}

View File

@@ -0,0 +1,96 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
// It uses a standard parameter codec constructed based on the given generated Scheme.
type InformersMap struct {
// we abstract over the details of structured vs unstructured with the specificInformerMaps
structured *specificInformersMap
unstructured *specificInformersMap
// Scheme maps runtime.Objects to GroupVersionKinds
Scheme *runtime.Scheme
}
// NewInformersMap creates a new InformersMap that can create informers for
// both structured and unstructured objects.
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
Scheme: scheme,
}
}
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
func (m *InformersMap) Start(stop <-chan struct{}) error {
go m.structured.Start(stop)
go m.unstructured.Start(stop)
<-stop
return nil
}
// WaitForCacheSync waits until all the caches have been synced.
func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
syncedFuncs := append([]cache.InformerSynced(nil), m.structured.HasSyncedFuncs()...)
syncedFuncs = append(syncedFuncs, m.unstructured.HasSyncedFuncs()...)
return cache.WaitForCacheSync(stop, syncedFuncs...)
}
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList
if isUnstructured {
return m.unstructured.Get(gvk, obj)
}
return m.structured.Get(gvk, obj)
}
// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
}
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
}

View File

@@ -0,0 +1,281 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
// clientListWatcherFunc knows how to create a ListWatcher
type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)
// newSpecificInformersMap returns a new specificInformersMap (like
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
func newSpecificInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
createListWatcher: createListWatcher,
namespace: namespace,
}
return ip
}
// MapEntry contains the cached data for an Informer
type MapEntry struct {
// Informer is the cached informer
Informer cache.SharedIndexInformer
// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader
}
// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
// It uses a standard parameter codec constructed based on the given generated Scheme.
type specificInformersMap struct {
// Scheme maps runtime.Objects to GroupVersionKinds
Scheme *runtime.Scheme
// config is used to talk to the apiserver
config *rest.Config
// mapper maps GroupVersionKinds to Resources
mapper meta.RESTMapper
// informersByGVK is the cache of informers keyed by groupVersionKind
informersByGVK map[schema.GroupVersionKind]*MapEntry
// codecs is used to create a new REST client
codecs serializer.CodecFactory
// paramCodec is used by list and watch
paramCodec runtime.ParameterCodec
// stop is the stop channel to stop informers
stop <-chan struct{}
// resync is the frequency the informers are resynced
resync time.Duration
// mu guards access to the map
mu sync.RWMutex
// start is true if the informers have been started
started bool
// createClient knows how to create a client and a list object,
// and allows for abstracting over the particulars of structured vs
// unstructured objects.
createListWatcher createListWatcherFunc
// namespace is the namespace that all ListWatches are restricted to
// default or empty string means all namespaces
namespace string
}
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
// It doesn't return start because it can't return an error, and it's not a runnable directly.
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
func() {
ip.mu.Lock()
defer ip.mu.Unlock()
// Set the stop channel so it can be passed to informers that are added later
ip.stop = stop
// Start each informer
for _, informer := range ip.informersByGVK {
go informer.Informer.Run(stop)
}
// Set started to true so we immediately start any informers added later.
ip.started = true
}()
<-stop
}
// HasSyncedFuncs returns all the HasSynced functions for the informers in this map.
func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
ip.mu.RLock()
defer ip.mu.RUnlock()
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK))
for _, informer := range ip.informersByGVK {
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
}
return syncedFuncs
}
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
// Return the informer if it is found
i, ok := func() (*MapEntry, bool) {
ip.mu.RLock()
defer ip.mu.RUnlock()
i, ok := ip.informersByGVK[gvk]
return i, ok
}()
if ok {
return i, nil
}
// Do the mutex part in its own function so we can use defer without blocking pieces that don't
// need to be locked
var sync bool
i, err := func() (*MapEntry, error) {
ip.mu.Lock()
defer ip.mu.Unlock()
// Check the cache to see if we already have an Informer. If we do, return the Informer.
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
// so neither returned early, but the first one created it.
var ok bool
i, ok := ip.informersByGVK[gvk]
if ok {
return i, nil
}
// Create a NewSharedIndexInformer and add it to the map.
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
if err != nil {
return nil, err
}
ni := cache.NewSharedIndexInformer(lw, obj, ip.resync, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
i = &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
}
ip.informersByGVK[gvk] = i
// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started {
sync = true
go i.Informer.Run(ip.stop)
}
return i, nil
}()
if err != nil {
return nil, err
}
if sync {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
}
}
return i, err
}
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
if err != nil {
return nil, err
}
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
listObj, err := ip.Scheme.New(listGVK)
if err != nil {
return nil, err
}
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
},
}, nil
}
func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(ip.config)
if err != nil {
return nil, err
}
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
}
return dynamicClient.Resource(mapping.Resource).List(opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
}
return dynamicClient.Resource(mapping.Resource).Watch(opts)
},
}, nil
}

View File

@@ -0,0 +1,88 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiutil
import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
)
// NewDiscoveryRESTMapper constructs a new RESTMapper based on discovery
// information fetched by a new client with the given config.
func NewDiscoveryRESTMapper(c *rest.Config) (meta.RESTMapper, error) {
// Get a mapper
dc := discovery.NewDiscoveryClientForConfigOrDie(c)
gr, err := restmapper.GetAPIGroupResources(dc)
if err != nil {
return nil, err
}
return restmapper.NewDiscoveryRESTMapper(gr), nil
}
// GVKForObject finds the GroupVersionKind associated with the given object, if there is only a single such GVK.
func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersionKind, error) {
gvks, isUnversioned, err := scheme.ObjectKinds(obj)
if err != nil {
return schema.GroupVersionKind{}, err
}
if isUnversioned {
return schema.GroupVersionKind{}, fmt.Errorf("cannot create a new informer for the unversioned type %T", obj)
}
if len(gvks) < 1 {
return schema.GroupVersionKind{}, fmt.Errorf("no group-version-kinds associated with type %T", obj)
}
if len(gvks) > 1 {
// this should only trigger for things like metav1.XYZ --
// normal versioned types should be fine
return schema.GroupVersionKind{}, fmt.Errorf(
"multiple group-version-kinds associated with type %T, refusing to guess at one", obj)
}
return gvks[0], nil
}
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
// with the given GroupVersionKind.
func RESTClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
cfg := createRestConfig(gvk, baseConfig)
cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: codecs}
return rest.RESTClientFor(cfg)
}
//createRestConfig copies the base config and updates needed fields for a new rest config
func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *rest.Config {
gv := gvk.GroupVersion()
cfg := rest.CopyConfig(baseConfig)
cfg.GroupVersion = &gv
if gvk.Group == "" {
cfg.APIPath = "/api"
} else {
cfg.APIPath = "/apis"
}
if cfg.UserAgent == "" {
cfg.UserAgent = rest.DefaultKubernetesUserAgent()
}
return cfg
}

View File

@@ -0,0 +1,162 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"fmt"
"reflect"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
// Options are creation options for a Client
type Options struct {
// Scheme, if provided, will be used to map go structs to GroupVersionKinds
Scheme *runtime.Scheme
// Mapper, if provided, will be used to map GroupVersionKinds to Resources
Mapper meta.RESTMapper
}
// New returns a new Client using the provided config and Options.
func New(config *rest.Config, options Options) (Client, error) {
if config == nil {
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
}
// Init a scheme if none provided
if options.Scheme == nil {
options.Scheme = scheme.Scheme
}
// Init a Mapper if none provided
if options.Mapper == nil {
var err error
options.Mapper, err = apiutil.NewDiscoveryRESTMapper(config)
if err != nil {
return nil, err
}
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
c := &client{
typedClient: typedClient{
cache: clientCache{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
resourceByType: make(map[reflect.Type]*resourceMeta),
},
paramCodec: runtime.NewParameterCodec(options.Scheme),
},
unstructuredClient: unstructuredClient{
client: dynamicClient,
restMapper: options.Mapper,
},
}
return c, nil
}
var _ Client = &client{}
// client is a client.Client that reads and writes directly from/to an API server. It lazily initializes
// new clients at the time they are used, and caches the client.
type client struct {
typedClient typedClient
unstructuredClient unstructuredClient
}
// Create implements client.Client
func (c *client) Create(ctx context.Context, obj runtime.Object) error {
_, ok := obj.(*unstructured.Unstructured)
if ok {
return c.unstructuredClient.Create(ctx, obj)
}
return c.typedClient.Create(ctx, obj)
}
// Update implements client.Client
func (c *client) Update(ctx context.Context, obj runtime.Object) error {
_, ok := obj.(*unstructured.Unstructured)
if ok {
return c.unstructuredClient.Update(ctx, obj)
}
return c.typedClient.Update(ctx, obj)
}
// Delete implements client.Client
func (c *client) Delete(ctx context.Context, obj runtime.Object, opts ...DeleteOptionFunc) error {
_, ok := obj.(*unstructured.Unstructured)
if ok {
return c.unstructuredClient.Delete(ctx, obj, opts...)
}
return c.typedClient.Delete(ctx, obj, opts...)
}
// Get implements client.Client
func (c *client) Get(ctx context.Context, key ObjectKey, obj runtime.Object) error {
_, ok := obj.(*unstructured.Unstructured)
if ok {
return c.unstructuredClient.Get(ctx, key, obj)
}
return c.typedClient.Get(ctx, key, obj)
}
// List implements client.Client
func (c *client) List(ctx context.Context, opts *ListOptions, obj runtime.Object) error {
_, ok := obj.(*unstructured.UnstructuredList)
if ok {
return c.unstructuredClient.List(ctx, opts, obj)
}
return c.typedClient.List(ctx, opts, obj)
}
// Status implements client.StatusClient
func (c *client) Status() StatusWriter {
return &statusWriter{client: c}
}
// statusWriter is client.StatusWriter that writes status subresource
type statusWriter struct {
client *client
}
// ensure statusWriter implements client.StatusWriter
var _ StatusWriter = &statusWriter{}
// Update implements client.StatusWriter
func (sw *statusWriter) Update(ctx context.Context, obj runtime.Object) error {
_, ok := obj.(*unstructured.Unstructured)
if ok {
return sw.client.unstructuredClient.UpdateStatus(ctx, obj)
}
return sw.client.typedClient.UpdateStatus(ctx, obj)
}

View File

@@ -0,0 +1,145 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"reflect"
"strings"
"sync"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
// clientCache creates and caches rest clients and metadata for Kubernetes types
type clientCache struct {
// config is the rest.Config to talk to an apiserver
config *rest.Config
// scheme maps go structs to GroupVersionKinds
scheme *runtime.Scheme
// mapper maps GroupVersionKinds to Resources
mapper meta.RESTMapper
// codecs are used to create a REST client for a gvk
codecs serializer.CodecFactory
// resourceByType caches type metadata
resourceByType map[reflect.Type]*resourceMeta
mu sync.RWMutex
}
// newResource maps obj to a Kubernetes Resource and constructs a client for that Resource.
// If the object is a list, the resource represents the item's type instead.
func (c *clientCache) newResource(obj runtime.Object) (*resourceMeta, error) {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return nil, err
}
if strings.HasSuffix(gvk.Kind, "List") && meta.IsListType(obj) {
// if this was a list, treat it as a request for the item's resource
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}
client, err := apiutil.RESTClientForGVK(gvk, c.config, c.codecs)
if err != nil {
return nil, err
}
mapping, err := c.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
return &resourceMeta{Interface: client, mapping: mapping, gvk: gvk}, nil
}
// getResource returns the resource meta information for the given type of object.
// If the object is a list, the resource represents the item's type instead.
func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
typ := reflect.TypeOf(obj)
// It's better to do creation work twice than to not let multiple
// people make requests at once
c.mu.RLock()
r, known := c.resourceByType[typ]
c.mu.RUnlock()
if known {
return r, nil
}
// Initialize a new Client
c.mu.Lock()
defer c.mu.Unlock()
r, err := c.newResource(obj)
if err != nil {
return nil, err
}
c.resourceByType[typ] = r
return r, err
}
// getObjMeta returns objMeta containing both type and object metadata and state
func (c *clientCache) getObjMeta(obj runtime.Object) (*objMeta, error) {
r, err := c.getResource(obj)
if err != nil {
return nil, err
}
m, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
return &objMeta{resourceMeta: r, Object: m}, err
}
// resourceMeta caches state for a Kubernetes type.
type resourceMeta struct {
// client is the rest client used to talk to the apiserver
rest.Interface
// gvk is the GroupVersionKind of the resourceMeta
gvk schema.GroupVersionKind
// mapping is the rest mapping
mapping *meta.RESTMapping
}
// isNamespaced returns true if the type is namespaced
func (r *resourceMeta) isNamespaced() bool {
if r.mapping.Scope.Name() == meta.RESTScopeNameRoot {
return false
}
return true
}
// resource returns the resource name of the type
func (r *resourceMeta) resource() string {
return r.mapping.Resource.Resource
}
// objMeta stores type and object information about a Kubernetes type
type objMeta struct {
// resourceMeta contains type information for the object
*resourceMeta
// Object contains meta data for the object instance
v1.Object
}

View File

@@ -0,0 +1,94 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"flag"
"fmt"
"os"
"os/user"
"path/filepath"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)
var (
kubeconfig, masterURL string
log = logf.KBLog.WithName("client").WithName("config")
)
func init() {
// TODO: Fix this to allow double vendoring this library but still register flags on behalf of users
flag.StringVar(&kubeconfig, "kubeconfig", "",
"Paths to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "",
"The address of the Kubernetes API server. Overrides any value in kubeconfig. "+
"Only required if out-of-cluster.")
}
// GetConfig creates a *rest.Config for talking to a Kubernetes apiserver.
// If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running
// in cluster and use the cluster provided kubeconfig.
//
// Config precedence
//
// * --kubeconfig flag pointing at a file
//
// * KUBECONFIG environment variable pointing at a file
//
// * In-cluster config if running in cluster
//
// * $HOME/.kube/config if exists
func GetConfig() (*rest.Config, error) {
// If a flag is specified with the config location, use that
if len(kubeconfig) > 0 {
return clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
}
// If an env variable is specified with the config locaiton, use that
if len(os.Getenv("KUBECONFIG")) > 0 {
return clientcmd.BuildConfigFromFlags(masterURL, os.Getenv("KUBECONFIG"))
}
// If no explicit location, try the in-cluster config
if c, err := rest.InClusterConfig(); err == nil {
return c, nil
}
// If no in-cluster config, try the default location in the user's home directory
if usr, err := user.Current(); err == nil {
if c, err := clientcmd.BuildConfigFromFlags(
"", filepath.Join(usr.HomeDir, ".kube", "config")); err == nil {
return c, nil
}
}
return nil, fmt.Errorf("could not locate a kubeconfig")
}
// GetConfigOrDie creates a *rest.Config for talking to a Kubernetes apiserver.
// If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running
// in cluster and use the cluster provided kubeconfig.
//
// Will log an error and exit if there is an error creating the rest.Config.
func GetConfigOrDie() *rest.Config {
config, err := GetConfig()
if err != nil {
log.Error(err, "unable to get kubeconfig")
}
return config
}

View File

@@ -0,0 +1,18 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package config contains libraries for initializing rest configs for talking to the Kubernetes API
package config

View File

@@ -0,0 +1,292 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
)
// ObjectKey identifies a Kubernetes Object.
type ObjectKey = types.NamespacedName
// ObjectKeyFromObject returns the ObjectKey given a runtime.Object
func ObjectKeyFromObject(obj runtime.Object) (ObjectKey, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return ObjectKey{}, err
}
return ObjectKey{Namespace: accessor.GetNamespace(), Name: accessor.GetName()}, nil
}
// TODO(directxman12): is there a sane way to deal with get/delete options?
// Reader knows how to read and list Kubernetes objects.
type Reader interface {
// Get retrieves an obj for the given object key from the Kubernetes Cluster.
// obj must be a struct pointer so that obj can be updated with the response
// returned by the Server.
Get(ctx context.Context, key ObjectKey, obj runtime.Object) error
// List retrieves list of objects for a given namespace and list options. On a
// successful call, Items field in the list will be populated with the
// result returned from the server.
List(ctx context.Context, opts *ListOptions, list runtime.Object) error
}
// Writer knows how to create, delete, and update Kubernetes objects.
type Writer interface {
// Create saves the object obj in the Kubernetes cluster.
Create(ctx context.Context, obj runtime.Object) error
// Delete deletes the given obj from Kubernetes cluster.
Delete(ctx context.Context, obj runtime.Object, opts ...DeleteOptionFunc) error
// Update updates the given obj in the Kubernetes cluster. obj must be a
// struct pointer so that obj can be updated with the content returned by the Server.
Update(ctx context.Context, obj runtime.Object) error
}
// StatusClient knows how to create a client which can update status subresource
// for kubernetes objects.
type StatusClient interface {
Status() StatusWriter
}
// StatusWriter knows how to update status subresource of a Kubernetes object.
type StatusWriter interface {
// Update updates the fields corresponding to the status subresource for the
// given obj. obj must be a struct pointer so that obj can be updated
// with the content returned by the Server.
Update(ctx context.Context, obj runtime.Object) error
}
// Client knows how to perform CRUD operations on Kubernetes objects.
type Client interface {
Reader
Writer
StatusClient
}
// IndexerFunc knows how to take an object and turn it into a series
// of (non-namespaced) keys for that object.
type IndexerFunc func(runtime.Object) []string
// FieldIndexer knows how to index over a particular "field" such that it
// can later be used by a field selector.
type FieldIndexer interface {
// IndexFields adds an index with the given field name on the given object type
// by using the given function to extract the value for that field. If you want
// compatibility with the Kubernetes API server, only return one key, and only use
// fields that the API server supports. Otherwise, you can return multiple keys,
// and "equality" in the field selector means that at least one key matches the value.
IndexField(obj runtime.Object, field string, extractValue IndexerFunc) error
}
// DeleteOptions contains options for delete requests. It's generally a subset
// of metav1.DeleteOptions.
type DeleteOptions struct {
// GracePeriodSeconds is the duration in seconds before the object should be
// deleted. Value must be non-negative integer. The value zero indicates
// delete immediately. If this value is nil, the default grace period for the
// specified type will be used.
GracePeriodSeconds *int64
// Preconditions must be fulfilled before a deletion is carried out. If not
// possible, a 409 Conflict status will be returned.
Preconditions *metav1.Preconditions
// PropagationPolicy determined whether and how garbage collection will be
// performed. Either this field or OrphanDependents may be set, but not both.
// The default policy is decided by the existing finalizer set in the
// metadata.finalizers and the resource-specific default policy.
// Acceptable values are: 'Orphan' - orphan the dependents; 'Background' -
// allow the garbage collector to delete the dependents in the background;
// 'Foreground' - a cascading policy that deletes all dependents in the
// foreground.
PropagationPolicy *metav1.DeletionPropagation
// Raw represents raw DeleteOptions, as passed to the API server.
Raw *metav1.DeleteOptions
}
// AsDeleteOptions returns these options as a metav1.DeleteOptions.
// This may mutate the Raw field.
func (o *DeleteOptions) AsDeleteOptions() *metav1.DeleteOptions {
if o == nil {
return &metav1.DeleteOptions{}
}
if o.Raw == nil {
o.Raw = &metav1.DeleteOptions{}
}
o.Raw.GracePeriodSeconds = o.GracePeriodSeconds
o.Raw.Preconditions = o.Preconditions
o.Raw.PropagationPolicy = o.PropagationPolicy
return o.Raw
}
// ApplyOptions executes the given DeleteOptionFuncs and returns the mutated
// DeleteOptions.
func (o *DeleteOptions) ApplyOptions(optFuncs []DeleteOptionFunc) *DeleteOptions {
for _, optFunc := range optFuncs {
optFunc(o)
}
return o
}
// DeleteOptionFunc is a function that mutates a DeleteOptions struct. It implements
// the functional options pattern. See
// https://github.com/tmrts/go-patterns/blob/master/idiom/functional-options.md.
type DeleteOptionFunc func(*DeleteOptions)
// GracePeriodSeconds is a functional option that sets the GracePeriodSeconds
// field of a DeleteOptions struct.
func GracePeriodSeconds(gp int64) DeleteOptionFunc {
return func(opts *DeleteOptions) {
opts.GracePeriodSeconds = &gp
}
}
// Preconditions is a functional option that sets the Preconditions field of a
// DeleteOptions struct.
func Preconditions(p *metav1.Preconditions) DeleteOptionFunc {
return func(opts *DeleteOptions) {
opts.Preconditions = p
}
}
// PropagationPolicy is a functional option that sets the PropagationPolicy
// field of a DeleteOptions struct.
func PropagationPolicy(p metav1.DeletionPropagation) DeleteOptionFunc {
return func(opts *DeleteOptions) {
opts.PropagationPolicy = &p
}
}
// ListOptions contains options for limitting or filtering results.
// It's generally a subset of metav1.ListOptions, with support for
// pre-parsed selectors (since generally, selectors will be executed
// against the cache).
type ListOptions struct {
// LabelSelector filters results by label. Use SetLabelSelector to
// set from raw string form.
LabelSelector labels.Selector
// FieldSelector filters results by a particular field. In order
// to use this with cache-based implementations, restrict usage to
// a single field-value pair that's been added to the indexers.
FieldSelector fields.Selector
// Namespace represents the namespace to list for, or empty for
// non-namespaced objects, or to list across all namespaces.
Namespace string
// Raw represents raw ListOptions, as passed to the API server. Note
// that these may not be respected by all implementations of interface,
// and the LabelSelector and FieldSelector fields are ignored.
Raw *metav1.ListOptions
}
// SetLabelSelector sets this the label selector of these options
// from a string form of the selector.
func (o *ListOptions) SetLabelSelector(selRaw string) error {
sel, err := labels.Parse(selRaw)
if err != nil {
return err
}
o.LabelSelector = sel
return nil
}
// SetFieldSelector sets this the label selector of these options
// from a string form of the selector.
func (o *ListOptions) SetFieldSelector(selRaw string) error {
sel, err := fields.ParseSelector(selRaw)
if err != nil {
return err
}
o.FieldSelector = sel
return nil
}
// AsListOptions returns these options as a flattened metav1.ListOptions.
// This may mutate the Raw field.
func (o *ListOptions) AsListOptions() *metav1.ListOptions {
if o == nil {
return &metav1.ListOptions{}
}
if o.Raw == nil {
o.Raw = &metav1.ListOptions{}
}
if o.LabelSelector != nil {
o.Raw.LabelSelector = o.LabelSelector.String()
}
if o.FieldSelector != nil {
o.Raw.FieldSelector = o.FieldSelector.String()
}
return o.Raw
}
// MatchingLabels is a convenience function that sets the label selector
// to match the given labels, and then returns the options.
// It mutates the list options.
func (o *ListOptions) MatchingLabels(lbls map[string]string) *ListOptions {
sel := labels.SelectorFromSet(lbls)
o.LabelSelector = sel
return o
}
// MatchingField is a convenience function that sets the field selector
// to match the given field, and then returns the options.
// It mutates the list options.
func (o *ListOptions) MatchingField(name, val string) *ListOptions {
sel := fields.SelectorFromSet(fields.Set{name: val})
o.FieldSelector = sel
return o
}
// InNamespace is a convenience function that sets the namespace,
// and then returns the options. It mutates the list options.
func (o *ListOptions) InNamespace(ns string) *ListOptions {
o.Namespace = ns
return o
}
// MatchingLabels is a convenience function that constructs list options
// to match the given labels.
func MatchingLabels(lbls map[string]string) *ListOptions {
return (&ListOptions{}).MatchingLabels(lbls)
}
// MatchingField is a convenience function that constructs list options
// to match the given field.
func MatchingField(name, val string) *ListOptions {
return (&ListOptions{}).MatchingField(name, val)
}
// InNamespace is a convenience function that constructs list
// options to list in the given namespace.
func InNamespace(ns string) *ListOptions {
return (&ListOptions{}).InNamespace(ns)
}

View File

@@ -0,0 +1,59 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)
// DelegatingClient forms an interface Client by composing separate
// reader, writer and statusclient interfaces. This way, you can have an Client that
// reads from a cache and writes to the API server.
type DelegatingClient struct {
Reader
Writer
StatusClient
}
// DelegatingReader forms a interface Reader that will cause Get and List
// requests for unstructured types to use the ClientReader while
// requests for any other type of object with use the CacheReader.
type DelegatingReader struct {
CacheReader Reader
ClientReader Reader
}
// Get retrieves an obj for a given object key from the Kubernetes Cluster.
func (d *DelegatingReader) Get(ctx context.Context, key ObjectKey, obj runtime.Object) error {
_, isUnstructured := obj.(*unstructured.Unstructured)
if isUnstructured {
return d.ClientReader.Get(ctx, key, obj)
}
return d.CacheReader.Get(ctx, key, obj)
}
// List retrieves list of objects for a given namespace and list options.
func (d *DelegatingReader) List(ctx context.Context, opts *ListOptions, list runtime.Object) error {
_, isUnstructured := list.(*unstructured.UnstructuredList)
if isUnstructured {
return d.ClientReader.List(ctx, opts, list)
}
return d.CacheReader.List(ctx, opts, list)
}

View File

@@ -0,0 +1,133 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
)
// client is a client.Client that reads and writes directly from/to an API server. It lazily initializes
// new clients at the time they are used, and caches the client.
type typedClient struct {
cache clientCache
paramCodec runtime.ParameterCodec
}
// Create implements client.Client
func (c *typedClient) Create(ctx context.Context, obj runtime.Object) error {
o, err := c.cache.getObjMeta(obj)
if err != nil {
return err
}
return o.Post().
NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()).
Resource(o.resource()).
Body(obj).
Context(ctx).
Do().
Into(obj)
}
// Update implements client.Client
func (c *typedClient) Update(ctx context.Context, obj runtime.Object) error {
o, err := c.cache.getObjMeta(obj)
if err != nil {
return err
}
return o.Put().
NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()).
Resource(o.resource()).
Name(o.GetName()).
Body(obj).
Context(ctx).
Do().
Into(obj)
}
// Delete implements client.Client
func (c *typedClient) Delete(ctx context.Context, obj runtime.Object, opts ...DeleteOptionFunc) error {
o, err := c.cache.getObjMeta(obj)
if err != nil {
return err
}
deleteOpts := DeleteOptions{}
return o.Delete().
NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()).
Resource(o.resource()).
Name(o.GetName()).
Body(deleteOpts.ApplyOptions(opts).AsDeleteOptions()).
Context(ctx).
Do().
Error()
}
// Get implements client.Client
func (c *typedClient) Get(ctx context.Context, key ObjectKey, obj runtime.Object) error {
r, err := c.cache.getResource(obj)
if err != nil {
return err
}
return r.Get().
NamespaceIfScoped(key.Namespace, r.isNamespaced()).
Resource(r.resource()).
Context(ctx).
Name(key.Name).Do().Into(obj)
}
// List implements client.Client
func (c *typedClient) List(ctx context.Context, opts *ListOptions, obj runtime.Object) error {
r, err := c.cache.getResource(obj)
if err != nil {
return err
}
namespace := ""
if opts != nil {
namespace = opts.Namespace
}
return r.Get().
NamespaceIfScoped(namespace, r.isNamespaced()).
Resource(r.resource()).
Body(obj).
VersionedParams(opts.AsListOptions(), c.paramCodec).
Context(ctx).
Do().
Into(obj)
}
// UpdateStatus used by StatusWriter to write status.
func (c *typedClient) UpdateStatus(ctx context.Context, obj runtime.Object) error {
o, err := c.cache.getObjMeta(obj)
if err != nil {
return err
}
// TODO(droot): examine the returned error and check if it error needs to be
// wrapped to improve the UX ?
// It will be nice to receive an error saying the object doesn't implement
// status subresource and check CRD definition
return o.Put().
NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()).
Resource(o.resource()).
Name(o.GetName()).
SubResource("status").
Body(obj).
Context(ctx).
Do().
Into(obj)
}

View File

@@ -0,0 +1,162 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"fmt"
"strings"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)
// client is a client.Client that reads and writes directly from/to an API server. It lazily initializes
// new clients at the time they are used, and caches the client.
type unstructuredClient struct {
client dynamic.Interface
restMapper meta.RESTMapper
}
// Create implements client.Client
func (uc *unstructuredClient) Create(_ context.Context, obj runtime.Object) error {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("unstructured client did not understand object: %T", obj)
}
r, err := uc.getResourceInterface(u.GroupVersionKind(), u.GetNamespace())
if err != nil {
return err
}
i, err := r.Create(u, metav1.CreateOptions{})
if err != nil {
return err
}
u.Object = i.Object
return nil
}
// Update implements client.Client
func (uc *unstructuredClient) Update(_ context.Context, obj runtime.Object) error {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("unstructured client did not understand object: %T", obj)
}
r, err := uc.getResourceInterface(u.GroupVersionKind(), u.GetNamespace())
if err != nil {
return err
}
i, err := r.Update(u, metav1.UpdateOptions{})
if err != nil {
return err
}
u.Object = i.Object
return nil
}
// Delete implements client.Client
func (uc *unstructuredClient) Delete(_ context.Context, obj runtime.Object, opts ...DeleteOptionFunc) error {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("unstructured client did not understand object: %T", obj)
}
r, err := uc.getResourceInterface(u.GroupVersionKind(), u.GetNamespace())
if err != nil {
return err
}
deleteOpts := DeleteOptions{}
err = r.Delete(u.GetName(), deleteOpts.ApplyOptions(opts).AsDeleteOptions())
return err
}
// Get implements client.Client
func (uc *unstructuredClient) Get(_ context.Context, key ObjectKey, obj runtime.Object) error {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("unstructured client did not understand object: %T", obj)
}
r, err := uc.getResourceInterface(u.GroupVersionKind(), key.Namespace)
if err != nil {
return err
}
i, err := r.Get(key.Name, metav1.GetOptions{})
if err != nil {
return err
}
u.Object = i.Object
return nil
}
// List implements client.Client
func (uc *unstructuredClient) List(_ context.Context, opts *ListOptions, obj runtime.Object) error {
u, ok := obj.(*unstructured.UnstructuredList)
if !ok {
return fmt.Errorf("unstructured client did not understand object: %T", obj)
}
gvk := u.GroupVersionKind()
if strings.HasSuffix(gvk.Kind, "List") {
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}
namespace := ""
if opts != nil {
namespace = opts.Namespace
}
r, err := uc.getResourceInterface(gvk, namespace)
if err != nil {
return err
}
i, err := r.List(*opts.AsListOptions())
if err != nil {
return err
}
u.Items = i.Items
u.Object = i.Object
return nil
}
func (uc *unstructuredClient) UpdateStatus(_ context.Context, obj runtime.Object) error {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("unstructured client did not understand object: %T", obj)
}
r, err := uc.getResourceInterface(u.GroupVersionKind(), u.GetNamespace())
if err != nil {
return err
}
i, err := r.UpdateStatus(u, metav1.UpdateOptions{})
if err != nil {
return err
}
u.Object = i.Object
return nil
}
func (uc *unstructuredClient) getResourceInterface(gvk schema.GroupVersionKind, ns string) (dynamic.ResourceInterface, error) {
mapping, err := uc.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
if mapping.Scope.Name() == meta.RESTScopeNameRoot {
return uc.client.Resource(mapping.Resource), nil
}
return uc.client.Resource(mapping.Resource).Namespace(ns), nil
}

View File

@@ -0,0 +1,61 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package recorder
import (
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/recorder"
)
type provider struct {
// scheme to specify when creating a recorder
scheme *runtime.Scheme
// eventBroadcaster to create new recorder instance
eventBroadcaster record.EventBroadcaster
// logger is the logger to use when logging diagnostic event info
logger logr.Logger
}
// NewProvider create a new Provider instance.
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error) {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to init clientSet: %v", err)
}
p := &provider{scheme: scheme, logger: logger}
p.eventBroadcaster = record.NewBroadcaster()
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
p.eventBroadcaster.StartEventWatcher(
func(e *corev1.Event) {
p.logger.V(1).Info(e.Type, "object", e.InvolvedObject, "reason", e.Reason, "message", e.Message)
})
return p, nil
}
func (p *provider) GetEventRecorderFor(name string) record.EventRecorder {
return p.eventBroadcaster.NewRecorder(p.scheme, corev1.EventSource{Component: name})
}

View File

@@ -0,0 +1,20 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package leaderelection contains a constructors for a leader election resource lock
*/
package leaderelection

View File

@@ -0,0 +1,109 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"fmt"
"io/ioutil"
"os"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"sigs.k8s.io/controller-runtime/pkg/recorder"
)
const inClusterNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
// Options provides the required configuration to create a new resource lock
type Options struct {
// LeaderElection determines whether or not to use leader election when
// starting the manager.
LeaderElection bool
// LeaderElectionNamespace determines the namespace in which the leader
// election configmap will be created.
LeaderElectionNamespace string
// LeaderElectionID determines the name of the configmap that leader election
// will use for holding the leader lock.
LeaderElectionID string
}
// NewResourceLock creates a new config map resource lock for use in a leader
// election loop
func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, options Options) (resourcelock.Interface, error) {
if !options.LeaderElection {
return nil, nil
}
// Default the LeaderElectionID
if options.LeaderElectionID == "" {
options.LeaderElectionID = "controller-leader-election-helper"
}
// Default the namespace (if running in cluster)
if options.LeaderElectionNamespace == "" {
var err error
options.LeaderElectionNamespace, err = getInClusterNamespace()
if err != nil {
return nil, fmt.Errorf("unable to find leader election namespace: %v", err)
}
}
// Leader id, needs to be unique
id, err := os.Hostname()
if err != nil {
return nil, err
}
id = id + "_" + string(uuid.NewUUID())
// Construct client for leader election
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
// TODO(JoelSpeed): switch to leaderelection object in 1.12
return resourcelock.New(resourcelock.ConfigMapsResourceLock,
options.LeaderElectionNamespace,
options.LeaderElectionID,
client.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorderProvider.GetEventRecorderFor(id),
})
}
func getInClusterNamespace() (string, error) {
// Check whether the namespace file exists.
// If not, we are not running in cluster so can't guess the namespace.
_, err := os.Stat(inClusterNamespacePath)
if os.IsNotExist(err) {
return "", fmt.Errorf("not running in-cluster, please specify LeaderElectionNamespace")
} else if err != nil {
return "", fmt.Errorf("error checking namespace file: %v", err)
}
// Load the namespace file and return itss content
namespace, err := ioutil.ReadFile(inClusterNamespacePath)
if err != nil {
return "", fmt.Errorf("error reading namespace file: %v", err)
}
return string(namespace), nil
}

View File

@@ -0,0 +1,21 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package manager is required to create Controllers and provides shared dependencies such as clients, caches, schemes,
etc. Controllers must be started by calling Manager.Start.
*/
package manager

View File

@@ -0,0 +1,291 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package manager
import (
"context"
"fmt"
"net"
"net/http"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/recorder"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission/types"
)
var log = logf.KBLog.WithName("manager")
type controllerManager struct {
// config is the rest.config used to talk to the apiserver. Required.
config *rest.Config
// scheme is the scheme injected into Controllers, EventHandlers, Sources and Predicates. Defaults
// to scheme.scheme.
scheme *runtime.Scheme
// admissionDecoder is used to decode an admission.Request.
admissionDecoder types.Decoder
// runnables is the set of Controllers that the controllerManager injects deps into and Starts.
runnables []Runnable
cache cache.Cache
// TODO(directxman12): Provide an escape hatch to get individual indexers
// client is the client injected into Controllers (and EventHandlers, Sources and Predicates).
client client.Client
// fieldIndexes knows how to add field indexes over the Cache used by this controller,
// which can later be consumed via field selectors from the injected client.
fieldIndexes client.FieldIndexer
// recorderProvider is used to generate event recorders that will be injected into Controllers
// (and EventHandlers, Sources and Predicates).
recorderProvider recorder.Provider
// resourceLock forms the basis for leader election
resourceLock resourcelock.Interface
// mapper is used to map resources to kind, and map kind and version.
mapper meta.RESTMapper
// metricsListener is used to serve prometheus metrics
metricsListener net.Listener
mu sync.Mutex
started bool
errChan chan error
// internalStop is the stop channel *actually* used by everything involved
// with the manager as a stop channel, so that we can pass a stop channel
// to things that need it off the bat (like the Channel source). It can
// be closed via `internalStopper` (by being the same underlying channel).
internalStop <-chan struct{}
// internalStopper is the write side of the internal stop channel, allowing us to close it.
// It and `internalStop` should point to the same channel.
internalStopper chan<- struct{}
startCache func(stop <-chan struct{}) error
}
// Add sets dependencies on i, and adds it to the list of runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()
// Set dependencies on the object
if err := cm.SetFields(r); err != nil {
return err
}
// Add the runnable to the list
cm.runnables = append(cm.runnables, r)
if cm.started {
// If already started, start the controller
go func() {
cm.errChan <- r.Start(cm.internalStop)
}()
}
return nil
}
func (cm *controllerManager) SetFields(i interface{}) error {
if _, err := inject.ConfigInto(cm.config, i); err != nil {
return err
}
if _, err := inject.ClientInto(cm.client, i); err != nil {
return err
}
if _, err := inject.SchemeInto(cm.scheme, i); err != nil {
return err
}
if _, err := inject.CacheInto(cm.cache, i); err != nil {
return err
}
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
return err
}
if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil {
return err
}
if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil {
return err
}
return nil
}
func (cm *controllerManager) GetConfig() *rest.Config {
return cm.config
}
func (cm *controllerManager) GetClient() client.Client {
return cm.client
}
func (cm *controllerManager) GetScheme() *runtime.Scheme {
return cm.scheme
}
func (cm *controllerManager) GetAdmissionDecoder() types.Decoder {
return cm.admissionDecoder
}
func (cm *controllerManager) GetFieldIndexer() client.FieldIndexer {
return cm.fieldIndexes
}
func (cm *controllerManager) GetCache() cache.Cache {
return cm.cache
}
func (cm *controllerManager) GetRecorder(name string) record.EventRecorder {
return cm.recorderProvider.GetEventRecorderFor(name)
}
func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
return cm.mapper
}
func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
ErrorHandling: promhttp.HTTPErrorOnError,
})
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
mux := http.NewServeMux()
mux.Handle("/metrics", handler)
server := http.Server{
Handler: mux,
}
// Run the server
go func() {
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
cm.errChan <- err
}
}()
// Shutdown the server when stop is closed
select {
case <-stop:
if err := server.Shutdown(context.Background()); err != nil {
cm.errChan <- err
}
}
}
func (cm *controllerManager) Start(stop <-chan struct{}) error {
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
defer close(cm.internalStopper)
// Metrics should be served whether the controller is leader or not.
// (If we don't serve metrics for non-leaders, prometheus will still scrape
// the pod but will get a connection refused)
if cm.metricsListener != nil {
go cm.serveMetrics(cm.internalStop)
}
if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
return err
}
} else {
go cm.start()
}
select {
case <-stop:
// We are done
return nil
case err := <-cm.errChan:
// Error starting a controller
return err
}
}
func (cm *controllerManager) start() {
cm.mu.Lock()
defer cm.mu.Unlock()
// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errChan <- err
}
}()
// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)
// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}
cm.started = true
}
func (cm *controllerManager) startLeaderElection() (err error) {
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
// TODO(joelspeed): These timings should be configurable
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
cm.start()
},
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
// an error here which will cause the program to exit.
cm.errChan <- fmt.Errorf("leader election lost")
},
},
})
if err != nil {
return err
}
// Start the leader elector process
go l.Run(context.Background())
return nil
}

View File

@@ -0,0 +1,269 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package manager
import (
"fmt"
"net"
"time"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
internalrecorder "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/recorder"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission/types"
)
// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
// A Manager is required to create Controllers.
type Manager interface {
// Add will set reqeusted dependencies on the component, and cause the component to be
// started when Start is called. Add will inject any dependencies for which the argument
// implements the inject interface - e.g. inject.Client
Add(Runnable) error
// SetFields will set any dependencies on an object for which the object has implemented the inject
// interface - e.g. inject.Client.
SetFields(interface{}) error
// Start starts all registered Controllers and blocks until the Stop channel is closed.
// Returns an error if there is an error starting any controller.
Start(<-chan struct{}) error
// GetConfig returns an initialized Config
GetConfig() *rest.Config
// GetScheme returns and initialized Scheme
GetScheme() *runtime.Scheme
// GetAdmissionDecoder returns the runtime.Decoder based on the scheme.
GetAdmissionDecoder() types.Decoder
// GetClient returns a client configured with the Config
GetClient() client.Client
// GetFieldIndexer returns a client.FieldIndexer configured with the client
GetFieldIndexer() client.FieldIndexer
// GetCache returns a cache.Cache
GetCache() cache.Cache
// GetRecorder returns a new EventRecorder for the provided name
GetRecorder(name string) record.EventRecorder
// GetRESTMapper returns a RESTMapper
GetRESTMapper() meta.RESTMapper
}
// Options are the arguments for creating a new Manager
type Options struct {
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources
// Defaults to the kubernetes/client-go scheme.Scheme
Scheme *runtime.Scheme
// MapperProvider provides the rest mapper used to map go types to Kubernetes APIs
MapperProvider func(c *rest.Config) (meta.RESTMapper, error)
// SyncPeriod determines the minimum frequency at which watched resources are
// reconciled. A lower period will correct entropy more quickly, but reduce
// responsiveness to change if there are many watched resources. Change this
// value only if you know what you are doing. Defaults to 10 hours if unset.
SyncPeriod *time.Duration
// LeaderElection determines whether or not to use leader election when
// starting the manager.
LeaderElection bool
// LeaderElectionNamespace determines the namespace in which the leader
// election configmap will be created.
LeaderElectionNamespace string
// LeaderElectionID determines the name of the configmap that leader election
// will use for holding the leader lock.
LeaderElectionID string
// Namespace if specified restricts the manager's cache to watch objects in the desired namespace
// Defaults to all namespaces
// Note: If a namespace is specified then controllers can still Watch for a cluster-scoped resource e.g Node
// For namespaced resources the cache will only hold objects from the desired namespace.
Namespace string
// MetricsBindAddress is the TCP address that the controller should bind to
// for serving prometheus metrics
MetricsBindAddress string
// Dependency injection for testing
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
newClient func(config *rest.Config, options client.Options) (client.Client, error)
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error)
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
newAdmissionDecoder func(scheme *runtime.Scheme) (types.Decoder, error)
newMetricsListener func(addr string) (net.Listener, error)
}
// Runnable allows a component to be started.
type Runnable interface {
// Start starts running the component. The component will stop running when the channel is closed.
// Start blocks until the channel is closed or an error occurs.
Start(<-chan struct{}) error
}
// RunnableFunc implements Runnable
type RunnableFunc func(<-chan struct{}) error
// Start implements Runnable
func (r RunnableFunc) Start(s <-chan struct{}) error {
return r(s)
}
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Initialize a rest.config if none was specified
if config == nil {
return nil, fmt.Errorf("must specify Config")
}
// Set default values for options fields
options = setOptionsDefaults(options)
// Create the mapper provider
mapper, err := options.MapperProvider(config)
if err != nil {
log.Error(err, "Failed to get API Group-Resources")
return nil, err
}
// Create the Client for Write operations.
writeObj, err := options.newClient(config, client.Options{Scheme: options.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}
// Create the cache for the cached read client and registering informers
cache, err := options.newCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
if err != nil {
return nil, err
}
// Create the recorder provider to inject event recorders for the components.
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
// to the particular controller that it's being injected into, rather than a generic one like is here.
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"))
if err != nil {
return nil, err
}
// Create the resource lock to enable leader election)
resourceLock, err := options.newResourceLock(config, recorderProvider, leaderelection.Options{
LeaderElection: options.LeaderElection,
LeaderElectionID: options.LeaderElectionID,
LeaderElectionNamespace: options.LeaderElectionNamespace,
})
if err != nil {
return nil, err
}
admissionDecoder, err := options.newAdmissionDecoder(options.Scheme)
if err != nil {
return nil, err
}
// Create the mertics listener. This will throw an error if the metrics bind
// address is invalid or already in use.
metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
if err != nil {
return nil, err
}
stop := make(chan struct{})
return &controllerManager{
config: config,
scheme: options.Scheme,
admissionDecoder: admissionDecoder,
errChan: make(chan error),
cache: cache,
fieldIndexes: cache,
client: client.DelegatingClient{
Reader: &client.DelegatingReader{
CacheReader: cache,
ClientReader: writeObj,
},
Writer: writeObj,
StatusClient: writeObj,
},
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
}, nil
}
// setOptionsDefaults set default values for Options fields
func setOptionsDefaults(options Options) Options {
// Use the Kubernetes client-go scheme if none is specified
if options.Scheme == nil {
options.Scheme = scheme.Scheme
}
if options.MapperProvider == nil {
options.MapperProvider = apiutil.NewDiscoveryRESTMapper
}
// Allow newClient to be mocked
if options.newClient == nil {
options.newClient = client.New
}
// Allow newCache to be mocked
if options.newCache == nil {
options.newCache = cache.New
}
// Allow newRecorderProvider to be mocked
if options.newRecorderProvider == nil {
options.newRecorderProvider = internalrecorder.NewProvider
}
// Allow newResourceLock to be mocked
if options.newResourceLock == nil {
options.newResourceLock = leaderelection.NewResourceLock
}
if options.newAdmissionDecoder == nil {
options.newAdmissionDecoder = admission.NewDecoder
}
if options.newMetricsListener == nil {
options.newMetricsListener = metrics.NewListener
}
return options
}

View File

@@ -0,0 +1,25 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package manager
// func SetCacheForTest(options *Options, c func(config *rest.Config, opts cache.Options) (cache.Cache, error)) {
// options.newCache = c
// }
// func SetClientForTest(options *Options, c func(config *rest.Config, options client.Options) (client.Client, error)) {
// options.newClient = c
// }

View File

@@ -0,0 +1,20 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package metrics contains controller related metrics utilities
*/
package metrics

View File

@@ -0,0 +1,47 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
"net"
)
// DefaultBindAddress sets the default bind address for the metrics
// listener
// The metrics is off by default.
// TODO: Flip the default by changing DefaultBindAddress back to ":8080" in the v0.2.0.
var DefaultBindAddress = "0"
// NewListener creates a new TCP listener bound to the given address.
func NewListener(addr string) (net.Listener, error) {
if addr == "" {
// If the metrics bind address is empty, default to ":8080"
addr = DefaultBindAddress
}
// Add a case to disable metrics altogether
if addr == "0" {
return nil, nil
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error listening on %s: %v", addr, err)
}
return ln, nil
}

View File

@@ -0,0 +1,23 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import "github.com/prometheus/client_golang/prometheus"
// Registry is a prometheus registry for storing metrics within the
// controller-runtime
var Registry = prometheus.NewRegistry()

33
vendor/sigs.k8s.io/controller-runtime/pkg/patch/doc.go generated vendored Normal file
View File

@@ -0,0 +1,33 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package patch provides method to calculate JSON patch between 2 k8s objects.
Calculate JSON patch
oldDeployment := appsv1.Deployment{
// some fields
}
newDeployment := appsv1.Deployment{
// some different fields
}
patch, err := NewJSONPatch(oldDeployment, newDeployment)
if err != nil {
// handle error
}
*/
package patch

View File

@@ -0,0 +1,45 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package patch
import (
"encoding/json"
"fmt"
"reflect"
"github.com/mattbaird/jsonpatch"
"k8s.io/apimachinery/pkg/runtime"
)
// NewJSONPatch calculates the JSON patch between original and current objects.
func NewJSONPatch(original, current runtime.Object) ([]jsonpatch.JsonPatchOperation, error) {
originalGVK := original.GetObjectKind().GroupVersionKind()
currentGVK := current.GetObjectKind().GroupVersionKind()
if !reflect.DeepEqual(originalGVK, currentGVK) {
return nil, fmt.Errorf("GroupVersionKind %#v is expected to match %#v", originalGVK, currentGVK)
}
ori, err := json.Marshal(original)
if err != nil {
return nil, err
}
cur, err := json.Marshal(current)
if err != nil {
return nil, err
}
return jsonpatch.CreatePatch(ori, cur)
}

View File

@@ -0,0 +1,27 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package recorder
import (
"k8s.io/client-go/tools/record"
)
// Provider knows how to generate new event recorders with given name.
type Provider interface {
// NewRecorder returns an EventRecorder with given name.
GetEventRecorderFor(name string) record.EventRecorder
}

View File

@@ -0,0 +1,22 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package inject defines interfaces and functions for propagating dependencies from a ControllerManager to
the components registered with it. Dependencies are propagated to Reconciler, Source, EventHandler and Predicate
objects which implement the Injectable interfaces.
*/
package inject

View File

@@ -0,0 +1,131 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package inject
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission/types"
)
// Cache is used by the ControllerManager to inject Cache into Sources, EventHandlers, Predicates, and
// Reconciles
type Cache interface {
InjectCache(cache cache.Cache) error
}
// CacheInto will set informers on i and return the result if it implements Cache. Returns
//// false if i does not implement Cache.
func CacheInto(c cache.Cache, i interface{}) (bool, error) {
if s, ok := i.(Cache); ok {
return true, s.InjectCache(c)
}
return false, nil
}
// Config is used by the ControllerManager to inject Config into Sources, EventHandlers, Predicates, and
// Reconciles
type Config interface {
InjectConfig(*rest.Config) error
}
// ConfigInto will set config on i and return the result if it implements Config. Returns
//// false if i does not implement Config.
func ConfigInto(config *rest.Config, i interface{}) (bool, error) {
if s, ok := i.(Config); ok {
return true, s.InjectConfig(config)
}
return false, nil
}
// Client is used by the ControllerManager to inject client into Sources, EventHandlers, Predicates, and
// Reconciles
type Client interface {
InjectClient(client.Client) error
}
// ClientInto will set client on i and return the result if it implements Client. Returns
// false if i does not implement Client.
func ClientInto(client client.Client, i interface{}) (bool, error) {
if s, ok := i.(Client); ok {
return true, s.InjectClient(client)
}
return false, nil
}
// Decoder is used by the ControllerManager to inject decoder into webhook handlers.
type Decoder interface {
InjectDecoder(types.Decoder) error
}
// DecoderInto will set decoder on i and return the result if it implements Decoder. Returns
// false if i does not implement Decoder.
func DecoderInto(decoder types.Decoder, i interface{}) (bool, error) {
if s, ok := i.(Decoder); ok {
return true, s.InjectDecoder(decoder)
}
return false, nil
}
// Scheme is used by the ControllerManager to inject Scheme into Sources, EventHandlers, Predicates, and
// Reconciles
type Scheme interface {
InjectScheme(scheme *runtime.Scheme) error
}
// SchemeInto will set scheme and return the result on i if it implements Scheme. Returns
// false if i does not implement Scheme.
func SchemeInto(scheme *runtime.Scheme, i interface{}) (bool, error) {
if is, ok := i.(Scheme); ok {
return true, is.InjectScheme(scheme)
}
return false, nil
}
// Stoppable is used by the ControllerManager to inject stop channel into Sources,
// EventHandlers, Predicates, and Reconciles.
type Stoppable interface {
InjectStopChannel(<-chan struct{}) error
}
// StopChannelInto will set stop channel on i and return the result if it implements Stoppable.
// Returns false if i does not implement Stoppable.
func StopChannelInto(stop <-chan struct{}, i interface{}) (bool, error) {
if s, ok := i.(Stoppable); ok {
return true, s.InjectStopChannel(stop)
}
return false, nil
}
// Func injects dependencies into i.
type Func func(i interface{}) error
// Injector is used by the ControllerManager to inject Func into Controllers
type Injector interface {
InjectFunc(f Func) error
}
// InjectorInto will set f and return the result on i if it implements Injector. Returns
// false if i does not implement Injector.
func InjectorInto(f Func, i interface{}) (bool, error) {
if ii, ok := i.(Injector); ok {
return true, ii.InjectFunc(f)
}
return false, nil
}

View File

@@ -0,0 +1,126 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package log
import (
"github.com/go-logr/logr"
)
// loggerPromise knows how to populate a concrete logr.Logger
// with options, given an actual base logger later on down the line.
type loggerPromise struct {
logger *DelegatingLogger
childPromises []*loggerPromise
name *string
tags []interface{}
}
// WithName provides a new Logger with the name appended
func (p *loggerPromise) WithName(l *DelegatingLogger, name string) *loggerPromise {
res := &loggerPromise{
logger: l,
name: &name,
}
p.childPromises = append(p.childPromises, res)
return res
}
// WithValues provides a new Logger with the tags appended
func (p *loggerPromise) WithValues(l *DelegatingLogger, tags ...interface{}) *loggerPromise {
res := &loggerPromise{
logger: l,
tags: tags,
}
p.childPromises = append(p.childPromises, res)
return res
}
// Fulfill instantiates the Logger with the provided logger
func (p *loggerPromise) Fulfill(parentLogger logr.Logger) {
var logger = parentLogger
if p.name != nil {
logger = logger.WithName(*p.name)
}
if p.tags != nil {
logger = logger.WithValues(p.tags...)
}
p.logger.Logger = logger
p.logger.promise = nil
for _, childPromise := range p.childPromises {
childPromise.Fulfill(logger)
}
}
// DelegatingLogger is a logr.Logger that delegates to another logr.Logger.
// If the underlying promise is not nil, it registers calls to sub-loggers with
// the logging factory to be populated later, and returns a new delegating
// logger. It expects to have *some* logr.Logger set at all times (generally
// a no-op logger before the promises are fulfilled).
type DelegatingLogger struct {
logr.Logger
promise *loggerPromise
}
// WithName provides a new Logger with the name appended
func (l *DelegatingLogger) WithName(name string) logr.Logger {
if l.promise == nil {
return l.Logger.WithName(name)
}
res := &DelegatingLogger{Logger: l.Logger}
promise := l.promise.WithName(res, name)
res.promise = promise
return res
}
// WithValues provides a new Logger with the tags appended
func (l *DelegatingLogger) WithValues(tags ...interface{}) logr.Logger {
if l.promise == nil {
return l.Logger.WithValues(tags...)
}
res := &DelegatingLogger{Logger: l.Logger}
promise := l.promise.WithValues(res, tags...)
res.promise = promise
return res
}
// Fulfill switches the logger over to use the actual logger
// provided, instead of the temporary initial one, if this method
// has not been previously called.
func (l *DelegatingLogger) Fulfill(actual logr.Logger) {
if l.promise != nil {
l.promise.Fulfill(actual)
}
}
// NewDelegatingLogger constructs a new DelegatingLogger which uses
// the given logger before it's promise is fulfilled.
func NewDelegatingLogger(initial logr.Logger) *DelegatingLogger {
l := &DelegatingLogger{
Logger: initial,
promise: &loggerPromise{},
}
l.promise.logger = l
return l
}

View File

@@ -0,0 +1,129 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package log contains utilities for fetching a new logger
// when one is not already available.
package log
import (
"fmt"
"go.uber.org/zap/buffer"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
)
// KubeAwareEncoder is a Kubernetes-aware Zap Encoder.
// Instead of trying to force Kubernetes objects to implement
// ObjectMarshaller, we just implement a wrapper around a normal
// ObjectMarshaller that checks for Kubernetes objects.
type KubeAwareEncoder struct {
// Encoder is the zapcore.Encoder that this encoder delegates to
zapcore.Encoder
// Verbose controls whether or not the full object is printed.
// If false, only name, namespace, api version, and kind are printed.
// Otherwise, the full object is logged.
Verbose bool
}
// namespacedNameWrapper is a zapcore.ObjectMarshaler for Kubernetes NamespacedName
type namespacedNameWrapper struct {
types.NamespacedName
}
func (w namespacedNameWrapper) MarshalLogObject(enc zapcore.ObjectEncoder) error {
if w.Namespace != "" {
enc.AddString("namespace", w.Namespace)
}
enc.AddString("name", w.Name)
return nil
}
// kubeObjectWrapper is a zapcore.ObjectMarshaler for Kubernetes objects.
type kubeObjectWrapper struct {
obj runtime.Object
}
// MarshalLogObject implements zapcore.ObjectMarshaler
func (w kubeObjectWrapper) MarshalLogObject(enc zapcore.ObjectEncoder) error {
// TODO(directxman12): log kind and apiversion if not set explicitly (common case)
// -- needs an a scheme to convert to the GVK.
gvk := w.obj.GetObjectKind().GroupVersionKind()
if gvk.Version != "" {
enc.AddString("apiVersion", gvk.GroupVersion().String())
enc.AddString("kind", gvk.Kind)
}
objMeta, err := meta.Accessor(w.obj)
if err != nil {
return fmt.Errorf("got runtime.Object without object metadata: %v", w.obj)
}
ns := objMeta.GetNamespace()
if ns != "" {
enc.AddString("namespace", ns)
}
enc.AddString("name", objMeta.GetName())
return nil
}
// NB(directxman12): can't just override AddReflected, since the encoder calls AddReflected on itself directly
// Clone implements zapcore.Encoder
func (k *KubeAwareEncoder) Clone() zapcore.Encoder {
return &KubeAwareEncoder{
Encoder: k.Encoder.Clone(),
}
}
// EncodeEntry implements zapcore.Encoder
func (k *KubeAwareEncoder) EncodeEntry(entry zapcore.Entry, fields []zapcore.Field) (*buffer.Buffer, error) {
if k.Verbose {
// Kubernetes objects implement fmt.Stringer, so if we
// want verbose output, just delegate to that.
return k.Encoder.EncodeEntry(entry, fields)
}
for i, field := range fields {
// intercept stringer fields that happen to be Kubernetes runtime.Object or
// types.NamespacedName values (Kubernetes runtime.Objects commonly
// implement String, apparently).
if field.Type == zapcore.StringerType {
switch val := field.Interface.(type) {
case runtime.Object:
fields[i] = zapcore.Field{
Type: zapcore.ObjectMarshalerType,
Key: field.Key,
Interface: kubeObjectWrapper{obj: val},
}
case types.NamespacedName:
fields[i] = zapcore.Field{
Type: zapcore.ObjectMarshalerType,
Key: field.Key,
Interface: namespacedNameWrapper{NamespacedName: val},
}
}
}
}
return k.Encoder.EncodeEntry(entry, fields)
}

View File

@@ -0,0 +1,85 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package log contains utilities for fetching a new logger
// when one is not already available.
package log
import (
"io"
"os"
"time"
"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// ZapLogger is a Logger implementation.
// If development is true, a Zap development config will be used
// (stacktraces on warnings, no sampling), otherwise a Zap production
// config will be used (stacktraces on errors, sampling).
func ZapLogger(development bool) logr.Logger {
return ZapLoggerTo(os.Stderr, development)
}
// ZapLoggerTo returns a new Logger implementation using Zap which logs
// to the given destination, instead of stderr. It otherise behaves like
// ZapLogger.
func ZapLoggerTo(destWriter io.Writer, development bool) logr.Logger {
// this basically mimics New<type>Config, but with a custom sink
sink := zapcore.AddSync(destWriter)
var enc zapcore.Encoder
var lvl zap.AtomicLevel
var opts []zap.Option
if development {
encCfg := zap.NewDevelopmentEncoderConfig()
enc = zapcore.NewConsoleEncoder(encCfg)
lvl = zap.NewAtomicLevelAt(zap.DebugLevel)
opts = append(opts, zap.Development(), zap.AddStacktrace(zap.ErrorLevel))
} else {
encCfg := zap.NewProductionEncoderConfig()
enc = zapcore.NewJSONEncoder(encCfg)
lvl = zap.NewAtomicLevelAt(zap.InfoLevel)
opts = append(opts, zap.AddStacktrace(zap.WarnLevel),
zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSampler(core, time.Second, 100, 100)
}))
}
opts = append(opts, zap.AddCallerSkip(1), zap.ErrorOutput(sink))
log := zap.New(zapcore.NewCore(&KubeAwareEncoder{Encoder: enc, Verbose: development}, sink, lvl))
log = log.WithOptions(opts...)
return zapr.NewLogger(log)
}
// SetLogger sets a concrete logging implementation for all deferred Loggers.
func SetLogger(l logr.Logger) {
Log.Fulfill(l)
}
// Log is the base logger used by kubebuilder. It delegates
// to another logr.Logger. You *must* call SetLogger to
// get any actual logging.
var Log = NewDelegatingLogger(NullLogger{})
// KBLog is a base parent logger.
var KBLog logr.Logger
func init() {
KBLog = Log.WithName("kubebuilder")
}

View File

@@ -0,0 +1,60 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package log
import (
"github.com/go-logr/logr"
)
// NB: this is the same as the null logger logr/testing,
// but avoids accidentally adding the testing flags to
// all binaries.
// NullLogger is a logr.Logger that does nothing.
type NullLogger struct{}
var _ logr.Logger = NullLogger{}
// Info implements logr.InfoLogger
func (NullLogger) Info(_ string, _ ...interface{}) {
// Do nothing.
}
// Enabled implements logr.InfoLogger
func (NullLogger) Enabled() bool {
return false
}
// Error implements logr.Logger
func (NullLogger) Error(_ error, _ string, _ ...interface{}) {
// Do nothing.
}
// V implements logr.Logger
func (log NullLogger) V(_ int) logr.InfoLogger {
return log
}
// WithName implements logr.Logger
func (log NullLogger) WithName(_ string) logr.Logger {
return log
}
// WithValues implements logr.Logger
func (log NullLogger) WithValues(_ ...interface{}) logr.Logger {
return log
}

View File

@@ -0,0 +1,18 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package signals contains libraries for handling signals to shutdown the system.
package signals

View File

@@ -0,0 +1,43 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package signals
import (
"os"
"os/signal"
)
var onlyOneSignalHandler = make(chan struct{})
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() (stopCh <-chan struct{}) {
close(onlyOneSignalHandler) // panics when called twice
stop := make(chan struct{})
c := make(chan os.Signal, 2)
signal.Notify(c, shutdownSignals...)
go func() {
<-c
close(stop)
<-c
os.Exit(1) // second signal. Exit directly.
}()
return stop
}

View File

@@ -0,0 +1,26 @@
// +build !windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package signals
import (
"os"
"syscall"
)
var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}

View File

@@ -0,0 +1,23 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package signals
import (
"os"
)
var shutdownSignals = []os.Signal{os.Interrupt}

View File

@@ -0,0 +1,48 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission/types"
)
// DecodeFunc is a function that implements the Decoder interface.
type DecodeFunc func(types.Request, runtime.Object) error
var _ types.Decoder = DecodeFunc(nil)
// Decode implements the Decoder interface.
func (f DecodeFunc) Decode(req types.Request, obj runtime.Object) error {
return f(req, obj)
}
type decoder struct {
codecs serializer.CodecFactory
}
// NewDecoder creates a Decoder given the runtime.Scheme
func NewDecoder(scheme *runtime.Scheme) (types.Decoder, error) {
return decoder{codecs: serializer.NewCodecFactory(scheme)}, nil
}
// Decode decodes the inlined object in the AdmissionRequest into the passed-in runtime.Object.
func (d decoder) Decode(req types.Request, into runtime.Object) error {
deserializer := d.codecs.UniversalDeserializer()
return runtime.DecodeInto(deserializer, req.AdmissionRequest.Object.Raw, into)
}

View File

@@ -0,0 +1,101 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package admission provides implementation for admission webhook and methods to implement admission webhook handlers.
The following snippet is an example implementation of mutating handler.
type Mutator struct {
client client.Client
decoder types.Decoder
}
func (m *Mutator) mutatePodsFn(ctx context.Context, pod *corev1.Pod) error {
// your logic to mutate the passed-in pod.
}
func (m *Mutator) Handle(ctx context.Context, req types.Request) types.Response {
pod := &corev1.Pod{}
err := m.decoder.Decode(req, pod)
if err != nil {
return admission.ErrorResponse(http.StatusBadRequest, err)
}
// Do deepcopy before actually mutate the object.
copy := pod.DeepCopy()
err = m.mutatePodsFn(ctx, copy)
if err != nil {
return admission.ErrorResponse(http.StatusInternalServerError, err)
}
return admission.PatchResponse(pod, copy)
}
// InjectClient is called by the Manager and provides a client.Client to the Mutator instance.
func (m *Mutator) InjectClient(c client.Client) error {
h.client = c
return nil
}
// InjectDecoder is called by the Manager and provides a types.Decoder to the Mutator instance.
func (m *Mutator) InjectDecoder(d types.Decoder) error {
h.decoder = d
return nil
}
The following snippet is an example implementation of validating handler.
type Handler struct {
client client.Client
decoder types.Decoder
}
func (v *Validator) validatePodsFn(ctx context.Context, pod *corev1.Pod) (bool, string, error) {
// your business logic
}
func (v *Validator) Handle(ctx context.Context, req types.Request) types.Response {
pod := &corev1.Pod{}
err := h.decoder.Decode(req, pod)
if err != nil {
return admission.ErrorResponse(http.StatusBadRequest, err)
}
allowed, reason, err := h.validatePodsFn(ctx, pod)
if err != nil {
return admission.ErrorResponse(http.StatusInternalServerError, err)
}
return admission.ValidationResponse(allowed, reason)
}
// InjectClient is called by the Manager and provides a client.Client to the Validator instance.
func (v *Validator) InjectClient(c client.Client) error {
h.client = c
return nil
}
// InjectDecoder is called by the Manager and provides a types.Decoder to the Validator instance.
func (v *Validator) InjectDecoder(d types.Decoder) error {
h.decoder = d
return nil
}
*/
package admission
import (
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)
var log = logf.KBLog.WithName("admission")

View File

@@ -0,0 +1,115 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"k8s.io/api/admission/v1beta1"
admissionv1beta1 "k8s.io/api/admission/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission/types"
"sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics"
)
var admissionv1beta1scheme = runtime.NewScheme()
var admissionv1beta1schemecodecs = serializer.NewCodecFactory(admissionv1beta1scheme)
func init() {
addToScheme(admissionv1beta1scheme)
}
func addToScheme(scheme *runtime.Scheme) {
utilruntime.Must(admissionv1beta1.AddToScheme(scheme))
}
var _ http.Handler = &Webhook{}
func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
startTS := time.Now()
defer metrics.RequestLatency.WithLabelValues(wh.Name).Observe(time.Now().Sub(startTS).Seconds())
var body []byte
var err error
var reviewResponse types.Response
if r.Body != nil {
if body, err = ioutil.ReadAll(r.Body); err != nil {
log.Error(err, "unable to read the body from the incoming request")
reviewResponse = ErrorResponse(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
return
}
} else {
err = errors.New("request body is empty")
log.Error(err, "bad request")
reviewResponse = ErrorResponse(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
return
}
// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
err = fmt.Errorf("contentType=%s, expect application/json", contentType)
log.Error(err, "unable to process a request with an unknown content type", "content type", contentType)
reviewResponse = ErrorResponse(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
return
}
ar := v1beta1.AdmissionReview{}
if _, _, err := admissionv1beta1schemecodecs.UniversalDeserializer().Decode(body, nil, &ar); err != nil {
log.Error(err, "unable to decode the request")
reviewResponse = ErrorResponse(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
return
}
// TODO: add panic-recovery for Handle
reviewResponse = wh.Handle(context.Background(), types.Request{AdmissionRequest: ar.Request})
wh.writeResponse(w, reviewResponse)
}
func (wh *Webhook) writeResponse(w io.Writer, response types.Response) {
if response.Response.Result.Code != 0 {
if response.Response.Result.Code == http.StatusOK {
metrics.TotalRequests.WithLabelValues(wh.Name, "true").Inc()
} else {
metrics.TotalRequests.WithLabelValues(wh.Name, "false").Inc()
}
}
encoder := json.NewEncoder(w)
responseAdmissionReview := v1beta1.AdmissionReview{
Response: response.Response,
}
err := encoder.Encode(responseAdmissionReview)
if err != nil {
log.Error(err, "unable to encode the response")
wh.writeResponse(w, ErrorResponse(http.StatusInternalServerError, err))
}
}

View File

@@ -0,0 +1,70 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"net/http"
admissionv1beta1 "k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/patch"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission/types"
)
// ErrorResponse creates a new Response for error-handling a request.
func ErrorResponse(code int32, err error) types.Response {
return types.Response{
Response: &admissionv1beta1.AdmissionResponse{
Allowed: false,
Result: &metav1.Status{
Code: code,
Message: err.Error(),
},
},
}
}
// ValidationResponse returns a response for admitting a request.
func ValidationResponse(allowed bool, reason string) types.Response {
resp := types.Response{
Response: &admissionv1beta1.AdmissionResponse{
Allowed: allowed,
},
}
if len(reason) > 0 {
resp.Response.Result = &metav1.Status{
Reason: metav1.StatusReason(reason),
}
}
return resp
}
// PatchResponse returns a new response with json patch.
func PatchResponse(original, current runtime.Object) types.Response {
patches, err := patch.NewJSONPatch(original, current)
if err != nil {
return ErrorResponse(http.StatusInternalServerError, err)
}
return types.Response{
Patches: patches,
Response: &admissionv1beta1.AdmissionResponse{
Allowed: true,
PatchType: func() *admissionv1beta1.PatchType { pt := admissionv1beta1.PatchTypeJSONPatch; return &pt }(),
},
}
}

View File

@@ -0,0 +1,44 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"github.com/mattbaird/jsonpatch"
admissionv1beta1 "k8s.io/api/admission/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
)
// Request is the input of Handler
type Request struct {
AdmissionRequest *admissionv1beta1.AdmissionRequest
}
// Response is the output of admission.Handler
type Response struct {
// Patches are the JSON patches for mutating webhooks.
// Using this instead of setting Response.Patch to minimize the overhead of serialization and deserialization.
Patches []jsonpatch.JsonPatchOperation
// Response is the admission response. Don't set the Patch field in it.
Response *admissionv1beta1.AdmissionResponse
}
// Decoder is used to decode AdmissionRequest.
type Decoder interface {
// Decode decodes the raw byte object from the AdmissionRequest to the passed-in runtime.Object.
Decode(Request, runtime.Object) error
}

View File

@@ -0,0 +1,259 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"sync"
"github.com/mattbaird/jsonpatch"
admissionv1beta1 "k8s.io/api/admission/v1beta1"
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
atypes "sigs.k8s.io/controller-runtime/pkg/webhook/admission/types"
"sigs.k8s.io/controller-runtime/pkg/webhook/types"
)
// Handler can handle an AdmissionRequest.
type Handler interface {
Handle(context.Context, atypes.Request) atypes.Response
}
// HandlerFunc implements Handler interface using a single function.
type HandlerFunc func(context.Context, atypes.Request) atypes.Response
var _ Handler = HandlerFunc(nil)
// Handle process the AdmissionRequest by invoking the underlying function.
func (f HandlerFunc) Handle(ctx context.Context, req atypes.Request) atypes.Response {
return f(ctx, req)
}
// Webhook represents each individual webhook.
type Webhook struct {
// Name is the name of the webhook
Name string
// Type is the webhook type, i.e. mutating, validating
Type types.WebhookType
// Path is the path this webhook will serve.
Path string
// Rules maps to the Rules field in admissionregistrationv1beta1.Webhook
Rules []admissionregistrationv1beta1.RuleWithOperations
// FailurePolicy maps to the FailurePolicy field in admissionregistrationv1beta1.Webhook
// This optional. If not set, will be defaulted to Ignore (fail-open) by the server.
// More details: https://github.com/kubernetes/api/blob/f5c295feaba2cbc946f0bbb8b535fc5f6a0345ee/admissionregistration/v1beta1/types.go#L144-L147
FailurePolicy *admissionregistrationv1beta1.FailurePolicyType
// NamespaceSelector maps to the NamespaceSelector field in admissionregistrationv1beta1.Webhook
// This optional.
NamespaceSelector *metav1.LabelSelector
// Handlers contains a list of handlers. Each handler may only contains the business logic for its own feature.
// For example, feature foo and bar can be in the same webhook if all the other configurations are the same.
// The handler will be invoked sequentially as the order in the list.
// Note: if you are using mutating webhook with multiple handlers, it's your responsibility to
// ensure the handlers are not generating conflicting JSON patches.
Handlers []Handler
once sync.Once
}
func (w *Webhook) setDefaults() {
if len(w.Path) == 0 {
if len(w.Rules) == 0 || len(w.Rules[0].Resources) == 0 {
// can't do defaulting, skip it.
return
}
if w.Type == types.WebhookTypeMutating {
w.Path = "/mutate-" + w.Rules[0].Resources[0]
} else if w.Type == types.WebhookTypeValidating {
w.Path = "/validate-" + w.Rules[0].Resources[0]
}
}
if len(w.Name) == 0 {
reg := regexp.MustCompile("[^a-zA-Z0-9]+")
processedPath := strings.ToLower(reg.ReplaceAllString(w.Path, ""))
w.Name = processedPath + ".example.com"
}
}
// Add adds additional handler(s) in the webhook
func (w *Webhook) Add(handlers ...Handler) {
w.Handlers = append(w.Handlers, handlers...)
}
// Webhook implements Handler interface.
var _ Handler = &Webhook{}
// Handle processes AdmissionRequest.
// If the webhook is mutating type, it delegates the AdmissionRequest to each handler and merge the patches.
// If the webhook is validating type, it delegates the AdmissionRequest to each handler and
// deny the request if anyone denies.
func (w *Webhook) Handle(ctx context.Context, req atypes.Request) atypes.Response {
if req.AdmissionRequest == nil {
return ErrorResponse(http.StatusBadRequest, errors.New("got an empty AdmissionRequest"))
}
var resp atypes.Response
switch w.Type {
case types.WebhookTypeMutating:
resp = w.handleMutating(ctx, req)
case types.WebhookTypeValidating:
resp = w.handleValidating(ctx, req)
default:
return ErrorResponse(http.StatusInternalServerError, errors.New("you must specify your webhook type"))
}
resp.Response.UID = req.AdmissionRequest.UID
return resp
}
func (w *Webhook) handleMutating(ctx context.Context, req atypes.Request) atypes.Response {
patches := []jsonpatch.JsonPatchOperation{}
for _, handler := range w.Handlers {
resp := handler.Handle(ctx, req)
if !resp.Response.Allowed {
setStatusOKInAdmissionResponse(resp.Response)
return resp
}
if resp.Response.PatchType != nil && *resp.Response.PatchType != admissionv1beta1.PatchTypeJSONPatch {
return ErrorResponse(http.StatusInternalServerError,
fmt.Errorf("unexpected patch type returned by the handler: %v, only allow: %v",
resp.Response.PatchType, admissionv1beta1.PatchTypeJSONPatch))
}
patches = append(patches, resp.Patches...)
}
var err error
marshaledPatch, err := json.Marshal(patches)
if err != nil {
return ErrorResponse(http.StatusBadRequest, fmt.Errorf("error when marshaling the patch: %v", err))
}
return atypes.Response{
Response: &admissionv1beta1.AdmissionResponse{
Allowed: true,
Result: &metav1.Status{
Code: http.StatusOK,
},
Patch: marshaledPatch,
PatchType: func() *admissionv1beta1.PatchType { pt := admissionv1beta1.PatchTypeJSONPatch; return &pt }(),
},
}
}
func (w *Webhook) handleValidating(ctx context.Context, req atypes.Request) atypes.Response {
for _, handler := range w.Handlers {
resp := handler.Handle(ctx, req)
if !resp.Response.Allowed {
setStatusOKInAdmissionResponse(resp.Response)
return resp
}
}
return atypes.Response{
Response: &admissionv1beta1.AdmissionResponse{
Allowed: true,
Result: &metav1.Status{
Code: http.StatusOK,
},
},
}
}
func setStatusOKInAdmissionResponse(resp *admissionv1beta1.AdmissionResponse) {
if resp == nil {
return
}
if resp.Result == nil {
resp.Result = &metav1.Status{}
}
if resp.Result.Code == 0 {
resp.Result.Code = http.StatusOK
}
}
// GetName returns the name of the webhook.
func (w *Webhook) GetName() string {
w.once.Do(w.setDefaults)
return w.Name
}
// GetPath returns the path that the webhook registered.
func (w *Webhook) GetPath() string {
w.once.Do(w.setDefaults)
return w.Path
}
// GetType returns the type of the webhook.
func (w *Webhook) GetType() types.WebhookType {
w.once.Do(w.setDefaults)
return w.Type
}
// Handler returns a http.Handler for the webhook
func (w *Webhook) Handler() http.Handler {
w.once.Do(w.setDefaults)
return w
}
// Validate validates if the webhook is valid.
func (w *Webhook) Validate() error {
w.once.Do(w.setDefaults)
if len(w.Rules) == 0 {
return errors.New("field Rules should not be empty")
}
if len(w.Name) == 0 {
return errors.New("field Name should not be empty")
}
if w.Type != types.WebhookTypeMutating && w.Type != types.WebhookTypeValidating {
return fmt.Errorf("unsupported Type: %v, only WebhookTypeMutating and WebhookTypeValidating are supported", w.Type)
}
if len(w.Path) == 0 {
return errors.New("field Path should not be empty")
}
if len(w.Handlers) == 0 {
return errors.New("field Handler should not be empty")
}
return nil
}
var _ inject.Client = &Webhook{}
// InjectClient injects the client into the handlers
func (w *Webhook) InjectClient(c client.Client) error {
for _, handler := range w.Handlers {
if _, err := inject.ClientInto(c, handler); err != nil {
return err
}
}
return nil
}
var _ inject.Decoder = &Webhook{}
// InjectDecoder injects the decoder into the handlers
func (w *Webhook) InjectDecoder(d atypes.Decoder) error {
for _, handler := range w.Handlers {
if _, err := inject.DecoderInto(d, handler); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,51 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
var (
// TotalRequests is a prometheus metric which counts the total number of requests that
// the webhook server has received.
TotalRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "controller_runtime_webhook_requests_total",
Help: "Total number of admission requests",
},
[]string{"webhook", "succeeded"},
)
// RequestLatency is a prometheus metric which is a histogram of the latency
// of processing admission requests.
RequestLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "controller_runtime_webhook_latency_seconds",
Help: "Histogram of the latency of processing admission requests",
},
[]string{"webhook"},
)
)
func init() {
metrics.Registry.MustRegister(
TotalRequests,
RequestLatency)
}

View File

@@ -0,0 +1,28 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
// WebhookType defines the type of a webhook
type WebhookType int
const (
_ = iota
// WebhookTypeMutating represents mutating type webhook
WebhookTypeMutating WebhookType = iota
// WebhookTypeValidating represents validating type webhook
WebhookTypeValidating
)