network: support network isolate

Add new crd to convert kubesphere network policy to k8s network policy, and then other network
plugin will do the rest work.

Use  cache.go from calico project's kube-controller,  it aim to  sync nsnp with k8s np, delete unused np, and relieve the pressure on k8s restful client.

If you want higher performance, you can implement interface  NsNetworkPolicyProvider in pkg/controller/provider/namespace_np.go.

Signed-off-by: Duan Jiong <djduanjiong@gmail.com>
This commit is contained in:
Duan Jiong
2020-04-15 21:42:29 +08:00
parent fc373b18e3
commit d3bdcd0465
85 changed files with 4130 additions and 6254 deletions

View File

@@ -1,3 +0,0 @@
package provider
// +kubebuilder:rbac:groups="crd.projectcalico.org",resources=globalfelixconfigs;felixconfigurations;ippools;ipamblocks;globalnetworkpolicies;globalnetworksets;networkpolicies;networksets;clusterinformations;hostendpoints,verbs=get;list;watch;create;patch;update;delete

View File

@@ -0,0 +1,49 @@
package provider
import (
"fmt"
"github.com/projectcalico/kube-controllers/pkg/converter"
api "github.com/projectcalico/libcalico-go/lib/apis/v3"
constants "github.com/projectcalico/libcalico-go/lib/backend/k8s/conversion"
v1 "k8s.io/api/networking/v1"
)
func NewFakeNetworkProvider() *FakeNetworkProvider {
f := new(FakeNetworkProvider)
f.NSNPData = make(map[string]*api.NetworkPolicy)
f.policyConverter = converter.NewPolicyConverter()
return f
}
type FakeNetworkProvider struct {
NSNPData map[string]*api.NetworkPolicy
policyConverter converter.Converter
}
func (f *FakeNetworkProvider) Delete(key string) {
delete(f.NSNPData, key)
}
func (f *FakeNetworkProvider) Start(stopCh <-chan struct{}) {
}
func (f *FakeNetworkProvider) Set(np *v1.NetworkPolicy) error {
policy, err := f.policyConverter.Convert(np)
if err != nil {
return err
}
// Add to cache.
k := f.policyConverter.GetKey(policy)
tmp := policy.(api.NetworkPolicy)
f.NSNPData[k] = &tmp
return nil
}
func (f *FakeNetworkProvider) GetKey(name, nsname string) string {
policyName := fmt.Sprintf(constants.K8sNetworkPolicyNamePrefix + name)
return fmt.Sprintf("%s/%s", nsname, policyName)
}

View File

@@ -1,66 +0,0 @@
package provider
import (
"reflect"
"github.com/projectcalico/libcalico-go/lib/errors"
"k8s.io/client-go/tools/cache"
api "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
)
func NewFakeCalicoNetworkProvider() *FakeCalicoNetworkProvider {
f := new(FakeCalicoNetworkProvider)
f.NSNPData = make(map[string]*api.NamespaceNetworkPolicy)
return f
}
type FakeCalicoNetworkProvider struct {
NSNPData map[string]*api.NamespaceNetworkPolicy
}
func (f *FakeCalicoNetworkProvider) Get(o *api.NamespaceNetworkPolicy) (interface{}, error) {
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
obj, ok := f.NSNPData[namespacename]
if !ok {
return nil, errors.ErrorResourceDoesNotExist{}
}
return obj, nil
}
func (f *FakeCalicoNetworkProvider) Add(o *api.NamespaceNetworkPolicy) error {
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
if _, ok := f.NSNPData[namespacename]; ok {
return errors.ErrorResourceAlreadyExists{}
}
f.NSNPData[namespacename] = o
return nil
}
func (f *FakeCalicoNetworkProvider) CheckExist(o *api.NamespaceNetworkPolicy) (bool, error) {
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
if _, ok := f.NSNPData[namespacename]; ok {
return true, nil
}
return false, nil
}
func (f *FakeCalicoNetworkProvider) NeedUpdate(o *api.NamespaceNetworkPolicy) (bool, error) {
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
store := f.NSNPData[namespacename]
if !reflect.DeepEqual(store, o) {
return true, nil
}
return false, nil
}
func (f *FakeCalicoNetworkProvider) Update(o *api.NamespaceNetworkPolicy) error {
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
f.NSNPData[namespacename] = o
return nil
}
func (f *FakeCalicoNetworkProvider) Delete(o *api.NamespaceNetworkPolicy) error {
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
delete(f.NSNPData, namespacename)
return nil
}

View File

@@ -1 +0,0 @@
package provider

View File

