change cluster schema (#2026)

* change cluster schema

* change cluster schema
This commit is contained in:
zryfish
2020-04-27 17:34:02 +08:00
committed by GitHub
parent 794f388306
commit 5a3eb651f3
123 changed files with 13582 additions and 1032 deletions

View File

@@ -0,0 +1,36 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"time"
"k8s.io/client-go/util/flowcontrol"
)
func StartBackoffGC(backoff *flowcontrol.Backoff, stopCh <-chan struct{}) {
go func() {
for {
select {
case <-time.After(time.Minute):
backoff.GC()
case <-stopCh:
return
}
}
}()
}

View File

@@ -0,0 +1,221 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"net/http"
"net/url"
"time"
"github.com/pkg/errors"
apiv1 "k8s.io/api/core/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/transport"
"k8s.io/klog"
fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
"sigs.k8s.io/kubefed/pkg/client/generic"
)
const (
DefaultKubeFedSystemNamespace = "kube-federation-system"
KubeAPIQPS = 20.0
KubeAPIBurst = 30
TokenKey = "token"
KubeFedConfigName = "kubefed"
)
// BuildClusterConfig returns a restclient.Config that can be used to configure
// a client for the given KubeFedCluster or an error. The client is used to
// access kubernetes secrets in the kubefed namespace.
func BuildClusterConfig(fedCluster *fedv1b1.KubeFedCluster, client generic.Client, fedNamespace string) (*restclient.Config, error) {
clusterName := fedCluster.Name
apiEndpoint := fedCluster.Spec.APIEndpoint
// TODO(marun) Remove when validation ensures a non-empty value.
if apiEndpoint == "" {
return nil, errors.Errorf("The api endpoint of cluster %s is empty", clusterName)
}
secretName := fedCluster.Spec.SecretRef.Name
if secretName == "" {
return nil, errors.Errorf("Cluster %s does not have a secret name", clusterName)
}
secret := &apiv1.Secret{}
err := client.Get(context.TODO(), secret, fedNamespace, secretName)
if err != nil {
return nil, err
}
token, tokenFound := secret.Data[TokenKey]
if !tokenFound || len(token) == 0 {
return nil, errors.Errorf("The secret for cluster %s is missing a non-empty value for %q", clusterName, TokenKey)
}
clusterConfig, err := clientcmd.BuildConfigFromFlags(apiEndpoint, "")
if err != nil {
return nil, err
}
clusterConfig.CAData = fedCluster.Spec.CABundle
clusterConfig.BearerToken = string(token)
clusterConfig.QPS = KubeAPIQPS
clusterConfig.Burst = KubeAPIBurst
if len(fedCluster.Spec.DisabledTLSValidations) != 0 {
klog.V(1).Infof("Cluster %s will use a custom transport for TLS certificate validation", fedCluster.Name)
if err = CustomizeTLSTransport(fedCluster, clusterConfig); err != nil {
return nil, err
}
}
return clusterConfig, nil
}
// IsPrimaryCluster checks if the caller is working with objects for the
// primary cluster by checking if the UIDs match for both ObjectMetas passed
// in.
// TODO (font): Need to revisit this when cluster ID is available.
func IsPrimaryCluster(obj, clusterObj pkgruntime.Object) bool {
meta := MetaAccessor(obj)
clusterMeta := MetaAccessor(clusterObj)
return meta.GetUID() == clusterMeta.GetUID()
}
// CustomizeTLSTransport replaces the restclient.Config.Transport with one that
// implements the desired TLS certificate validations
func CustomizeTLSTransport(fedCluster *fedv1b1.KubeFedCluster, clientConfig *restclient.Config) error {
clientTransportConfig, err := clientConfig.TransportConfig()
if err != nil {
return errors.Errorf("Cluster %s client transport config error: %s", fedCluster.Name, err)
}
transportConfig, err := transport.TLSConfigFor(clientTransportConfig)
if err != nil {
return errors.Errorf("Cluster %s transport error: %s", fedCluster.Name, err)
}
err = CustomizeCertificateValidation(fedCluster, transportConfig)
if err != nil {
return errors.Errorf("Cluster %s custom certificate validation error: %s", fedCluster.Name, err)
}
// using the same defaults as http.DefaultTransport
clientConfig.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: transportConfig,
}
clientConfig.TLSClientConfig = restclient.TLSClientConfig{}
return nil
}
// CustomizeCertificateValidation modifies an existing tls.Config to disable the
// desired TLS checks in KubeFedCluster config
func CustomizeCertificateValidation(fedCluster *fedv1b1.KubeFedCluster, tlsConfig *tls.Config) error {
// InsecureSkipVerify must be enabled to prevent early validation errors from
// returning before VerifyPeerCertificate is run
tlsConfig.InsecureSkipVerify = true
var ignoreSubjectName, ignoreValidityPeriod bool
for _, validation := range fedCluster.Spec.DisabledTLSValidations {
switch fedv1b1.TLSValidation(validation) {
case fedv1b1.TLSAll:
klog.V(1).Infof("Cluster %s will not perform TLS certificate validation", fedCluster.Name)
return nil
case fedv1b1.TLSSubjectName:
ignoreSubjectName = true
case fedv1b1.TLSValidityPeriod:
ignoreValidityPeriod = true
}
}
// Normal TLS SubjectName validation uses the conn dnsname for validation,
// but this is not available when using a VerifyPeerCertificate functions.
// As a workaround, we will fill the tls.Config.ServerName with the URL host
// specified as the KubeFedCluster API target
if !ignoreSubjectName && tlsConfig.ServerName == "" {
apiURL, err := url.Parse(fedCluster.Spec.APIEndpoint)
if err != nil {
return errors.Errorf("failed to identify a valid host from APIEndpoint for use in SubjectName validation")
}
tlsConfig.ServerName = apiURL.Hostname()
}
// VerifyPeerCertificate uses the same logic as crypto/tls Conn.verifyServerCertificate
// but uses a modified set of options to ignore specific validations
tlsConfig.VerifyPeerCertificate = func(certificates [][]byte, verifiedChains [][]*x509.Certificate) error {
opts := x509.VerifyOptions{
Roots: tlsConfig.RootCAs,
CurrentTime: time.Now(),
Intermediates: x509.NewCertPool(),
DNSName: tlsConfig.ServerName,
}
if tlsConfig.Time != nil {
opts.CurrentTime = tlsConfig.Time()
}
certs := make([]*x509.Certificate, len(certificates))
for i, asn1Data := range certificates {
cert, err := x509.ParseCertificate(asn1Data)
if err != nil {
return errors.New("tls: failed to parse certificate from server: " + err.Error())
}
certs[i] = cert
}
for i, cert := range certs {
if i == 0 {
continue
}
opts.Intermediates.AddCert(cert)
}
if ignoreSubjectName {
// set the DNSName to nil to ignore the name validation
opts.DNSName = ""
klog.V(1).Infof("Cluster %s will not perform tls certificate SubjectName validation", fedCluster.Name)
}
if ignoreValidityPeriod {
// set the CurrentTime to immediately after the certificate start time
// this will ensure that certificate passes the validity period check
opts.CurrentTime = certs[0].NotBefore.Add(time.Second)
klog.V(1).Infof("Cluster %s will not perform tls certificate ValidityPeriod validation", fedCluster.Name)
}
_, err := certs[0].Verify(opts)
return err
}
return nil
}

View File

@@ -0,0 +1,76 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"time"
)
// Providing 0 duration to an informer indicates that resync should be delayed as long as possible
const (
NoResyncPeriod time.Duration = 0 * time.Second
NamespaceName = "namespaces"
NamespaceKind = "Namespace"
ServiceKind = "Service"
ServiceAccountKind = "ServiceAccount"
// The following fields are used to interact with unstructured
// resources.
// Common fields
SpecField = "spec"
StatusField = "status"
MetadataField = "metadata"
// ServiceAccount fields
SecretsField = "secrets"
// Scale types
ReplicasField = "replicas"
RetainReplicasField = "retainReplicas"
// Template fields
TemplateField = "template"
// Placement fields
PlacementField = "placement"
ClusterSelectorField = "clusterSelector"
MatchLabelsField = "matchLabels"
// Override fields
OverridesField = "overrides"
ClusterNameField = "clusterName"
ClusterOverridesField = "clusterOverrides"
PathField = "path"
ValueField = "value"
// Cluster reference
ClustersField = "clusters"
NameField = "name"
)
type ReconciliationStatus int
const (
StatusAllOK ReconciliationStatus = iota
StatusNeedsRecheck
StatusError
StatusNotSynced
)

