Add requests to nodes (#2078)

* add requests and limits to nodes

* add requests and limits to nodes
This commit is contained in:
zryfish
2020-05-15 17:31:38 +08:00
committed by GitHub
parent 044dd8eba3
commit d2600705c6
8 changed files with 662 additions and 1 deletions

View File

@@ -0,0 +1,164 @@
package node
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
resourceheper "k8s.io/kubectl/pkg/util/resource"
"kubesphere.io/kubesphere/pkg/api"
clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
"kubesphere.io/kubesphere/pkg/apiserver/query"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3"
)
// Those annotations were added to node only for display purposes
const (
nodeCPURequests = "node.kubesphere.io/cpu-requests"
nodeMemoryRequests = "node.kubesphere.io/memory-requests"
nodeCPULimits = "node.kubesphere.io/cpu-limits"
nodeMemoryLimits = "node.kubesphere.io/memory-limits"
nodeCPURequestsFraction = "node.kubesphere.io/cpu-requests-fraction"
nodeCPULimitsFraction = "node.kubesphere.io/cpu-limits-fraction"
nodeMemoryRequestsFraction = "node.kubesphere.io/memory-requests-fraction"
nodeMemoryLimitsFraction = "node.kubesphere.io/memory-limits-fraction"
)
type nodesGetter struct {
informers informers.SharedInformerFactory
}
func New(informers informers.SharedInformerFactory) v1alpha3.Interface {
return &nodesGetter{
informers: informers,
}
}
func (c nodesGetter) Get(_, name string) (runtime.Object, error) {
node, err := c.informers.Core().V1().Nodes().Lister().Get(name)
if err != nil {
return nil, err
}
// ignore the error, skip annotating process if error happened
pods, _ := c.informers.Core().V1().Pods().Lister().Pods("").List(labels.Everything())
c.annotateNode(node, pods)
return node, nil
}
func (c nodesGetter) List(_ string, query *query.Query) (*api.ListResult, error) {
nodes, err := c.informers.Core().V1().Nodes().Lister().List(query.Selector())
if err != nil {
return nil, err
}
// ignore the error, skip annotating process if error happened
pods, _ := c.informers.Core().V1().Pods().Lister().Pods("").List(labels.Everything())
var nonTerminatedPodsList []*v1.Pod
for _, pod := range pods {
if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed {
nonTerminatedPodsList = append(nonTerminatedPodsList, pod)
}
}
var result []runtime.Object
for _, node := range nodes {
c.annotateNode(node, nonTerminatedPodsList)
result = append(result, node)
}
return v1alpha3.DefaultList(result, query, c.compare, c.filter), nil
}
func (c nodesGetter) compare(left runtime.Object, right runtime.Object, field query.Field) bool {
leftNode, ok := left.(*v1.Node)
if !ok {
return false
}
rightNode, ok := right.(*v1.Node)
if !ok {
return false
}
return v1alpha3.DefaultObjectMetaCompare(leftNode.ObjectMeta, rightNode.ObjectMeta, field)
}
func (c nodesGetter) filter(object runtime.Object, filter query.Filter) bool {
cluster, ok := object.(*clusterv1alpha1.Cluster)
if !ok {
return false
}
return v1alpha3.DefaultObjectMetaFilter(cluster.ObjectMeta, filter)
}
// annotateNode adds cpu/memory requests usage data to node's annotations
func (c nodesGetter) annotateNode(node *v1.Node, pods []*v1.Pod) {
if len(pods) == 0 {
return
}
var nodePods []*v1.Pod
for _, pod := range pods {
if pod.Spec.NodeName == node.Name {
nodePods = append(nodePods, pod)
}
}
reqs, limits := c.getPodsTotalRequestAndLimits(nodePods)
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
cpuReqs, cpuLimits, memoryReqs, memoryLimits := reqs[v1.ResourceCPU], limits[v1.ResourceCPU], reqs[v1.ResourceMemory], limits[v1.ResourceMemory]
node.Annotations[nodeCPURequests] = cpuReqs.String()
node.Annotations[nodeCPULimits] = cpuLimits.String()
node.Annotations[nodeMemoryRequests] = memoryReqs.String()
node.Annotations[nodeMemoryLimits] = memoryLimits.String()
fractionCpuReqs, fractionCpuLimits := float64(0), float64(0)
allocatable := node.Status.Allocatable
if allocatable.Cpu().MilliValue() != 0 {
fractionCpuReqs = float64(cpuReqs.MilliValue()) / float64(allocatable.Cpu().MilliValue()) * 100
fractionCpuLimits = float64(cpuLimits.MilliValue()) / float64(allocatable.Cpu().MilliValue()) * 100
}
fractionMemoryReqs, fractionMemoryLimits := float64(0), float64(0)
if allocatable.Memory().Value() != 0 {
fractionMemoryReqs = float64(memoryReqs.Value()) / float64(allocatable.Memory().Value()) * 100
fractionMemoryLimits = float64(memoryLimits.Value()) / float64(allocatable.Memory().Value()) * 100
}
node.Annotations[nodeCPURequestsFraction] = fmt.Sprintf("%d%%", int(fractionCpuReqs))
node.Annotations[nodeCPULimitsFraction] = fmt.Sprintf("%d%%", int(fractionCpuLimits))
node.Annotations[nodeMemoryRequestsFraction] = fmt.Sprintf("%d%%", int(fractionMemoryReqs))
node.Annotations[nodeMemoryLimitsFraction] = fmt.Sprintf("%d%%", int(fractionMemoryLimits))
}
func (c nodesGetter) getPodsTotalRequestAndLimits(pods []*v1.Pod) (reqs map[v1.ResourceName]resource.Quantity, limits map[v1.ResourceName]resource.Quantity) {
reqs, limits = map[v1.ResourceName]resource.Quantity{}, map[v1.ResourceName]resource.Quantity{}
for _, pod := range pods {
podReqs, podLimits := resourceheper.PodRequestsAndLimits(pod)
for podReqName, podReqValue := range podReqs {
if value, ok := reqs[podReqName]; !ok {
reqs[podReqName] = podReqValue.DeepCopy()
} else {
value.Add(podReqValue)
reqs[podReqName] = value
}
}
for podLimitName, podLimitValue := range podLimits {
if value, ok := limits[podLimitName]; !ok {
limits[podLimitName] = podLimitValue.DeepCopy()
} else {
value.Add(podLimitValue)
limits[podLimitName] = value
}
}
}
return
}