@@ -1,35 +1,11 @@
package provider
import (
k8snetworkinformer "k8s.io/client-go/informers/networking/v1"
k8snetworklister "k8s.io/client-go/listers/networking/v1"
api "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
)
import netv1 "k8s.io/api/networking/v1"
// NsNetworkPolicyProvider is a interface to let different cnis to implement our api
type NsNetworkPolicyProvider interface {
Add(*api.NamespaceNetworkPolicy) error
CheckExist(*api.NamespaceNetworkPolicy) (bool, error)
NeedUpdate(*api.NamespaceNetworkPolicy) (bool, error)
Update(*api.NamespaceNetworkPolicy) error
Delete(*api.NamespaceNetworkPolicy) error
Get(*api.NamespaceNetworkPolicy) (interface{}, error)
}
// TODO: support no-calico CNI
type k8sNetworkProvider struct {
networkPolicyInformer k8snetworkinformer.NetworkPolicyInformer
networkPolicyLister k8snetworklister.NetworkPolicyLister
}
func (k *k8sNetworkProvider) Add(o *api.NamespaceNetworkPolicy) error {
return nil
}
func (k *k8sNetworkProvider) CheckExist(o *api.NamespaceNetworkPolicy) (bool, error) {
return false, nil
}
func (k *k8sNetworkProvider) Delete(o *api.NamespaceNetworkPolicy) error {
return nil
Delete(key string)
Set(policy *netv1.NetworkPolicy) error
Start(stopCh <-chan struct{})
GetKey(name, nsname string) string
}

View File

@@ -1,144 +0,0 @@
package provider
import (
"context"
"encoding/json"
"reflect"
"time"
v3 "github.com/projectcalico/libcalico-go/lib/apis/v3"
"github.com/projectcalico/libcalico-go/lib/clientv3"
"github.com/projectcalico/libcalico-go/lib/errors"
"github.com/projectcalico/libcalico-go/lib/options"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/klogr"
api "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
)
var log = klogr.New().WithName("calico-client")
var defaultBackoff = wait.Backoff{
Steps: 4,
Duration: 10 * time.Millisecond,
Factor: 5.0,
Jitter: 0.1,
}
type calicoNetworkProvider struct {
np clientv3.NetworkPolicyInterface
}
func NewCalicoNetworkProvider(np clientv3.NetworkPolicyInterface) NsNetworkPolicyProvider {
return &calicoNetworkProvider{
np: np,
}
}
func convertSpec(n *api.NamespaceNetworkPolicySpec) *v3.NetworkPolicySpec {
bytes, err := json.Marshal(&n)
if err != nil {
panic(err)
}
m := new(v3.NetworkPolicySpec)
err = json.Unmarshal(bytes, m)
if err != nil {
panic(err)
}
return m
}
// ConvertAPIToCalico convert our api to calico api
func ConvertAPIToCalico(n *api.NamespaceNetworkPolicy) *v3.NetworkPolicy {
output := v3.NewNetworkPolicy()
//Object Metadata
output.ObjectMeta.Name = n.Name
output.Namespace = n.Namespace
output.Annotations = n.Annotations
output.Labels = n.Labels
//spec
output.Spec = *(convertSpec(&n.Spec))
return output
}
func (k *calicoNetworkProvider) Get(o *api.NamespaceNetworkPolicy) (interface{}, error) {
return k.np.Get(context.TODO(), o.Namespace, o.Name, options.GetOptions{})
}
func (k *calicoNetworkProvider) Add(o *api.NamespaceNetworkPolicy) error {
log.V(3).Info("Creating network policy", "name", o.Name, "namespace", o.Namespace)
obj := ConvertAPIToCalico(o)
log.V(4).Info("Show object spe detail", "name", o.Name, "namespace", o.Namespace, "Spec", obj.Spec)
_, err := k.np.Create(context.TODO(), obj, options.SetOptions{})
return err
}
func (k *calicoNetworkProvider) CheckExist(o *api.NamespaceNetworkPolicy) (bool, error) {
log.V(3).Info("Checking network policy whether exsits or not", "name", o.Name, "namespace", o.Namespace)
out, err := k.np.Get(context.Background(), o.Namespace, o.Name, options.GetOptions{})
if err != nil {
if _, ok := err.(errors.ErrorResourceDoesNotExist); ok {
return false, nil
}
return false, err
}
if out != nil {
return true, nil
}
return false, nil
}
func (k *calicoNetworkProvider) Delete(o *api.NamespaceNetworkPolicy) error {
log.V(3).Info("Deleting network policy", "name", o.Name, "namespace", o.Namespace)
_, err := k.np.Delete(context.Background(), o.Namespace, o.Name, options.DeleteOptions{})
return err
}
func (k *calicoNetworkProvider) NeedUpdate(o *api.NamespaceNetworkPolicy) (bool, error) {
store, err := k.np.Get(context.Background(), o.Namespace, o.Name, options.GetOptions{})
if err != nil {
log.Error(err, "Failed to get resource", "name", o.Name, "namespace", o.Namespace)
}
expected := ConvertAPIToCalico(o)
log.V(4).Info("Comparing Spec", "store", store.Spec, "current", expected.Spec)
if !reflect.DeepEqual(store.Spec, expected.Spec) {
return true, nil
}
return false, nil
}
func (k *calicoNetworkProvider) Update(o *api.NamespaceNetworkPolicy) error {
log.V(3).Info("Updating network policy", "name", o.Name, "namespace", o.Namespace)
updateObject, err := k.Get(o)
if err != nil {
log.Error(err, "Failed to get resource in store")
return err
}
up := updateObject.(*v3.NetworkPolicy)
up.Spec = *convertSpec(&o.Spec)
err = RetryOnConflict(defaultBackoff, func() error {
_, err := k.np.Update(context.Background(), up, options.SetOptions{})
return err
})
if err != nil {
log.Error(err, "Failed to update resource", "name", o.Name, "namespace", o.Namespace)
}
return err
}
// RetryOnConflict is same as the function in k8s, but replaced with error in calico
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
var lastConflictErr error
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := fn()
if err == nil {
return true, nil
}
if _, ok := err.(errors.ErrorResourceUpdateConflict); ok {
lastConflictErr = err
return false, nil
}
return false, err
})
if err == wait.ErrWaitTimeout {
err = lastConflictErr
}
return err
}