View File

@@ -0,0 +1,80 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
restclient "k8s.io/client-go/rest"
fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
)
// LeaderElectionConfiguration defines the configuration of leader election
// clients for controller that can run with leader election enabled.
type LeaderElectionConfiguration struct {
// leaseDuration is the duration that non-leader candidates will wait
// after observing a leadership renewal until attempting to acquire
// leadership of a led but unrenewed leader slot. This is effectively the
// maximum duration that a leader can be stopped before it is replaced
// by another candidate. This is only applicable if leader election is
// enabled.
LeaseDuration time.Duration
// renewDeadline is the interval between attempts by the acting master to
// renew a leadership slot before it stops leading. This must be less
// than or equal to the lease duration. This is only applicable if leader
// election is enabled.
RenewDeadline time.Duration
// retryPeriod is the duration the clients should wait between attempting
// acquisition and renewal of a leadership. This is only applicable if
// leader election is enabled.
RetryPeriod time.Duration
// resourceLock indicates the resource object type that will be used to lock
// during leader election cycles.
ResourceLock fedv1b1.ResourceLockType
}
// KubeFedNamespaces defines the namespace configuration shared by
// most kubefed controllers.
type KubeFedNamespaces struct {
KubeFedNamespace string
TargetNamespace string
}
// ClusterHealthCheckConfig defines the configurable parameters for cluster health check
type ClusterHealthCheckConfig struct {
Period time.Duration
FailureThreshold int64
SuccessThreshold int64
Timeout time.Duration
}
// ControllerConfig defines the configuration common to KubeFed
// controllers.
type ControllerConfig struct {
KubeFedNamespaces
KubeConfig *restclient.Config
ClusterAvailableDelay time.Duration
ClusterUnavailableDelay time.Duration
MinimizeLatency bool
SkipAdoptingResources bool
}
func (c *ControllerConfig) LimitedScope() bool {
return c.KubeFedNamespaces.TargetNamespace != metav1.NamespaceAll
}

View File

@@ -0,0 +1,183 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// TODO: consider moving it to a more generic package.
package util
import (
"container/heap"
"time"
)
const (
// TODO: Investigate what capacity is right.
delayingDelivererUpdateChanCapacity = 1000
)
// DelayingDelivererItem is structure delivered by DelayingDeliverer to the
// target channel.
type DelayingDelivererItem struct {
// Key under which the value was added to deliverer.
Key string
// Value of the item.
Value interface{}
// When the item should be delivered.
DeliveryTime time.Time
}
type delivererHeap struct {
keyPosition map[string]int
data []*DelayingDelivererItem
}
// Functions required by container.Heap.
func (dh *delivererHeap) Len() int { return len(dh.data) }
func (dh *delivererHeap) Less(i, j int) bool {
return dh.data[i].DeliveryTime.Before(dh.data[j].DeliveryTime)
}
func (dh *delivererHeap) Swap(i, j int) {
dh.keyPosition[dh.data[i].Key] = j
dh.keyPosition[dh.data[j].Key] = i
dh.data[i], dh.data[j] = dh.data[j], dh.data[i]
}
func (dh *delivererHeap) Push(x interface{}) {
item := x.(*DelayingDelivererItem)
dh.data = append(dh.data, item)
dh.keyPosition[item.Key] = len(dh.data) - 1
}
func (dh *delivererHeap) Pop() interface{} {
n := len(dh.data)
item := dh.data[n-1]
dh.data = dh.data[:n-1]
delete(dh.keyPosition, item.Key)
return item
}
// A structure that pushes the items to the target channel at a given time.
type DelayingDeliverer struct {
// Channel to deliver the data when their time comes.
targetChannel chan *DelayingDelivererItem
// Store for data
heap *delivererHeap
// Channel to feed the main goroutine with updates.
updateChannel chan *DelayingDelivererItem
// To stop the main goroutine.
stopChannel chan struct{}
}
func NewDelayingDeliverer() *DelayingDeliverer {
return NewDelayingDelivererWithChannel(make(chan *DelayingDelivererItem, 100))
}
func NewDelayingDelivererWithChannel(targetChannel chan *DelayingDelivererItem) *DelayingDeliverer {
return &DelayingDeliverer{
targetChannel: targetChannel,
heap: &delivererHeap{
keyPosition: make(map[string]int),
data: make([]*DelayingDelivererItem, 0),
},
updateChannel: make(chan *DelayingDelivererItem, delayingDelivererUpdateChanCapacity),
stopChannel: make(chan struct{}),
}
}
// Deliver all items due before or equal to timestamp.
func (d *DelayingDeliverer) deliver(timestamp time.Time) {
for d.heap.Len() > 0 {
if timestamp.Before(d.heap.data[0].DeliveryTime) {
return
}
item := heap.Pop(d.heap).(*DelayingDelivererItem)
d.targetChannel <- item
}
}
func (d *DelayingDeliverer) run() {
for {
now := time.Now()
d.deliver(now)
nextWakeUp := now.Add(time.Hour)
if d.heap.Len() > 0 {
nextWakeUp = d.heap.data[0].DeliveryTime
}
sleepTime := nextWakeUp.Sub(now)
select {
case <-time.After(sleepTime):
break // just wake up and process the data
case item := <-d.updateChannel:
if position, found := d.heap.keyPosition[item.Key]; found {
if item.DeliveryTime.Before(d.heap.data[position].DeliveryTime) {
d.heap.data[position] = item
heap.Fix(d.heap, position)
}
// Ignore if later.
} else {
heap.Push(d.heap, item)
}
case <-d.stopChannel:
return
}
}
}
// Starts the DelayingDeliverer.
func (d *DelayingDeliverer) Start() {
go d.run()
}
// Stops the DelayingDeliverer. Undelivered items are discarded.
func (d *DelayingDeliverer) Stop() {
close(d.stopChannel)
}
// Delivers value at the given time.
func (d *DelayingDeliverer) DeliverAt(key string, value interface{}, deliveryTime time.Time) {
d.updateChannel <- &DelayingDelivererItem{
Key: key,
Value: value,
DeliveryTime: deliveryTime,
}
}
// Delivers value after the given delay.
func (d *DelayingDeliverer) DeliverAfter(key string, value interface{}, delay time.Duration) {
d.DeliverAt(key, value, time.Now().Add(delay))
}
// Gets target channel of the deliverer.
func (d *DelayingDeliverer) GetTargetChannel() chan *DelayingDelivererItem {
return d.targetChannel
}
// Starts Delaying deliverer with a handler listening on the target channel.
func (d *DelayingDeliverer) StartWithHandler(handler func(*DelayingDelivererItem)) {
go func() {
for {
select {
case item := <-d.targetChannel:
handler(item)
case <-d.stopChannel:
return
}
}
}()
d.Start()
}

View File

