update modules kustomize add helm

Signed-off-by: LiHui <andrewli@yunify.com>
This commit is contained in:
LiHui
2021-04-14 14:06:59 +08:00
parent e587887aac
commit ce4cfbee51
98 changed files with 108932 additions and 14 deletions

View File

@@ -0,0 +1,598 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/v3/pkg/kube"
import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
jsonpatch "github.com/evanphx/json-patch"
"github.com/pkg/errors"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes/scheme"
cachetools "k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)
// ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found.
var ErrNoObjectsVisited = errors.New("no objects visited")
var metadataAccessor = meta.NewAccessor()
// Client represents a client capable of communicating with the Kubernetes API.
type Client struct {
Factory Factory
Log func(string, ...interface{})
// Namespace allows to bypass the kubeconfig file for the choice of the namespace
Namespace string
}
var addToScheme sync.Once
// New creates a new Client.
func New(getter genericclioptions.RESTClientGetter) *Client {
if getter == nil {
getter = genericclioptions.NewConfigFlags(true)
}
// Add CRDs to the scheme. They are missing by default.
addToScheme.Do(func() {
if err := apiextv1.AddToScheme(scheme.Scheme); err != nil {
// This should never happen.
panic(err)
}
if err := apiextv1beta1.AddToScheme(scheme.Scheme); err != nil {
panic(err)
}
})
return &Client{
Factory: cmdutil.NewFactory(getter),
Log: nopLogger,
}
}
var nopLogger = func(_ string, _ ...interface{}) {}
// IsReachable tests connectivity to the cluster
func (c *Client) IsReachable() error {
client, err := c.Factory.KubernetesClientSet()
if err == genericclioptions.ErrEmptyConfig {
// re-replace kubernetes ErrEmptyConfig error with a friendy error
// moar workarounds for Kubernetes API breaking.
return errors.New("Kubernetes cluster unreachable")
}
if err != nil {
return errors.Wrap(err, "Kubernetes cluster unreachable")
}
if _, err := client.ServerVersion(); err != nil {
return errors.Wrap(err, "Kubernetes cluster unreachable")
}
return nil
}
// Create creates Kubernetes resources specified in the resource list.
func (c *Client) Create(resources ResourceList) (*Result, error) {
c.Log("creating %d resource(s)", len(resources))
if err := perform(resources, createResource); err != nil {
return nil, err
}
return &Result{Created: resources}, nil
}
// Wait up to the given timeout for the specified resources to be ready
func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
cs, err := c.Factory.KubernetesClientSet()
if err != nil {
return err
}
w := waiter{
c: cs,
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources)
}
func (c *Client) namespace() string {
if c.Namespace != "" {
return c.Namespace
}
if ns, _, err := c.Factory.ToRawKubeConfigLoader().Namespace(); err == nil {
return ns
}
return v1.NamespaceDefault
}
// newBuilder returns a new resource builder for structured api objects.
func (c *Client) newBuilder() *resource.Builder {
return c.Factory.NewBuilder().
ContinueOnError().
NamespaceParam(c.namespace()).
DefaultNamespace().
Flatten()
}
// Build validates for Kubernetes objects and returns unstructured infos.
func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) {
schema, err := c.Factory.Validator(validate)
if err != nil {
return nil, err
}
result, err := c.newBuilder().
Unstructured().
Schema(schema).
Stream(reader, "").
Do().Infos()
return result, scrubValidationError(err)
}
// Update takes the current list of objects and target list of objects and
// creates resources that don't already exists, updates resources that have been
// modified in the target configuration, and deletes resources from the current
// configuration that are not present in the target configuration. If an error
// occurs, a Result will still be returned with the error, containing all
// resource updates, creations, and deletions that were attempted. These can be
// used for cleanup or other logging purposes.
func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) {
updateErrors := []string{}
res := &Result{}
c.Log("checking %d resources for changes", len(target))
err := target.Visit(func(info *resource.Info, err error) error {
if err != nil {
return err
}
helper := resource.NewHelper(info.Client, info.Mapping)
if _, err := helper.Get(info.Namespace, info.Name, info.Export); err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrap(err, "could not get information about the resource")
}
// Append the created resource to the results, even if something fails
res.Created = append(res.Created, info)
// Since the resource does not exist, create it.
if err := createResource(info); err != nil {
return errors.Wrap(err, "failed to create resource")
}
kind := info.Mapping.GroupVersionKind.Kind
c.Log("Created a new %s called %q in %s\n", kind, info.Name, info.Namespace)
return nil
}
originalInfo := original.Get(info)
if originalInfo == nil {
kind := info.Mapping.GroupVersionKind.Kind
return errors.Errorf("no %s with the name %q found", kind, info.Name)
}
if err := updateResource(c, info, originalInfo.Object, force); err != nil {
c.Log("error updating the resource %q:\n\t %v", info.Name, err)
updateErrors = append(updateErrors, err.Error())
}
// Because we check for errors later, append the info regardless
res.Updated = append(res.Updated, info)
return nil
})
switch {
case err != nil:
return res, err
case len(updateErrors) != 0:
return res, errors.Errorf(strings.Join(updateErrors, " && "))
}
for _, info := range original.Difference(target) {
c.Log("Deleting %q in %s...", info.Name, info.Namespace)
if err := info.Get(); err != nil {
c.Log("Unable to get obj %q, err: %s", info.Name, err)
continue
}
annotations, err := metadataAccessor.Annotations(info.Object)
if err != nil {
c.Log("Unable to get annotations on %q, err: %s", info.Name, err)
}
if annotations != nil && annotations[ResourcePolicyAnno] == KeepPolicy {
c.Log("Skipping delete of %q due to annotation [%s=%s]", info.Name, ResourcePolicyAnno, KeepPolicy)
continue
}
if err := deleteResource(info); err != nil {
c.Log("Failed to delete %q, err: %s", info.ObjectName(), err)
continue
}
res.Deleted = append(res.Deleted, info)
}
return res, nil
}
// Delete deletes Kubernetes resources specified in the resources list. It will
// attempt to delete all resources even if one or more fail and collect any
// errors. All successfully deleted items will be returned in the `Deleted`
// ResourceList that is part of the result.
func (c *Client) Delete(resources ResourceList) (*Result, []error) {
var errs []error
res := &Result{}
mtx := sync.Mutex{}
err := perform(resources, func(info *resource.Info) error {
c.Log("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind)
if err := c.skipIfNotFound(deleteResource(info)); err != nil {
mtx.Lock()
defer mtx.Unlock()
// Collect the error and continue on
errs = append(errs, err)
} else {
mtx.Lock()
defer mtx.Unlock()
res.Deleted = append(res.Deleted, info)
}
return nil
})
if err != nil {
// Rewrite the message from "no objects visited" if that is what we got
// back
if err == ErrNoObjectsVisited {
err = errors.New("object not found, skipping delete")
}
errs = append(errs, err)
}
if errs != nil {
return nil, errs
}
return res, nil
}
func (c *Client) skipIfNotFound(err error) error {
if apierrors.IsNotFound(err) {
c.Log("%v", err)
return nil
}
return err
}
func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
return func(info *resource.Info) error {
return c.watchUntilReady(t, info)
}
}
// WatchUntilReady watches the resources given and waits until it is ready.
//
// This function is mainly for hook implementations. It watches for a resource to
// hit a particular milestone. The milestone depends on the Kind.
//
// For most kinds, it checks to see if the resource is marked as Added or Modified
// by the Kubernetes event stream. For some kinds, it does more:
//
// - Jobs: A job is marked "Ready" when it has successfully completed. This is
// ascertained by watching the Status fields in a job's output.
// - Pods: A pod is marked "Ready" when it has successfully completed. This is
// ascertained by watching the status.phase field in a pod's output.
//
// Handling for other kinds will be added as necessary.
func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error {
// For jobs, there's also the option to do poll c.Jobs(namespace).Get():
// https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300
return perform(resources, c.watchTimeout(timeout))
}
func perform(infos ResourceList, fn func(*resource.Info) error) error {
if len(infos) == 0 {
return ErrNoObjectsVisited
}
errs := make(chan error)
go batchPerform(infos, fn, errs)
for range infos {
err := <-errs
if err != nil {
return err
}
}
return nil
}
func batchPerform(infos ResourceList, fn func(*resource.Info) error, errs chan<- error) {
var kind string
var wg sync.WaitGroup
for _, info := range infos {
currentKind := info.Object.GetObjectKind().GroupVersionKind().Kind
if kind != currentKind {
wg.Wait()
kind = currentKind
}
wg.Add(1)
go func(i *resource.Info) {
errs <- fn(i)
wg.Done()
}(info)
}
}
func createResource(info *resource.Info) error {
obj, err := resource.NewHelper(info.Client, info.Mapping).Create(info.Namespace, true, info.Object)
if err != nil {
return err
}
return info.Refresh(obj, true)
}
func deleteResource(info *resource.Info) error {
policy := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &policy}
_, err := resource.NewHelper(info.Client, info.Mapping).DeleteWithOptions(info.Namespace, info.Name, opts)
return err
}
func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.PatchType, error) {
oldData, err := json.Marshal(current)
if err != nil {
return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing current configuration")
}
newData, err := json.Marshal(target.Object)
if err != nil {
return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing target configuration")
}
// Fetch the current object for the three way merge
helper := resource.NewHelper(target.Client, target.Mapping)
currentObj, err := helper.Get(target.Namespace, target.Name, target.Export)
if err != nil && !apierrors.IsNotFound(err) {
return nil, types.StrategicMergePatchType, errors.Wrapf(err, "unable to get data for current object %s/%s", target.Namespace, target.Name)
}
// Even if currentObj is nil (because it was not found), it will marshal just fine
currentData, err := json.Marshal(currentObj)
if err != nil {
return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing live configuration")
}
// Get a versioned object
versionedObject := AsVersioned(target)
// Unstructured objects, such as CRDs, may not have an not registered error
// returned from ConvertToVersion. Anything that's unstructured should
// use the jsonpatch.CreateMergePatch. Strategic Merge Patch is not supported
// on objects like CRDs.
_, isUnstructured := versionedObject.(runtime.Unstructured)
// On newer K8s versions, CRDs aren't unstructured but has this dedicated type
_, isCRD := versionedObject.(*apiextv1beta1.CustomResourceDefinition)
if isUnstructured || isCRD {
// fall back to generic JSON merge patch
patch, err := jsonpatch.CreateMergePatch(oldData, newData)
return patch, types.MergePatchType, err
}
patchMeta, err := strategicpatch.NewPatchMetaFromStruct(versionedObject)
if err != nil {
return nil, types.StrategicMergePatchType, errors.Wrap(err, "unable to create patch metadata from object")
}
patch, err := strategicpatch.CreateThreeWayMergePatch(oldData, newData, currentData, patchMeta, true)
return patch, types.StrategicMergePatchType, err
}
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error {
var (
obj runtime.Object
helper = resource.NewHelper(target.Client, target.Mapping)
kind = target.Mapping.GroupVersionKind.Kind
)
// if --force is applied, attempt to replace the existing resource with the new object.
if force {
var err error
obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object)
if err != nil {
return errors.Wrap(err, "failed to replace object")
}
c.Log("Replaced %q with kind %s for kind %s", target.Name, currentObj.GetObjectKind().GroupVersionKind().Kind, kind)
} else {
patch, patchType, err := createPatch(target, currentObj)
if err != nil {
return errors.Wrap(err, "failed to create patch")
}
if patch == nil || string(patch) == "{}" {
c.Log("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name)
// This needs to happen to make sure that Helm has the latest info from the API
// Otherwise there will be no labels and other functions that use labels will panic
if err := target.Get(); err != nil {
return errors.Wrap(err, "failed to refresh resource information")
}
return nil
}
// send patch to server
obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil)
if err != nil {
return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, kind)
}
}
target.Refresh(obj, true)
return nil
}
func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error {
kind := info.Mapping.GroupVersionKind.Kind
switch kind {
case "Job", "Pod":
default:
return nil
}
c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout)
// Use a selector on the name of the resource. This should be unique for the
// given version and kind
selector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", info.Name))
if err != nil {
return err
}
lw := cachetools.NewListWatchFromClient(info.Client, info.Mapping.Resource.Resource, info.Namespace, selector)
// What we watch for depends on the Kind.
// - For a Job, we watch for completion.
// - For all else, we watch until Ready.
// In the future, we might want to add some special logic for types
// like Ingress, Volume, etc.
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
_, err = watchtools.ListWatchUntil(ctx, lw, func(e watch.Event) (bool, error) {
// Make sure the incoming object is versioned as we use unstructured
// objects when we build manifests
obj := convertWithMapper(e.Object, info.Mapping)
switch e.Type {
case watch.Added, watch.Modified:
// For things like a secret or a config map, this is the best indicator
// we get. We care mostly about jobs, where what we want to see is
// the status go into a good state. For other types, like ReplicaSet
// we don't really do anything to support these as hooks.
c.Log("Add/Modify event for %s: %v", info.Name, e.Type)
switch kind {
case "Job":
return c.waitForJob(obj, info.Name)
case "Pod":
return c.waitForPodSuccess(obj, info.Name)
}
return true, nil
case watch.Deleted:
c.Log("Deleted event for %s", info.Name)
return true, nil
case watch.Error:
// Handle error and return with an error.
c.Log("Error event for %s", info.Name)
return true, errors.Errorf("failed to deploy %s", info.Name)
default:
return false, nil
}
})
return err
}
// waitForJob is a helper that waits for a job to complete.
//
// This operates on an event returned from a watcher.
func (c *Client) waitForJob(obj runtime.Object, name string) (bool, error) {
o, ok := obj.(*batch.Job)
if !ok {
return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, obj)
}
for _, c := range o.Status.Conditions {
if c.Type == batch.JobComplete && c.Status == "True" {
return true, nil
} else if c.Type == batch.JobFailed && c.Status == "True" {
return true, errors.Errorf("job failed: %s", c.Reason)
}
}
c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded)
return false, nil
}
// waitForPodSuccess is a helper that waits for a pod to complete.
//
// This operates on an event returned from a watcher.
func (c *Client) waitForPodSuccess(obj runtime.Object, name string) (bool, error) {
o, ok := obj.(*v1.Pod)
if !ok {
return true, errors.Errorf("expected %s to be a *v1.Pod, got %T", name, obj)
}
switch o.Status.Phase {
case v1.PodSucceeded:
fmt.Printf("Pod %s succeeded\n", o.Name)
return true, nil
case v1.PodFailed:
return true, errors.Errorf("pod %s failed", o.Name)
case v1.PodPending:
fmt.Printf("Pod %s pending\n", o.Name)
case v1.PodRunning:
fmt.Printf("Pod %s running\n", o.Name)
}
return false, nil
}
// scrubValidationError removes kubectl info from the message.
func scrubValidationError(err error) error {
if err == nil {
return nil
}
const stopValidateMessage = "if you choose to ignore these errors, turn validation off with --validate=false"
if strings.Contains(err.Error(), stopValidateMessage) {
return errors.New(strings.ReplaceAll(err.Error(), "; "+stopValidateMessage, ""))
}
return err
}
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify).
func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) {
client, err := c.Factory.KubernetesClientSet()
if err != nil {
return v1.PodUnknown, err
}
to := int64(timeout)
watcher, err := client.CoreV1().Pods(c.namespace()).Watch(context.Background(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
TimeoutSeconds: &to,
})
for event := range watcher.ResultChan() {
p, ok := event.Object.(*v1.Pod)
if !ok {
return v1.PodUnknown, fmt.Errorf("%s not a pod", name)
}
switch p.Status.Phase {
case v1.PodFailed:
return v1.PodFailed, nil
case v1.PodSucceeded:
return v1.PodSucceeded, nil
}
}
return v1.PodUnknown, err
}

