fix bug get goroutine for audit timeout

Signed-off-by: wanjunlei <wanjunlei@yunify.com>
This commit is contained in:
wanjunlei
2020-12-14 11:10:40 +08:00
parent a314b31bf0
commit b543ae1a12
4 changed files with 149 additions and 76 deletions

View File

@@ -281,7 +281,7 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
if s.Config.AuditingOptions.Enable { if s.Config.AuditingOptions.Enable {
handler = filters.WithAuditing(handler, handler = filters.WithAuditing(handler,
audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions.WebhookUrl, stopCh)) audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions, stopCh))
} }
var authorizers authorizer.Authorizer var authorizers authorizer.Authorizer

View File

@@ -23,42 +23,62 @@ import (
"encoding/json" "encoding/json"
"k8s.io/klog" "k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1" "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
options "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch"
"net/http" "net/http"
"time" "time"
) )
const ( const (
WaitTimeout = time.Second WaitTimeout = time.Second
WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event" SendTimeout = time.Second * 3
DefaultGoroutinesNum = 100
DefaultBatchSize = 100
DefaultBatchWait = time.Second * 3
WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event"
) )
type Backend struct { type Backend struct {
url string url string
channelCapacity int semCh chan interface{}
semCh chan interface{} cache chan *v1alpha1.Event
cache chan *v1alpha1.EventList client http.Client
client http.Client sendTimeout time.Duration
sendTimeout time.Duration waitTimeout time.Duration
waitTimeout time.Duration maxBatchSize int
stopCh <-chan struct{} maxBatchWait time.Duration
stopCh <-chan struct{}
} }
func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend { func NewBackend(opts *options.Options, cache chan *v1alpha1.Event, stopCh <-chan struct{}) *Backend {
b := Backend{ b := Backend{
url: url, url: opts.WebhookUrl,
semCh: make(chan interface{}, channelCapacity), waitTimeout: WaitTimeout,
channelCapacity: channelCapacity, cache: cache,
waitTimeout: WaitTimeout, sendTimeout: SendTimeout,
cache: cache, maxBatchSize: opts.MaxBatchSize,
sendTimeout: sendTimeout, maxBatchWait: opts.MaxBatchWait,
stopCh: stopCh, stopCh: stopCh,
} }
if len(b.url) == 0 { if len(b.url) == 0 {
b.url = WebhookURL b.url = WebhookURL
} }
if b.maxBatchWait == 0 {
b.maxBatchWait = DefaultBatchWait
}
if b.maxBatchSize == 0 {
b.maxBatchSize = DefaultBatchSize
}
goroutinesNum := opts.GoroutinesNum
if goroutinesNum == 0 {
goroutinesNum = DefaultGoroutinesNum
}
b.semCh = make(chan interface{}, goroutinesNum)
b.client = http.Client{ b.client = http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
TLSClientConfig: &tls.Config{ TLSClientConfig: &tls.Config{
@@ -76,53 +96,97 @@ func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList,
func (b *Backend) worker() { func (b *Backend) worker() {
for { for {
events := b.getEvents()
var event *v1alpha1.EventList if events == nil {
select {
case event = <-b.cache:
if event == nil {
break
}
case <-b.stopCh:
break break
} }
send := func(event *v1alpha1.EventList) { if len(events.Items) == 0 {
ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout) continue
defer cancel()
select {
case <-ctx.Done():
klog.Errorf("get goroutine for audit(%s) timeout", event.Items[0].AuditID)
return
case b.semCh <- struct{}{}:
}
defer func() {
<-b.semCh
}()
bs, err := b.eventToBytes(event)
if err != nil {
klog.V(6).Infof("json marshal error, %s", err)
return
}
klog.V(8).Infof("%s", string(bs))
response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs))
if err != nil {
klog.Errorf("send audit event[%s] error, %s", event.Items[0].AuditID, err)
return
}
if response.StatusCode != http.StatusOK {
klog.Errorf("send audit event[%s] error[%d]", event.Items[0].AuditID, response.StatusCode)
return
}
} }
go send(event) go b.sendEvents(events)
}
}
func (b *Backend) getEvents() *v1alpha1.EventList {
ctx, cancel := context.WithTimeout(context.Background(), b.maxBatchWait)
defer cancel()
events := &v1alpha1.EventList{}
for {
select {
case event := <-b.cache:
if event == nil {
break
}
events.Items = append(events.Items, *event)
if len(events.Items) >= b.maxBatchSize {
return events
}
case <-ctx.Done():
return events
case <-b.stopCh:
return nil
}
}
}
func (b *Backend) sendEvents(events *v1alpha1.EventList) {
ctx, cancel := context.WithTimeout(context.Background(), b.sendTimeout)
defer cancel()
stopCh := make(chan struct{})
send := func() {
ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout)
defer cancel()
select {
case <-ctx.Done():
klog.Error("get goroutine timeout")
return
case b.semCh <- struct{}{}:
}
start := time.Now()
defer func() {
stopCh <- struct{}{}
klog.V(8).Infof("send %d auditing logs used %d", len(events.Items), time.Now().Sub(start).Milliseconds())
}()
bs, err := b.eventToBytes(events)
if err != nil {
klog.V(6).Infof("json marshal error, %s", err)
return
}
klog.V(8).Infof("%s", string(bs))
response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs))
if err != nil {
klog.Errorf("send audit events error, %s", err)
return
}
if response.StatusCode != http.StatusOK {
klog.Errorf("send audit events error[%d]", response.StatusCode)
return
}
}
go send()
defer func() {
<-b.semCh
}()
select {
case <-ctx.Done():
klog.Error("send audit events timeout")
case <-stopCh:
} }
} }

