fix ippool status statistics

and delete ippool label while workspace is deleted

sync default ippool to namespace annotation

Signed-off-by: Duan Jiong <djduanjiong@gmail.com>
This commit is contained in:
Duan Jiong
2021-02-25 18:53:08 +08:00
parent ee9c2d114c
commit 67cbff464f
6 changed files with 368 additions and 182 deletions

View File

@@ -19,32 +19,36 @@ package ippool
import (
"context"
"fmt"
"reflect"
"time"
cnet "github.com/projectcalico/libcalico-go/lib/net"
podv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
k8sinformers "k8s.io/client-go/informers"
coreinfomers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
networkv1alpha1 "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
tenantv1alpha1 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha1"
kubesphereclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
ksinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
networkInformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/network/v1alpha1"
tenantv1alpha1informers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/tenant/v1alpha1"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/controller/network/utils"
"kubesphere.io/kubesphere/pkg/controller/network/webhooks"
"kubesphere.io/kubesphere/pkg/simple/client/network/ippool"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"time"
)
var (
@@ -61,6 +65,13 @@ type IPPoolController struct {
ippoolSynced cache.InformerSynced
ippoolQueue workqueue.RateLimitingInterface
wsInformer tenantv1alpha1informers.WorkspaceInformer
wsSynced cache.InformerSynced
nsInformer coreinfomers.NamespaceInformer
nsSynced cache.InformerSynced
nsQueue workqueue.RateLimitingInterface
ipamblockInformer networkInformer.IPAMBlockInformer
ipamblockSynced cache.InformerSynced
@@ -68,31 +79,25 @@ type IPPoolController struct {
kubesphereClient kubesphereclient.Interface
}
func (c *IPPoolController) ippoolHandle(obj interface{}) {
func (c *IPPoolController) enqueueIPPools(obj interface{}) {
pool, ok := obj.(*networkv1alpha1.IPPool)
if !ok {
utilruntime.HandleError(fmt.Errorf("IPPool informer returned non-ippool object: %#v", obj))
return
}
key, err := cache.MetaNamespaceKeyFunc(pool)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for ippool %#v: %v", pool, err))
return
}
if utils.NeedToAddFinalizer(pool, networkv1alpha1.IPPoolFinalizer) || utils.IsDeletionCandidate(pool, networkv1alpha1.IPPoolFinalizer) {
c.ippoolQueue.Add(key)
}
c.ippoolQueue.Add(pool.Name)
}
func (c *IPPoolController) addFinalizer(pool *networkv1alpha1.IPPool) error {
clone := pool.DeepCopy()
controllerutil.AddFinalizer(clone, networkv1alpha1.IPPoolFinalizer)
clone.Labels = map[string]string{
networkv1alpha1.IPPoolNameLabel: clone.Name,
networkv1alpha1.IPPoolTypeLabel: clone.Spec.Type,
networkv1alpha1.IPPoolIDLabel: fmt.Sprintf("%d", clone.ID()),
if clone.Labels == nil {
clone.Labels = make(map[string]string)
}
clone.Labels[networkv1alpha1.IPPoolNameLabel] = clone.Name
clone.Labels[networkv1alpha1.IPPoolTypeLabel] = clone.Spec.Type
clone.Labels[networkv1alpha1.IPPoolIDLabel] = fmt.Sprintf("%d", clone.ID())
pool, err := c.kubesphereClient.NetworkV1alpha1().IPPools().Update(context.TODO(), clone, metav1.UpdateOptions{})
if err != nil {
klog.V(3).Infof("Error adding finalizer to pool %s: %v", pool.Name, err)
@@ -116,12 +121,15 @@ func (c *IPPoolController) removeFinalizer(pool *networkv1alpha1.IPPool) error {
func (c *IPPoolController) ValidateCreate(obj runtime.Object) error {
b := obj.(*networkv1alpha1.IPPool)
_, cidr, err := cnet.ParseCIDR(b.Spec.CIDR)
ip, cidr, err := cnet.ParseCIDR(b.Spec.CIDR)
if err != nil {
return fmt.Errorf("invalid cidr")
}
size, _ := cidr.Mask.Size()
if ip.IP.To4() != nil && size == 32 {
return fmt.Errorf("the cidr mask must be less than 32")
}
if b.Spec.BlockSize > 0 && b.Spec.BlockSize < size {
return fmt.Errorf("the blocksize should be larger than the cidr mask")
}
@@ -163,6 +171,25 @@ func (c *IPPoolController) ValidateCreate(obj runtime.Object) error {
return nil
}
func (c *IPPoolController) validateDefaultIPPool(p *networkv1alpha1.IPPool) error {
pools, err := c.kubesphereClient.NetworkV1alpha1().IPPools().List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(
labels.Set{
networkv1alpha1.IPPoolDefaultLabel: "",
}).String(),
})
if err != nil {
return err
}
poolLen := len(pools.Items)
if poolLen != 1 || pools.Items[0].Name != p.Name {
return nil
}
return fmt.Errorf("Must ensure that there is at least one default ippool")
}
func (c *IPPoolController) ValidateUpdate(old runtime.Object, new runtime.Object) error {
oldP := old.(*networkv1alpha1.IPPool)
newP := new.(*networkv1alpha1.IPPool)
@@ -183,6 +210,15 @@ func (c *IPPoolController) ValidateUpdate(old runtime.Object, new runtime.Object
return fmt.Errorf("ippool rangeEnd/rangeStart cannot be modified")
}
_, defaultOld := oldP.Labels[networkv1alpha1.IPPoolDefaultLabel]
_, defaultNew := newP.Labels[networkv1alpha1.IPPoolDefaultLabel]
if !defaultNew && defaultOld != defaultNew {
err := c.validateDefaultIPPool(newP)
if err != nil {
return err
}
}
return nil
}
@@ -193,7 +229,7 @@ func (c *IPPoolController) ValidateDelete(obj runtime.Object) error {
return fmt.Errorf("ippool is in use, please remove the workload before deleting")
}
return nil
return c.validateDefaultIPPool(p)
}
func (c *IPPoolController) disableIPPool(old *networkv1alpha1.IPPool) error {
@@ -204,7 +240,7 @@ func (c *IPPoolController) disableIPPool(old *networkv1alpha1.IPPool) error {
clone := old.DeepCopy()
clone.Spec.Disabled = true
old, err := c.kubesphereClient.NetworkV1alpha1().IPPools().Update(context.TODO(), clone, metav1.UpdateOptions{})
_, err := c.kubesphereClient.NetworkV1alpha1().IPPools().Update(context.TODO(), clone, metav1.UpdateOptions{})
return err
}
@@ -305,19 +341,20 @@ func (c *IPPoolController) Run(workers int, stopCh <-chan struct{}) error {
klog.Info("starting ippool controller")
defer klog.Info("shutting down ippool controller")
if !cache.WaitForCacheSync(stopCh, c.ippoolSynced, c.ipamblockSynced) {
if !cache.WaitForCacheSync(stopCh, c.ippoolSynced, c.ipamblockSynced, c.wsSynced, c.nsSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runIPPoolWorker, time.Second, stopCh)
go wait.Until(c.runNSWorker, time.Second, stopCh)
}
<-stopCh
return nil
}
func (c *IPPoolController) runWorker() {
func (c *IPPoolController) runIPPoolWorker() {
for c.processIPPoolItem() {
}
}
@@ -329,13 +366,7 @@ func (c *IPPoolController) processIPPoolItem() bool {
}
defer c.ippoolQueue.Done(key)
_, name, err := cache.SplitMetaNamespaceKey(key.(string))
if err != nil {
utilruntime.HandleError(fmt.Errorf("error parsing ippool key %q: %v", key, err))
return true
}
delay, err := c.processIPPool(name)
delay, err := c.processIPPool(key.(string))
if err == nil {
c.ippoolQueue.Forget(key)
return true
@@ -350,7 +381,63 @@ func (c *IPPoolController) processIPPoolItem() bool {
return true
}
func (c *IPPoolController) ipamblockHandle(obj interface{}) {
func (c *IPPoolController) runNSWorker() {
for c.processNSItem() {
}
}
func (c *IPPoolController) processNS(name string) error {
ns, err := c.nsInformer.Lister().Get(name)
if apierrors.IsNotFound(err) {
return nil
}
var poolsName []string
if ns.Labels != nil && ns.Labels[constants.WorkspaceLabelKey] != "" {
pools, err := c.ippoolInformer.Lister().List(labels.SelectorFromSet(labels.Set{
networkv1alpha1.IPPoolDefaultLabel: "",
}))
if err != nil {
return err
}
for _, pool := range pools {
poolsName = append(poolsName, pool.Name)
}
}
clone := ns.DeepCopy()
err = c.provider.UpdateNamespace(clone, poolsName)
if err != nil {
return err
}
if reflect.DeepEqual(clone, ns) {
return nil
}
_, err = c.client.CoreV1().Namespaces().Update(context.TODO(), clone, metav1.UpdateOptions{})
return err
}
func (c *IPPoolController) processNSItem() bool {
key, quit := c.nsQueue.Get()
if quit {
return false
}
defer c.nsQueue.Done(key)
err := c.processNS(key.(string))
if err == nil {
c.nsQueue.Forget(key)
return true
}
c.nsQueue.AddRateLimited(key)
utilruntime.HandleError(fmt.Errorf("error processing ns %v (will retry): %v", key, err))
return true
}
func (c *IPPoolController) enqueueIPAMBlocks(obj interface{}) {
block, ok := obj.(*networkv1alpha1.IPAMBlock)
if !ok {
return
@@ -360,9 +447,47 @@ func (c *IPPoolController) ipamblockHandle(obj interface{}) {
c.ippoolQueue.Add(poolName)
}
func (c *IPPoolController) enqueueWorkspace(obj interface{}) {
wk, ok := obj.(*tenantv1alpha1.Workspace)
if !ok {
return
}
pools, err := c.ippoolInformer.Lister().List(labels.SelectorFromSet(labels.Set{
constants.WorkspaceLabelKey: wk.Name,
}))
if err != nil {
klog.Errorf("failed to list ippools by worksapce %s, err=%v", wk.Name, err)
}
for _, pool := range pools {
c.ippoolQueue.Add(pool.Name)
}
}
func (c *IPPoolController) enqueueNamespace(old interface{}, new interface{}) {
workspaceOld := ""
if old != nil {
nsOld := old.(*corev1.Namespace)
if nsOld.Labels != nil {
workspaceOld = nsOld.Labels[constants.WorkspaceLabelKey]
}
}
nsNew := new.(*corev1.Namespace)
workspaceNew := ""
if nsNew.Labels != nil {
workspaceNew = nsNew.Labels[constants.WorkspaceLabelKey]
}
if workspaceOld != workspaceNew {
c.nsQueue.Add(nsNew.Name)
}
}
func NewIPPoolController(
ippoolInformer networkInformer.IPPoolInformer,
ipamblockInformer networkInformer.IPAMBlockInformer,
kubesphereInformers ksinformers.SharedInformerFactory,
kubernetesInformers k8sinformers.SharedInformerFactory,
client clientset.Interface,
kubesphereClient kubesphereclient.Interface,
provider ippool.Provider) *IPPoolController {
@@ -371,43 +496,71 @@ func NewIPPoolController(
broadcaster.StartLogging(func(format string, args ...interface{}) {
klog.Info(fmt.Sprintf(format, args))
})
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ippool-controller"})
broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "ippool-controller"})
c := &IPPoolController{
eventBroadcaster: broadcaster,
eventRecorder: recorder,
ippoolInformer: ippoolInformer,
ippoolSynced: ippoolInformer.Informer().HasSynced,
ippoolQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ippool"),
ipamblockInformer: ipamblockInformer,
ipamblockSynced: ipamblockInformer.Informer().HasSynced,
client: client,
kubesphereClient: kubesphereClient,
provider: provider,
eventBroadcaster: broadcaster,
eventRecorder: recorder,
ippoolQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ippool"),
nsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ippool-ns"),
client: client,
kubesphereClient: kubesphereClient,
provider: provider,
}
c.ippoolInformer = kubesphereInformers.Network().V1alpha1().IPPools()
c.ippoolSynced = c.ippoolInformer.Informer().HasSynced
c.ipamblockInformer = kubesphereInformers.Network().V1alpha1().IPAMBlocks()
c.ipamblockSynced = c.ipamblockInformer.Informer().HasSynced
c.wsInformer = kubesphereInformers.Tenant().V1alpha1().Workspaces()
c.wsSynced = c.wsInformer.Informer().HasSynced
c.nsInformer = kubernetesInformers.Core().V1().Namespaces()
c.nsSynced = c.nsInformer.Informer().HasSynced
ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.ippoolHandle,
c.ippoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueIPPools,
UpdateFunc: func(old, new interface{}) {
c.ippoolHandle(new)
_, defaultOld := old.(*networkv1alpha1.IPPool).Labels[networkv1alpha1.IPPoolDefaultLabel]
_, defaultNew := new.(*networkv1alpha1.IPPool).Labels[networkv1alpha1.IPPoolDefaultLabel]
if defaultOld != defaultNew {
nss, err := c.nsInformer.Lister().List(labels.Everything())
if err != nil {
return
}
for _, ns := range nss {
c.enqueueNamespace(nil, ns)
}
}
c.enqueueIPPools(new)
},
})
//just for update ippool status
ipamblockInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.ipamblockHandle,
c.ipamblockInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueIPAMBlocks,
UpdateFunc: func(old, new interface{}) {
c.ipamblockHandle(new)
c.enqueueIPAMBlocks(new)
},
DeleteFunc: c.ipamblockHandle,
DeleteFunc: c.enqueueIPAMBlocks,
})
c.wsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.enqueueWorkspace,
})
c.nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
c.enqueueNamespace(nil, new)
},
UpdateFunc: c.enqueueNamespace,
})
//register ippool webhook
webhooks.RegisterValidator(networkv1alpha1.SchemeGroupVersion.WithKind(networkv1alpha1.ResourceKindIPPool).String(),
&webhooks.ValidatorWrap{Obj: &networkv1alpha1.IPPool{}, Helper: c})
webhooks.RegisterDefaulter(podv1.SchemeGroupVersion.WithKind("Pod").String(),
&webhooks.DefaulterWrap{Obj: &podv1.Pod{}, Helper: provider})
webhooks.RegisterDefaulter(corev1.SchemeGroupVersion.WithKind("Pod").String(),
&webhooks.DefaulterWrap{Obj: &corev1.Pod{}, Helper: provider})
return c
}

