From b543ae1a12e411d6bff72a98ae0c4cd95cc2bf62 Mon Sep 17 00:00:00 2001 From: wanjunlei Date: Mon, 14 Dec 2020 11:10:40 +0800 Subject: [PATCH 1/2] fix bug get goroutine for audit timeout Signed-off-by: wanjunlei --- pkg/apiserver/apiserver.go | 2 +- pkg/apiserver/auditing/backend.go | 184 ++++++++++++------ pkg/apiserver/auditing/types.go | 15 +- .../client/auditing/elasticsearch/options.go | 24 ++- 4 files changed, 149 insertions(+), 76 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index ab306fd87..392c9d014 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -281,7 +281,7 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) { if s.Config.AuditingOptions.Enable { handler = filters.WithAuditing(handler, - audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions.WebhookUrl, stopCh)) + audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions, stopCh)) } var authorizers authorizer.Authorizer diff --git a/pkg/apiserver/auditing/backend.go b/pkg/apiserver/auditing/backend.go index 96041c854..eb5c01099 100644 --- a/pkg/apiserver/auditing/backend.go +++ b/pkg/apiserver/auditing/backend.go @@ -23,42 +23,62 @@ import ( "encoding/json" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1" + options "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch" "net/http" "time" ) const ( - WaitTimeout = time.Second - WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event" + WaitTimeout = time.Second + SendTimeout = time.Second * 3 + DefaultGoroutinesNum = 100 + DefaultBatchSize = 100 + DefaultBatchWait = time.Second * 3 + WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event" ) type Backend struct { - url string - channelCapacity int - semCh chan interface{} - cache chan *v1alpha1.EventList - client http.Client - sendTimeout time.Duration - waitTimeout time.Duration - stopCh <-chan struct{} + url string + semCh chan interface{} + cache chan *v1alpha1.Event + client http.Client + sendTimeout time.Duration + waitTimeout time.Duration + maxBatchSize int + maxBatchWait time.Duration + stopCh <-chan struct{} } -func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend { +func NewBackend(opts *options.Options, cache chan *v1alpha1.Event, stopCh <-chan struct{}) *Backend { b := Backend{ - url: url, - semCh: make(chan interface{}, channelCapacity), - channelCapacity: channelCapacity, - waitTimeout: WaitTimeout, - cache: cache, - sendTimeout: sendTimeout, - stopCh: stopCh, + url: opts.WebhookUrl, + waitTimeout: WaitTimeout, + cache: cache, + sendTimeout: SendTimeout, + maxBatchSize: opts.MaxBatchSize, + maxBatchWait: opts.MaxBatchWait, + stopCh: stopCh, } if len(b.url) == 0 { b.url = WebhookURL } + if b.maxBatchWait == 0 { + b.maxBatchWait = DefaultBatchWait + } + + if b.maxBatchSize == 0 { + b.maxBatchSize = DefaultBatchSize + } + + goroutinesNum := opts.GoroutinesNum + if goroutinesNum == 0 { + goroutinesNum = DefaultGoroutinesNum + } + b.semCh = make(chan interface{}, goroutinesNum) + b.client = http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{ @@ -76,53 +96,97 @@ func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList, func (b *Backend) worker() { for { - - var event *v1alpha1.EventList - select { - case event = <-b.cache: - if event == nil { - break - } - case <-b.stopCh: + events := b.getEvents() + if events == nil { break } - send := func(event *v1alpha1.EventList) { - ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout) - defer cancel() - - select { - case <-ctx.Done(): - klog.Errorf("get goroutine for audit(%s) timeout", event.Items[0].AuditID) - return - case b.semCh <- struct{}{}: - } - - defer func() { - <-b.semCh - }() - - bs, err := b.eventToBytes(event) - if err != nil { - klog.V(6).Infof("json marshal error, %s", err) - return - } - - klog.V(8).Infof("%s", string(bs)) - - response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs)) - if err != nil { - klog.Errorf("send audit event[%s] error, %s", event.Items[0].AuditID, err) - return - } - - if response.StatusCode != http.StatusOK { - klog.Errorf("send audit event[%s] error[%d]", event.Items[0].AuditID, response.StatusCode) - return - } + if len(events.Items) == 0 { + continue } - go send(event) + go b.sendEvents(events) + } +} + +func (b *Backend) getEvents() *v1alpha1.EventList { + + ctx, cancel := context.WithTimeout(context.Background(), b.maxBatchWait) + defer cancel() + + events := &v1alpha1.EventList{} + for { + select { + case event := <-b.cache: + if event == nil { + break + } + events.Items = append(events.Items, *event) + if len(events.Items) >= b.maxBatchSize { + return events + } + case <-ctx.Done(): + return events + case <-b.stopCh: + return nil + } + } +} + +func (b *Backend) sendEvents(events *v1alpha1.EventList) { + + ctx, cancel := context.WithTimeout(context.Background(), b.sendTimeout) + defer cancel() + + stopCh := make(chan struct{}) + + send := func() { + ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout) + defer cancel() + + select { + case <-ctx.Done(): + klog.Error("get goroutine timeout") + return + case b.semCh <- struct{}{}: + } + + start := time.Now() + defer func() { + stopCh <- struct{}{} + klog.V(8).Infof("send %d auditing logs used %d", len(events.Items), time.Now().Sub(start).Milliseconds()) + }() + + bs, err := b.eventToBytes(events) + if err != nil { + klog.V(6).Infof("json marshal error, %s", err) + return + } + + klog.V(8).Infof("%s", string(bs)) + + response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs)) + if err != nil { + klog.Errorf("send audit events error, %s", err) + return + } + + if response.StatusCode != http.StatusOK { + klog.Errorf("send audit events error[%d]", response.StatusCode) + return + } + } + + go send() + + defer func() { + <-b.semCh + }() + + select { + case <-ctx.Done(): + klog.Error("send audit events timeout") + case <-stopCh: } } diff --git a/pkg/apiserver/auditing/types.go b/pkg/apiserver/auditing/types.go index d6dd0b08c..4ffc12b75 100644 --- a/pkg/apiserver/auditing/types.go +++ b/pkg/apiserver/auditing/types.go @@ -36,6 +36,7 @@ import ( "kubesphere.io/kubesphere/pkg/informers" "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3" "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/devops" + options "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch" "kubesphere.io/kubesphere/pkg/utils/iputil" "net" "net/http" @@ -46,8 +47,6 @@ const ( DefaultWebhook = "kube-auditing-webhook" DefaultCacheCapacity = 10000 CacheTimeout = time.Second - SendTimeout = time.Second * 3 - ChannelCapacity = 10 ) type Auditing interface { @@ -60,19 +59,19 @@ type Auditing interface { type auditing struct { webhookLister v1alpha1.WebhookLister devopsGetter v1alpha3.Interface - cache chan *auditv1alpha1.EventList + cache chan *auditv1alpha1.Event backend *Backend } -func NewAuditing(informers informers.InformerFactory, url string, stopCh <-chan struct{}) Auditing { +func NewAuditing(informers informers.InformerFactory, opts *options.Options, stopCh <-chan struct{}) Auditing { a := &auditing{ webhookLister: informers.KubeSphereSharedInformerFactory().Auditing().V1alpha1().Webhooks().Lister(), devopsGetter: devops.New(informers.KubeSphereSharedInformerFactory()), - cache: make(chan *auditv1alpha1.EventList, DefaultCacheCapacity), + cache: make(chan *auditv1alpha1.Event, DefaultCacheCapacity), } - a.backend = NewBackend(url, ChannelCapacity, a.cache, SendTimeout, stopCh) + a.backend = NewBackend(opts, a.cache, stopCh) return a } @@ -226,10 +225,8 @@ func (a *auditing) LogResponseObject(e *auditv1alpha1.Event, resp *ResponseCaptu func (a *auditing) cacheEvent(e auditv1alpha1.Event) { - eventList := &auditv1alpha1.EventList{} - eventList.Items = append(eventList.Items, e) select { - case a.cache <- eventList: + case a.cache <- &e: return case <-time.After(CacheTimeout): klog.Errorf("cache audit event %s timeout", e.AuditID) diff --git a/pkg/simple/client/auditing/elasticsearch/options.go b/pkg/simple/client/auditing/elasticsearch/options.go index 9da5ac996..2b6063a00 100644 --- a/pkg/simple/client/auditing/elasticsearch/options.go +++ b/pkg/simple/client/auditing/elasticsearch/options.go @@ -19,14 +19,21 @@ package elasticsearch import ( "github.com/spf13/pflag" "kubesphere.io/kubesphere/pkg/utils/reflectutils" + "time" ) type Options struct { - Enable bool `json:"enable" yaml:"enable"` - WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"` - Host string `json:"host" yaml:"host"` - IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` - Version string `json:"version" yaml:"version"` + Enable bool `json:"enable" yaml:"enable"` + WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"` + // The number of goroutines which send auditing events to webhook. + GoroutinesNum int `json:"goroutinesNum" yaml:"goroutinesNum"` + // The max size of the auditing event in a batch. + MaxBatchSize int `json:"batchSize" yaml:"batchSize"` + // MaxBatchWait indicates the maximum interval between two batches. + MaxBatchWait time.Duration `json:"batchTimeout" yaml:"batchTimeout"` + Host string `json:"host" yaml:"host"` + IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` + Version string `json:"version" yaml:"version"` } func NewElasticSearchOptions() *Options { @@ -52,7 +59,12 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) { fs.BoolVar(&s.Enable, "auditing-enabled", c.Enable, "Enable auditing component or not. ") fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing wehook url") - + fs.IntVar(&s.GoroutinesNum, "auditing-goroutines-num", c.GoroutinesNum, + "The number of goroutines which send auditing events to webhook.") + fs.IntVar(&s.MaxBatchSize, "auditing-batch-max-size", c.MaxBatchSize, + "The max size of the auditing event in a batch.") + fs.DurationVar(&s.MaxBatchWait, "auditing-batch-max-wait", c.MaxBatchWait, + "MaxBatchWait indicates the maximum interval between two batches.") fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+ "Elasticsearch service host. KubeSphere is using elastic as auditing store, "+ "if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+ From ee95aeff153c036b94bc42ea72db1d8c8796244f Mon Sep 17 00:00:00 2001 From: wanjunlei Date: Mon, 14 Dec 2020 15:38:11 +0800 Subject: [PATCH 2/2] resolve conversation Signed-off-by: wanjunlei --- pkg/apiserver/auditing/backend.go | 68 +++++++++---------- pkg/apiserver/auditing/types.go | 2 +- .../client/auditing/elasticsearch/options.go | 30 ++++---- 3 files changed, 50 insertions(+), 50 deletions(-) diff --git a/pkg/apiserver/auditing/backend.go b/pkg/apiserver/auditing/backend.go index eb5c01099..078010156 100644 --- a/pkg/apiserver/auditing/backend.go +++ b/pkg/apiserver/auditing/backend.go @@ -29,55 +29,55 @@ import ( ) const ( - WaitTimeout = time.Second + GetSenderTimeout = time.Second SendTimeout = time.Second * 3 - DefaultGoroutinesNum = 100 + DefaultSendersNum = 100 DefaultBatchSize = 100 - DefaultBatchWait = time.Second * 3 + DefaultBatchInterval = time.Second * 3 WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event" ) type Backend struct { - url string - semCh chan interface{} - cache chan *v1alpha1.Event - client http.Client - sendTimeout time.Duration - waitTimeout time.Duration - maxBatchSize int - maxBatchWait time.Duration - stopCh <-chan struct{} + url string + senderCh chan interface{} + cache chan *v1alpha1.Event + client http.Client + sendTimeout time.Duration + getSenderTimeout time.Duration + eventBatchSize int + eventBatchInterval time.Duration + stopCh <-chan struct{} } func NewBackend(opts *options.Options, cache chan *v1alpha1.Event, stopCh <-chan struct{}) *Backend { b := Backend{ - url: opts.WebhookUrl, - waitTimeout: WaitTimeout, - cache: cache, - sendTimeout: SendTimeout, - maxBatchSize: opts.MaxBatchSize, - maxBatchWait: opts.MaxBatchWait, - stopCh: stopCh, + url: opts.WebhookUrl, + getSenderTimeout: GetSenderTimeout, + cache: cache, + sendTimeout: SendTimeout, + eventBatchSize: opts.EventBatchSize, + eventBatchInterval: opts.EventBatchInterval, + stopCh: stopCh, } if len(b.url) == 0 { b.url = WebhookURL } - if b.maxBatchWait == 0 { - b.maxBatchWait = DefaultBatchWait + if b.eventBatchInterval == 0 { + b.eventBatchInterval = DefaultBatchInterval } - if b.maxBatchSize == 0 { - b.maxBatchSize = DefaultBatchSize + if b.eventBatchSize == 0 { + b.eventBatchSize = DefaultBatchSize } - goroutinesNum := opts.GoroutinesNum - if goroutinesNum == 0 { - goroutinesNum = DefaultGoroutinesNum + sendersNum := opts.EventSendersNum + if sendersNum == 0 { + sendersNum = DefaultSendersNum } - b.semCh = make(chan interface{}, goroutinesNum) + b.senderCh = make(chan interface{}, sendersNum) b.client = http.Client{ Transport: &http.Transport{ @@ -111,7 +111,7 @@ func (b *Backend) worker() { func (b *Backend) getEvents() *v1alpha1.EventList { - ctx, cancel := context.WithTimeout(context.Background(), b.maxBatchWait) + ctx, cancel := context.WithTimeout(context.Background(), b.eventBatchInterval) defer cancel() events := &v1alpha1.EventList{} @@ -122,7 +122,7 @@ func (b *Backend) getEvents() *v1alpha1.EventList { break } events.Items = append(events.Items, *event) - if len(events.Items) >= b.maxBatchSize { + if len(events.Items) >= b.eventBatchSize { return events } case <-ctx.Done(): @@ -141,14 +141,14 @@ func (b *Backend) sendEvents(events *v1alpha1.EventList) { stopCh := make(chan struct{}) send := func() { - ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout) + ctx, cancel := context.WithTimeout(context.Background(), b.getSenderTimeout) defer cancel() select { case <-ctx.Done(): - klog.Error("get goroutine timeout") + klog.Error("Get auditing event sender timeout") return - case b.semCh <- struct{}{}: + case b.senderCh <- struct{}{}: } start := time.Now() @@ -159,7 +159,7 @@ func (b *Backend) sendEvents(events *v1alpha1.EventList) { bs, err := b.eventToBytes(events) if err != nil { - klog.V(6).Infof("json marshal error, %s", err) + klog.Errorf("json marshal error, %s", err) return } @@ -180,7 +180,7 @@ func (b *Backend) sendEvents(events *v1alpha1.EventList) { go send() defer func() { - <-b.semCh + <-b.senderCh }() select { diff --git a/pkg/apiserver/auditing/types.go b/pkg/apiserver/auditing/types.go index 4ffc12b75..049fa647f 100644 --- a/pkg/apiserver/auditing/types.go +++ b/pkg/apiserver/auditing/types.go @@ -229,7 +229,7 @@ func (a *auditing) cacheEvent(e auditv1alpha1.Event) { case a.cache <- &e: return case <-time.After(CacheTimeout): - klog.Errorf("cache audit event %s timeout", e.AuditID) + klog.V(8).Infof("cache audit event %s timeout", e.AuditID) break } } diff --git a/pkg/simple/client/auditing/elasticsearch/options.go b/pkg/simple/client/auditing/elasticsearch/options.go index 2b6063a00..161ed30ac 100644 --- a/pkg/simple/client/auditing/elasticsearch/options.go +++ b/pkg/simple/client/auditing/elasticsearch/options.go @@ -25,15 +25,15 @@ import ( type Options struct { Enable bool `json:"enable" yaml:"enable"` WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"` - // The number of goroutines which send auditing events to webhook. - GoroutinesNum int `json:"goroutinesNum" yaml:"goroutinesNum"` - // The max size of the auditing event in a batch. - MaxBatchSize int `json:"batchSize" yaml:"batchSize"` - // MaxBatchWait indicates the maximum interval between two batches. - MaxBatchWait time.Duration `json:"batchTimeout" yaml:"batchTimeout"` - Host string `json:"host" yaml:"host"` - IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` - Version string `json:"version" yaml:"version"` + // The maximum concurrent senders which send auditing events to the auditing webhook. + EventSendersNum int `json:"eventSendersNum" yaml:"eventSendersNum"` + // The batch size of auditing events. + EventBatchSize int `json:"eventBatchSize" yaml:"eventBatchSize"` + // The batch interval of auditing events. + EventBatchInterval time.Duration `json:"eventBatchInterval" yaml:"eventBatchInterval"` + Host string `json:"host" yaml:"host"` + IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` + Version string `json:"version" yaml:"version"` } func NewElasticSearchOptions() *Options { @@ -59,12 +59,12 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) { fs.BoolVar(&s.Enable, "auditing-enabled", c.Enable, "Enable auditing component or not. ") fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing wehook url") - fs.IntVar(&s.GoroutinesNum, "auditing-goroutines-num", c.GoroutinesNum, - "The number of goroutines which send auditing events to webhook.") - fs.IntVar(&s.MaxBatchSize, "auditing-batch-max-size", c.MaxBatchSize, - "The max size of the auditing event in a batch.") - fs.DurationVar(&s.MaxBatchWait, "auditing-batch-max-wait", c.MaxBatchWait, - "MaxBatchWait indicates the maximum interval between two batches.") + fs.IntVar(&s.EventSendersNum, "auditing-event-senders-num", c.EventSendersNum, + "The maximum concurrent senders which send auditing events to the auditing webhook.") + fs.IntVar(&s.EventBatchSize, "auditing-event-batch-size", c.EventBatchSize, + "The batch size of auditing events.") + fs.DurationVar(&s.EventBatchInterval, "auditing-event-batch-interval", c.EventBatchInterval, + "The batch interval of auditing events.") fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+ "Elasticsearch service host. KubeSphere is using elastic as auditing store, "+ "if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+