View File

@@ -0,0 +1,130 @@
package node
import (
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"testing"
)
// mergeResourceLists will merge resoure lists. When two lists have the same resourece, the value from
// the last list will be present in the result
func mergeResourceLists(resourceLists ...corev1.ResourceList) corev1.ResourceList {
result := corev1.ResourceList{}
for _, rl := range resourceLists {
for resource, quantity := range rl {
result[resource] = quantity
}
}
return result
}
func getResourceList(cpu, memory string) corev1.ResourceList {
res := corev1.ResourceList{}
if cpu != "" {
res[corev1.ResourceCPU] = resource.MustParse(cpu)
}
if memory != "" {
res[corev1.ResourceMemory] = resource.MustParse(memory)
}
return res
}
var nodeAllocatable = mergeResourceLists(getResourceList("4", "12Gi"))
var node = &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Status: corev1.NodeStatus{
Allocatable: nodeAllocatable,
},
}
var pods = []*corev1.Pod{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "pod-with-resources",
},
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
},
Spec: corev1.PodSpec{
NodeName: node.Name,
Containers: []corev1.Container{
{
Name: "cpu-mem",
Image: "image:latest",
Resources: corev1.ResourceRequirements{
Requests: getResourceList("1", "1Gi"),
Limits: getResourceList("2", "2Gi"),
},
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo2",
Name: "pod-with-resources",
},
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
},
Spec: corev1.PodSpec{
NodeName: node.Name,
Containers: []corev1.Container{
{
Name: "cpu-mem",
Image: "image:latest",
Resources: corev1.ResourceRequirements{
Requests: getResourceList("1", "1Gi"),
Limits: getResourceList("2", "2Gi"),
},
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
},
}
var expectedAnnotations = map[string]string{
nodeCPURequests: "2",
nodeCPULimits: "4",
nodeCPURequestsFraction: "50%",
nodeCPULimitsFraction: "100%",
nodeMemoryRequests: "2Gi",
nodeMemoryLimits: "4Gi",
nodeMemoryRequestsFraction: "16%",
nodeMemoryLimitsFraction: "33%",
}
func TestNodesGetterGet(t *testing.T) {
fake := fake.NewSimpleClientset(node, pods[0], pods[1])
informer := informers.NewSharedInformerFactory(fake, 0)
informer.Core().V1().Nodes().Informer().GetIndexer().Add(node)
for _, pod := range pods {
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod)
}
nodeGetter := New(informer)
got, err := nodeGetter.Get("", node.Name)
if err != nil {
t.Fatal(err)
}
nodeGot := got.(*corev1.Node)
if diff := cmp.Diff(nodeGot.Annotations, expectedAnnotations); len(diff) != 0 {
t.Errorf("%T, diff(-got, +expected), %v", expectedAnnotations, nodeGot.Annotations)
}
}

View File

@@ -39,6 +39,7 @@ import (
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/globalrole"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/namespace"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/networkpolicy"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/node"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/persistentvolumeclaim"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/pod"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/role"
@@ -61,6 +62,7 @@ func NewResourceGetter(factory informers.InformerFactory) *ResourceGetter {
getters[schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"}] = namespace.New(factory.KubernetesSharedInformerFactory())
getters[schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}] = configmap.New(factory.KubernetesSharedInformerFactory())
getters[schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}] = pod.New(factory.KubernetesSharedInformerFactory())
getters[schema.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"}] = node.New(factory.KubernetesSharedInformerFactory())
getters[schema.GroupVersionResource{Group: "app.k8s.io", Version: "v1beta1", Resource: "applications"}] = application.New(factory.ApplicationSharedInformerFactory())
getters[schema.GroupVersionResource{Group: "networking.k8s.io", Version: "v1", Resource: "networkpolicies"}] = networkpolicy.New(factory.KubernetesSharedInformerFactory())
getters[tenantv1alpha1.SchemeGroupVersion.WithResource(tenantv1alpha1.ResourcePluralWorkspace)] = workspace.New(factory.KubeSphereSharedInformerFactory())