add get drain status api

This commit is contained in:
yanmingfan
2018-06-19 16:37:10 +08:00
parent 0c7cf180fc
commit d42cbd8258
3 changed files with 223 additions and 36 deletions

View File

@@ -38,6 +38,10 @@ func Register(ws *restful.WebService, subPath string) {
ws.Route(ws.POST(subPath+"/{nodename}/drainage").To(handleDrainNode).Filter(route.RouteLogging)).
Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON)
ws.Route(ws.GET(subPath+"/{nodename}/drainage").To(handleDrainStatus).Filter(route.RouteLogging)).
Consumes(restful.MIME_JSON, restful.MIME_XML).
Produces(restful.MIME_JSON)
}
func handleNodes(request *restful.Request, response *restful.Response) {
@@ -84,3 +88,20 @@ func handleDrainNode(request *restful.Request, response *restful.Response) {
}
}
func handleDrainStatus(request *restful.Request, response *restful.Response) {
nodeName := request.PathParameter("nodename")
result, err := models.DrainStatus(nodeName)
if err != nil {
response.WriteHeaderAndEntity(http.StatusInternalServerError, constants.MessageResponse{Message: err.Error()})
} else {
response.WriteAsJson(result)
}
}

View File

@@ -43,7 +43,7 @@ type Components struct {
SelfLink string `json:"selfLink"`
Label interface{} `json:"label"`
HealthStatus string `json:"healthStatus"`
CreateTime time.Time `json:"updateTime"`
CreateTime time.Time `json:"createTime"`
}
/***
@@ -173,7 +173,6 @@ func GetComponentsByNamespace(ns string) ([]Components, error) {
if ns != KUBESYSTEM {
option.LabelSelector = ""
}
servicelists, err := k8sClient.CoreV1().Services(ns).List(option)
if err != nil {

View File

@@ -19,15 +19,19 @@ package models
import (
"encoding/json"
"fmt"
"math"
"strconv"
"strings"
"time"
"github.com/golang/glog"
v1beta2 "k8s.io/api/apps/v1beta2"
"k8s.io/api/apps/v1beta2"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"kubesphere.io/kubesphere/pkg/client"
kubeclient "kubesphere.io/kubesphere/pkg/client"
@@ -48,6 +52,8 @@ const (
KubeletReady = "Ready"
)
const GracePeriods = 900
type ResultNode struct {
NodeName string `json:"node_name"`
NodeStatus string `json:"node_status"`
@@ -221,6 +227,33 @@ func getNodeFileSystemStatus(node *v1.Node) (string, string, string) {
func DrainNode(nodename string) (msg constants.MessageResponse, err error) {
k8sclient := kubeclient.NewK8sClient()
node, err := k8sclient.CoreV1().Nodes().Get(nodename, metav1.GetOptions{})
if err != nil {
glog.Fatal(err)
return msg, err
}
if node.Spec.Unschedulable {
glog.Info(node.Spec.Unschedulable)
msg.Message = fmt.Sprintf("node %s have been drained", nodename)
return msg, nil
}
data := []byte(" {\"spec\":{\"unschedulable\":true}}")
nodestatus, err := k8sclient.CoreV1().Nodes().Patch(nodename, types.StrategicMergePatchType, data)
glog.Info(nodestatus)
if err != nil {
glog.Fatal(err)
return msg, err
}
msg.Message = "success"
return msg, nil
}
func DrainStatus(nodename string) (msg constants.MessageResponse, err error) {
k8sclient := kubeclient.NewK8sClient()
var options metav1.ListOptions
pods := make([]v1.Pod, 0)
@@ -241,56 +274,74 @@ func DrainNode(nodename string) (msg constants.MessageResponse, err error) {
return msg, err
}
// remove mirror pod static pod
if len(podList.Items) > 0 {
for _, pod := range podList.Items {
if !containDaemonset(pod, *daemonsetList) {
//static or mirror pod
if isStaticPod(&pod) || isMirrorPod(&pod) {
continue
} else {
pods = append(pods, pod)
}
pods = append(pods, pod)
}
}
}
//create eviction
var eviction policy.Eviction
eviction.Kind = "Eviction"
eviction.APIVersion = "policy/v1beta1"
if len(pods) > 0 {
for _, pod := range pods {
eviction.Namespace = pod.Namespace
eviction.Name = pod.Name
err := k8sclient.CoreV1().Pods(pod.Namespace).Evict(&eviction)
if err != nil {
return msg, err
}
}
}
data := []byte(" {\"spec\":{\"unschedulable\":true}}")
nodestatus, err := k8sclient.CoreV1().Nodes().Patch(nodename, types.StrategicMergePatchType, data)
if err != nil {
glog.Fatal(err)
return msg, err
}
if nodestatus.Spec.Unschedulable {
if len(pods) == 0 {
msg.Message = fmt.Sprintf("success")
return msg, nil
} else {
glog.Fatal(err)
return msg, err
//create eviction
getPodFn := func(namespace, name string) (*v1.Pod, error) {
k8sclient := kubeclient.NewK8sClient()
return k8sclient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
}
evicerr := evictPods(pods, 900, getPodFn)
if evicerr == nil {
msg.Message = fmt.Sprintf("success")
return msg, nil
} else {
glog.Info(evicerr)
msg.Message = evicerr.Error()
return msg, nil
}
}
return msg, nil
}
func getPodSource(pod *v1.Pod) (string, error) {
if pod.Annotations != nil {
if source, ok := pod.Annotations["kubernetes.io/config.source"]; ok {
return source, nil
}
}
return "", fmt.Errorf("cannot get source of pod %q", pod.UID)
}
func isStaticPod(pod *v1.Pod) bool {
source, err := getPodSource(pod)
return err == nil && source != "api"
}
func isMirrorPod(pod *v1.Pod) bool {
_, ok := pod.Annotations[v1.MirrorPodAnnotationKey]
return ok
}
func containDaemonset(pod v1.Pod, daemonsetList v1beta2.DaemonSetList) bool {
@@ -307,3 +358,119 @@ func containDaemonset(pod v1.Pod, daemonsetList v1beta2.DaemonSetList) bool {
return flag
}
func evictPod(pod v1.Pod, GracePeriodSeconds int) error {
k8sclient := kubeclient.NewK8sClient()
deleteOptions := &metav1.DeleteOptions{}
if GracePeriodSeconds >= 0 {
gracePeriodSeconds := int64(GracePeriodSeconds)
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
}
var eviction policy.Eviction
eviction.Kind = "Eviction"
eviction.APIVersion = "policy/v1beta1"
eviction.Namespace = pod.Namespace
eviction.Name = pod.Name
eviction.DeleteOptions = deleteOptions
err := k8sclient.CoreV1().Pods(pod.Namespace).Evict(&eviction)
if err != nil {
return err
}
return nil
}
func evictPods(pods []v1.Pod, GracePeriodSeconds int, getPodFn func(namespace, name string) (*v1.Pod, error)) error {
doneCh := make(chan bool, len(pods))
errCh := make(chan error, 1)
for _, pod := range pods {
go func(pod v1.Pod, doneCh chan bool, errCh chan error) {
var err error
for {
err = evictPod(pod, GracePeriodSeconds)
if err == nil {
break
} else if apierrors.IsNotFound(err) {
doneCh <- true
glog.Info(fmt.Sprintf("pod %s evict", pod.Name))
return
} else if apierrors.IsTooManyRequests(err) {
time.Sleep(5 * time.Second)
} else {
errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
return
}
}
podArray := []v1.Pod{pod}
_, err = waitForDelete(podArray, time.Second, time.Duration(math.MaxInt64), getPodFn)
if err == nil {
doneCh <- true
glog.Info(fmt.Sprintf("pod %s delete", pod.Name))
} else {
errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
}
}(pod, doneCh, errCh)
}
Timeout := GracePeriods * power(10, 9)
doneCount := 0
// 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration
if Timeout == 0 {
globalTimeout = time.Duration(math.MaxInt64)
} else {
globalTimeout = time.Duration(Timeout)
}
for {
select {
case err := <-errCh:
return err
case <-doneCh:
doneCount++
if doneCount == len(pods) {
return nil
}
case <-time.After(globalTimeout):
return fmt.Errorf("Drain did not complete within %v, please check node status in a few minutes", globalTimeout)
}
}
}
func waitForDelete(pods []v1.Pod, interval, timeout time.Duration, getPodFn func(string, string) (*v1.Pod, error)) ([]v1.Pod, error) {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
pendingPods := []v1.Pod{}
for i, pod := range pods {
p, err := getPodFn(pod.Namespace, pod.Name)
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
continue
} else if err != nil {
return false, err
} else {
pendingPods = append(pendingPods, pods[i])
}
}
pods = pendingPods
if len(pendingPods) > 0 {
return false, nil
}
return true, nil
})
return pods, err
}
func power(x int64, n int) int64 {
var res int64 = 1
for n != 0 {
res *= x
n--
}
return res
}