@@ -0,0 +1,570 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"reflect"
"sync"
"time"
"github.com/pkg/errors"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
fedcommon "sigs.k8s.io/kubefed/pkg/apis/core/common"
fedv1b1 "sigs.k8s.io/kubefed/pkg/apis/core/v1beta1"
"sigs.k8s.io/kubefed/pkg/client/generic"
)
const (
clusterSyncPeriod = 10 * time.Minute
userAgentName = "kubefed-controller"
)
// An object with an origin information.
type FederatedObject struct {
Object interface{}
ClusterName string
}
// FederatedReadOnlyStore is an overlay over multiple stores created in federated clusters.
type FederatedReadOnlyStore interface {
// Returns all items in the store.
List() ([]FederatedObject, error)
// Returns all items from a cluster.
ListFromCluster(clusterName string) ([]interface{}, error)
// GetKeyFor returns the key under which the item would be put in the store.
GetKeyFor(item interface{}) string
// GetByKey returns the item stored under the given key in the specified cluster (if exist).
GetByKey(clusterName string, key string) (interface{}, bool, error)
// Returns the items stored under the given key in all clusters.
GetFromAllClusters(key string) ([]FederatedObject, error)
// Checks whether stores for all clusters form the lists (and only these) are there and
// are synced. This is only a basic check whether the data inside of the store is usable.
// It is not a full synchronization/locking mechanism it only tries to ensure that out-of-sync
// issues occur less often. All users of the interface should assume
// that there may be significant delays in content updates of all kinds and write their
// code that it doesn't break if something is slightly out-of-sync.
ClustersSynced(clusters []*fedv1b1.KubeFedCluster) bool
}
// An interface to retrieve both KubeFedCluster resources and clients
// to access the clusters they represent.
type RegisteredClustersView interface {
// GetClientForCluster returns a client for the cluster, if present.
GetClientForCluster(clusterName string) (generic.Client, error)
// GetUnreadyClusters returns a list of all clusters that are not ready yet.
GetUnreadyClusters() ([]*fedv1b1.KubeFedCluster, error)
// GetReadyClusters returns all clusters for which the sub-informers are run.
GetReadyClusters() ([]*fedv1b1.KubeFedCluster, error)
// GetClusters returns a list of all clusters.
GetClusters() ([]*fedv1b1.KubeFedCluster, error)
// GetReadyCluster returns the cluster with the given name, if found.
GetReadyCluster(name string) (*fedv1b1.KubeFedCluster, bool, error)
// ClustersSynced returns true if the view is synced (for the first time).
ClustersSynced() bool
}
// FederatedInformer provides access to clusters registered with a
// KubeFed control plane and watches a given resource type in
// registered clusters.
//
// Whenever a new cluster is registered with KubeFed, an informer is
// created for it using TargetInformerFactory. Informers are stopped
// when a cluster is either put offline of deleted. It is assumed that
// some controller keeps an eye on the cluster list and thus the
// clusters in ETCD are up to date.
type FederatedInformer interface {
RegisteredClustersView
// Returns a store created over all stores from target informers.
GetTargetStore() FederatedReadOnlyStore
// Starts all the processes.
Start()
// Stops all the processes inside the informer.
Stop()
}
// A function that should be used to create an informer on the target object. Store should use
// cache.DeletionHandlingMetaNamespaceKeyFunc as a keying function.
type TargetInformerFactory func(*fedv1b1.KubeFedCluster, *restclient.Config) (cache.Store, cache.Controller, error)
// A structure with cluster lifecycle handler functions. Cluster is available (and ClusterAvailable is fired)
// when it is created in federated etcd and ready. Cluster becomes unavailable (and ClusterUnavailable is fired)
// when it is either deleted or becomes not ready. When cluster spec (IP)is modified both ClusterAvailable
// and ClusterUnavailable are fired.
type ClusterLifecycleHandlerFuncs struct {
// Fired when the cluster becomes available.
ClusterAvailable func(*fedv1b1.KubeFedCluster)
// Fired when the cluster becomes unavailable. The second arg contains data that was present
// in the cluster before deletion.
ClusterUnavailable func(*fedv1b1.KubeFedCluster, []interface{})
}
// Builds a FederatedInformer for the given configuration.
func NewFederatedInformer(
config *ControllerConfig,
client generic.Client,
apiResource *metav1.APIResource,
triggerFunc func(pkgruntime.Object),
clusterLifecycle *ClusterLifecycleHandlerFuncs) (FederatedInformer, error) {
targetInformerFactory := func(cluster *fedv1b1.KubeFedCluster, clusterConfig *restclient.Config) (cache.Store, cache.Controller, error) {
resourceClient, err := NewResourceClient(clusterConfig, apiResource)
if err != nil {
return nil, nil, err
}
targetNamespace := NamespaceForCluster(cluster.Name, config.TargetNamespace)
store, controller := NewManagedResourceInformer(resourceClient, targetNamespace, apiResource, triggerFunc)
return store, controller, nil
}
federatedInformer := &federatedInformerImpl{
targetInformerFactory: targetInformerFactory,
configFactory: func(cluster *fedv1b1.KubeFedCluster) (*restclient.Config, error) {
clusterConfig, err := BuildClusterConfig(cluster, client, config.KubeFedNamespace)
if err != nil {
return nil, err
}
if clusterConfig == nil {
return nil, errors.Errorf("Unable to load configuration for cluster %q", cluster.Name)
}
restclient.AddUserAgent(clusterConfig, userAgentName)
return clusterConfig, nil
},
targetInformers: make(map[string]informer),
fedNamespace: config.KubeFedNamespace,
clusterClients: make(map[string]generic.Client),
}
getClusterData := func(name string) []interface{} {
data, err := federatedInformer.GetTargetStore().ListFromCluster(name)
if err != nil {
klog.Errorf("Failed to list %s content: %v", name, err)
return make([]interface{}, 0)
}
return data
}
var err error
federatedInformer.clusterInformer.store, federatedInformer.clusterInformer.controller, err = NewGenericInformerWithEventHandler(
config.KubeConfig,
config.KubeFedNamespace,
&fedv1b1.KubeFedCluster{},
clusterSyncPeriod,
&cache.ResourceEventHandlerFuncs{
DeleteFunc: func(old interface{}) {
oldCluster, ok := old.(*fedv1b1.KubeFedCluster)
if ok {
var data []interface{}
if clusterLifecycle.ClusterUnavailable != nil {
data = getClusterData(oldCluster.Name)
}
federatedInformer.deleteCluster(oldCluster)
if clusterLifecycle.ClusterUnavailable != nil {
clusterLifecycle.ClusterUnavailable(oldCluster, data)
}
}
},
AddFunc: func(cur interface{}) {
curCluster, ok := cur.(*fedv1b1.KubeFedCluster)
if !ok {
klog.Errorf("Cluster %v/%v not added; incorrect type", curCluster.Namespace, curCluster.Name)
} else if IsClusterReady(&curCluster.Status) {
federatedInformer.addCluster(curCluster)
klog.Infof("Cluster %v/%v is ready", curCluster.Namespace, curCluster.Name)
if clusterLifecycle.ClusterAvailable != nil {
clusterLifecycle.ClusterAvailable(curCluster)
}
} else {
klog.Infof("Cluster %v/%v not added; it is not ready.", curCluster.Namespace, curCluster.Name)
}
},
UpdateFunc: func(old, cur interface{}) {
oldCluster, ok := old.(*fedv1b1.KubeFedCluster)
if !ok {
klog.Errorf("Internal error: Cluster %v not updated. Old cluster not of correct type.", old)
return
}
curCluster, ok := cur.(*fedv1b1.KubeFedCluster)
if !ok {
klog.Errorf("Internal error: Cluster %v not updated. New cluster not of correct type.", cur)
return
}
if IsClusterReady(&oldCluster.Status) != IsClusterReady(&curCluster.Status) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) || !reflect.DeepEqual(oldCluster.ObjectMeta.Labels, curCluster.ObjectMeta.Labels) || !reflect.DeepEqual(oldCluster.ObjectMeta.Annotations, curCluster.ObjectMeta.Annotations) {
var data []interface{}
if clusterLifecycle.ClusterUnavailable != nil {
data = getClusterData(oldCluster.Name)
}
federatedInformer.deleteCluster(oldCluster)
if clusterLifecycle.ClusterUnavailable != nil {
clusterLifecycle.ClusterUnavailable(oldCluster, data)
}
if IsClusterReady(&curCluster.Status) {
federatedInformer.addCluster(curCluster)
if clusterLifecycle.ClusterAvailable != nil {
clusterLifecycle.ClusterAvailable(curCluster)
}
}
} else {
klog.V(7).Infof("Cluster %v not updated to %v as ready status and specs are identical", oldCluster, curCluster)
}
},
},
)
return federatedInformer, err
}
func IsClusterReady(clusterStatus *fedv1b1.KubeFedClusterStatus) bool {
for _, condition := range clusterStatus.Conditions {
if condition.Type == fedcommon.ClusterReady {
if condition.Status == apiv1.ConditionTrue {
return true
}
}
}
return false
}
type informer struct {
controller cache.Controller
store cache.Store
stopChan chan struct{}
}
type federatedInformerImpl struct {
sync.Mutex
// Informer on federated clusters.
clusterInformer informer
// Target informers factory
targetInformerFactory TargetInformerFactory
// Structures returned by targetInformerFactory
targetInformers map[string]informer
// Retrieves configuration to access a cluster.
configFactory func(*fedv1b1.KubeFedCluster) (*restclient.Config, error)
// Caches cluster clients (reduces client discovery and secret retrieval)
clusterClients map[string]generic.Client
// Namespace from which to source KubeFedCluster resources
fedNamespace string
}
// *federatedInformerImpl implements FederatedInformer interface.
var _ FederatedInformer = &federatedInformerImpl{}
type federatedStoreImpl struct {
federatedInformer *federatedInformerImpl
}
func (f *federatedInformerImpl) Stop() {
klog.V(4).Infof("Stopping federated informer.")
f.Lock()
defer f.Unlock()
klog.V(4).Infof("... Closing cluster informer channel.")
close(f.clusterInformer.stopChan)
for key, informer := range f.targetInformers {
klog.V(4).Infof("... Closing informer channel for %q.", key)
close(informer.stopChan)
// Remove each informer after it has been stopped to prevent
// subsequent cluster deletion from attempting to double close
// an informer's stop channel.
delete(f.targetInformers, key)
}
}
func (f *federatedInformerImpl) Start() {
f.Lock()
defer f.Unlock()
f.clusterInformer.stopChan = make(chan struct{})
go f.clusterInformer.controller.Run(f.clusterInformer.stopChan)
}
// GetClientForCluster returns a client for the cluster, if present.
func (f *federatedInformerImpl) GetClientForCluster(clusterName string) (generic.Client, error) {
f.Lock()
defer f.Unlock()
// return cached client if one exists (to prevent frequent secret retrieval and rest discovery)
if client, ok := f.clusterClients[clusterName]; ok {
return client, nil
}
config, err := f.getConfigForClusterUnlocked(clusterName)
if err != nil {
return nil, errors.Wrap(err, "Client creation failed")
}
client, err := generic.New(config)
if err != nil {
return client, err
}
f.clusterClients[clusterName] = client
return client, nil
}
func (f *federatedInformerImpl) getConfigForClusterUnlocked(clusterName string) (*restclient.Config, error) {
// No locking needed. Will happen in f.GetCluster.
klog.V(4).Infof("Getting config for cluster %q", clusterName)
if cluster, found, err := f.getReadyClusterUnlocked(clusterName); found && err == nil {
return f.configFactory(cluster)
} else {
if err != nil {
return nil, err
}
}
return nil, errors.Errorf("cluster %q not found", clusterName)
}
func (f *federatedInformerImpl) GetUnreadyClusters() ([]*fedv1b1.KubeFedCluster, error) {
f.Lock()
defer f.Unlock()
items := f.clusterInformer.store.List()
result := make([]*fedv1b1.KubeFedCluster, 0, len(items))
for _, item := range items {
if cluster, ok := item.(*fedv1b1.KubeFedCluster); ok {
if !IsClusterReady(&cluster.Status) {
result = append(result, cluster)
}
} else {
return nil, errors.Errorf("wrong data in FederatedInformerImpl cluster store: %v", item)
}
}
return result, nil
}
// GetReadyClusters returns all clusters for which the sub-informers are run.
func (f *federatedInformerImpl) GetReadyClusters() ([]*fedv1b1.KubeFedCluster, error) {
return f.getClusters(true)
}
// GetClusters returns all clusters regardless of ready state.
func (f *federatedInformerImpl) GetClusters() ([]*fedv1b1.KubeFedCluster, error) {
return f.getClusters(false)
}
// GetReadyClusters returns only ready clusters if onlyReady is true and all clusters otherwise.
func (f *federatedInformerImpl) getClusters(onlyReady bool) ([]*fedv1b1.KubeFedCluster, error) {
f.Lock()
defer f.Unlock()
items := f.clusterInformer.store.List()
result := make([]*fedv1b1.KubeFedCluster, 0, len(items))
for _, item := range items {
if cluster, ok := item.(*fedv1b1.KubeFedCluster); ok {
if !onlyReady || IsClusterReady(&cluster.Status) {
result = append(result, cluster)
}
} else {
return nil, errors.Errorf("wrong data in FederatedInformerImpl cluster store: %v", item)
}
}
return result, nil
}
// GetCluster returns the cluster with the given name, if found.
func (f *federatedInformerImpl) GetReadyCluster(name string) (*fedv1b1.KubeFedCluster, bool, error) {
f.Lock()
defer f.Unlock()
return f.getReadyClusterUnlocked(name)
}
func (f *federatedInformerImpl) getReadyClusterUnlocked(name string) (*fedv1b1.KubeFedCluster, bool, error) {
key := fmt.Sprintf("%s/%s", f.fedNamespace, name)
if obj, exist, err := f.clusterInformer.store.GetByKey(key); exist && err == nil {
if cluster, ok := obj.(*fedv1b1.KubeFedCluster); ok {
if IsClusterReady(&cluster.Status) {
return cluster, true, nil
}
return nil, false, nil
}
return nil, false, errors.Errorf("wrong data in FederatedInformerImpl cluster store: %v", obj)
} else {
return nil, false, err
}
}
// Synced returns true if the view is synced (for the first time)
func (f *federatedInformerImpl) ClustersSynced() bool {
return f.clusterInformer.controller.HasSynced()
}
// Adds the given cluster to federated informer.
func (f *federatedInformerImpl) addCluster(cluster *fedv1b1.KubeFedCluster) {
f.Lock()
defer f.Unlock()
name := cluster.Name
if config, err := f.getConfigForClusterUnlocked(name); err == nil {
store, controller, err := f.targetInformerFactory(cluster, config)
if err != nil {
// TODO: create also an event for cluster.
klog.Errorf("Failed to create an informer for cluster %q: %v", cluster.Name, err)
return
}
targetInformer := informer{
controller: controller,
store: store,
stopChan: make(chan struct{}),
}
f.targetInformers[name] = targetInformer
go targetInformer.controller.Run(targetInformer.stopChan)
} else {
// TODO: create also an event for cluster.
klog.Errorf("Failed to create a client for cluster: %v", err)
}
}
// Removes the cluster from federated informer.
func (f *federatedInformerImpl) deleteCluster(cluster *fedv1b1.KubeFedCluster) {
f.Lock()
defer f.Unlock()
name := cluster.Name
if targetInformer, found := f.targetInformers[name]; found {
close(targetInformer.stopChan)
}
delete(f.targetInformers, name)
delete(f.clusterClients, name)
}
// Returns a store created over all stores from target informers.
func (f *federatedInformerImpl) GetTargetStore() FederatedReadOnlyStore {
return &federatedStoreImpl{
federatedInformer: f,
}
}
// Returns all items in the store.
func (fs *federatedStoreImpl) List() ([]FederatedObject, error) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()
result := make([]FederatedObject, 0)
for clusterName, targetInformer := range fs.federatedInformer.targetInformers {
for _, value := range targetInformer.store.List() {
result = append(result, FederatedObject{ClusterName: clusterName, Object: value})
}
}
return result, nil
}
// Returns all items in the given cluster.
func (fs *federatedStoreImpl) ListFromCluster(clusterName string) ([]interface{}, error) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()
result := make([]interface{}, 0)
if targetInformer, found := fs.federatedInformer.targetInformers[clusterName]; found {
values := targetInformer.store.List()
result = append(result, values...)
}
return result, nil
}
// GetByKey returns the item stored under the given key in the specified cluster (if exist).
func (fs *federatedStoreImpl) GetByKey(clusterName string, key string) (interface{}, bool, error) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()
if targetInformer, found := fs.federatedInformer.targetInformers[clusterName]; found {
return targetInformer.store.GetByKey(key)
}
return nil, false, nil
}
// Returns the items stored under the given key in all clusters.
func (fs *federatedStoreImpl) GetFromAllClusters(key string) ([]FederatedObject, error) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()
result := make([]FederatedObject, 0)
for clusterName, targetInformer := range fs.federatedInformer.targetInformers {
value, exist, err := targetInformer.store.GetByKey(key)
if err != nil {
return nil, err
}
if exist {
result = append(result, FederatedObject{ClusterName: clusterName, Object: value})
}
}
return result, nil
}
// GetKeyFor returns the key under which the item would be put in the store.
func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string {
// TODO: support other keying functions.
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(item)
return key
}
// Checks whether stores for all clusters form the lists (and only these) are there and
// are synced.
func (fs *federatedStoreImpl) ClustersSynced(clusters []*fedv1b1.KubeFedCluster) bool {
// Get the list of informers to check under a lock and check it outside.
okSoFar, informersToCheck := func() (bool, []informer) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()
if len(fs.federatedInformer.targetInformers) != len(clusters) {
return false, []informer{}
}
informersToCheck := make([]informer, 0, len(clusters))
for _, cluster := range clusters {
if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found {
informersToCheck = append(informersToCheck, targetInformer)
} else {
return false, []informer{}
}
}
return true, informersToCheck
}()
if !okSoFar {
return false
}
for _, informerToCheck := range informersToCheck {
if !informerToCheck.controller.HasSynced() {
return false
}
}
return true
}

