diff --git a/pkg/apis/v1alpha/nodes/nodes_handler.go b/pkg/apis/v1alpha/nodes/nodes_handler.go index 62ed76c90..dd4aa5fc3 100644 --- a/pkg/apis/v1alpha/nodes/nodes_handler.go +++ b/pkg/apis/v1alpha/nodes/nodes_handler.go @@ -38,6 +38,10 @@ func Register(ws *restful.WebService, subPath string) { ws.Route(ws.POST(subPath+"/{nodename}/drainage").To(handleDrainNode).Filter(route.RouteLogging)). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) + + ws.Route(ws.GET(subPath+"/{nodename}/drainage").To(handleDrainStatus).Filter(route.RouteLogging)). + Consumes(restful.MIME_JSON, restful.MIME_XML). + Produces(restful.MIME_JSON) } func handleNodes(request *restful.Request, response *restful.Response) { @@ -84,3 +88,20 @@ func handleDrainNode(request *restful.Request, response *restful.Response) { } } + +func handleDrainStatus(request *restful.Request, response *restful.Response) { + + nodeName := request.PathParameter("nodename") + + result, err := models.DrainStatus(nodeName) + + if err != nil { + + response.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()}) + + } else { + + response.WriteAsJson(result) + + } +} diff --git a/pkg/models/components.go b/pkg/models/components.go index a4ae72e3f..ddf55fa6b 100644 --- a/pkg/models/components.go +++ b/pkg/models/components.go @@ -43,7 +43,7 @@ type Components struct { SelfLink string `json:"selfLink"` Label interface{} `json:"label"` HealthStatus string `json:"healthStatus"` - CreateTime time.Time `json:"updateTime"` + CreateTime time.Time `json:"createTime"` } /*** @@ -173,7 +173,6 @@ func GetComponentsByNamespace(ns string) ([]Components, error) { if ns != KUBESYSTEM { option.LabelSelector = "" } - servicelists, err := k8sClient.CoreV1().Services(ns).List(option) if err != nil { diff --git a/pkg/models/nodes.go b/pkg/models/nodes.go index cfe006e80..2f99153a9 100644 --- a/pkg/models/nodes.go +++ b/pkg/models/nodes.go @@ -19,15 +19,19 @@ package models import ( "encoding/json" "fmt" + "math" "strconv" "strings" + "time" "github.com/golang/glog" - v1beta2 "k8s.io/api/apps/v1beta2" + "k8s.io/api/apps/v1beta2" "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - types "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "kubesphere.io/kubesphere/pkg/client" kubeclient "kubesphere.io/kubesphere/pkg/client" @@ -48,6 +52,8 @@ const ( KubeletReady = "Ready" ) +const GracePeriods = 900 + type ResultNode struct { NodeName string `json:"node_name"` NodeStatus string `json:"node_status"` @@ -221,6 +227,33 @@ func getNodeFileSystemStatus(node *v1.Node) (string, string, string) { func DrainNode(nodename string) (msg constants.MessageResponse, err error) { + k8sclient := kubeclient.NewK8sClient() + node, err := k8sclient.CoreV1().Nodes().Get(nodename, metav1.GetOptions{}) + if err != nil { + glog.Fatal(err) + return msg, err + } + + if node.Spec.Unschedulable { + glog.Info(node.Spec.Unschedulable) + msg.Message = fmt.Sprintf("node %s have been drained", nodename) + return msg, nil + } + + data := []byte(" {\"spec\":{\"unschedulable\":true}}") + nodestatus, err := k8sclient.CoreV1().Nodes().Patch(nodename, types.StrategicMergePatchType, data) + glog.Info(nodestatus) + + if err != nil { + glog.Fatal(err) + return msg, err + } + msg.Message = "success" + return msg, nil +} + +func DrainStatus(nodename string) (msg constants.MessageResponse, err error) { + k8sclient := kubeclient.NewK8sClient() var options metav1.ListOptions pods := make([]v1.Pod, 0) @@ -241,56 +274,74 @@ func DrainNode(nodename string) (msg constants.MessageResponse, err error) { return msg, err } + // remove mirror pod static pod if len(podList.Items) > 0 { for _, pod := range podList.Items { if !containDaemonset(pod, *daemonsetList) { + //static or mirror pod + if isStaticPod(&pod) || isMirrorPod(&pod) { + + continue + + } else { + + pods = append(pods, pod) + + } - pods = append(pods, pod) } + } } - //create eviction - var eviction policy.Eviction - eviction.Kind = "Eviction" - eviction.APIVersion = "policy/v1beta1" - if len(pods) > 0 { - - for _, pod := range pods { - - eviction.Namespace = pod.Namespace - eviction.Name = pod.Name - err := k8sclient.CoreV1().Pods(pod.Namespace).Evict(&eviction) - if err != nil { - return msg, err - } - } - } - - data := []byte(" {\"spec\":{\"unschedulable\":true}}") - nodestatus, err := k8sclient.CoreV1().Nodes().Patch(nodename, types.StrategicMergePatchType, data) - - if err != nil { - - glog.Fatal(err) - return msg, err - - } - - if nodestatus.Spec.Unschedulable { + if len(pods) == 0 { msg.Message = fmt.Sprintf("success") + return msg, nil } else { - glog.Fatal(err) - return msg, err + //create eviction + getPodFn := func(namespace, name string) (*v1.Pod, error) { + k8sclient := kubeclient.NewK8sClient() + return k8sclient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) + } + evicerr := evictPods(pods, 900, getPodFn) + + if evicerr == nil { + + msg.Message = fmt.Sprintf("success") + return msg, nil + + } else { + + glog.Info(evicerr) + msg.Message = evicerr.Error() + return msg, nil + } } - return msg, nil +} +func getPodSource(pod *v1.Pod) (string, error) { + if pod.Annotations != nil { + if source, ok := pod.Annotations["kubernetes.io/config.source"]; ok { + return source, nil + } + } + return "", fmt.Errorf("cannot get source of pod %q", pod.UID) +} + +func isStaticPod(pod *v1.Pod) bool { + source, err := getPodSource(pod) + return err == nil && source != "api" +} + +func isMirrorPod(pod *v1.Pod) bool { + _, ok := pod.Annotations[v1.MirrorPodAnnotationKey] + return ok } func containDaemonset(pod v1.Pod, daemonsetList v1beta2.DaemonSetList) bool { @@ -307,3 +358,119 @@ func containDaemonset(pod v1.Pod, daemonsetList v1beta2.DaemonSetList) bool { return flag } + +func evictPod(pod v1.Pod, GracePeriodSeconds int) error { + + k8sclient := kubeclient.NewK8sClient() + deleteOptions := &metav1.DeleteOptions{} + if GracePeriodSeconds >= 0 { + gracePeriodSeconds := int64(GracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriodSeconds + } + + var eviction policy.Eviction + eviction.Kind = "Eviction" + eviction.APIVersion = "policy/v1beta1" + eviction.Namespace = pod.Namespace + eviction.Name = pod.Name + eviction.DeleteOptions = deleteOptions + err := k8sclient.CoreV1().Pods(pod.Namespace).Evict(&eviction) + if err != nil { + return err + } + + return nil +} + +func evictPods(pods []v1.Pod, GracePeriodSeconds int, getPodFn func(namespace, name string) (*v1.Pod, error)) error { + doneCh := make(chan bool, len(pods)) + errCh := make(chan error, 1) + + for _, pod := range pods { + go func(pod v1.Pod, doneCh chan bool, errCh chan error) { + var err error + for { + err = evictPod(pod, GracePeriodSeconds) + if err == nil { + break + } else if apierrors.IsNotFound(err) { + doneCh <- true + glog.Info(fmt.Sprintf("pod %s evict", pod.Name)) + return + } else if apierrors.IsTooManyRequests(err) { + time.Sleep(5 * time.Second) + } else { + errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) + return + } + } + + podArray := []v1.Pod{pod} + _, err = waitForDelete(podArray, time.Second, time.Duration(math.MaxInt64), getPodFn) + if err == nil { + doneCh <- true + glog.Info(fmt.Sprintf("pod %s delete", pod.Name)) + } else { + errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) + } + }(pod, doneCh, errCh) + } + + Timeout := GracePeriods * power(10, 9) + doneCount := 0 + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = time.Duration(Timeout) + } + for { + select { + case err := <-errCh: + return err + case <-doneCh: + doneCount++ + if doneCount == len(pods) { + return nil + } + case <-time.After(globalTimeout): + return fmt.Errorf("Drain did not complete within %v, please check node status in a few minutes", globalTimeout) + } + } +} + +func waitForDelete(pods []v1.Pod, interval, timeout time.Duration, getPodFn func(string, string) (*v1.Pod, error)) ([]v1.Pod, error) { + + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + pendingPods := []v1.Pod{} + for i, pod := range pods { + p, err := getPodFn(pod.Namespace, pod.Name) + if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { + continue + } else if err != nil { + return false, err + } else { + pendingPods = append(pendingPods, pods[i]) + } + } + pods = pendingPods + if len(pendingPods) > 0 { + return false, nil + } + return true, nil + }) + return pods, err +} + +func power(x int64, n int) int64 { + + var res int64 = 1 + for n != 0 { + res *= x + n-- + } + + return res + +}