View File

@@ -25,6 +25,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sinformers "k8s.io/client-go/informers"
k8sfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
@@ -45,6 +46,10 @@ func TestIPPoolSuit(t *testing.T) {
RunSpecs(t, "IPPool Suite")
}
var (
alwaysReady = func() bool { return true }
)
var _ = Describe("test ippool", func() {
pool := &v1alpha1.IPPool{
TypeMeta: v1.TypeMeta{},
@@ -60,16 +65,16 @@ var _ = Describe("test ippool", func() {
ksclient := ksfake.NewSimpleClientset()
k8sclinet := k8sfake.NewSimpleClientset()
p := ippool.NewProvider(nil, ksclient, k8sclinet, v1alpha1.IPPoolTypeLocal, nil)
ipamClient := ipam.NewIPAMClient(ksclient, v1alpha1.VLAN)
ksInformer := ksinformers.NewSharedInformerFactory(ksclient, 0)
ippoolInformer := ksInformer.Network().V1alpha1().IPPools()
ipamblockInformer := ksInformer.Network().V1alpha1().IPAMBlocks()
c := NewIPPoolController(ippoolInformer, ipamblockInformer, k8sclinet, ksclient, p)
k8sInformer := k8sinformers.NewSharedInformerFactory(k8sclinet, 0)
p := ippool.NewProvider(k8sInformer, ksclient, k8sclinet, v1alpha1.IPPoolTypeLocal, nil)
ipamClient := ipam.NewIPAMClient(ksclient, v1alpha1.VLAN)
c := NewIPPoolController(ksInformer, k8sInformer, k8sclinet, ksclient, p)
stopCh := make(chan struct{})
go ksInformer.Start(stopCh)
go k8sInformer.Start(stopCh)
go c.Start(stopCh)
It("test create ippool", func() {