View File

@@ -0,0 +1,35 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// FederatedResource is a generic representation of a federated type
type FederatedResource struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
ClusterStatus []ResourceClusterStatus `json:"clusterStatus,omitempty"`
}
// ResourceClusterStatus defines the status of federated resource within a cluster
type ResourceClusterStatus struct {
ClusterName string `json:"clusterName,omitempty"`
Status map[string]interface{} `json:"status,omitempty"`
}

View File

@@ -0,0 +1,86 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"time"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/kubefed/pkg/client/generic/scheme"
)
func NewGenericInformer(config *rest.Config, namespace string, obj pkgruntime.Object, resyncPeriod time.Duration, triggerFunc func(pkgruntime.Object)) (cache.Store, cache.Controller, error) {
return NewGenericInformerWithEventHandler(config, namespace, obj, resyncPeriod, NewTriggerOnAllChanges(triggerFunc))
}
func NewGenericInformerWithEventHandler(config *rest.Config, namespace string, obj pkgruntime.Object, resyncPeriod time.Duration, resourceEventHandlerFuncs *cache.ResourceEventHandlerFuncs) (cache.Store, cache.Controller, error) {
gvk, err := apiutil.GVKForObject(obj, scheme.Scheme)
if err != nil {
return nil, nil, err
}
mapper, err := apiutil.NewDiscoveryRESTMapper(config)
if err != nil {
return nil, nil, errors.Wrap(err, "Could not create RESTMapper from config")
}
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, nil, err
}
client, err := apiutil.RESTClientForGVK(gvk, config, scheme.Codecs)
if err != nil {
return nil, nil, err
}
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
listObj, err := scheme.Scheme.New(listGVK)
if err != nil {
return nil, nil, err
}
store, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (pkgruntime.Object, error) {
res := listObj.DeepCopyObject()
isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, scheme.ParameterCodec).Do().Into(res)
return res, err
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, scheme.ParameterCodec).Watch()
},
},
obj,
resyncPeriod,
resourceEventHandlerFuncs,
)
return store, controller, nil
}