View File

@@ -0,0 +1,30 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/v3/pkg/kube"
import "k8s.io/cli-runtime/pkg/genericclioptions"
// GetConfig returns a Kubernetes client config.
//
// Deprecated
func GetConfig(kubeconfig, context, namespace string) *genericclioptions.ConfigFlags {
cf := genericclioptions.NewConfigFlags(true)
cf.Namespace = &namespace
cf.Context = &context
cf.KubeConfig = &kubeconfig
return cf
}

View File

@@ -0,0 +1,69 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/v3/pkg/kube"
import (
"sync"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes/scheme"
)
var k8sNativeScheme *runtime.Scheme
var k8sNativeSchemeOnce sync.Once
// AsVersioned converts the given info into a runtime.Object with the correct
// group and version set
func AsVersioned(info *resource.Info) runtime.Object {
return convertWithMapper(info.Object, info.Mapping)
}
// convertWithMapper converts the given object with the optional provided
// RESTMapping. If no mapping is provided, the default schema versioner is used
func convertWithMapper(obj runtime.Object, mapping *meta.RESTMapping) runtime.Object {
s := kubernetesNativeScheme()
var gv = runtime.GroupVersioner(schema.GroupVersions(s.PrioritizedVersionsAllGroups()))
if mapping != nil {
gv = mapping.GroupVersionKind.GroupVersion()
}
if obj, err := runtime.ObjectConvertor(s).ConvertToVersion(obj, gv); err == nil {
return obj
}
return obj
}
// kubernetesNativeScheme returns a clean *runtime.Scheme with _only_ Kubernetes
// native resources added to it. This is required to break free of custom resources
// that may have been added to scheme.Scheme due to Helm being used as a package in
// combination with e.g. a versioned kube client. If we would not do this, the client
// may attempt to perform e.g. a 3-way-merge strategy patch for custom resources.
func kubernetesNativeScheme() *runtime.Scheme {
k8sNativeSchemeOnce.Do(func() {
k8sNativeScheme = runtime.NewScheme()
scheme.AddToScheme(k8sNativeScheme)
// API extensions are not in the above scheme set,
// and must thus be added separately.
apiextensionsv1beta1.AddToScheme(k8sNativeScheme)
apiextensionsv1.AddToScheme(k8sNativeScheme)
})
return k8sNativeScheme
}

