pin dependencies

Signed-off-by: Roland.Ma <rolandma@yunify.com>
This commit is contained in:
Roland.Ma
2021-08-16 03:29:03 +00:00
parent eae248b3c9
commit 7bb8124a61
251 changed files with 40010 additions and 716 deletions

View File

@@ -28,339 +28,36 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
deploymentutil "helm.sh/helm/v3/internal/third_party/k8s.io/kubernetes/deployment/util"
"k8s.io/apimachinery/pkg/util/wait"
)
type waiter struct {
c kubernetes.Interface
c ReadyChecker
timeout time.Duration
log func(string, ...interface{})
}
// waitForResources polls to get the current status of all pods, PVCs, Services and
// Jobs(optional) until all are ready or a timeout is reached
func (w *waiter) waitForResources(created ResourceList, waitForJobsEnabled bool) error {
func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
return wait.Poll(2*time.Second, w.timeout, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel()
return wait.PollImmediateUntil(2*time.Second, func() (bool, error) {
for _, v := range created {
var (
// This defaults to true, otherwise we get to a point where
// things will always return false unless one of the objects
// that manages pods has been hit
ok = true
err error
)
switch value := AsVersioned(v).(type) {
case *corev1.Pod:
pod, err := w.c.CoreV1().Pods(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil || !w.isPodReady(pod) {
return false, err
}
case *batchv1.Job:
if waitForJobsEnabled {
job, err := w.c.BatchV1().Jobs(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil || !w.jobReady(job) {
return false, err
}
}
case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
currentDeployment, err := w.c.AppsV1().Deployments(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// If paused deployment will never be ready
if currentDeployment.Spec.Paused {
continue
}
// Find RS associated with deployment
newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, w.c.AppsV1())
if err != nil || newReplicaSet == nil {
return false, err
}
if !w.deploymentReady(newReplicaSet, currentDeployment) {
return false, nil
}
case *corev1.PersistentVolumeClaim:
claim, err := w.c.CoreV1().PersistentVolumeClaims(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if !w.volumeReady(claim) {
return false, nil
}
case *corev1.Service:
svc, err := w.c.CoreV1().Services(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if !w.serviceReady(svc) {
return false, nil
}
case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet:
ds, err := w.c.AppsV1().DaemonSets(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if !w.daemonSetReady(ds) {
return false, nil
}
case *apiextv1beta1.CustomResourceDefinition:
if err := v.Get(); err != nil {
return false, err
}
crd := &apiextv1beta1.CustomResourceDefinition{}
if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
return false, err
}
if !w.crdBetaReady(*crd) {
return false, nil
}
case *apiextv1.CustomResourceDefinition:
if err := v.Get(); err != nil {
return false, err
}
crd := &apiextv1.CustomResourceDefinition{}
if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
return false, err
}
if !w.crdReady(*crd) {
return false, nil
}
case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet:
sts, err := w.c.AppsV1().StatefulSets(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if !w.statefulSetReady(sts) {
return false, nil
}
case *corev1.ReplicationController, *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet:
ok, err = w.podsReadyForObject(v.Namespace, value)
}
if !ok || err != nil {
ready, err := w.c.IsReady(ctx, v)
if !ready || err != nil {
return false, err
}
}
return true, nil
})
}
func (w *waiter) podsReadyForObject(namespace string, obj runtime.Object) (bool, error) {
pods, err := w.podsforObject(namespace, obj)
if err != nil {
return false, err
}
for _, pod := range pods {
if !w.isPodReady(&pod) {
return false, nil
}
}
return true, nil
}
func (w *waiter) podsforObject(namespace string, obj runtime.Object) ([]corev1.Pod, error) {
selector, err := SelectorsForObject(obj)
if err != nil {
return nil, err
}
list, err := getPods(w.c, namespace, selector.String())
return list, err
}
// isPodReady returns true if a pod is ready; false otherwise.
func (w *waiter) isPodReady(pod *corev1.Pod) bool {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
return true
}
}
w.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName())
return false
}
func (w *waiter) jobReady(job *batchv1.Job) bool {
if job.Status.Failed >= *job.Spec.BackoffLimit {
w.log("Job is failed: %s/%s", job.GetNamespace(), job.GetName())
return false
}
if job.Status.Succeeded < *job.Spec.Completions {
w.log("Job is not completed: %s/%s", job.GetNamespace(), job.GetName())
return false
}
return true
}
func (w *waiter) serviceReady(s *corev1.Service) bool {
// ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set)
if s.Spec.Type == corev1.ServiceTypeExternalName {
return true
}
// Ensure that the service cluster IP is not empty
if s.Spec.ClusterIP == "" {
w.log("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName())
return false
}
// This checks if the service has a LoadBalancer and that balancer has an Ingress defined
if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
// do not wait when at least 1 external IP is set
if len(s.Spec.ExternalIPs) > 0 {
w.log("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs)
return true
}
if s.Status.LoadBalancer.Ingress == nil {
w.log("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName())
return false
}
}
return true
}
func (w *waiter) volumeReady(v *corev1.PersistentVolumeClaim) bool {
if v.Status.Phase != corev1.ClaimBound {
w.log("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName())
return false
}
return true
}
func (w *waiter) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool {
expectedReady := *dep.Spec.Replicas - deploymentutil.MaxUnavailable(*dep)
if !(rs.Status.ReadyReplicas >= expectedReady) {
w.log("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady)
return false
}
return true
}
func (w *waiter) daemonSetReady(ds *appsv1.DaemonSet) bool {
// If the update strategy is not a rolling update, there will be nothing to wait for
if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
return true
}
// Make sure all the updated pods have been scheduled
if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
w.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
return false
}
maxUnavailable, err := intstr.GetValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true)
if err != nil {
// If for some reason the value is invalid, set max unavailable to the
// number of desired replicas. This is the same behavior as the
// `MaxUnavailable` function in deploymentutil
maxUnavailable = int(ds.Status.DesiredNumberScheduled)
}
expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable
if !(int(ds.Status.NumberReady) >= expectedReady) {
w.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady)
return false
}
return true
}
// Because the v1 extensions API is not available on all supported k8s versions
// yet and because Go doesn't support generics, we need to have a duplicate
// function to support the v1beta1 types
func (w *waiter) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool {
for _, cond := range crd.Status.Conditions {
switch cond.Type {
case apiextv1beta1.Established:
if cond.Status == apiextv1beta1.ConditionTrue {
return true
}
case apiextv1beta1.NamesAccepted:
if cond.Status == apiextv1beta1.ConditionFalse {
// This indicates a naming conflict, but it's probably not the
// job of this function to fail because of that. Instead,
// we treat it as a success, since the process should be able to
// continue.
return true
}
}
}
return false
}
func (w *waiter) crdReady(crd apiextv1.CustomResourceDefinition) bool {
for _, cond := range crd.Status.Conditions {
switch cond.Type {
case apiextv1.Established:
if cond.Status == apiextv1.ConditionTrue {
return true
}
case apiextv1.NamesAccepted:
if cond.Status == apiextv1.ConditionFalse {
// This indicates a naming conflict, but it's probably not the
// job of this function to fail because of that. Instead,
// we treat it as a success, since the process should be able to
// continue.
return true
}
}
}
return false
}
func (w *waiter) statefulSetReady(sts *appsv1.StatefulSet) bool {
// If the update strategy is not a rolling update, there will be nothing to wait for
if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
return true
}
// Dereference all the pointers because StatefulSets like them
var partition int
// 1 is the default for replicas if not set
var replicas = 1
// For some reason, even if the update strategy is a rolling update, the
// actual rollingUpdate field can be nil. If it is, we can safely assume
// there is no partition value
if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition)
}
if sts.Spec.Replicas != nil {
replicas = int(*sts.Spec.Replicas)
}
// Because an update strategy can use partitioning, we need to calculate the
// number of updated replicas we should have. For example, if the replicas
// is set to 3 and the partition is 2, we'd expect only one pod to be
// updated
expectedReplicas := replicas - partition
// Make sure all the updated pods have been scheduled
if int(sts.Status.UpdatedReplicas) != expectedReplicas {
w.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas)
return false
}
if int(sts.Status.ReadyReplicas) != replicas {
w.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas)
return false
}
return true
}
func getPods(client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) {
list, err := client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: selector,
})
return list.Items, err
}, ctx.Done())
}
// SelectorsForObject returns the pod label selector for a given object