View File

@@ -0,0 +1,52 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"reflect"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
// Returns cache.ResourceEventHandlerFuncs that trigger the given function
// on all object changes.
func NewTriggerOnAllChanges(triggerFunc func(pkgruntime.Object)) *cache.ResourceEventHandlerFuncs {
return &cache.ResourceEventHandlerFuncs{
DeleteFunc: func(old interface{}) {
if deleted, ok := old.(cache.DeletedFinalStateUnknown); ok {
// This object might be stale but ok for our current usage.
old = deleted.Obj
if old == nil {
return
}
}
oldObj := old.(pkgruntime.Object)
triggerFunc(oldObj)
},
AddFunc: func(cur interface{}) {
curObj := cur.(pkgruntime.Object)
triggerFunc(curObj)
},
UpdateFunc: func(old, cur interface{}) {
curObj := cur.(pkgruntime.Object)
if !reflect.DeepEqual(old, cur) {
triggerFunc(curObj)
}
},
}
}

View File

@@ -0,0 +1,69 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
const (
ManagedByKubeFedLabelKey = "kubefed.io/managed"
ManagedByKubeFedLabelValue = "true"
UnmanagedByKubeFedLabelValue = "false"
)
// HasManagedLabel indicates whether the given object has the managed
// label.
func HasManagedLabel(obj *unstructured.Unstructured) bool {
labels := obj.GetLabels()
if labels == nil {
return false
}
return labels[ManagedByKubeFedLabelKey] == ManagedByKubeFedLabelValue
}
// IsExplicitlyUnmanaged indicates whether the given object has the managed
// label with value false.
func IsExplicitlyUnmanaged(obj *unstructured.Unstructured) bool {
labels := obj.GetLabels()
if labels == nil {
return false
}
return labels[ManagedByKubeFedLabelKey] == UnmanagedByKubeFedLabelValue
}
// AddManagedLabel ensures that the given object has the managed
// label.
func AddManagedLabel(obj *unstructured.Unstructured) {
labels := obj.GetLabels()
if labels == nil {
labels = make(map[string]string)
}
labels[ManagedByKubeFedLabelKey] = ManagedByKubeFedLabelValue
obj.SetLabels(labels)
}
// RemoveManagedLabel ensures that the given object does not have the
// managed label.
func RemoveManagedLabel(obj *unstructured.Unstructured) {
labels := obj.GetLabels()
if labels == nil || labels[ManagedByKubeFedLabelKey] != ManagedByKubeFedLabelValue {
return
}
delete(labels, ManagedByKubeFedLabelKey)
obj.SetLabels(labels)
}

139
vendor/sigs.k8s.io/kubefed/pkg/controller/util/meta.go generated vendored Normal file
View File