View File

@@ -0,0 +1,38 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/v3/pkg/kube"
import (
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kubectl/pkg/validation"
)
// Factory provides abstractions that allow the Kubectl command to be extended across multiple types
// of resources and different API sets.
type Factory interface {
// ToRawKubeConfigLoader return kubeconfig loader as-is
ToRawKubeConfigLoader() clientcmd.ClientConfig
// KubernetesClientSet gives you back an external clientset
KubernetesClientSet() (*kubernetes.Clientset, error)
// NewBuilder returns an object that assists in loading objects from both disk and the server
// and which implements the common patterns for CLI interactions with generic resources.
NewBuilder() *resource.Builder
// Returns a schema that can validate objects stored on disk.
Validator(validate bool) (validation.Schema, error)
}

View File

@@ -0,0 +1,66 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"io"
"time"
v1 "k8s.io/api/core/v1"
)
// Interface represents a client capable of communicating with the Kubernetes API.
//
// A KubernetesClient must be concurrency safe.
type Interface interface {
// Create creates one or more resources.
Create(resources ResourceList) (*Result, error)
Wait(resources ResourceList, timeout time.Duration) error
// Delete destroys one or more resources.
Delete(resources ResourceList) (*Result, []error)
// Watch the resource in reader until it is "ready". This method
//
// For Jobs, "ready" means the Job ran to completion (exited without error).
// For Pods, "ready" means the Pod phase is marked "succeeded".
// For all other kinds, it means the kind was created or modified without
// error.
WatchUntilReady(resources ResourceList, timeout time.Duration) error
// Update updates one or more resources or creates the resource
// if it doesn't exist.
Update(original, target ResourceList, force bool) (*Result, error)
// Build creates a resource list from a Reader
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n")
//
// Validates against OpenAPI schema if validate is true.
Build(reader io.Reader, validate bool) (ResourceList, error)
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify).
WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error)
// isReachable checks whether the client is able to connect to the cluster
IsReachable() error
}
var _ Interface = (*Client)(nil)