View File

@@ -36,6 +36,7 @@ import (
"kubesphere.io/kubesphere/pkg/informers" "kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3" "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/devops" "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/devops"
options "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch"
"kubesphere.io/kubesphere/pkg/utils/iputil" "kubesphere.io/kubesphere/pkg/utils/iputil"
"net" "net"
"net/http" "net/http"
@@ -46,8 +47,6 @@ const (
DefaultWebhook = "kube-auditing-webhook" DefaultWebhook = "kube-auditing-webhook"
DefaultCacheCapacity = 10000 DefaultCacheCapacity = 10000
CacheTimeout = time.Second CacheTimeout = time.Second
SendTimeout = time.Second * 3
ChannelCapacity = 10
) )
type Auditing interface { type Auditing interface {
@@ -60,19 +59,19 @@ type Auditing interface {
type auditing struct { type auditing struct {
webhookLister v1alpha1.WebhookLister webhookLister v1alpha1.WebhookLister
devopsGetter v1alpha3.Interface devopsGetter v1alpha3.Interface
cache chan *auditv1alpha1.EventList cache chan *auditv1alpha1.Event
backend *Backend backend *Backend
} }
func NewAuditing(informers informers.InformerFactory, url string, stopCh <-chan struct{}) Auditing { func NewAuditing(informers informers.InformerFactory, opts *options.Options, stopCh <-chan struct{}) Auditing {
a := &auditing{ a := &auditing{
webhookLister: informers.KubeSphereSharedInformerFactory().Auditing().V1alpha1().Webhooks().Lister(), webhookLister: informers.KubeSphereSharedInformerFactory().Auditing().V1alpha1().Webhooks().Lister(),
devopsGetter: devops.New(informers.KubeSphereSharedInformerFactory()), devopsGetter: devops.New(informers.KubeSphereSharedInformerFactory()),
cache: make(chan *auditv1alpha1.EventList, DefaultCacheCapacity), cache: make(chan *auditv1alpha1.Event, DefaultCacheCapacity),
} }
a.backend = NewBackend(url, ChannelCapacity, a.cache, SendTimeout, stopCh) a.backend = NewBackend(opts, a.cache, stopCh)
return a return a
} }
@@ -226,10 +225,8 @@ func (a *auditing) LogResponseObject(e *auditv1alpha1.Event, resp *ResponseCaptu
func (a *auditing) cacheEvent(e auditv1alpha1.Event) { func (a *auditing) cacheEvent(e auditv1alpha1.Event) {
eventList := &auditv1alpha1.EventList{}
eventList.Items = append(eventList.Items, e)
select { select {
case a.cache <- eventList: case a.cache <- &e:
return return
case <-time.After(CacheTimeout): case <-time.After(CacheTimeout):
klog.Errorf("cache audit event %s timeout", e.AuditID) klog.Errorf("cache audit event %s timeout", e.AuditID)

View File

@@ -19,14 +19,21 @@ package elasticsearch
import ( import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
"kubesphere.io/kubesphere/pkg/utils/reflectutils" "kubesphere.io/kubesphere/pkg/utils/reflectutils"
"time"
) )
type Options struct { type Options struct {
Enable bool `json:"enable" yaml:"enable"` Enable bool `json:"enable" yaml:"enable"`
WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"` WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"`
Host string `json:"host" yaml:"host"` // The number of goroutines which send auditing events to webhook.
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` GoroutinesNum int `json:"goroutinesNum" yaml:"goroutinesNum"`
Version string `json:"version" yaml:"version"` // The max size of the auditing event in a batch.
MaxBatchSize int `json:"batchSize" yaml:"batchSize"`
// MaxBatchWait indicates the maximum interval between two batches.
MaxBatchWait time.Duration `json:"batchTimeout" yaml:"batchTimeout"`
Host string `json:"host" yaml:"host"`
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"`
Version string `json:"version" yaml:"version"`
} }
func NewElasticSearchOptions() *Options { func NewElasticSearchOptions() *Options {
@@ -52,7 +59,12 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
fs.BoolVar(&s.Enable, "auditing-enabled", c.Enable, "Enable auditing component or not. ") fs.BoolVar(&s.Enable, "auditing-enabled", c.Enable, "Enable auditing component or not. ")
fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing wehook url") fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing wehook url")
fs.IntVar(&s.GoroutinesNum, "auditing-goroutines-num", c.GoroutinesNum,
"The number of goroutines which send auditing events to webhook.")
fs.IntVar(&s.MaxBatchSize, "auditing-batch-max-size", c.MaxBatchSize,
"The max size of the auditing event in a batch.")
fs.DurationVar(&s.MaxBatchWait, "auditing-batch-max-wait", c.MaxBatchWait,
"MaxBatchWait indicates the maximum interval between two batches.")
fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+ fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+
"Elasticsearch service host. KubeSphere is using elastic as auditing store, "+ "Elasticsearch service host. KubeSphere is using elastic as auditing store, "+
"if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+ "if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+