fix concurrent map writes (#3529)
Signed-off-by: Jeff <jeffzhang@yunify.com>
This commit is contained in:
@@ -28,10 +28,13 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/query"
|
||||
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Those annotations were added to node only for display purposes
|
||||
const (
|
||||
nodeAnnotatedAt = "node.kubesphere.io/last-annotated-at"
|
||||
nodeCPURequests = "node.kubesphere.io/cpu-requests"
|
||||
nodeMemoryRequests = "node.kubesphere.io/memory-requests"
|
||||
nodeCPULimits = "node.kubesphere.io/cpu-limits"
|
||||
@@ -49,15 +52,17 @@ const (
|
||||
|
||||
type nodesGetter struct {
|
||||
informers informers.SharedInformerFactory
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func New(informers informers.SharedInformerFactory) v1alpha3.Interface {
|
||||
return &nodesGetter{
|
||||
informers: informers,
|
||||
mutex: sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (c nodesGetter) Get(_, name string) (runtime.Object, error) {
|
||||
func (c *nodesGetter) Get(_, name string) (runtime.Object, error) {
|
||||
node, err := c.informers.Core().V1().Nodes().Lister().Get(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -70,7 +75,7 @@ func (c nodesGetter) Get(_, name string) (runtime.Object, error) {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (c nodesGetter) List(_ string, q *query.Query) (*api.ListResult, error) {
|
||||
func (c *nodesGetter) List(_ string, q *query.Query) (*api.ListResult, error) {
|
||||
nodes, err := c.informers.Core().V1().Nodes().Lister().List(q.Selector())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -127,7 +132,7 @@ func (c nodesGetter) List(_ string, q *query.Query) (*api.ListResult, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c nodesGetter) compare(left runtime.Object, right runtime.Object, field query.Field) bool {
|
||||
func (c *nodesGetter) compare(left runtime.Object, right runtime.Object, field query.Field) bool {
|
||||
leftNode, ok := left.(*v1.Node)
|
||||
if !ok {
|
||||
return false
|
||||
@@ -141,7 +146,7 @@ func (c nodesGetter) compare(left runtime.Object, right runtime.Object, field qu
|
||||
return v1alpha3.DefaultObjectMetaCompare(leftNode.ObjectMeta, rightNode.ObjectMeta, field)
|
||||
}
|
||||
|
||||
func (c nodesGetter) filter(object runtime.Object, filter query.Filter) bool {
|
||||
func (c *nodesGetter) filter(object runtime.Object, filter query.Filter) bool {
|
||||
node, ok := object.(*v1.Node)
|
||||
if !ok {
|
||||
return false
|
||||
@@ -155,7 +160,24 @@ func (c nodesGetter) filter(object runtime.Object, filter query.Filter) bool {
|
||||
}
|
||||
|
||||
// annotateNode adds cpu/memory requests usage data to node's annotations
|
||||
func (c nodesGetter) annotateNode(node *v1.Node, pods []*v1.Pod) {
|
||||
// this operation is resource consuming, so avoid annotating on every query
|
||||
func (c *nodesGetter) annotateNode(node *v1.Node, pods []*v1.Pod) {
|
||||
// only annotate node when two consecutive annotating gap bigger than 30s
|
||||
c.mutex.Lock()
|
||||
if node.Annotations == nil {
|
||||
node.Annotations = make(map[string]string)
|
||||
}
|
||||
|
||||
if lastAnnotatedAt, ok := node.Annotations[nodeAnnotatedAt]; ok {
|
||||
if lastAnnotationTimeStamp, err := time.Parse(time.RFC3339, lastAnnotatedAt); err != nil {
|
||||
if lastAnnotationTimeStamp.Add(30 * time.Second).After(time.Now()) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
node.Annotations[nodeAnnotatedAt] = time.Now().Format(time.RFC3339)
|
||||
c.mutex.Unlock()
|
||||
|
||||
if len(pods) == 0 {
|
||||
return
|
||||
}
|
||||
@@ -169,10 +191,6 @@ func (c nodesGetter) annotateNode(node *v1.Node, pods []*v1.Pod) {
|
||||
|
||||
reqs, limits := c.getPodsTotalRequestAndLimits(nodePods)
|
||||
|
||||
if node.Annotations == nil {
|
||||
node.Annotations = make(map[string]string)
|
||||
}
|
||||
|
||||
cpuReqs, cpuLimits, memoryReqs, memoryLimits := reqs[v1.ResourceCPU], limits[v1.ResourceCPU], reqs[v1.ResourceMemory], limits[v1.ResourceMemory]
|
||||
node.Annotations[nodeCPURequests] = cpuReqs.String()
|
||||
node.Annotations[nodeCPULimits] = cpuLimits.String()
|
||||
@@ -197,7 +215,7 @@ func (c nodesGetter) annotateNode(node *v1.Node, pods []*v1.Pod) {
|
||||
node.Annotations[nodeMemoryLimitsFraction] = fmt.Sprintf("%d%%", int(fractionMemoryLimits))
|
||||
}
|
||||
|
||||
func (c nodesGetter) getPodsTotalRequestAndLimits(pods []*v1.Pod) (reqs map[v1.ResourceName]resource.Quantity, limits map[v1.ResourceName]resource.Quantity) {
|
||||
func (c *nodesGetter) getPodsTotalRequestAndLimits(pods []*v1.Pod) (reqs map[v1.ResourceName]resource.Quantity, limits map[v1.ResourceName]resource.Quantity) {
|
||||
reqs, limits = map[v1.ResourceName]resource.Quantity{}, map[v1.ResourceName]resource.Quantity{}
|
||||
for _, pod := range pods {
|
||||
podReqs, podLimits := resourceheper.PodRequestsAndLimits(pod)
|
||||
|
||||
@@ -143,6 +143,8 @@ func TestNodesGetterGet(t *testing.T) {
|
||||
}
|
||||
nodeGot := got.(*corev1.Node)
|
||||
|
||||
// ignore last-annotated-at annotation
|
||||
delete(nodeGot.Annotations, nodeAnnotatedAt)
|
||||
if diff := cmp.Diff(nodeGot.Annotations, expectedAnnotations); len(diff) != 0 {
|
||||
t.Errorf("%T, diff(-got, +expected), %v", expectedAnnotations, nodeGot.Annotations)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user