View File

@@ -0,0 +1,85 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/v3/pkg/kube"
import "k8s.io/cli-runtime/pkg/resource"
// ResourceList provides convenience methods for comparing collections of Infos.
type ResourceList []*resource.Info
// Append adds an Info to the Result.
func (r *ResourceList) Append(val *resource.Info) {
*r = append(*r, val)
}
// Visit implements resource.Visitor.
func (r ResourceList) Visit(fn resource.VisitorFunc) error {
for _, i := range r {
if err := fn(i, nil); err != nil {
return err
}
}
return nil
}
// Filter returns a new Result with Infos that satisfy the predicate fn.
func (r ResourceList) Filter(fn func(*resource.Info) bool) ResourceList {
var result ResourceList
for _, i := range r {
if fn(i) {
result.Append(i)
}
}
return result
}
// Get returns the Info from the result that matches the name and kind.
func (r ResourceList) Get(info *resource.Info) *resource.Info {
for _, i := range r {
if isMatchingInfo(i, info) {
return i
}
}
return nil
}
// Contains checks to see if an object exists.
func (r ResourceList) Contains(info *resource.Info) bool {
for _, i := range r {
if isMatchingInfo(i, info) {
return true
}
}
return false
}
// Difference will return a new Result with objects not contained in rs.
func (r ResourceList) Difference(rs ResourceList) ResourceList {
return r.Filter(func(info *resource.Info) bool {
return !rs.Contains(info)
})
}
// Intersect will return a new Result with objects contained in both Results.
func (r ResourceList) Intersect(rs ResourceList) ResourceList {
return r.Filter(rs.Contains)
}
// isMatchingInfo returns true if infos match on Name and GroupVersionKind.
func isMatchingInfo(a, b *resource.Info) bool {
return a.Name == b.Name && a.Namespace == b.Namespace && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind
}

