Fix: deepcopy before mutating shared objects
Signed-off-by: dkeven <keven@kubesphere.io>
This commit is contained in:
@@ -19,8 +19,6 @@ package node
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
@@ -36,7 +34,6 @@ import (
|
||||
|
||||
// Those annotations were added to node only for display purposes
|
||||
const (
|
||||
nodeAnnotatedAt = "node.kubesphere.io/last-annotated-at"
|
||||
nodeCPURequests = "node.kubesphere.io/cpu-requests"
|
||||
nodeMemoryRequests = "node.kubesphere.io/memory-requests"
|
||||
nodeCPULimits = "node.kubesphere.io/cpu-limits"
|
||||
@@ -54,13 +51,11 @@ const (
|
||||
|
||||
type nodesGetter struct {
|
||||
informers informers.SharedInformerFactory
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func New(informers informers.SharedInformerFactory) v1alpha3.Interface {
|
||||
return &nodesGetter{
|
||||
informers: informers,
|
||||
mutex: sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,6 +67,20 @@ func (c *nodesGetter) Get(_, name string) (runtime.Object, error) {
|
||||
|
||||
// ignore the error, skip annotating process if error happened
|
||||
pods, _ := c.informers.Core().V1().Pods().Lister().Pods("").List(labels.Everything())
|
||||
|
||||
// Never mutate original objects!
|
||||
// Caches are shared across controllers,
|
||||
// this means that if you mutate your "copy" (actually a reference or shallow copy) of an object,
|
||||
// you'll mess up other controllers (not just your own).
|
||||
// Also, if the mutated field is a map,
|
||||
// a "concurrent map (read &) write" panic might occur,
|
||||
// causing the ks-apiserver to crash.
|
||||
// Refer:
|
||||
// https://github.com/kubesphere/kubesphere/issues/4357
|
||||
// https://github.com/kubesphere/kubesphere/issues/3469
|
||||
// https://github.com/kubesphere/kubesphere/pull/4599
|
||||
// https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md
|
||||
node = node.DeepCopy()
|
||||
c.annotateNode(node, pods)
|
||||
|
||||
return node, nil
|
||||
@@ -124,6 +133,7 @@ func (c *nodesGetter) List(_ string, q *query.Query) (*api.ListResult, error) {
|
||||
|
||||
var result = make([]interface{}, 0)
|
||||
for _, node := range selectedNodes {
|
||||
node = node.DeepCopy()
|
||||
c.annotateNode(node, nonTerminatedPodsList)
|
||||
result = append(result, node)
|
||||
}
|
||||
@@ -162,25 +172,13 @@ func (c *nodesGetter) filter(object runtime.Object, filter query.Filter) bool {
|
||||
}
|
||||
|
||||
// annotateNode adds cpu/memory requests usage data to node's annotations
|
||||
// this operation is resource consuming, so avoid annotating on every query
|
||||
// this operation mutates the *v1.Node passed in
|
||||
// so DO A DEEPCOPY before calling
|
||||
func (c *nodesGetter) annotateNode(node *v1.Node, pods []*v1.Pod) {
|
||||
// only annotate node when two consecutive annotating gap bigger than 30s
|
||||
c.mutex.Lock()
|
||||
if node.Annotations == nil {
|
||||
node.Annotations = make(map[string]string)
|
||||
}
|
||||
|
||||
if lastAnnotatedAt, ok := node.Annotations[nodeAnnotatedAt]; ok {
|
||||
if lastAnnotationTimeStamp, err := time.Parse(time.RFC3339, lastAnnotatedAt); err != nil {
|
||||
if lastAnnotationTimeStamp.Add(30 * time.Second).After(time.Now()) {
|
||||
c.mutex.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
node.Annotations[nodeAnnotatedAt] = time.Now().Format(time.RFC3339)
|
||||
c.mutex.Unlock()
|
||||
|
||||
if len(pods) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -145,8 +145,6 @@ func TestNodesGetterGet(t *testing.T) {
|
||||
}
|
||||
nodeGot := got.(*corev1.Node)
|
||||
|
||||
// ignore last-annotated-at annotation
|
||||
delete(nodeGot.Annotations, nodeAnnotatedAt)
|
||||
if diff := cmp.Diff(nodeGot.Annotations, expectedAnnotations); len(diff) != 0 {
|
||||
t.Errorf("%T, diff(-got, +expected), %v", expectedAnnotations, nodeGot.Annotations)
|
||||
}
|
||||
@@ -170,7 +168,7 @@ func TestListNodes(t *testing.T) {
|
||||
},
|
||||
&api.ListResult{
|
||||
Items: []interface{}{
|
||||
node2,
|
||||
node2Expected,
|
||||
},
|
||||
TotalItems: 1,
|
||||
},
|
||||
@@ -187,7 +185,7 @@ func TestListNodes(t *testing.T) {
|
||||
},
|
||||
&api.ListResult{
|
||||
Items: []interface{}{
|
||||
node1,
|
||||
node1Expected,
|
||||
},
|
||||
TotalItems: 1,
|
||||
},
|
||||
@@ -204,7 +202,7 @@ func TestListNodes(t *testing.T) {
|
||||
},
|
||||
&api.ListResult{
|
||||
Items: []interface{}{
|
||||
node2,
|
||||
node2Expected,
|
||||
},
|
||||
TotalItems: 1,
|
||||
},
|
||||
@@ -239,6 +237,17 @@ var (
|
||||
Unschedulable: true,
|
||||
},
|
||||
}
|
||||
|
||||
node1Expected = &corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node1",
|
||||
Annotations: map[string]string{},
|
||||
},
|
||||
Spec: corev1.NodeSpec{
|
||||
Unschedulable: true,
|
||||
},
|
||||
}
|
||||
|
||||
node2 = &corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node2",
|
||||
@@ -247,6 +256,17 @@ var (
|
||||
Unschedulable: false,
|
||||
},
|
||||
}
|
||||
|
||||
node2Expected = &corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node2",
|
||||
Annotations: map[string]string{},
|
||||
},
|
||||
Spec: corev1.NodeSpec{
|
||||
Unschedulable: false,
|
||||
},
|
||||
}
|
||||
|
||||
nodes = []*corev1.Node{node1, node2}
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user