@@ -0,0 +1,139 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"reflect"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
)
// Copies cluster-independent, user provided data from the given ObjectMeta struct. If in
// the future the ObjectMeta structure is expanded then any field that is not populated
// by the api server should be included here.
func copyObjectMeta(obj metav1.ObjectMeta) metav1.ObjectMeta {
return metav1.ObjectMeta{
Name: obj.Name,
Namespace: obj.Namespace,
Labels: obj.Labels,
Annotations: obj.Annotations,
ResourceVersion: obj.ResourceVersion,
}
}
// Deep copies cluster-independent, user provided data from the given ObjectMeta struct. If in
// the future the ObjectMeta structure is expanded then any field that is not populated
// by the api server should be included here.
func DeepCopyRelevantObjectMeta(obj metav1.ObjectMeta) metav1.ObjectMeta {
copyMeta := copyObjectMeta(obj)
if obj.Labels != nil {
copyMeta.Labels = make(map[string]string)
for key, val := range obj.Labels {
copyMeta.Labels[key] = val
}
}
if obj.Annotations != nil {
copyMeta.Annotations = make(map[string]string)
for key, val := range obj.Annotations {
copyMeta.Annotations[key] = val
}
}
return copyMeta
}
// Checks if cluster-independent, user provided data in two given ObjectMeta are equal. If in
// the future the ObjectMeta structure is expanded then any field that is not populated
// by the api server should be included here.
func ObjectMetaEquivalent(a, b metav1.ObjectMeta) bool {
if a.Name != b.Name {
return false
}
if a.Namespace != b.Namespace {
return false
}
if !reflect.DeepEqual(a.Labels, b.Labels) && (len(a.Labels) != 0 || len(b.Labels) != 0) {
return false
}
if !reflect.DeepEqual(a.Annotations, b.Annotations) && (len(a.Annotations) != 0 || len(b.Annotations) != 0) {
return false
}
return true
}
// Checks if cluster-independent, user provided data in two given ObjectMeta are equal. If in
// the future the ObjectMeta structure is expanded then any field that is not populated
// by the api server should be included here.
func ObjectMetaObjEquivalent(a, b metav1.Object) bool {
if a.GetName() != b.GetName() {
return false
}
if a.GetNamespace() != b.GetNamespace() {
return false
}
aLabels := a.GetLabels()
bLabels := b.GetLabels()
if !reflect.DeepEqual(aLabels, bLabels) && (len(aLabels) != 0 || len(bLabels) != 0) {
return false
}
aAnnotations := a.GetAnnotations()
bAnnotations := b.GetAnnotations()
if !reflect.DeepEqual(aAnnotations, bAnnotations) && (len(aAnnotations) != 0 || len(bAnnotations) != 0) {
return false
}
return true
}
// Checks if cluster-independent, user provided data in ObjectMeta and Spec in two given top
// level api objects are equivalent.
func ObjectMetaAndSpecEquivalent(a, b runtime.Object) bool {
objectMetaA := reflect.ValueOf(a).Elem().FieldByName("ObjectMeta").Interface().(metav1.ObjectMeta)
objectMetaB := reflect.ValueOf(b).Elem().FieldByName("ObjectMeta").Interface().(metav1.ObjectMeta)
specA := reflect.ValueOf(a).Elem().FieldByName("Spec").Interface()
specB := reflect.ValueOf(b).Elem().FieldByName("Spec").Interface()
return ObjectMetaEquivalent(objectMetaA, objectMetaB) && reflect.DeepEqual(specA, specB)
}
func MetaAccessor(obj pkgruntime.Object) metav1.Object {
accessor, err := meta.Accessor(obj)
if err != nil {
// This should always succeed if obj is not nil. Also,
// adapters are slated for replacement by unstructured.
return nil
}
return accessor
}
// GetUnstructured return Unstructured for any given kubernetes type
func GetUnstructured(resource interface{}) (*unstructured.Unstructured, error) {
content, err := json.Marshal(resource)
if err != nil {
return nil, errors.Wrap(err, "Failed to JSON Marshal")
}
unstructuredResource := &unstructured.Unstructured{}
err = unstructuredResource.UnmarshalJSON(content)
if err != nil {
return nil, errors.Wrap(err, "Failed to UnmarshalJSON into unstructured content")
}
return unstructuredResource, nil
}

View File

@@ -0,0 +1,47 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
// The functions in this file are exposed as variables to allow them
// to be overridden for testing purposes. Simulated scale testing
// requires being able to change the namespace of target resources
// (NamespaceForCluster and QualifiedNameForCluster) and ensure that
// the namespace of a federated resource will always be the kubefed
// system namespace (NamespaceForResource).
func namespaceForCluster(clusterName, namespace string) string {
return namespace
}
// NamespaceForCluster returns the namespace to use for the given cluster.
var NamespaceForCluster = namespaceForCluster
func namespaceForResource(resourceNamespace, fedNamespace string) string {
return resourceNamespace
}
// NamespaceForResource returns either the kubefed namespace or
// resource namespace.
var NamespaceForResource = namespaceForResource
func qualifiedNameForCluster(clusterName string, qualifiedName QualifiedName) QualifiedName {
return qualifiedName
}
// QualifiedNameForCluster returns the qualified name to use for the
// given cluster.
var QualifiedNameForCluster = qualifiedNameForCluster

View File

@@ -0,0 +1,58 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
const (
// If this annotation is present on a federated resource, resources in the
// member clusters managed by the federated resource should be orphaned.
// If the annotation is not present (the default), resources in member
// clusters will be deleted before the federated resource is deleted.
OrphanManagedResourcesAnnotation = "kubefed.io/orphan"
OrphanedManagedResourcesValue = "true"
)
// IsOrphaningEnabled checks status of "orphaning enable" (OrphanManagedResources: OrphanedManagedResourceslValue')
// annotation on a resource.
func IsOrphaningEnabled(obj *unstructured.Unstructured) bool {
annotations := obj.GetAnnotations()
if annotations == nil {
return false
}
return annotations[OrphanManagedResourcesAnnotation] == OrphanedManagedResourcesValue
}
// Enables the orphaning mode
func EnableOrphaning(obj *unstructured.Unstructured) {
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[OrphanManagedResourcesAnnotation] = OrphanedManagedResourcesValue
obj.SetAnnotations(annotations)
}
// Disables the orphaning mode
func DisableOrphaning(obj *unstructured.Unstructured) {
annotations := obj.GetAnnotations()
if annotations == nil {
return
}
delete(annotations, OrphanManagedResourcesAnnotation)
obj.SetAnnotations(annotations)
}

View File

