add ns networkpolicy
This commit is contained in:
6
pkg/controller/network/controllerapi/interface.go
Normal file
6
pkg/controller/network/controllerapi/interface.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package controllerapi
|
||||
|
||||
// Controller expose Run method
|
||||
type Controller interface {
|
||||
Run(threadiness int, stopCh <-chan struct{}) error
|
||||
}
|
||||
5
pkg/controller/network/doc.go
Normal file
5
pkg/controller/network/doc.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package network
|
||||
|
||||
// +kubebuilder:rbac:groups=network.kubesphere.io,resources=workspacenetworkpolicies;namespacenetworkpolicies,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups:core,resource=namespaces,verbs=get;list;watch;create;update;patch
|
||||
// +kubebuilder:rbac:groups=tenant.kubesphere.io,resources=workspaces,verbs=get;list;watch;create;update;patch;delete
|
||||
177
pkg/controller/network/nsnetworkpolicy/controller.go
Normal file
177
pkg/controller/network/nsnetworkpolicy/controller.go
Normal file
@@ -0,0 +1,177 @@
|
||||
package nsnetworkpolicy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/klog/klogr"
|
||||
kubesphereclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
|
||||
kubespherescheme "kubesphere.io/kubesphere/pkg/client/clientset/versioned/scheme"
|
||||
networkinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/network/v1alpha1"
|
||||
networklister "kubesphere.io/kubesphere/pkg/client/listers/network/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/controller/network/controllerapi"
|
||||
"kubesphere.io/kubesphere/pkg/controller/network/provider"
|
||||
)
|
||||
|
||||
const controllerAgentName = "nsnp-controller"
|
||||
|
||||
type controller struct {
|
||||
kubeClientset kubernetes.Interface
|
||||
kubesphereClientset kubesphereclient.Interface
|
||||
|
||||
nsnpInformer networkinformer.NamespaceNetworkPolicyInformer
|
||||
nsnpLister networklister.NamespaceNetworkPolicyLister
|
||||
nsnpSynced cache.InformerSynced
|
||||
// workqueue is a rate limited work queue. This is used to queue work to be
|
||||
// processed instead of performing it as soon as a change happens. This
|
||||
// means we can ensure we only process a fixed amount of resources at a
|
||||
// time, and makes it easy to ensure we are never processing the same item
|
||||
// simultaneously in two different workers.
|
||||
workqueue workqueue.RateLimitingInterface
|
||||
// recorder is an event recorder for recording Event resources to the
|
||||
// Kubernetes API.
|
||||
recorder record.EventRecorder
|
||||
nsNetworkPolicyProvider provider.NsNetworkPolicyProvider
|
||||
}
|
||||
|
||||
var (
|
||||
log = klogr.New().WithName("Controller").WithValues("Component", controllerAgentName)
|
||||
errCount = 0
|
||||
)
|
||||
|
||||
func NewController(kubeclientset kubernetes.Interface,
|
||||
kubesphereclientset kubesphereclient.Interface,
|
||||
nsnpInformer networkinformer.NamespaceNetworkPolicyInformer,
|
||||
nsNetworkPolicyProvider provider.NsNetworkPolicyProvider) controllerapi.Controller {
|
||||
utilruntime.Must(kubespherescheme.AddToScheme(scheme.Scheme))
|
||||
log.V(4).Info("Creating event broadcaster")
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(klog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
|
||||
ctl := &controller{
|
||||
kubeClientset: kubeclientset,
|
||||
kubesphereClientset: kubesphereclientset,
|
||||
nsnpInformer: nsnpInformer,
|
||||
nsnpLister: nsnpInformer.Lister(),
|
||||
nsnpSynced: nsnpInformer.Informer().HasSynced,
|
||||
nsNetworkPolicyProvider: nsNetworkPolicyProvider,
|
||||
|
||||
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "NamespaceNetworkPolicies"),
|
||||
recorder: recorder,
|
||||
}
|
||||
log.Info("Setting up event handlers")
|
||||
nsnpInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: ctl.enqueueNSNP,
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
ctl.enqueueNSNP(new)
|
||||
},
|
||||
DeleteFunc: ctl.enqueueNSNP,
|
||||
})
|
||||
return ctl
|
||||
}
|
||||
|
||||
func (c *controller) Run(threadiness int, stopCh <-chan struct{}) error {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.workqueue.ShutDown()
|
||||
|
||||
//init client
|
||||
|
||||
// Start the informer factories to begin populating the informer caches
|
||||
log.V(1).Info("Starting WSNP controller")
|
||||
|
||||
// Wait for the caches to be synced before starting workers
|
||||
log.V(2).Info("Waiting for informer caches to sync")
|
||||
if ok := cache.WaitForCacheSync(stopCh, c.nsnpSynced); !ok {
|
||||
return fmt.Errorf("failed to wait for caches to sync")
|
||||
}
|
||||
|
||||
log.Info("Starting workers")
|
||||
// Launch two workers to process Foo resources
|
||||
for i := 0; i < threadiness; i++ {
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
klog.V(2).Info("Started workers")
|
||||
<-stopCh
|
||||
log.V(2).Info("Shutting down workers")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) enqueueNSNP(obj interface{}) {
|
||||
var key string
|
||||
var err error
|
||||
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
c.workqueue.Add(key)
|
||||
}
|
||||
|
||||
func (c *controller) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controller) processNextWorkItem() bool {
|
||||
obj, shutdown := c.workqueue.Get()
|
||||
|
||||
if shutdown {
|
||||
return false
|
||||
}
|
||||
|
||||
// We wrap this block in a func so we can defer c.workqueue.Done.
|
||||
err := func(obj interface{}) error {
|
||||
// We call Done here so the workqueue knows we have finished
|
||||
// processing this item. We also must remember to call Forget if we
|
||||
// do not want this work item being re-queued. For example, we do
|
||||
// not call Forget if a transient error occurs, instead the item is
|
||||
// put back on the workqueue and attempted again after a back-off
|
||||
// period.
|
||||
defer c.workqueue.Done(obj)
|
||||
var key string
|
||||
var ok bool
|
||||
// We expect strings to come off the workqueue. These are of the
|
||||
// form namespace/name. We do this as the delayed nature of the
|
||||
// workqueue means the items in the informer cache may actually be
|
||||
// more up to date that when the item was initially put onto the
|
||||
// workqueue.
|
||||
if key, ok = obj.(string); !ok {
|
||||
// As the item in the workqueue is actually invalid, we call
|
||||
// Forget here else we'd go into a loop of attempting to
|
||||
// process a work item that is invalid.
|
||||
c.workqueue.Forget(obj)
|
||||
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
|
||||
return nil
|
||||
}
|
||||
// Run the reconcile, passing it the namespace/name string of the
|
||||
// Foo resource to be synced.
|
||||
if err := c.reconcile(key); err != nil {
|
||||
// Put the item back on the workqueue to handle any transient errors.
|
||||
c.workqueue.AddRateLimited(key)
|
||||
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
|
||||
}
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get queued again until another change happens.
|
||||
c.workqueue.Forget(obj)
|
||||
log.Info("Successfully synced", "key", key)
|
||||
return nil
|
||||
}(obj)
|
||||
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return true
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package nsnetworkpolicy
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"testing"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
func TestNsnetworkpolicy(t *testing.T) {
|
||||
klog.InitFlags(nil)
|
||||
flag.Set("logtostderr", "false")
|
||||
flag.Set("alsologtostderr", "false")
|
||||
flag.Set("v", "4")
|
||||
flag.Parse()
|
||||
klog.SetOutput(GinkgoWriter)
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "Nsnetworkpolicy Suite")
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
package nsnetworkpolicy
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
|
||||
nsnplister "kubesphere.io/kubesphere/pkg/client/listers/network/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/controller/network/controllerapi"
|
||||
"kubesphere.io/kubesphere/pkg/controller/network/provider"
|
||||
controllertesting "kubesphere.io/kubesphere/pkg/controller/network/testing"
|
||||
)
|
||||
|
||||
var (
|
||||
fakeControllerBuilder *controllertesting.FakeControllerBuilder
|
||||
c controllerapi.Controller
|
||||
stopCh chan struct{}
|
||||
calicoProvider *provider.FakeCalicoNetworkProvider
|
||||
nsnpLister nsnplister.NamespaceNetworkPolicyLister
|
||||
)
|
||||
|
||||
var _ = Describe("Nsnetworkpolicy", func() {
|
||||
BeforeEach(func() {
|
||||
fakeControllerBuilder = controllertesting.NewFakeControllerBuilder()
|
||||
stopCh = make(chan struct{})
|
||||
informer, _ := fakeControllerBuilder.NewControllerInformer()
|
||||
calicoProvider = provider.NewFakeCalicoNetworkProvider()
|
||||
c = NewController(fakeControllerBuilder.KubeClient, fakeControllerBuilder.KsClient, informer.Network().V1alpha1().NamespaceNetworkPolicies(), calicoProvider)
|
||||
go informer.Network().V1alpha1().NamespaceNetworkPolicies().Informer().Run(stopCh)
|
||||
originalController := c.(*controller)
|
||||
originalController.recorder = &record.FakeRecorder{}
|
||||
go c.Run(1, stopCh)
|
||||
nsnpLister = informer.Network().V1alpha1().NamespaceNetworkPolicies().Lister()
|
||||
})
|
||||
|
||||
It("Should create a new calico object", func() {
|
||||
objSrt := `{
|
||||
"apiVersion": "network.kubesphere.io/v1alpha1",
|
||||
"kind": "NetworkPolicy",
|
||||
"metadata": {
|
||||
"name": "allow-tcp-6379",
|
||||
"namespace": "production"
|
||||
},
|
||||
"spec": {
|
||||
"selector": "color == 'red'",
|
||||
"ingress": [
|
||||
{
|
||||
"action": "Allow",
|
||||
"protocol": "TCP",
|
||||
"source": {
|
||||
"selector": "color == 'blue'"
|
||||
},
|
||||
"destination": {
|
||||
"ports": [
|
||||
6379
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}`
|
||||
obj := &v1alpha1.NamespaceNetworkPolicy{}
|
||||
Expect(controllertesting.StringToObject(objSrt, obj)).ShouldNot(HaveOccurred())
|
||||
_, err := fakeControllerBuilder.KsClient.NetworkV1alpha1().NamespaceNetworkPolicies(obj.Namespace).Create(obj)
|
||||
Expect(err).ShouldNot(HaveOccurred())
|
||||
Eventually(func() bool {
|
||||
exist, _ := calicoProvider.CheckExist(obj)
|
||||
return exist
|
||||
}).Should(BeTrue())
|
||||
obj, _ = fakeControllerBuilder.KsClient.NetworkV1alpha1().NamespaceNetworkPolicies(obj.Namespace).Get(obj.Name, metav1.GetOptions{})
|
||||
Expect(obj.Finalizers).To(HaveLen(1))
|
||||
// TestUpdate
|
||||
newStr := "color == 'green'"
|
||||
obj.Spec.Selector = newStr
|
||||
_, err = fakeControllerBuilder.KsClient.NetworkV1alpha1().NamespaceNetworkPolicies(obj.Namespace).Update(obj)
|
||||
Expect(err).ShouldNot(HaveOccurred())
|
||||
Eventually(func() string {
|
||||
o, err := calicoProvider.Get(obj)
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
n := o.(*v1alpha1.NamespaceNetworkPolicy)
|
||||
return n.Spec.Selector
|
||||
}).Should(Equal(newStr))
|
||||
// TestDelete
|
||||
Expect(fakeControllerBuilder.KsClient.NetworkV1alpha1().NamespaceNetworkPolicies(obj.Namespace).Delete(obj.Name, &metav1.DeleteOptions{})).ShouldNot(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
close(stopCh)
|
||||
})
|
||||
})
|
||||
119
pkg/controller/network/nsnetworkpolicy/reconcile.go
Normal file
119
pkg/controller/network/nsnetworkpolicy/reconcile.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package nsnetworkpolicy
|
||||
|
||||
import (
|
||||
"github.com/go-logr/logr"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/controller/network/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
controllerFinalizier = "nsnp.finalizers.networking.kubesphere.io"
|
||||
)
|
||||
|
||||
var clog logr.Logger
|
||||
|
||||
func (c *controller) reconcile(key string) error {
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
clog = log.WithValues("name", name, "namespace", namespace)
|
||||
clog.V(1).Info("---------Begin to reconcile--------")
|
||||
defer clog.V(1).Info("---------Reconcile done--------")
|
||||
obj, err := c.nsnpLister.NamespaceNetworkPolicies(namespace).Get(name)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
clog.V(2).Info("Object is removed")
|
||||
return nil
|
||||
}
|
||||
clog.Error(err, "Failed to get resource")
|
||||
return err
|
||||
}
|
||||
stop, err := c.addOrRemoveFinalizer(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stop {
|
||||
return nil
|
||||
}
|
||||
clog.V(2).Info("Check if we need a create or update")
|
||||
ok, err := c.nsNetworkPolicyProvider.CheckExist(obj)
|
||||
if err != nil {
|
||||
clog.Error(err, "Failed to check exist of network policy")
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
clog.V(1).Info("Create a new object in backend")
|
||||
err = c.nsNetworkPolicyProvider.Add(obj)
|
||||
if err != nil {
|
||||
clog.Error(err, "Failed to create np")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
needUpdate, err := c.nsNetworkPolicyProvider.NeedUpdate(obj)
|
||||
if err != nil {
|
||||
clog.Error(err, "Failed to check if object need a update")
|
||||
return err
|
||||
}
|
||||
if needUpdate {
|
||||
clog.V(1).Info("Update object in backend")
|
||||
err = c.nsNetworkPolicyProvider.Update(obj)
|
||||
if err != nil {
|
||||
clog.Error(err, "Failed to update object")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) addOrRemoveFinalizer(obj *v1alpha1.NamespaceNetworkPolicy) (bool, error) {
|
||||
if obj.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
if !utils.ContainsString(obj.ObjectMeta.Finalizers, controllerFinalizier) {
|
||||
clog.V(2).Info("Detect no finalizer")
|
||||
obj.ObjectMeta.Finalizers = append(obj.ObjectMeta.Finalizers, controllerFinalizier)
|
||||
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
_, err := c.kubesphereClientset.NetworkV1alpha1().NamespaceNetworkPolicies(obj.Namespace).Update(obj)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
clog.Error(err, "Failed to add finalizer")
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
} else {
|
||||
// The object is being deleted
|
||||
if utils.ContainsString(obj.ObjectMeta.Finalizers, controllerFinalizier) {
|
||||
// our finalizer is present, so lets handle any external dependency
|
||||
if err := c.deleteProviderNSNP(obj); err != nil {
|
||||
// if fail to delete the external dependency here, return with error
|
||||
// so that it can be retried
|
||||
return false, err
|
||||
}
|
||||
clog.V(2).Info("Removing finalizer")
|
||||
// remove our finalizer from the list and update it.
|
||||
obj.ObjectMeta.Finalizers = utils.RemoveString(obj.ObjectMeta.Finalizers, controllerFinalizier)
|
||||
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
_, err := c.kubesphereClientset.NetworkV1alpha1().NamespaceNetworkPolicies(obj.Namespace).Update(obj)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
clog.Error(err, "Failed to remove finalizer")
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// deleteProviderNSNP delete network policy in the backend
|
||||
func (c *controller) deleteProviderNSNP(obj *v1alpha1.NamespaceNetworkPolicy) error {
|
||||
clog.V(2).Info("Deleting backend network policy")
|
||||
return c.nsNetworkPolicyProvider.Delete(obj)
|
||||
}
|
||||
66
pkg/controller/network/provider/fake_ns_calico.go
Normal file
66
pkg/controller/network/provider/fake_ns_calico.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
"github.com/projectcalico/libcalico-go/lib/errors"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
api "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
|
||||
)
|
||||
|
||||
func NewFakeCalicoNetworkProvider() *FakeCalicoNetworkProvider {
|
||||
f := new(FakeCalicoNetworkProvider)
|
||||
f.NSNPData = make(map[string]*api.NamespaceNetworkPolicy)
|
||||
return f
|
||||
}
|
||||
|
||||
type FakeCalicoNetworkProvider struct {
|
||||
NSNPData map[string]*api.NamespaceNetworkPolicy
|
||||
}
|
||||
|
||||
func (f *FakeCalicoNetworkProvider) Get(o *api.NamespaceNetworkPolicy) (interface{}, error) {
|
||||
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
|
||||
obj, ok := f.NSNPData[namespacename]
|
||||
if !ok {
|
||||
return nil, errors.ErrorResourceDoesNotExist{}
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (f *FakeCalicoNetworkProvider) Add(o *api.NamespaceNetworkPolicy) error {
|
||||
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
|
||||
if _, ok := f.NSNPData[namespacename]; ok {
|
||||
return errors.ErrorResourceAlreadyExists{}
|
||||
}
|
||||
f.NSNPData[namespacename] = o
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeCalicoNetworkProvider) CheckExist(o *api.NamespaceNetworkPolicy) (bool, error) {
|
||||
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
|
||||
if _, ok := f.NSNPData[namespacename]; ok {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (f *FakeCalicoNetworkProvider) NeedUpdate(o *api.NamespaceNetworkPolicy) (bool, error) {
|
||||
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
|
||||
store := f.NSNPData[namespacename]
|
||||
if !reflect.DeepEqual(store, o) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (f *FakeCalicoNetworkProvider) Update(o *api.NamespaceNetworkPolicy) error {
|
||||
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
|
||||
f.NSNPData[namespacename] = o
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeCalicoNetworkProvider) Delete(o *api.NamespaceNetworkPolicy) error {
|
||||
namespacename, _ := cache.MetaNamespaceKeyFunc(o)
|
||||
delete(f.NSNPData, namespacename)
|
||||
return nil
|
||||
}
|
||||
1
pkg/controller/network/provider/global_np.go
Normal file
1
pkg/controller/network/provider/global_np.go
Normal file
@@ -0,0 +1 @@
|
||||
package provider
|
||||
35
pkg/controller/network/provider/namespace_np.go
Normal file
35
pkg/controller/network/provider/namespace_np.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
k8snetworkinformer "k8s.io/client-go/informers/networking/v1"
|
||||
k8snetworklister "k8s.io/client-go/listers/networking/v1"
|
||||
api "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
|
||||
)
|
||||
|
||||
// NsNetworkPolicyProvider is a interface to let different cnis to implement our api
|
||||
type NsNetworkPolicyProvider interface {
|
||||
Add(*api.NamespaceNetworkPolicy) error
|
||||
CheckExist(*api.NamespaceNetworkPolicy) (bool, error)
|
||||
NeedUpdate(*api.NamespaceNetworkPolicy) (bool, error)
|
||||
Update(*api.NamespaceNetworkPolicy) error
|
||||
Delete(*api.NamespaceNetworkPolicy) error
|
||||
Get(*api.NamespaceNetworkPolicy) (interface{}, error)
|
||||
}
|
||||
|
||||
// TODO: support no-calico CNI
|
||||
type k8sNetworkProvider struct {
|
||||
networkPolicyInformer k8snetworkinformer.NetworkPolicyInformer
|
||||
networkPolicyLister k8snetworklister.NetworkPolicyLister
|
||||
}
|
||||
|
||||
func (k *k8sNetworkProvider) Add(o *api.NamespaceNetworkPolicy) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *k8sNetworkProvider) CheckExist(o *api.NamespaceNetworkPolicy) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (k *k8sNetworkProvider) Delete(o *api.NamespaceNetworkPolicy) error {
|
||||
return nil
|
||||
}
|
||||
144
pkg/controller/network/provider/ns_calico.go
Normal file
144
pkg/controller/network/provider/ns_calico.go
Normal file
@@ -0,0 +1,144 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
v3 "github.com/projectcalico/libcalico-go/lib/apis/v3"
|
||||
"github.com/projectcalico/libcalico-go/lib/clientv3"
|
||||
"github.com/projectcalico/libcalico-go/lib/errors"
|
||||
"github.com/projectcalico/libcalico-go/lib/options"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog/klogr"
|
||||
api "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
|
||||
)
|
||||
|
||||
var log = klogr.New().WithName("calico-client")
|
||||
var defaultBackoff = wait.Backoff{
|
||||
Steps: 4,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 5.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
type calicoNetworkProvider struct {
|
||||
np clientv3.NetworkPolicyInterface
|
||||
}
|
||||
|
||||
func NewCalicoNetworkProvider(np clientv3.NetworkPolicyInterface) NsNetworkPolicyProvider {
|
||||
return &calicoNetworkProvider{
|
||||
np: np,
|
||||
}
|
||||
}
|
||||
func convertSpec(n *api.NamespaceNetworkPolicySpec) *v3.NetworkPolicySpec {
|
||||
bytes, err := json.Marshal(&n)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m := new(v3.NetworkPolicySpec)
|
||||
err = json.Unmarshal(bytes, m)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// ConvertAPIToCalico convert our api to calico api
|
||||
func ConvertAPIToCalico(n *api.NamespaceNetworkPolicy) *v3.NetworkPolicy {
|
||||
output := v3.NewNetworkPolicy()
|
||||
//Object Metadata
|
||||
output.ObjectMeta.Name = n.Name
|
||||
output.Namespace = n.Namespace
|
||||
output.Annotations = n.Annotations
|
||||
output.Labels = n.Labels
|
||||
//spec
|
||||
output.Spec = *(convertSpec(&n.Spec))
|
||||
return output
|
||||
}
|
||||
|
||||
func (k *calicoNetworkProvider) Get(o *api.NamespaceNetworkPolicy) (interface{}, error) {
|
||||
return k.np.Get(context.TODO(), o.Namespace, o.Name, options.GetOptions{})
|
||||
}
|
||||
|
||||
func (k *calicoNetworkProvider) Add(o *api.NamespaceNetworkPolicy) error {
|
||||
log.V(3).Info("Creating network policy", "name", o.Name, "namespace", o.Namespace)
|
||||
obj := ConvertAPIToCalico(o)
|
||||
log.V(4).Info("Show object spe detail", "name", o.Name, "namespace", o.Namespace, "Spec", obj.Spec)
|
||||
_, err := k.np.Create(context.TODO(), obj, options.SetOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (k *calicoNetworkProvider) CheckExist(o *api.NamespaceNetworkPolicy) (bool, error) {
|
||||
log.V(3).Info("Checking network policy whether exsits or not", "name", o.Name, "namespace", o.Namespace)
|
||||
out, err := k.np.Get(context.Background(), o.Namespace, o.Name, options.GetOptions{})
|
||||
if err != nil {
|
||||
if _, ok := err.(errors.ErrorResourceDoesNotExist); ok {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
if out != nil {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (k *calicoNetworkProvider) Delete(o *api.NamespaceNetworkPolicy) error {
|
||||
log.V(3).Info("Deleting network policy", "name", o.Name, "namespace", o.Namespace)
|
||||
_, err := k.np.Delete(context.Background(), o.Namespace, o.Name, options.DeleteOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (k *calicoNetworkProvider) NeedUpdate(o *api.NamespaceNetworkPolicy) (bool, error) {
|
||||
store, err := k.np.Get(context.Background(), o.Namespace, o.Name, options.GetOptions{})
|
||||
if err != nil {
|
||||
log.Error(err, "Failed to get resource", "name", o.Name, "namespace", o.Namespace)
|
||||
}
|
||||
expected := ConvertAPIToCalico(o)
|
||||
log.V(4).Info("Comparing Spec", "store", store.Spec, "current", expected.Spec)
|
||||
if !reflect.DeepEqual(store.Spec, expected.Spec) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (k *calicoNetworkProvider) Update(o *api.NamespaceNetworkPolicy) error {
|
||||
log.V(3).Info("Updating network policy", "name", o.Name, "namespace", o.Namespace)
|
||||
updateObject, err := k.Get(o)
|
||||
if err != nil {
|
||||
log.Error(err, "Failed to get resource in store")
|
||||
return err
|
||||
}
|
||||
up := updateObject.(*v3.NetworkPolicy)
|
||||
up.Spec = *convertSpec(&o.Spec)
|
||||
err = RetryOnConflict(defaultBackoff, func() error {
|
||||
_, err := k.np.Update(context.Background(), up, options.SetOptions{})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err, "Failed to update resource", "name", o.Name, "namespace", o.Namespace)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// RetryOnConflict is same as the function in k8s, but replaced with error in calico
|
||||
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
|
||||
var lastConflictErr error
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
err := fn()
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
if _, ok := err.(errors.ErrorResourceUpdateConflict); ok {
|
||||
lastConflictErr = err
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = lastConflictErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
22
pkg/controller/network/utils/strings.go
Normal file
22
pkg/controller/network/utils/strings.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package utils
|
||||
|
||||
// ContainsString report if s is in a slice
|
||||
func ContainsString(slice []string, s string) bool {
|
||||
for _, item := range slice {
|
||||
if item == s {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// RemoveString remove s from slice if exists
|
||||
func RemoveString(slice []string, s string) (result []string) {
|
||||
for _, item := range slice {
|
||||
if item == s {
|
||||
continue
|
||||
}
|
||||
result = append(result, item)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
workspaceinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/tenant/v1alpha1"
|
||||
networklister "kubesphere.io/kubesphere/pkg/client/listers/network/v1alpha1"
|
||||
workspacelister "kubesphere.io/kubesphere/pkg/client/listers/tenant/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/controller/network/controllerapi"
|
||||
)
|
||||
|
||||
const controllerAgentName = "wsnp-controller"
|
||||
@@ -38,10 +39,6 @@ var (
|
||||
errCount = 0
|
||||
)
|
||||
|
||||
// Controller expose Run method
|
||||
type Controller interface {
|
||||
Run(threadiness int, stopCh <-chan struct{}) error
|
||||
}
|
||||
type controller struct {
|
||||
kubeClientset kubernetes.Interface
|
||||
kubesphereClientset kubesphereclient.Interface
|
||||
@@ -77,7 +74,7 @@ func NewController(kubeclientset kubernetes.Interface,
|
||||
wsnpInformer networkinformer.WorkspaceNetworkPolicyInformer,
|
||||
networkPolicyInformer k8snetworkinformer.NetworkPolicyInformer,
|
||||
namespaceInformer corev1informer.NamespaceInformer,
|
||||
workspaceInformer workspaceinformer.WorkspaceInformer) Controller {
|
||||
workspaceInformer workspaceinformer.WorkspaceInformer) controllerapi.Controller {
|
||||
utilruntime.Must(kubespherescheme.AddToScheme(scheme.Scheme))
|
||||
log.V(4).Info("Creating event broadcaster")
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
|
||||
@@ -16,12 +16,13 @@ import (
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
|
||||
tenant "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/controller/network/controllerapi"
|
||||
controllertesting "kubesphere.io/kubesphere/pkg/controller/network/testing"
|
||||
)
|
||||
|
||||
var (
|
||||
fakeControllerBuilder *controllertesting.FakeControllerBuilder
|
||||
c Controller
|
||||
c controllerapi.Controller
|
||||
npLister netv1lister.NetworkPolicyLister
|
||||
stopCh chan struct{}
|
||||
deletePolicy metav1.DeletionPropagation
|
||||
|
||||
Reference in New Issue
Block a user