add ippool resource api

add ippool webhook and fix some bugs

Signed-off-by: Duan Jiong <djduanjiong@gmail.com>
This commit is contained in:
Duan Jiong
2020-12-02 18:34:13 +08:00
parent 8a6ce2d7ac
commit 24e3ac865f
30 changed files with 3057 additions and 390 deletions

View File

@@ -17,31 +17,64 @@ limitations under the License.
package calico
import (
"encoding/json"
"errors"
"fmt"
"net"
"time"
v3 "github.com/projectcalico/libcalico-go/lib/apis/v3"
"github.com/projectcalico/libcalico-go/lib/backend/model"
cnet "github.com/projectcalico/libcalico-go/lib/net"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
informercorev1 "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apis/network/calicov3"
"kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
kubesphereclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
calicoset "kubesphere.io/kubesphere/pkg/simple/client/network/ippool/calico/client/clientset/versioned"
calicoInformer "kubesphere.io/kubesphere/pkg/simple/client/network/ippool/calico/client/informers/externalversions"
blockInformer "kubesphere.io/kubesphere/pkg/simple/client/network/ippool/calico/client/informers/externalversions/network/calicov3"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
const (
CalicoNamespaceAnnotationIPPoolV4 = "cni.projectcalico.org/ipv4pools"
CalicoNamespaceAnnotationIPPoolV6 = "cni.projectcalico.org/ipv6pools"
CalicoPodAnnotationIPAddr = "cni.projectcalico.org/ipAddrs"
CalicoAnnotationIPPoolV4 = "cni.projectcalico.org/ipv4pools"
CalicoAnnotationIPPoolV6 = "cni.projectcalico.org/ipv6pools"
CalicoPodAnnotationIPAddr = "cni.projectcalico.org/ipAddrs"
CalicoPodAnnotationPodIP = "cni.projectcalico.org/podIP"
// Common attributes which may be set on allocations by clients.
IPAMBlockAttributePod = "pod"
IPAMBlockAttributeNamespace = "namespace"
IPAMBlockAttributeNode = "node"
IPAMBlockAttributeType = "type"
IPAMBlockAttributeTypeIPIP = "ipipTunnelAddress"
IPAMBlockAttributeTypeVXLAN = "vxlanTunnelAddress"
CALICO_IPV4POOL_IPIP = "CALICO_IPV4POOL_IPIP"
CALICO_IPV4POOL_VXLAN = "CALICO_IPV4POOL_VXLAN"
CALICO_IPV4POOL_NAT_OUTGOING = "CALICO_IPV4POOL_NAT_OUTGOING"
CalicoNodeDaemonset = "calico-node"
CalicoNodeNamespace = "kube-system"
DefaultBlockSize = 25
// default re-sync period for all informer factories
defaultResync = 600 * time.Second
)
var (
@@ -49,9 +82,15 @@ var (
)
type provider struct {
client calicoset.Interface
ksclient kubesphereclient.Interface
options Options
client calicoset.Interface
ksclient kubesphereclient.Interface
k8sclient clientset.Interface
pods informercorev1.PodInformer
block blockInformer.IPAMBlockInformer
queue workqueue.RateLimitingInterface
poolQueue workqueue.RateLimitingInterface
options Options
}
func (c provider) CreateIPPool(pool *v1alpha1.IPPool) error {
@@ -70,6 +109,12 @@ func (c provider) CreateIPPool(pool *v1alpha1.IPPool) error {
},
}
_, cidr, _ := net.ParseCIDR(pool.Spec.CIDR)
size, _ := cidr.Mask.Size()
if size > DefaultBlockSize {
calicoPool.Spec.BlockSize = size
}
err := controllerutil.SetControllerReference(pool, calicoPool, scheme.Scheme)
if err != nil {
klog.Warningf("cannot set reference for calico ippool %s, err=%v", pool.Name, err)
@@ -88,7 +133,7 @@ func (c provider) UpdateIPPool(pool *v1alpha1.IPPool) error {
}
func (c provider) GetIPPoolStats(pool *v1alpha1.IPPool) (*v1alpha1.IPPool, error) {
stats := &v1alpha1.IPPool{}
stats := pool.DeepCopy()
calicoPool, err := c.client.CrdCalicov3().IPPools().Get(pool.Name, v1.GetOptions{})
if err != nil {
@@ -100,24 +145,46 @@ func (c provider) GetIPPoolStats(pool *v1alpha1.IPPool) (*v1alpha1.IPPool, error
return nil, err
}
stats.Status.Capacity = pool.NumAddresses()
stats.Status.Reserved = 0
stats.Status.Unallocated = 0
if stats.Status.Capacity == 0 {
stats.Status.Capacity = pool.NumAddresses()
}
stats.Status.Synced = true
stats.Status.Allocations = 0
stats.Status.Reserved = 0
if stats.Status.Workspaces == nil {
stats.Status.Workspaces = make(map[string]v1alpha1.WorkspaceStatus)
}
if len(blocks) <= 0 {
stats.Status.Unallocated = pool.NumAddresses()
stats.Status.Allocations = 0
return stats, nil
} else {
for _, block := range blocks {
stats.Status.Allocations += block.NumAddresses() - block.NumFreeAddresses() - block.NumReservedAddresses()
stats.Status.Reserved += block.NumReservedAddresses()
}
stats.Status.Unallocated = stats.Status.Capacity - stats.Status.Allocations - stats.Status.Reserved
}
for _, block := range blocks {
stats.Status.Allocations += block.NumAddresses() - block.NumFreeAddresses() - block.NumReservedAddresses()
stats.Status.Reserved += block.NumReservedAddresses()
wks, err := c.getAssociatedWorkspaces(pool)
if err != nil {
return nil, err
}
stats.Status.Unallocated = stats.Status.Capacity - stats.Status.Allocations - stats.Status.Reserved
for _, wk := range wks {
status, err := c.getWorkspaceStatus(wk, pool.GetName())
if err != nil {
return nil, err
}
stats.Status.Workspaces[wk] = *status
}
for name, wk := range stats.Status.Workspaces {
if wk.Allocations == 0 {
delete(stats.Status.Workspaces, name)
}
}
return stats, nil
}
@@ -240,6 +307,9 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) {
// Get the pool so that we can find the CIDR associated with it.
calicoPool, err := c.client.CrdCalicov3().IPPools().Get(pool.Name, v1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return true, nil
}
return false, err
}
@@ -318,6 +388,9 @@ func (c provider) syncIPPools() error {
TypeMeta: v1.TypeMeta{},
ObjectMeta: v1.ObjectMeta{
Name: calicoPool.Name,
Labels: map[string]string{
v1alpha1.IPPoolDefaultLabel: "",
},
},
Spec: v1alpha1.IPPoolSpec{
Type: v1alpha1.Calico,
@@ -339,57 +412,233 @@ func (c provider) syncIPPools() error {
return nil
}
func (c provider) SyncStatus(stopCh <-chan struct{}, q workqueue.RateLimitingInterface) error {
blockWatch, err := c.client.CrdCalicov3().IPAMBlocks().Watch(v1.ListOptions{})
if err != nil {
return err
func (p provider) getAssociatedWorkspaces(pool *v1alpha1.IPPool) ([]string, error) {
var result []string
poolLabel := constants.WorkspaceLabelKey
if pool.GetLabels() == nil || pool.GetLabels()[poolLabel] == "" {
wks, err := p.ksclient.TenantV1alpha1().Workspaces().List(v1.ListOptions{})
if err != nil {
return nil, err
}
for _, wk := range wks.Items {
result = append(result, wk.GetName())
}
return result, nil
}
ch := blockWatch.ResultChan()
defer blockWatch.Stop()
return append(result, pool.GetLabels()[poolLabel]), nil
}
for {
select {
case <-stopCh:
return nil
case event, ok := <-ch:
if !ok {
// End of results.
return fmt.Errorf("calico ipamblock watch closed")
}
func (p provider) getWorkspaceStatus(name string, poolName string) (*v1alpha1.WorkspaceStatus, error) {
var result v1alpha1.WorkspaceStatus
if event.Type == watch.Added || event.Type == watch.Deleted || event.Type == watch.Modified {
block := event.Object.(*calicov3.IPAMBlock)
_, blockCIDR, _ := cnet.ParseCIDR(block.Spec.CIDR)
namespaces, err := p.k8sclient.CoreV1().Namespaces().List(v1.ListOptions{
LabelSelector: labels.SelectorFromSet(
map[string]string{
constants.WorkspaceLabelKey: name,
},
).String(),
})
if err != nil {
return nil, err
}
if block.Labels[v1alpha1.IPPoolNameLabel] != "" {
q.Add(block.Labels[v1alpha1.IPPoolNameLabel])
continue
}
pools, err := c.ksclient.NetworkV1alpha1().IPPools().List(v1.ListOptions{})
if err != nil {
continue
}
for _, pool := range pools.Items {
_, poolCIDR, _ := cnet.ParseCIDR(pool.Spec.CIDR)
if poolCIDR.IsNetOverlap(blockCIDR.IPNet) {
q.Add(pool.Name)
block.Labels = map[string]string{
v1alpha1.IPPoolNameLabel: pool.Name,
}
c.client.CrdCalicov3().IPAMBlocks().Update(block)
break
}
}
for _, ns := range namespaces.Items {
pods, err := p.k8sclient.CoreV1().Pods(ns.GetName()).List(v1.ListOptions{})
if err != nil {
return nil, err
}
for _, pod := range pods.Items {
if pod.GetLabels() != nil && pod.GetLabels()[v1alpha1.IPPoolNameLabel] == poolName {
result.Allocations++
}
}
}
return &result, nil
}
func NewProvider(ksclient kubesphereclient.Interface, options Options, k8sOptions *k8s.KubernetesOptions) provider {
func (p provider) Type() string {
return v1alpha1.IPPoolTypeCalico
}
func (p provider) SyncStatus(stopCh <-chan struct{}, q workqueue.RateLimitingInterface) error {
defer utilruntime.HandleCrash()
defer p.queue.ShutDown()
klog.Info("starting calico block controller")
defer klog.Info("shutting down calico block controller")
p.poolQueue = q
go p.block.Informer().Run(stopCh)
if !cache.WaitForCacheSync(stopCh, p.pods.Informer().HasSynced, p.block.Informer().HasSynced) {
klog.Fatal("failed to wait for caches to sync")
}
for i := 0; i < 5; i++ {
go wait.Until(p.runWorker, time.Second, stopCh)
}
<-stopCh
return nil
}
func (p provider) processBlock(name string) error {
block, err := p.block.Lister().Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
_, blockCIDR, _ := cnet.ParseCIDR(block.Spec.CIDR)
poolName := block.Labels[v1alpha1.IPPoolNameLabel]
if poolName == "" {
pools, err := p.ksclient.NetworkV1alpha1().IPPools().List(v1.ListOptions{})
if err != nil {
return err
}
for _, pool := range pools.Items {
_, poolCIDR, _ := cnet.ParseCIDR(pool.Spec.CIDR)
if poolCIDR.IsNetOverlap(blockCIDR.IPNet) {
poolName = pool.Name
block.Labels = map[string]string{
v1alpha1.IPPoolNameLabel: pool.Name,
}
p.client.CrdCalicov3().IPAMBlocks().Update(block)
break
}
}
}
for _, podAttr := range block.Spec.Attributes {
name := podAttr.AttrSecondary[IPAMBlockAttributePod]
namespace := podAttr.AttrSecondary[IPAMBlockAttributeNamespace]
if name == "" || namespace == "" {
continue
}
pod, err := p.pods.Lister().Pods(namespace).Get(name)
if err != nil {
continue
}
labels := pod.GetLabels()
if labels != nil {
poolLabel := labels[v1alpha1.IPPoolNameLabel]
if poolLabel != "" {
continue
}
}
retry.RetryOnConflict(retry.DefaultBackoff, func() error {
pod, err = p.k8sclient.CoreV1().Pods(namespace).Get(name, v1.GetOptions{})
if err != nil {
return err
}
labels := pod.GetLabels()
if labels != nil {
poolLabel := labels[v1alpha1.IPPoolNameLabel]
if poolLabel != "" {
return nil
}
} else {
pod.Labels = make(map[string]string)
}
if pod.GetAnnotations() == nil {
pod.Annotations = make(map[string]string)
}
annostrs, _ := json.Marshal([]string{poolName})
pod.GetAnnotations()[CalicoAnnotationIPPoolV4] = string(annostrs)
pod.Labels[v1alpha1.IPPoolNameLabel] = poolName
_, err = p.k8sclient.CoreV1().Pods(namespace).Update(pod)
return err
})
}
p.poolQueue.Add(poolName)
return nil
}
func (p provider) processBlockItem() bool {
key, quit := p.queue.Get()
if quit {
return false
}
defer p.queue.Done(key)
err := p.processBlock(key.(string))
if err == nil {
p.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("error processing calico block %v (will retry): %v", key, err))
p.queue.AddRateLimited(key)
return true
}
func (p provider) runWorker() {
for p.processBlockItem() {
}
}
func (p provider) addBlock(obj interface{}) {
block, ok := obj.(*calicov3.IPAMBlock)
if !ok {
return
}
p.queue.Add(block.Name)
}
func (p provider) Default(obj runtime.Object) error {
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil
}
annos := pod.GetAnnotations()
if annos == nil {
pod.Annotations = make(map[string]string)
}
if annos[CalicoAnnotationIPPoolV4] == "" {
pools, err := p.ksclient.NetworkV1alpha1().IPPools().List(v1.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
v1alpha1.IPPoolDefaultLabel: "",
}).String(),
})
if err != nil {
return err
}
var poolNames []string
for _, pool := range pools.Items {
poolNames = append(poolNames, pool.Name)
}
if len(poolNames) > 0 {
annostrs, _ := json.Marshal(poolNames)
pod.Annotations[CalicoAnnotationIPPoolV4] = string(annostrs)
}
}
return nil
}
func NewProvider(podInformer informercorev1.PodInformer, ksclient kubesphereclient.Interface, k8sClient clientset.Interface, k8sOptions *k8s.KubernetesOptions) provider {
config, err := clientcmd.BuildConfigFromFlags("", k8sOptions.KubeConfig)
if err != nil {
klog.Fatalf("failed to build k8s config , err=%v", err)
@@ -401,11 +650,49 @@ func NewProvider(ksclient kubesphereclient.Interface, options Options, k8sOption
klog.Fatalf("failed to new calico client , err=%v", err)
}
p := provider{
client: client,
ksclient: ksclient,
options: options,
ds, err := k8sClient.AppsV1().DaemonSets(CalicoNodeNamespace).Get(CalicoNodeDaemonset, v1.GetOptions{})
if err != nil {
klog.Fatalf("failed to get calico-node deployment in kube-system, err=%v", err)
}
opts := Options{
IPIPMode: "Always",
VXLANMode: "Never",
NATOutgoing: true,
}
envs := ds.Spec.Template.Spec.Containers[0].Env
for _, env := range envs {
if env.Name == CALICO_IPV4POOL_IPIP {
opts.IPIPMode = env.Value
}
if env.Name == CALICO_IPV4POOL_VXLAN {
opts.VXLANMode = env.Value
}
if env.Name == CALICO_IPV4POOL_NAT_OUTGOING {
if env.Value != "true" {
opts.NATOutgoing = false
}
}
}
p := provider{
client: client,
ksclient: ksclient,
k8sclient: k8sClient,
pods: podInformer,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "calicoBlock"),
options: opts,
}
blockI := calicoInformer.NewSharedInformerFactory(client, defaultResync).Crd().Calicov3().IPAMBlocks()
blockI.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.addBlock,
UpdateFunc: func(old, new interface{}) {
p.addBlock(new)
},
})
p.block = blockI
if err := p.syncIPPools(); err != nil {
klog.Fatalf("failed to sync calico ippool to kubesphere ippool, err=%v", err)