View File

@@ -0,0 +1,26 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/v3/pkg/kube"
// ResourcePolicyAnno is the annotation name for a resource policy
const ResourcePolicyAnno = "helm.sh/resource-policy"
// KeepPolicy is the resource policy type for keep
//
// This resource policy type allows resources to skip being deleted
// during an uninstallRelease action.
const KeepPolicy = "keep"

View File

@@ -0,0 +1,28 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
// Result contains the information of created, updated, and deleted resources
// for various kube API calls along with helper methods for using those
// resources
type Result struct {
Created ResourceList
Updated ResourceList
Deleted ResourceList
}
// If needed, we can add methods to the Result type for things like diffing

393
vendor/helm.sh/helm/v3/pkg/kube/wait.go vendored Normal file
View File

@@ -0,0 +1,393 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/v3/pkg/kube"
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
appsv1beta1 "k8s.io/api/apps/v1beta1"
appsv1beta2 "k8s.io/api/apps/v1beta2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
deploymentutil "helm.sh/helm/v3/internal/third_party/k8s.io/kubernetes/deployment/util"
)
type waiter struct {
c kubernetes.Interface
timeout time.Duration
log func(string, ...interface{})
}
// waitForResources polls to get the current status of all pods, PVCs, and Services
// until all are ready or a timeout is reached
func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
return wait.Poll(2*time.Second, w.timeout, func() (bool, error) {
for _, v := range created {
var (
// This defaults to true, otherwise we get to a point where
// things will always return false unless one of the objects
// that manages pods has been hit
ok = true
err error
)
switch value := AsVersioned(v).(type) {
case *corev1.Pod:
pod, err := w.c.CoreV1().Pods(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil || !w.isPodReady(pod) {
return false, err
}
case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
currentDeployment, err := w.c.AppsV1().Deployments(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// If paused deployment will never be ready
if currentDeployment.Spec.Paused {
continue
}
// Find RS associated with deployment
newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, w.c.AppsV1())
if err != nil || newReplicaSet == nil {
return false, err
}
if !w.deploymentReady(newReplicaSet, currentDeployment) {
return false, nil
}
case *corev1.PersistentVolumeClaim:
claim, err := w.c.CoreV1().PersistentVolumeClaims(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if !w.volumeReady(claim) {
return false, nil
}
case *corev1.Service:
svc, err := w.c.CoreV1().Services(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if !w.serviceReady(svc) {
return false, nil
}
case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet:
ds, err := w.c.AppsV1().DaemonSets(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if !w.daemonSetReady(ds) {
return false, nil
}
case *apiextv1beta1.CustomResourceDefinition:
if err := v.Get(); err != nil {
return false, err
}
crd := &apiextv1beta1.CustomResourceDefinition{}
if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
return false, err
}
if !w.crdBetaReady(*crd) {
return false, nil
}
case *apiextv1.CustomResourceDefinition:
if err := v.Get(); err != nil {
return false, err
}
crd := &apiextv1.CustomResourceDefinition{}
if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
return false, err
}
if !w.crdReady(*crd) {
return false, nil
}
case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet:
sts, err := w.c.AppsV1().StatefulSets(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if !w.statefulSetReady(sts) {
return false, nil
}
case *corev1.ReplicationController, *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet:
ok, err = w.podsReadyForObject(v.Namespace, value)
}
if !ok || err != nil {
return false, err
}
}
return true, nil
})
}
func (w *waiter) podsReadyForObject(namespace string, obj runtime.Object) (bool, error) {
pods, err := w.podsforObject(namespace, obj)
if err != nil {
return false, err
}
for _, pod := range pods {
if !w.isPodReady(&pod) {
return false, nil
}
}
return true, nil
}
func (w *waiter) podsforObject(namespace string, obj runtime.Object) ([]corev1.Pod, error) {
selector, err := SelectorsForObject(obj)
if err != nil {
return nil, err
}
list, err := getPods(w.c, namespace, selector.String())
return list, err
}
// isPodReady returns true if a pod is ready; false otherwise.
func (w *waiter) isPodReady(pod *corev1.Pod) bool {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
return true
}
}
w.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName())
return false
}
func (w *waiter) serviceReady(s *corev1.Service) bool {
// ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set)
if s.Spec.Type == corev1.ServiceTypeExternalName {
return true
}
// Make sure the service is not explicitly set to "None" before checking the IP
if s.Spec.ClusterIP != corev1.ClusterIPNone && s.Spec.ClusterIP == "" {
w.log("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName())
return false
}
// This checks if the service has a LoadBalancer and that balancer has an Ingress defined
if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
// do not wait when at least 1 external IP is set
if len(s.Spec.ExternalIPs) > 0 {
w.log("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs)
return true
}
if s.Status.LoadBalancer.Ingress == nil {
w.log("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName())
return false
}
}
return true
}
func (w *waiter) volumeReady(v *corev1.PersistentVolumeClaim) bool {
if v.Status.Phase != corev1.ClaimBound {
w.log("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName())
return false
}
return true
}
func (w *waiter) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool {
expectedReady := *dep.Spec.Replicas - deploymentutil.MaxUnavailable(*dep)
if !(rs.Status.ReadyReplicas >= expectedReady) {
w.log("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady)
return false
}
return true
}
func (w *waiter) daemonSetReady(ds *appsv1.DaemonSet) bool {
// If the update strategy is not a rolling update, there will be nothing to wait for
if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
return true
}
// Make sure all the updated pods have been scheduled
if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
w.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
return false
}
maxUnavailable, err := intstr.GetValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true)
if err != nil {
// If for some reason the value is invalid, set max unavailable to the
// number of desired replicas. This is the same behavior as the
// `MaxUnavailable` function in deploymentutil
maxUnavailable = int(ds.Status.DesiredNumberScheduled)
}
expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable
if !(int(ds.Status.NumberReady) >= expectedReady) {
w.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady)
return false
}
return true
}
// Because the v1 extensions API is not available on all supported k8s versions
// yet and because Go doesn't support generics, we need to have a duplicate
// function to support the v1beta1 types
func (w *waiter) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool {
for _, cond := range crd.Status.Conditions {
switch cond.Type {
case apiextv1beta1.Established:
if cond.Status == apiextv1beta1.ConditionTrue {
return true
}
case apiextv1beta1.NamesAccepted:
if cond.Status == apiextv1beta1.ConditionFalse {
// This indicates a naming conflict, but it's probably not the
// job of this function to fail because of that. Instead,
// we treat it as a success, since the process should be able to
// continue.
return true
}
}
}
return false
}
func (w *waiter) crdReady(crd apiextv1.CustomResourceDefinition) bool {
for _, cond := range crd.Status.Conditions {
switch cond.Type {
case apiextv1.Established:
if cond.Status == apiextv1.ConditionTrue {
return true
}
case apiextv1.NamesAccepted:
if cond.Status == apiextv1.ConditionFalse {
// This indicates a naming conflict, but it's probably not the
// job of this function to fail because of that. Instead,
// we treat it as a success, since the process should be able to
// continue.
return true
}
}
}
return false
}
func (w *waiter) statefulSetReady(sts *appsv1.StatefulSet) bool {
// If the update strategy is not a rolling update, there will be nothing to wait for
if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
return true
}
// Dereference all the pointers because StatefulSets like them
var partition int
// 1 is the default for replicas if not set
var replicas = 1
// For some reason, even if the update strategy is a rolling update, the
// actual rollingUpdate field can be nil. If it is, we can safely assume
// there is no partition value
if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition)
}
if sts.Spec.Replicas != nil {
replicas = int(*sts.Spec.Replicas)
}
// Because an update strategy can use partitioning, we need to calculate the
// number of updated replicas we should have. For example, if the replicas
// is set to 3 and the partition is 2, we'd expect only one pod to be
// updated
expectedReplicas := replicas - partition
// Make sure all the updated pods have been scheduled
if int(sts.Status.UpdatedReplicas) != expectedReplicas {
w.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas)
return false
}
if int(sts.Status.ReadyReplicas) != replicas {
w.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas)
return false
}
return true
}
func getPods(client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) {
list, err := client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: selector,
})
return list.Items, err
}
// SelectorsForObject returns the pod label selector for a given object
//
// Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84
func SelectorsForObject(object runtime.Object) (selector labels.Selector, err error) {
switch t := object.(type) {
case *extensionsv1beta1.ReplicaSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1.ReplicaSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta2.ReplicaSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *corev1.ReplicationController:
selector = labels.SelectorFromSet(t.Spec.Selector)
case *appsv1.StatefulSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta1.StatefulSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta2.StatefulSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *extensionsv1beta1.DaemonSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1.DaemonSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta2.DaemonSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *extensionsv1beta1.Deployment:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1.Deployment:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta1.Deployment:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta2.Deployment:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *batchv1.Job:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *corev1.Service:
if t.Spec.Selector == nil || len(t.Spec.Selector) == 0 {
return nil, fmt.Errorf("invalid service '%s': Service is defined without a selector", t.Name)
}
selector = labels.SelectorFromSet(t.Spec.Selector)
default:
return nil, fmt.Errorf("selector for %T not implemented", object)
}
return selector, errors.Wrap(err, "invalid label selector")
}