Merge branch 'master' into metering
Signed-off-by: Rao Yunkun <yunkunrao@yunify.com>
This commit is contained in:
@@ -34,12 +34,13 @@ func (j *Jenkins) SendPureRequest(path string, httpParameters *devops.HttpParame
|
||||
// provider request header to call jenkins api.
|
||||
// transfer bearer token to basic token for inner Oauth and Jeknins
|
||||
func (j *Jenkins) SendPureRequestWithHeaderResp(path string, httpParameters *devops.HttpParameters) ([]byte, http.Header, error) {
|
||||
Url, err := url.Parse(j.Server + path)
|
||||
apiURL, err := url.Parse(j.Server + path)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
klog.V(8).Info(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
apiURL.RawQuery = httpParameters.Url.RawQuery
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
|
||||
header := httpParameters.Header
|
||||
@@ -47,7 +48,7 @@ func (j *Jenkins) SendPureRequestWithHeaderResp(path string, httpParameters *dev
|
||||
|
||||
newRequest := &http.Request{
|
||||
Method: httpParameters.Method,
|
||||
URL: Url,
|
||||
URL: apiURL,
|
||||
Header: header,
|
||||
Body: httpParameters.Body,
|
||||
Form: httpParameters.Form,
|
||||
|
||||
@@ -74,10 +74,9 @@ func getRespBody(resp *http.Response) ([]byte, error) {
|
||||
|
||||
// ParseJenkinsQuery Parse the special query of jenkins.
|
||||
// ParseQuery in the standard library makes the query not re-encode
|
||||
func ParseJenkinsQuery(query string) (url.Values, error) {
|
||||
m := make(url.Values)
|
||||
err := error(nil)
|
||||
for query != "" {
|
||||
func ParseJenkinsQuery(query string) (result url.Values, err error) {
|
||||
result = make(url.Values)
|
||||
for query != "" && err == nil {
|
||||
key := query
|
||||
if i := strings.IndexAny(key, "&"); i >= 0 {
|
||||
key, query = key[:i], key[i+1:]
|
||||
@@ -91,23 +90,13 @@ func ParseJenkinsQuery(query string) (url.Values, error) {
|
||||
if i := strings.Index(key, "="); i >= 0 {
|
||||
key, value = key[:i], key[i+1:]
|
||||
}
|
||||
key, err1 := url.QueryUnescape(key)
|
||||
if err1 != nil {
|
||||
if err == nil {
|
||||
err = err1
|
||||
if key, err = url.QueryUnescape(key); err == nil {
|
||||
if value, err = url.QueryUnescape(value); err == nil {
|
||||
result[key] = append(result[key], value)
|
||||
}
|
||||
continue
|
||||
}
|
||||
value, err1 = url.QueryUnescape(value)
|
||||
if err1 != nil {
|
||||
if err == nil {
|
||||
err = err1
|
||||
}
|
||||
continue
|
||||
}
|
||||
m[key] = append(m[key], value)
|
||||
}
|
||||
return m, err
|
||||
return
|
||||
}
|
||||
|
||||
type JenkinsBlueTime time.Time
|
||||
|
||||
57
pkg/simple/client/devops/jenkins/utils_test.go
Normal file
57
pkg/simple/client/devops/jenkins/utils_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package jenkins
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"net/url"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseJenkinsQuery(t *testing.T) {
|
||||
table := []testData{
|
||||
{
|
||||
param: "start=0&limit=10&branch=master",
|
||||
expected: url.Values{
|
||||
"start": []string{"0"},
|
||||
"limit": []string{"10"},
|
||||
"branch": []string{"master"},
|
||||
}, err: false,
|
||||
},
|
||||
{
|
||||
param: "branch=master", expected: url.Values{
|
||||
"branch": []string{"master"},
|
||||
}, err: false,
|
||||
},
|
||||
{
|
||||
param: "&branch=master", expected: url.Values{
|
||||
"branch": []string{"master"},
|
||||
}, err: false,
|
||||
},
|
||||
{
|
||||
param: "branch=master&", expected: url.Values{
|
||||
"branch": []string{"master"},
|
||||
}, err: false,
|
||||
},
|
||||
{
|
||||
param: "branch=%gg", expected: url.Values{}, err: true,
|
||||
},
|
||||
{
|
||||
param: "%gg=fake", expected: url.Values{}, err: true,
|
||||
},
|
||||
}
|
||||
|
||||
for index, item := range table {
|
||||
result, err := ParseJenkinsQuery(item.param)
|
||||
if item.err {
|
||||
assert.NotNil(t, err, "index: [%d], unexpected error happen %v", index, err)
|
||||
} else {
|
||||
assert.Nil(t, err, "index: [%d], unexpected error happen %v", index, err)
|
||||
}
|
||||
assert.Equal(t, item.expected, result, "index: [%d], result do not match with the expect value", index)
|
||||
}
|
||||
}
|
||||
|
||||
type testData struct {
|
||||
param string
|
||||
expected interface{}
|
||||
err bool
|
||||
}
|
||||
@@ -31,8 +31,9 @@ type PipelineList struct {
|
||||
|
||||
// GetPipeline & SearchPipelines
|
||||
type Pipeline struct {
|
||||
Class string `json:"_class,omitempty" description:"It’s a fully qualified name and is an identifier of the producer of this resource's capability." `
|
||||
Links struct {
|
||||
Annotations map[string]string `json:"annotations,omitempty" description:"Add annotations from crd" `
|
||||
Class string `json:"_class,omitempty" description:"It’s a fully qualified name and is an identifier of the producer of this resource's capability." `
|
||||
Links struct {
|
||||
Self struct {
|
||||
Class string `json:"_class,omitempty"`
|
||||
Href string `json:"href,omitempty"`
|
||||
@@ -503,9 +504,9 @@ type PipelineBranchItem struct {
|
||||
Parameters []struct {
|
||||
Class string `json:"_class,omitempty" description:"It’s a fully qualified name and is an identifier of the producer of this resource's capability."`
|
||||
DefaultParameterValue struct {
|
||||
Class string `json:"_class,omitempty" description:"It’s a fully qualified name and is an identifier of the producer of this resource's capability."`
|
||||
Name string `json:"name,omitempty" description:"name"`
|
||||
Value string `json:"value,omitempty" description:"value"`
|
||||
Class string `json:"_class,omitempty" description:"It’s a fully qualified name and is an identifier of the producer of this resource's capability."`
|
||||
Name string `json:"name,omitempty" description:"name"`
|
||||
Value interface{} `json:"value,omitempty" description:"value"`
|
||||
} `json:"defaultParameterValue,omitempty"`
|
||||
Description string `json:"description,omitempty" description:"description"`
|
||||
Name string `json:"name,omitempty" description:"name"`
|
||||
@@ -535,8 +536,8 @@ type PipelineBranchItem struct {
|
||||
// RunPipeline
|
||||
type RunPayload struct {
|
||||
Parameters []struct {
|
||||
Name string `json:"name,omitempty" description:"name"`
|
||||
Value string `json:"value,omitempty" description:"value"`
|
||||
Name string `json:"name,omitempty" description:"name"`
|
||||
Value interface{} `json:"value,omitempty" description:"value"`
|
||||
} `json:"parameters,omitempty"`
|
||||
}
|
||||
|
||||
@@ -1035,8 +1036,8 @@ type ResJson struct {
|
||||
Arguments []struct {
|
||||
Key string `json:"key,omitempty" description:"key"`
|
||||
Value struct {
|
||||
IsLiteral bool `json:"isLiteral,omitempty" description:"is literal or not"`
|
||||
Value string `json:"value,omitempty" description:"value"`
|
||||
IsLiteral bool `json:"isLiteral,omitempty" description:"is literal or not"`
|
||||
Value interface{} `json:"value,omitempty" description:"value"`
|
||||
} `json:"value,omitempty"`
|
||||
} `json:"arguments,omitempty"`
|
||||
} `json:"parameters,omitempty"`
|
||||
|
||||
@@ -22,7 +22,7 @@ import (
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
const DefaultResyncPeriod = time.Duration(120) * time.Second
|
||||
const DefaultResyncPeriod = 120 * time.Second
|
||||
|
||||
type Options struct {
|
||||
// Enable
|
||||
@@ -79,5 +79,5 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, s *Options) {
|
||||
"This field is used when generating deployment yaml for agent.")
|
||||
|
||||
fs.DurationVar(&o.ClusterControllerResyncSecond, "cluster-controller-resync-second", s.ClusterControllerResyncSecond,
|
||||
"Cluster controller resync second to sync cluster resource.")
|
||||
"Cluster controller resync second to sync cluster resource. e.g. 2m 5m 10m ... default set to 2m")
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
k8sinformers "k8s.io/client-go/informers"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
@@ -38,7 +39,6 @@ import (
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/pkg/apis/network/calicov3"
|
||||
@@ -94,7 +94,7 @@ type provider struct {
|
||||
options Options
|
||||
}
|
||||
|
||||
func (c provider) CreateIPPool(pool *v1alpha1.IPPool) error {
|
||||
func (p *provider) CreateIPPool(pool *v1alpha1.IPPool) error {
|
||||
calicoPool := &calicov3.IPPool{
|
||||
TypeMeta: v1.TypeMeta{},
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
@@ -104,9 +104,9 @@ func (c provider) CreateIPPool(pool *v1alpha1.IPPool) error {
|
||||
CIDR: pool.Spec.CIDR,
|
||||
Disabled: pool.Spec.Disabled,
|
||||
NodeSelector: "all()",
|
||||
VXLANMode: v3.VXLANMode(c.options.VXLANMode),
|
||||
IPIPMode: v3.IPIPMode(c.options.IPIPMode),
|
||||
NATOutgoing: c.options.NATOutgoing,
|
||||
VXLANMode: v3.VXLANMode(p.options.VXLANMode),
|
||||
IPIPMode: v3.IPIPMode(p.options.IPIPMode),
|
||||
NATOutgoing: p.options.NATOutgoing,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ func (c provider) CreateIPPool(pool *v1alpha1.IPPool) error {
|
||||
klog.Warningf("cannot set reference for calico ippool %s, err=%v", pool.Name, err)
|
||||
}
|
||||
|
||||
_, err = c.client.CrdCalicov3().IPPools().Create(context.TODO(), calicoPool, v1.CreateOptions{})
|
||||
_, err = p.client.CrdCalicov3().IPPools().Create(context.TODO(), calicoPool, v1.CreateOptions{})
|
||||
if k8serrors.IsAlreadyExists(err) {
|
||||
return nil
|
||||
}
|
||||
@@ -129,19 +129,22 @@ func (c provider) CreateIPPool(pool *v1alpha1.IPPool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c provider) UpdateIPPool(pool *v1alpha1.IPPool) error {
|
||||
func (p *provider) UpdateIPPool(pool *v1alpha1.IPPool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c provider) GetIPPoolStats(pool *v1alpha1.IPPool) (*v1alpha1.IPPool, error) {
|
||||
func (p *provider) GetIPPoolStats(pool *v1alpha1.IPPool) (*v1alpha1.IPPool, error) {
|
||||
stats := pool.DeepCopy()
|
||||
|
||||
calicoPool, err := c.client.CrdCalicov3().IPPools().Get(context.TODO(), pool.Name, v1.GetOptions{})
|
||||
calicoPool, err := p.client.CrdCalicov3().IPPools().Get(context.TODO(), pool.Name, v1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
blocks, err := c.listBlocks(calicoPool)
|
||||
blocks, err := p.block.Lister().List(labels.SelectorFromSet(
|
||||
labels.Set{
|
||||
v1alpha1.IPPoolNameLabel: calicoPool.Name,
|
||||
}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -152,9 +155,7 @@ func (c provider) GetIPPoolStats(pool *v1alpha1.IPPool) (*v1alpha1.IPPool, error
|
||||
stats.Status.Synced = true
|
||||
stats.Status.Allocations = 0
|
||||
stats.Status.Reserved = 0
|
||||
if stats.Status.Workspaces == nil {
|
||||
stats.Status.Workspaces = make(map[string]v1alpha1.WorkspaceStatus)
|
||||
}
|
||||
stats.Status.Workspaces = make(map[string]v1alpha1.WorkspaceStatus)
|
||||
|
||||
if len(blocks) <= 0 {
|
||||
stats.Status.Unallocated = pool.NumAddresses()
|
||||
@@ -168,23 +169,20 @@ func (c provider) GetIPPoolStats(pool *v1alpha1.IPPool) (*v1alpha1.IPPool, error
|
||||
stats.Status.Unallocated = stats.Status.Capacity - stats.Status.Allocations - stats.Status.Reserved
|
||||
}
|
||||
|
||||
wks, err := c.getAssociatedWorkspaces(pool)
|
||||
wks, err := p.getAssociatedWorkspaces(pool)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, wk := range wks {
|
||||
status, err := c.getWorkspaceStatus(wk, pool.GetName())
|
||||
status, err := p.getWorkspaceStatus(wk, pool.GetName())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stats.Status.Workspaces[wk] = *status
|
||||
}
|
||||
|
||||
for name, wk := range stats.Status.Workspaces {
|
||||
if wk.Allocations == 0 {
|
||||
delete(stats.Status.Workspaces, name)
|
||||
if status.Allocations == 0 {
|
||||
continue
|
||||
}
|
||||
stats.Status.Workspaces[wk] = *status
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
@@ -195,16 +193,18 @@ func setBlockAffiDeletion(c calicoset.Interface, blockAffi *calicov3.BlockAffini
|
||||
return nil
|
||||
}
|
||||
|
||||
blockAffi.Spec.State = string(model.StatePendingDeletion)
|
||||
_, err := c.CrdCalicov3().BlockAffinities().Update(context.TODO(), blockAffi, v1.UpdateOptions{})
|
||||
clone := blockAffi.DeepCopy()
|
||||
clone.Spec.State = string(model.StatePendingDeletion)
|
||||
_, err := c.CrdCalicov3().BlockAffinities().Update(context.TODO(), clone, v1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func deleteBlockAffi(c calicoset.Interface, blockAffi *calicov3.BlockAffinity) error {
|
||||
trueStr := fmt.Sprintf("%t", true)
|
||||
if blockAffi.Spec.Deleted != trueStr {
|
||||
blockAffi.Spec.Deleted = trueStr
|
||||
_, err := c.CrdCalicov3().BlockAffinities().Update(context.TODO(), blockAffi, v1.UpdateOptions{})
|
||||
clone := blockAffi.DeepCopy()
|
||||
clone.Spec.Deleted = trueStr
|
||||
_, err := c.CrdCalicov3().BlockAffinities().Update(context.TODO(), clone, v1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -218,10 +218,10 @@ func deleteBlockAffi(c calicoset.Interface, blockAffi *calicov3.BlockAffinity) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c provider) doBlockAffis(pool *calicov3.IPPool, do func(calicoset.Interface, *calicov3.BlockAffinity) error) error {
|
||||
func (p *provider) doBlockAffis(pool *calicov3.IPPool, do func(calicoset.Interface, *calicov3.BlockAffinity) error) error {
|
||||
_, cidrNet, _ := cnet.ParseCIDR(pool.Spec.CIDR)
|
||||
|
||||
blockAffis, err := c.client.CrdCalicov3().BlockAffinities().List(context.TODO(), v1.ListOptions{})
|
||||
blockAffis, err := p.client.CrdCalicov3().BlockAffinities().List(context.TODO(), v1.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -232,7 +232,7 @@ func (c provider) doBlockAffis(pool *calicov3.IPPool, do func(calicoset.Interfac
|
||||
continue
|
||||
}
|
||||
|
||||
err = do(c.client, &blockAffi)
|
||||
err = do(p.client, &blockAffi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -241,34 +241,17 @@ func (c provider) doBlockAffis(pool *calicov3.IPPool, do func(calicoset.Interfac
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c provider) listBlocks(pool *calicov3.IPPool) ([]calicov3.IPAMBlock, error) {
|
||||
_, cidrNet, _ := cnet.ParseCIDR(pool.Spec.CIDR)
|
||||
|
||||
blocks, err := c.client.CrdCalicov3().IPAMBlocks().List(context.TODO(), v1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []calicov3.IPAMBlock
|
||||
for _, block := range blocks.Items {
|
||||
_, blockCIDR, _ := cnet.ParseCIDR(block.Spec.CIDR)
|
||||
if !cidrNet.IsNetOverlap(blockCIDR.IPNet) {
|
||||
continue
|
||||
}
|
||||
result = append(result, block)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c provider) doBlocks(pool *calicov3.IPPool, do func(calicoset.Interface, *calicov3.IPAMBlock) error) error {
|
||||
blocks, err := c.listBlocks(pool)
|
||||
func (p *provider) doBlocks(pool *calicov3.IPPool, do func(calicoset.Interface, *calicov3.IPAMBlock) error) error {
|
||||
blocks, err := p.block.Lister().List(labels.SelectorFromSet(
|
||||
labels.Set{
|
||||
v1alpha1.IPPoolNameLabel: pool.Name,
|
||||
}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, block := range blocks {
|
||||
err = do(c.client, &block)
|
||||
err = do(p.client, block)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -280,8 +263,9 @@ func (c provider) doBlocks(pool *calicov3.IPPool, do func(calicoset.Interface, *
|
||||
func deleteBlock(c calicoset.Interface, block *calicov3.IPAMBlock) error {
|
||||
if block.Empty() {
|
||||
if !block.Spec.Deleted {
|
||||
block.Spec.Deleted = true
|
||||
_, err := c.CrdCalicov3().IPAMBlocks().Update(context.TODO(), block, v1.UpdateOptions{})
|
||||
clone := block.DeepCopy()
|
||||
clone.Spec.Deleted = true
|
||||
_, err := c.CrdCalicov3().IPAMBlocks().Update(context.TODO(), clone, v1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -297,7 +281,7 @@ func deleteBlock(c calicoset.Interface, block *calicov3.IPAMBlock) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) {
|
||||
func (p *provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) {
|
||||
// Deleting a pool requires a little care because of existing endpoints
|
||||
// using IP addresses allocated in the pool. We do the deletion in
|
||||
// the following steps:
|
||||
@@ -306,7 +290,7 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) {
|
||||
// - delete the pool
|
||||
|
||||
// Get the pool so that we can find the CIDR associated with it.
|
||||
calicoPool, err := c.client.CrdCalicov3().IPPools().Get(context.TODO(), pool.Name, v1.GetOptions{})
|
||||
calicoPool, err := p.client.CrdCalicov3().IPPools().Get(context.TODO(), pool.Name, v1.GetOptions{})
|
||||
if err != nil {
|
||||
if k8serrors.IsNotFound(err) {
|
||||
return true, nil
|
||||
@@ -318,14 +302,14 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) {
|
||||
if !calicoPool.Spec.Disabled {
|
||||
calicoPool.Spec.Disabled = true
|
||||
|
||||
calicoPool, err = c.client.CrdCalicov3().IPPools().Update(context.TODO(), calicoPool, v1.UpdateOptions{})
|
||||
calicoPool, err = p.client.CrdCalicov3().IPPools().Update(context.TODO(), calicoPool, v1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
//If the address pool is being used, we return, avoiding deletions that cause other problems.
|
||||
stat, err := c.GetIPPoolStats(pool)
|
||||
stat, err := p.GetIPPoolStats(pool)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -334,13 +318,13 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) {
|
||||
}
|
||||
|
||||
//set blockaffi to pendingdelete
|
||||
err = c.doBlockAffis(calicoPool, setBlockAffiDeletion)
|
||||
err = p.doBlockAffis(calicoPool, setBlockAffiDeletion)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
//delete block
|
||||
err = c.doBlocks(calicoPool, deleteBlock)
|
||||
err = p.doBlocks(calicoPool, deleteBlock)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrBlockInuse) {
|
||||
return false, nil
|
||||
@@ -349,13 +333,13 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) {
|
||||
}
|
||||
|
||||
//delete blockaffi
|
||||
err = c.doBlockAffis(calicoPool, deleteBlockAffi)
|
||||
err = p.doBlockAffis(calicoPool, deleteBlockAffi)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
//delete calico ippool
|
||||
err = c.client.CrdCalicov3().IPPools().Delete(context.TODO(), calicoPool.Name, v1.DeleteOptions{})
|
||||
err = p.client.CrdCalicov3().IPPools().Delete(context.TODO(), calicoPool.Name, v1.DeleteOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -365,14 +349,14 @@ func (c provider) DeleteIPPool(pool *v1alpha1.IPPool) (bool, error) {
|
||||
}
|
||||
|
||||
//Synchronizing address pools at boot time
|
||||
func (c provider) syncIPPools() error {
|
||||
calicoPools, err := c.client.CrdCalicov3().IPPools().List(context.TODO(), v1.ListOptions{})
|
||||
func (p *provider) syncIPPools() error {
|
||||
calicoPools, err := p.client.CrdCalicov3().IPPools().List(context.TODO(), v1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.V(4).Infof("syncIPPools: cannot list calico ippools, err=%v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
pools, err := c.ksclient.NetworkV1alpha1().IPPools().List(context.TODO(), v1.ListOptions{})
|
||||
pools, err := p.ksclient.NetworkV1alpha1().IPPools().List(context.TODO(), v1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.V(4).Infof("syncIPPools: cannot list kubesphere ippools, err=%v", err)
|
||||
return err
|
||||
@@ -402,7 +386,7 @@ func (c provider) syncIPPools() error {
|
||||
Status: v1alpha1.IPPoolStatus{},
|
||||
}
|
||||
|
||||
_, err = c.ksclient.NetworkV1alpha1().IPPools().Create(context.TODO(), pool, v1.CreateOptions{})
|
||||
_, err = p.ksclient.NetworkV1alpha1().IPPools().Create(context.TODO(), pool, v1.CreateOptions{})
|
||||
if err != nil {
|
||||
klog.V(4).Infof("syncIPPools: cannot create kubesphere ippools, err=%v", err)
|
||||
return err
|
||||
@@ -413,7 +397,7 @@ func (c provider) syncIPPools() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p provider) getAssociatedWorkspaces(pool *v1alpha1.IPPool) ([]string, error) {
|
||||
func (p *provider) getAssociatedWorkspaces(pool *v1alpha1.IPPool) ([]string, error) {
|
||||
var result []string
|
||||
|
||||
poolLabel := constants.WorkspaceLabelKey
|
||||
@@ -430,10 +414,19 @@ func (p provider) getAssociatedWorkspaces(pool *v1alpha1.IPPool) ([]string, erro
|
||||
return result, nil
|
||||
}
|
||||
|
||||
return append(result, pool.GetLabels()[poolLabel]), nil
|
||||
wk := pool.GetLabels()[poolLabel]
|
||||
_, err := p.ksclient.TenantV1alpha1().Workspaces().Get(context.TODO(), wk, v1.GetOptions{})
|
||||
if k8serrors.IsNotFound(err) {
|
||||
clone := pool.DeepCopy()
|
||||
delete(clone.GetLabels(), poolLabel)
|
||||
_, err := p.ksclient.NetworkV1alpha1().IPPools().Update(context.TODO(), clone, v1.UpdateOptions{})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return append(result, wk), err
|
||||
}
|
||||
|
||||
func (p provider) getWorkspaceStatus(name string, poolName string) (*v1alpha1.WorkspaceStatus, error) {
|
||||
func (p *provider) getWorkspaceStatus(name string, poolName string) (*v1alpha1.WorkspaceStatus, error) {
|
||||
var result v1alpha1.WorkspaceStatus
|
||||
|
||||
namespaces, err := p.k8sclient.CoreV1().Namespaces().List(context.TODO(), v1.ListOptions{
|
||||
@@ -448,12 +441,19 @@ func (p provider) getWorkspaceStatus(name string, poolName string) (*v1alpha1.Wo
|
||||
}
|
||||
|
||||
for _, ns := range namespaces.Items {
|
||||
pods, err := p.k8sclient.CoreV1().Pods(ns.GetName()).List(context.TODO(), v1.ListOptions{})
|
||||
pods, err := p.k8sclient.CoreV1().Pods(ns.GetName()).List(context.TODO(), v1.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(
|
||||
labels.Set{
|
||||
v1alpha1.IPPoolNameLabel: poolName,
|
||||
},
|
||||
).String(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
if pod.GetLabels() != nil && pod.GetLabels()[v1alpha1.IPPoolNameLabel] == poolName {
|
||||
if pod.Status.Phase != corev1.PodSucceeded {
|
||||
result.Allocations++
|
||||
}
|
||||
}
|
||||
@@ -462,11 +462,25 @@ func (p provider) getWorkspaceStatus(name string, poolName string) (*v1alpha1.Wo
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (p provider) Type() string {
|
||||
func (p *provider) Type() string {
|
||||
return v1alpha1.IPPoolTypeCalico
|
||||
}
|
||||
|
||||
func (p provider) SyncStatus(stopCh <-chan struct{}, q workqueue.RateLimitingInterface) error {
|
||||
func (p *provider) UpdateNamespace(ns *corev1.Namespace, pools []string) error {
|
||||
if pools != nil {
|
||||
annostrs, _ := json.Marshal(pools)
|
||||
if ns.Annotations == nil {
|
||||
ns.Annotations = make(map[string]string)
|
||||
}
|
||||
ns.Annotations[CalicoAnnotationIPPoolV4] = string(annostrs)
|
||||
} else {
|
||||
delete(ns.Annotations, CalicoAnnotationIPPoolV4)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider) SyncStatus(stopCh <-chan struct{}, q workqueue.RateLimitingInterface) error {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer p.queue.ShutDown()
|
||||
|
||||
@@ -488,7 +502,7 @@ func (p provider) SyncStatus(stopCh <-chan struct{}, q workqueue.RateLimitingInt
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p provider) processBlock(name string) error {
|
||||
func (p *provider) processBlock(name string) error {
|
||||
block, err := p.block.Lister().Get(name)
|
||||
if err != nil {
|
||||
if k8serrors.IsNotFound(err) {
|
||||
@@ -510,10 +524,11 @@ func (p provider) processBlock(name string) error {
|
||||
if poolCIDR.IsNetOverlap(blockCIDR.IPNet) {
|
||||
poolName = pool.Name
|
||||
|
||||
block.Labels = map[string]string{
|
||||
clone := block.DeepCopy()
|
||||
clone.Labels = map[string]string{
|
||||
v1alpha1.IPPoolNameLabel: pool.Name,
|
||||
}
|
||||
p.client.CrdCalicov3().IPAMBlocks().Update(context.TODO(), block, v1.UpdateOptions{})
|
||||
p.client.CrdCalicov3().IPAMBlocks().Update(context.TODO(), clone, v1.UpdateOptions{})
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -529,52 +544,35 @@ func (p provider) processBlock(name string) error {
|
||||
|
||||
pod, err := p.pods.Lister().Pods(namespace).Get(name)
|
||||
if err != nil {
|
||||
continue
|
||||
if k8serrors.IsNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
labels := pod.GetLabels()
|
||||
clone := pod.DeepCopy()
|
||||
labels := clone.GetLabels()
|
||||
if labels != nil {
|
||||
poolLabel := labels[v1alpha1.IPPoolNameLabel]
|
||||
if poolLabel != "" {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
clone.Labels = make(map[string]string)
|
||||
}
|
||||
|
||||
retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
pod, err = p.k8sclient.CoreV1().Pods(namespace).Get(context.TODO(), name, v1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
labels := pod.GetLabels()
|
||||
if labels != nil {
|
||||
poolLabel := labels[v1alpha1.IPPoolNameLabel]
|
||||
if poolLabel != "" {
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
pod.Labels = make(map[string]string)
|
||||
}
|
||||
|
||||
if pod.GetAnnotations() == nil {
|
||||
pod.Annotations = make(map[string]string)
|
||||
}
|
||||
|
||||
annostrs, _ := json.Marshal([]string{poolName})
|
||||
pod.GetAnnotations()[CalicoAnnotationIPPoolV4] = string(annostrs)
|
||||
pod.Labels[v1alpha1.IPPoolNameLabel] = poolName
|
||||
|
||||
_, err = p.k8sclient.CoreV1().Pods(namespace).Update(context.TODO(), pod, v1.UpdateOptions{})
|
||||
clone.Labels[v1alpha1.IPPoolNameLabel] = poolName
|
||||
|
||||
_, err = p.k8sclient.CoreV1().Pods(namespace).Update(context.TODO(), clone, v1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
p.poolQueue.Add(poolName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p provider) processBlockItem() bool {
|
||||
func (p *provider) processBlockItem() bool {
|
||||
key, quit := p.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
@@ -592,12 +590,12 @@ func (p provider) processBlockItem() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (p provider) runWorker() {
|
||||
func (p *provider) runWorker() {
|
||||
for p.processBlockItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p provider) addBlock(obj interface{}) {
|
||||
func (p *provider) addBlock(obj interface{}) {
|
||||
block, ok := obj.(*calicov3.IPAMBlock)
|
||||
if !ok {
|
||||
return
|
||||
@@ -606,7 +604,7 @@ func (p provider) addBlock(obj interface{}) {
|
||||
p.queue.Add(block.Name)
|
||||
}
|
||||
|
||||
func (p provider) Default(obj runtime.Object) error {
|
||||
func (p *provider) Default(obj runtime.Object) error {
|
||||
pod, ok := obj.(*corev1.Pod)
|
||||
if !ok {
|
||||
return nil
|
||||
@@ -639,7 +637,18 @@ func (p provider) Default(obj runtime.Object) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewProvider(podInformer informercorev1.PodInformer, ksclient kubesphereclient.Interface, k8sClient clientset.Interface, k8sOptions *k8s.KubernetesOptions) provider {
|
||||
func (p *provider) addPod(obj interface{}) {
|
||||
pod, _ := obj.(*corev1.Pod)
|
||||
|
||||
if pod.Labels != nil {
|
||||
pool := pod.Labels[v1alpha1.IPPoolNameLabel]
|
||||
if pool != "" && p.poolQueue != nil {
|
||||
p.poolQueue.Add(pool)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func NewProvider(k8sInformer k8sinformers.SharedInformerFactory, ksclient kubesphereclient.Interface, k8sClient clientset.Interface, k8sOptions *k8s.KubernetesOptions) *provider {
|
||||
config, err := clientcmd.BuildConfigFromFlags("", k8sOptions.KubeConfig)
|
||||
if err != nil {
|
||||
klog.Fatalf("failed to build k8s config , err=%v", err)
|
||||
@@ -677,14 +686,27 @@ func NewProvider(podInformer informercorev1.PodInformer, ksclient kubesphereclie
|
||||
}
|
||||
}
|
||||
|
||||
p := provider{
|
||||
p := &provider{
|
||||
client: client,
|
||||
ksclient: ksclient,
|
||||
k8sclient: k8sClient,
|
||||
pods: podInformer,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "calicoBlock"),
|
||||
options: opts,
|
||||
}
|
||||
p.pods = k8sInformer.Core().V1().Pods()
|
||||
|
||||
p.pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
poolOld := old.(*corev1.Pod).Labels[v1alpha1.IPPoolNameLabel]
|
||||
poolNew := new.(*corev1.Pod).Labels[v1alpha1.IPPoolNameLabel]
|
||||
if poolNew == poolOld {
|
||||
return
|
||||
}
|
||||
p.addPod(new)
|
||||
},
|
||||
DeleteFunc: p.addPod,
|
||||
AddFunc: p.addPod,
|
||||
})
|
||||
|
||||
blockI := calicoInformer.NewSharedInformerFactory(client, defaultResync).Crd().Calicov3().IPAMBlocks()
|
||||
blockI.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
@@ -695,9 +717,16 @@ func NewProvider(podInformer informercorev1.PodInformer, ksclient kubesphereclie
|
||||
})
|
||||
p.block = blockI
|
||||
|
||||
if err := p.syncIPPools(); err != nil {
|
||||
klog.Fatalf("failed to sync calico ippool to kubesphere ippool, err=%v", err)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
if err := p.syncIPPools(); err != nil {
|
||||
klog.Infof("failed to sync calico ippool to kubesphere ippool, err=%v", err)
|
||||
time.Sleep(3 * time.Second)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
}()
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
@@ -423,6 +423,10 @@ func (c IPAMClient) GetUtilization(args GetUtilizationArgs) ([]*PoolUtilization,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(allPools) <= 0 {
|
||||
return nil, fmt.Errorf("not found pool")
|
||||
}
|
||||
|
||||
// Identify the ones we want and create a PoolUtilization for each of those.
|
||||
wantAllPools := len(args.Pools) == 0
|
||||
wantedPools := set.FromArray(args.Pools)
|
||||
|
||||
@@ -17,8 +17,9 @@ limitations under the License.
|
||||
package ippool
|
||||
|
||||
import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
v1 "k8s.io/client-go/informers/core/v1"
|
||||
k8sinformers "k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
networkv1alpha1 "kubesphere.io/kubesphere/pkg/apis/network/v1alpha1"
|
||||
@@ -35,6 +36,7 @@ type Provider interface {
|
||||
UpdateIPPool(pool *networkv1alpha1.IPPool) error
|
||||
GetIPPoolStats(pool *networkv1alpha1.IPPool) (*networkv1alpha1.IPPool, error)
|
||||
SyncStatus(stopCh <-chan struct{}, q workqueue.RateLimitingInterface) error
|
||||
UpdateNamespace(ns *corev1.Namespace, pools []string) error
|
||||
Type() string
|
||||
Default(obj runtime.Object) error
|
||||
}
|
||||
@@ -52,6 +54,10 @@ func (p provider) Default(obj runtime.Object) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p provider) UpdateNamespace(ns *corev1.Namespace, pools []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p provider) DeleteIPPool(pool *networkv1alpha1.IPPool) (bool, error) {
|
||||
blocks, err := p.ipamclient.ListBlocks(pool.Name)
|
||||
if err != nil {
|
||||
@@ -110,7 +116,7 @@ func newProvider(clientset kubesphereclient.Interface) provider {
|
||||
}
|
||||
}
|
||||
|
||||
func NewProvider(podInformer v1.PodInformer, clientset kubesphereclient.Interface, client clientset.Interface, pt string, k8sOptions *k8s.KubernetesOptions) Provider {
|
||||
func NewProvider(k8sInformer k8sinformers.SharedInformerFactory, clientset kubesphereclient.Interface, client clientset.Interface, pt string, k8sOptions *k8s.KubernetesOptions) Provider {
|
||||
var p Provider
|
||||
|
||||
switch pt {
|
||||
@@ -120,7 +126,7 @@ func NewProvider(podInformer v1.PodInformer, clientset kubesphereclient.Interfac
|
||||
ipamclient: ipam.NewIPAMClient(clientset, networkv1alpha1.VLAN),
|
||||
}
|
||||
case networkv1alpha1.IPPoolTypeCalico:
|
||||
p = calicoclient.NewProvider(podInformer, clientset, client, k8sOptions)
|
||||
p = calicoclient.NewProvider(k8sInformer, clientset, client, k8sOptions)
|
||||
}
|
||||
|
||||
return p
|
||||
|
||||
Reference in New Issue
Block a user