@@ -0,0 +1,192 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"github.com/evanphx/json-patch"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/sets"
)
type ClusterOverride struct {
Op string `json:"op,omitempty"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}
type GenericOverrideItem struct {
ClusterName string `json:"clusterName"`
ClusterOverrides []ClusterOverride `json:"clusterOverrides,omitempty"`
}
type GenericOverrideSpec struct {
Overrides []GenericOverrideItem `json:"overrides,omitempty"`
}
type GenericOverride struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec *GenericOverrideSpec `json:"spec,omitempty"`
}
// Namespace and name may not be overridden since these fields are the
// primary mechanism of association between a federated resource in
// the host cluster and the target resources in the member clusters.
//
// Kind should always be sourced from the FTC and not vary across
// member clusters.
//
// apiVersion can be overridden to support managing resources like
// Ingress which can exist in different groups at different
// versions. Users will need to take care not to abuse this
// capability.
var invalidPaths = sets.NewString(
"/metadata/namespace",
"/metadata/name",
"/metadata/generateName",
"/kind",
)
// Slice of ClusterOverride
type ClusterOverrides []ClusterOverride
// Mapping of clusterName to overrides for the cluster
type OverridesMap map[string]ClusterOverrides
// ToUnstructuredSlice converts the map of overrides to a slice of
// interfaces that can be set in an unstructured object.
func (m OverridesMap) ToUnstructuredSlice() []interface{} {
overrides := []interface{}{}
for clusterName, clusterOverrides := range m {
overridesItem := map[string]interface{}{
ClusterNameField: clusterName,
ClusterOverridesField: clusterOverrides,
}
overrides = append(overrides, overridesItem)
}
return overrides
}
// GetOverrides returns a map of overrides populated from the given
// unstructured object.
func GetOverrides(rawObj *unstructured.Unstructured) (OverridesMap, error) {
overridesMap := make(OverridesMap)
if rawObj == nil {
return overridesMap, nil
}
genericFedObject := GenericOverride{}
err := UnstructuredToInterface(rawObj, &genericFedObject)
if err != nil {
return nil, err
}
if genericFedObject.Spec == nil || genericFedObject.Spec.Overrides == nil {
// No overrides defined for the federated type
return overridesMap, nil
}
for _, overrideItem := range genericFedObject.Spec.Overrides {
clusterName := overrideItem.ClusterName
if _, ok := overridesMap[clusterName]; ok {
return nil, errors.Errorf("cluster %q appears more than once", clusterName)
}
clusterOverrides := overrideItem.ClusterOverrides
paths := sets.NewString()
for i, clusterOverride := range clusterOverrides {
path := clusterOverride.Path
if invalidPaths.Has(path) {
return nil, errors.Errorf("override[%d] for cluster %q has an invalid path: %s", i, clusterName, path)
}
if paths.Has(path) {
return nil, errors.Errorf("path %q appears more than once for cluster %q", path, clusterName)
}
paths.Insert(path)
}
overridesMap[clusterName] = clusterOverrides
}
return overridesMap, nil
}
// SetOverrides sets the spec.overrides field of the unstructured
// object from the provided overrides map.
func SetOverrides(fedObject *unstructured.Unstructured, overridesMap OverridesMap) error {
rawSpec := fedObject.Object[SpecField]
if rawSpec == nil {
rawSpec = map[string]interface{}{}
fedObject.Object[SpecField] = rawSpec
}
spec, ok := rawSpec.(map[string]interface{})
if !ok {
return errors.Errorf("Unable to set overrides since %q is not an object: %T", SpecField, rawSpec)
}
spec[OverridesField] = overridesMap.ToUnstructuredSlice()
return nil
}
// UnstructuredToInterface converts an unstructured object to the
// provided interface by json marshalling/unmarshalling.
func UnstructuredToInterface(rawObj *unstructured.Unstructured, obj interface{}) error {
content, err := rawObj.MarshalJSON()
if err != nil {
return err
}
return json.Unmarshal(content, obj)
}
// ApplyJsonPatch applies the override on to the given unstructured object.
func ApplyJsonPatch(obj *unstructured.Unstructured, overrides ClusterOverrides) error {
// TODO: Do the defaulting of "op" field to "replace" in API defaulting
for i, overrideItem := range overrides {
if overrideItem.Op == "" {
overrides[i].Op = "replace"
}
}
jsonPatchBytes, err := json.Marshal(overrides)
if err != nil {
return err
}
patch, err := jsonpatch.DecodePatch(jsonPatchBytes)
if err != nil {
return err
}
ObjectJSONBytes, err := obj.MarshalJSON()
if err != nil {
return err
}
patchedObjectJSONBytes, err := patch.Apply(ObjectJSONBytes)
if err != nil {
return err
}
err = obj.UnmarshalJSON(patchedObjectJSONBytes)
return err
}

View File

@@ -0,0 +1,88 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
)
type GenericClusterReference struct {
Name string `json:"name"`
}
type GenericPlacementFields struct {
Clusters []GenericClusterReference `json:"clusters,omitempty"`
ClusterSelector *metav1.LabelSelector `json:"clusterSelector,omitempty"`
}
type GenericPlacementSpec struct {
Placement GenericPlacementFields `json:"placement,omitempty"`
}
type GenericPlacement struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec GenericPlacementSpec `json:"spec,omitempty"`
}
func UnmarshalGenericPlacement(obj *unstructured.Unstructured) (*GenericPlacement, error) {
placement := &GenericPlacement{}
err := UnstructuredToInterface(obj, placement)
if err != nil {
return nil, err
}
return placement, nil
}
func (p *GenericPlacement) ClusterNames() []string {
if p.Spec.Placement.Clusters == nil {
return nil
}
clusterNames := []string{}
for _, cluster := range p.Spec.Placement.Clusters {
clusterNames = append(clusterNames, cluster.Name)
}
return clusterNames
}
func (p *GenericPlacement) ClusterSelector() (labels.Selector, error) {
return metav1.LabelSelectorAsSelector(p.Spec.Placement.ClusterSelector)
}
func GetClusterNames(obj *unstructured.Unstructured) ([]string, error) {
placement, err := UnmarshalGenericPlacement(obj)
if err != nil {
return nil, err
}
return placement.ClusterNames(), nil
}
func SetClusterNames(obj *unstructured.Unstructured, clusterNames []string) error {
var clusters []interface{}
if clusterNames != nil {
clusters = []interface{}{}
for _, clusterName := range clusterNames {
clusters = append(clusters, map[string]interface{}{
NameField: clusterName,
})
}
}
return unstructured.SetNestedSlice(obj.Object, clusters, SpecField, PlacementField, ClustersField)
}

View File

@@ -0,0 +1,76 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"reflect"
"sort"
"strings"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
fedv1a1 "sigs.k8s.io/kubefed/pkg/apis/core/v1alpha1"
)
const (
generationPrefix = "gen:"
resourceVersionPrefix = "rv:"
)
// ObjectVersion retrieves the field type-prefixed value used for
// determining currency of the given cluster object.
func ObjectVersion(clusterObj *unstructured.Unstructured) string {
generation := clusterObj.GetGeneration()
if generation != 0 {
return fmt.Sprintf("%s%d", generationPrefix, generation)
}
return fmt.Sprintf("%s%s", resourceVersionPrefix, clusterObj.GetResourceVersion())
}
// ObjectNeedsUpdate determines whether the 2 objects provided cluster
// object needs to be updated according to the desired object and the
// recorded version.
func ObjectNeedsUpdate(desiredObj, clusterObj *unstructured.Unstructured, recordedVersion string) bool {
targetVersion := ObjectVersion(clusterObj)
if recordedVersion != targetVersion {
return true
}
// If versions match and the version is sourced from the
// generation field, a further check of metadata equivalency is
// required.
return strings.HasPrefix(targetVersion, generationPrefix) && !ObjectMetaObjEquivalent(desiredObj, clusterObj)
}
// SortClusterVersions ASCII sorts the given cluster versions slice
// based on cluster name.
func SortClusterVersions(versions []fedv1a1.ClusterObjectVersion) {
sort.Slice(versions, func(i, j int) bool {
return versions[i].ClusterName < versions[j].ClusterName
})
}
// PropagatedVersionStatusEquivalent returns true if both statuses are equal by
// comparing Template and Override version, and their ClusterVersion slices;
// false otherwise.
func PropagatedVersionStatusEquivalent(pvs1, pvs2 *fedv1a1.PropagatedVersionStatus) bool {
return pvs1.TemplateVersion == pvs2.TemplateVersion &&
pvs1.OverrideVersion == pvs2.OverrideVersion &&
reflect.DeepEqual(pvs1.ClusterVersions, pvs2.ClusterVersions)
}

View File

@@ -0,0 +1,57 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
meta "k8s.io/apimachinery/pkg/api/meta"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
)
// QualifiedName comprises a resource name with an optional namespace.
// If namespace is provided, a QualifiedName will be rendered as
// "<namespace>/<name>". If not, it will be rendered as "name". This
// is intended to allow the FederatedTypeAdapter interface and its
// consumers to operate on both namespaces and namespace-qualified
// resources.
type QualifiedName struct {
Namespace string
Name string
}
func NewQualifiedName(obj pkgruntime.Object) QualifiedName {
accessor, err := meta.Accessor(obj)
if err != nil {
// TODO(marun) This should never happen, but if it does, the
// resulting empty name.
return QualifiedName{}
}
return QualifiedName{
Namespace: accessor.GetNamespace(),
Name: accessor.GetName(),
}
}
// String returns the general purpose string representation
func (n QualifiedName) String() string {
if len(n.Namespace) == 0 {
return n.Name
}
return fmt.Sprintf("%s/%s", n.Namespace, n.Name)
}

View File

@@ -0,0 +1,70 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
type ResourceClient interface {
Resources(namespace string) dynamic.ResourceInterface
Kind() string
}
type resourceClient struct {
client dynamic.Interface
apiResource schema.GroupVersionResource
namespaced bool
kind string
}
func NewResourceClient(config *rest.Config, apiResource *metav1.APIResource) (ResourceClient, error) {
resource := schema.GroupVersionResource{
Group: apiResource.Group,
Version: apiResource.Version,
Resource: apiResource.Name,
}
client, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
return &resourceClient{
client: client,
apiResource: resource,
namespaced: apiResource.Namespaced,
kind: apiResource.Kind,
}, nil
}
func (c *resourceClient) Resources(namespace string) dynamic.ResourceInterface {
// TODO(marun) Consider returning Interface instead of
// ResourceInterface to allow callers to decide if they want to
// invoke Namespace(). Either that, or replace the use of
// ResourceClient with the controller-runtime generic client.
if c.namespaced {
return c.client.Resource(c.apiResource).Namespace(namespace)
}
return c.client.Resource(c.apiResource)
}
func (c *resourceClient) Kind() string {
return c.kind
}

View File

@@ -0,0 +1,90 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
// NewResourceInformer returns an unfiltered informer.
func NewResourceInformer(client ResourceClient, namespace string, apiResource *metav1.APIResource, triggerFunc func(pkgruntime.Object)) (cache.Store, cache.Controller) {
return newResourceInformer(client, namespace, apiResource, triggerFunc, "")
}
// NewManagedResourceInformer returns an informer limited to resources
// managed by KubeFed as indicated by labeling.
func NewManagedResourceInformer(client ResourceClient, namespace string, apiResource *metav1.APIResource, triggerFunc func(pkgruntime.Object)) (cache.Store, cache.Controller) {
labelSelector := labels.Set(map[string]string{ManagedByKubeFedLabelKey: ManagedByKubeFedLabelValue}).AsSelector().String()
return newResourceInformer(client, namespace, apiResource, triggerFunc, labelSelector)
}
func newResourceInformer(client ResourceClient, namespace string, apiResource *metav1.APIResource, triggerFunc func(pkgruntime.Object), labelSelector string) (cache.Store, cache.Controller) {
obj := &unstructured.Unstructured{}
if apiResource != nil {
gvk := schema.GroupVersionKind{Group: apiResource.Group, Version: apiResource.Version, Kind: apiResource.Kind}
obj.SetGroupVersionKind(gvk)
}
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
options.LabelSelector = labelSelector
return client.Resources(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelSelector
return client.Resources(namespace).Watch(options)
},
},
obj, // use an unstructured type with apiVersion / kind populated for informer logging purposes
NoResyncPeriod,
NewTriggerOnAllChanges(triggerFunc),
)
}
func ObjFromCache(store cache.Store, kind, key string) (*unstructured.Unstructured, error) {
obj, err := rawObjFromCache(store, kind, key)
if err != nil {
return nil, err
}
if obj == nil {
return nil, nil
}
return obj.(*unstructured.Unstructured), nil
}
func rawObjFromCache(store cache.Store, kind, key string) (pkgruntime.Object, error) {
cachedObj, exist, err := store.GetByKey(key)
if err != nil {
wrappedErr := errors.Wrapf(err, "Failed to query %s store for %q", kind, key)
runtime.HandleError(wrappedErr)
return nil, err
}
if !exist {
return nil, nil
}
return cachedObj.(pkgruntime.Object).DeepCopyObject(), nil
}

View File

@@ -0,0 +1,75 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"sync"
)
type SafeMap struct {
sync.RWMutex
m map[string]interface{}
}
func NewSafeMap() *SafeMap {
return &SafeMap{
m: make(map[string]interface{}),
}
}
func (s *SafeMap) Store(key string, value interface{}) {
s.Lock()
defer s.Unlock()
s.m[key] = value
}
func (s *SafeMap) Get(key string) (interface{}, bool) {
s.RLock()
defer s.RUnlock()
value, ok := s.m[key]
return value, ok
}
func (s *SafeMap) GetAll() []interface{} {
s.RLock()
defer s.RUnlock()
vals := []interface{}{}
for _, val := range s.m {
vals = append(vals, val)
}
return vals
}
func (s *SafeMap) Delete(key string) {
s.Lock()
defer s.Unlock()
delete(s.m, key)
}
func (s *SafeMap) DeleteAll() {
s.Lock()
defer s.Unlock()
for key := range s.m {
delete(s.m, key)
}
}
func (s *SafeMap) Size() int {
s.Lock()
defer s.Unlock()
return len(s.m)
}

View File

@@ -0,0 +1,170 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"time"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
)
type ReconcileFunc func(qualifiedName QualifiedName) ReconciliationStatus
type ReconcileWorker interface {
Enqueue(qualifiedName QualifiedName)
EnqueueForClusterSync(qualifiedName QualifiedName)
EnqueueForError(qualifiedName QualifiedName)
EnqueueForRetry(qualifiedName QualifiedName)
EnqueueObject(obj pkgruntime.Object)
EnqueueWithDelay(qualifiedName QualifiedName, delay time.Duration)
Run(stopChan <-chan struct{})
SetDelay(retryDelay, clusterSyncDelay time.Duration)
}
type WorkerTiming struct {
Interval time.Duration
RetryDelay time.Duration
ClusterSyncDelay time.Duration
InitialBackoff time.Duration
MaxBackoff time.Duration
}
type asyncWorker struct {
reconcile ReconcileFunc
timing WorkerTiming
// For triggering reconciliation of a single resource. This is
// used when there is an add/update/delete operation on a resource
// in either the API of the cluster hosting KubeFed or in the API
// of a member cluster.
deliverer *DelayingDeliverer
// Work queue allowing parallel processing of resources
queue workqueue.Interface
// Backoff manager
backoff *flowcontrol.Backoff
}
func NewReconcileWorker(reconcile ReconcileFunc, timing WorkerTiming) ReconcileWorker {
if timing.Interval == 0 {
timing.Interval = time.Second * 1
}
if timing.RetryDelay == 0 {
timing.RetryDelay = time.Second * 10
}
if timing.InitialBackoff == 0 {
timing.InitialBackoff = time.Second * 5
}
if timing.MaxBackoff == 0 {
timing.MaxBackoff = time.Minute
}
return &asyncWorker{
reconcile: reconcile,
timing: timing,
deliverer: NewDelayingDeliverer(),
queue: workqueue.New(),
backoff: flowcontrol.NewBackOff(timing.InitialBackoff, timing.MaxBackoff),
}
}
func (w *asyncWorker) Enqueue(qualifiedName QualifiedName) {
w.deliver(qualifiedName, 0, false)
}
func (w *asyncWorker) EnqueueForError(qualifiedName QualifiedName) {
w.deliver(qualifiedName, 0, true)
}
func (w *asyncWorker) EnqueueForRetry(qualifiedName QualifiedName) {
w.deliver(qualifiedName, w.timing.RetryDelay, false)
}
func (w *asyncWorker) EnqueueForClusterSync(qualifiedName QualifiedName) {
w.deliver(qualifiedName, w.timing.ClusterSyncDelay, false)
}
func (w *asyncWorker) EnqueueObject(obj pkgruntime.Object) {
qualifiedName := NewQualifiedName(obj)
w.Enqueue(qualifiedName)
}
func (w *asyncWorker) EnqueueWithDelay(qualifiedName QualifiedName, delay time.Duration) {
w.deliver(qualifiedName, delay, false)
}
func (w *asyncWorker) Run(stopChan <-chan struct{}) {
StartBackoffGC(w.backoff, stopChan)
w.deliverer.StartWithHandler(func(item *DelayingDelivererItem) {
w.queue.Add(item)
})
go wait.Until(w.worker, w.timing.Interval, stopChan)
// Ensure all goroutines are cleaned up when the stop channel closes
go func() {
<-stopChan
w.queue.ShutDown()
w.deliverer.Stop()
}()
}
func (w *asyncWorker) SetDelay(retryDelay, clusterSyncDelay time.Duration) {
w.timing.RetryDelay = retryDelay
w.timing.ClusterSyncDelay = clusterSyncDelay
}
// deliver adds backoff to delay if this delivery is related to some
// failure. Resets backoff if there was no failure.
func (w *asyncWorker) deliver(qualifiedName QualifiedName, delay time.Duration, failed bool) {
key := qualifiedName.String()
if failed {
w.backoff.Next(key, time.Now())
delay = delay + w.backoff.Get(key)
} else {
w.backoff.Reset(key)
}
w.deliverer.DeliverAfter(key, &qualifiedName, delay)
}
func (w *asyncWorker) worker() {
for {
obj, quit := w.queue.Get()
if quit {
return
}
item := obj.(*DelayingDelivererItem)
qualifiedName := item.Value.(*QualifiedName)
status := w.reconcile(*qualifiedName)
w.queue.Done(item)
switch status {
case StatusAllOK:
break
case StatusError:
w.EnqueueForError(*qualifiedName)
case StatusNeedsRecheck:
w.EnqueueForRetry(*qualifiedName)
case StatusNotSynced:
w.EnqueueForClusterSync(*qualifiedName)
}
}
}