View File

@@ -0,0 +1,250 @@
package provider
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"
rcache "github.com/projectcalico/kube-controllers/pkg/cache"
netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
uruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
informerv1 "k8s.io/client-go/informers/networking/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)
const (
defaultSyncTime = 5 * time.Minute
)
func (c *k8sPolicyController) GetKey(name, nsname string) string {
return fmt.Sprintf("%s/%s", nsname, name)
}
func getkey(key string) (string, string) {
strs := strings.Split(key, "/")
return strs[0], strs[1]
}
// policyController implements the Controller interface for managing Kubernetes network policies
// and syncing them to the k8s datastore as NetworkPolicies.
type k8sPolicyController struct {
client kubernetes.Interface
informer informerv1.NetworkPolicyInformer
ctx context.Context
resourceCache rcache.ResourceCache
hasSynced cache.InformerSynced
}
func (c *k8sPolicyController) Start(stopCh <-chan struct{}) {
c.run(5, "5m", stopCh)
}
func (c *k8sPolicyController) Set(np *netv1.NetworkPolicy) error {
klog.V(4).Infof("Set NetworkPolicy %s/%s %+v", np.Namespace, np.Name, np)
// Add to cache.
k := c.GetKey(np.Name, np.Namespace)
c.resourceCache.Set(k, *np)
return nil
}
func (c *k8sPolicyController) Delete(key string) {
klog.V(4).Infof("Delete NetworkPolicy %s", key)
c.resourceCache.Delete(key)
}
// Run starts the controller.
func (c *k8sPolicyController) run(threadiness int, reconcilerPeriod string, stopCh <-chan struct{}) {
defer uruntime.HandleCrash()
// Let the workers stop when we are done
workqueue := c.resourceCache.GetQueue()
defer workqueue.ShutDown()
// Wait until we are in sync with the Kubernetes API before starting the
// resource cache.
klog.Info("Waiting to sync with Kubernetes API (NetworkPolicy)")
if ok := cache.WaitForCacheSync(stopCh, c.hasSynced); !ok {
}
klog.Infof("Finished syncing with Kubernetes API (NetworkPolicy)")
// Start the resource cache - this will trigger the queueing of any keys
// that are out of sync onto the resource cache event queue.
c.resourceCache.Run(reconcilerPeriod)
// Start a number of worker threads to read from the queue. Each worker
// will pull keys off the resource cache event queue and sync them to the
// k8s datastore.
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("NetworkPolicy controller is now running")
<-stopCh
klog.Info("Stopping NetworkPolicy controller")
}
func (c *k8sPolicyController) runWorker() {
for c.processNextItem() {
}
}
// processNextItem waits for an event on the output queue from the resource cache and syncs
// any received keys to the datastore.
func (c *k8sPolicyController) processNextItem() bool {
// Wait until there is a new item in the work queue.
workqueue := c.resourceCache.GetQueue()
key, quit := workqueue.Get()
if quit {
return false
}
// Sync the object to the k8s datastore.
if err := c.syncToDatastore(key.(string)); err != nil {
c.handleErr(err, key.(string))
}
// Indicate that we're done processing this key, allowing for safe parallel processing such that
// two objects with the same key are never processed in parallel.
workqueue.Done(key)
return true
}
// syncToDatastore syncs the given update to the k8s datastore. The provided key can be used to
// find the corresponding resource within the resource cache. If the resource for the provided key
// exists in the cache, then the value should be written to the datastore. If it does not exist
// in the cache, then it should be deleted from the datastore.
func (c *k8sPolicyController) syncToDatastore(key string) error {
// Check if it exists in the controller's cache.
obj, exists := c.resourceCache.Get(key)
if !exists {
// The object no longer exists - delete from the datastore.
klog.Infof("Deleting NetworkPolicy %s from k8s datastore", key)
ns, name := getkey(key)
err := c.client.NetworkingV1().NetworkPolicies(ns).Delete(name, nil)
if errors.IsNotFound(err) {
return nil
}
return err
} else {
// The object exists - update the datastore to reflect.
klog.Infof("Create/Update NetworkPolicy %s in k8s datastore", key)
p := obj.(netv1.NetworkPolicy)
// Lookup to see if this object already exists in the datastore.
gp, err := c.informer.Lister().NetworkPolicies(p.Namespace).Get(p.Name)
if err != nil {
if !errors.IsNotFound(err) {
klog.Warningf("Failed to get NetworkPolicy %s from datastore", key)
return err
}
// Doesn't exist - create it.
_, err := c.client.NetworkingV1().NetworkPolicies(p.Namespace).Create(&p)
if err != nil {
klog.Warningf("Failed to create NetworkPolicy %s", key)
return err
}
klog.Infof("Successfully created NetworkPolicy %s", key)
return nil
}
klog.V(4).Infof("New NetworkPolicy %s/%s %+v\n", p.Namespace, p.Name, p.Spec)
klog.V(4).Infof("Old NetworkPolicy %s/%s %+v\n", gp.Namespace, gp.Name, gp.Spec)
// The policy already exists, update it and write it back to the datastore.
gp.Spec = p.Spec
_, err = c.client.NetworkingV1().NetworkPolicies(p.Namespace).Update(gp)
if err != nil {
klog.Warningf("Failed to update NetworkPolicy %s", key)
return err
}
klog.Infof("Successfully updated NetworkPolicy %s", key)
return nil
}
}
// handleErr handles errors which occur while processing a key received from the resource cache.
// For a given error, we will re-queue the key in order to retry the datastore sync up to 5 times,
// at which point the update is dropped.
func (c *k8sPolicyController) handleErr(err error, key string) {
workqueue := c.resourceCache.GetQueue()
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
workqueue.Forget(key)
return
}
// This controller retries 5 times if something goes wrong. After that, it stops trying.
if workqueue.NumRequeues(key) < 5 {
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
klog.Errorf("Error syncing NetworkPolicy %v: %v", key, err)
workqueue.AddRateLimited(key)
return
}
workqueue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
uruntime.HandleError(err)
klog.Errorf("Dropping NetworkPolicy %q out of the queue: %v", key, err)
}
//NewNsNetworkPolicyProvider sync k8s NetworkPolicy
func NewNsNetworkPolicyProvider(client kubernetes.Interface, npInformer informerv1.NetworkPolicyInformer) (NsNetworkPolicyProvider, error) {
var once sync.Once
c := &k8sPolicyController{
client: client,
informer: npInformer,
ctx: context.Background(),
hasSynced: npInformer.Informer().HasSynced,
}
// Function returns map of policyName:policy stored by policy controller
// in datastore.
listFunc := func() (map[string]interface{}, error) {
//Wait cache be set by NSNP Controller, otherwise NetworkPolicy will be delete
//by mistake
once.Do(func() {
time.Sleep(defaultSyncTime)
})
// Get all policies from datastore
//TODO filter np not belong to kubesphere
policies, err := npInformer.Lister().List(labels.Everything())
if err != nil {
return nil, err
}
// Filter in only objects that are written by policy controller.
m := make(map[string]interface{})
for _, policy := range policies {
policy.ObjectMeta = metav1.ObjectMeta{Name: policy.Name, Namespace: policy.Namespace}
k := c.GetKey(policy.Name, policy.Namespace)
m[k] = *policy
}
klog.Infof("Found %d policies in k8s datastore:", len(m))
return m, nil
}
cacheArgs := rcache.ResourceCacheArgs{
ListFunc: listFunc,
ObjectType: reflect.TypeOf(netv1.NetworkPolicy{}),
}
c.resourceCache = rcache.NewResourceCache(cacheArgs)
return c, nil
}