@@ -29,55 +29,55 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
WaitTimeout = time.Second
|
||||
GetSenderTimeout = time.Second
|
||||
SendTimeout = time.Second * 3
|
||||
DefaultGoroutinesNum = 100
|
||||
DefaultSendersNum = 100
|
||||
DefaultBatchSize = 100
|
||||
DefaultBatchWait = time.Second * 3
|
||||
DefaultBatchInterval = time.Second * 3
|
||||
WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event"
|
||||
)
|
||||
|
||||
type Backend struct {
|
||||
url string
|
||||
semCh chan interface{}
|
||||
cache chan *v1alpha1.Event
|
||||
client http.Client
|
||||
sendTimeout time.Duration
|
||||
waitTimeout time.Duration
|
||||
maxBatchSize int
|
||||
maxBatchWait time.Duration
|
||||
stopCh <-chan struct{}
|
||||
url string
|
||||
senderCh chan interface{}
|
||||
cache chan *v1alpha1.Event
|
||||
client http.Client
|
||||
sendTimeout time.Duration
|
||||
getSenderTimeout time.Duration
|
||||
eventBatchSize int
|
||||
eventBatchInterval time.Duration
|
||||
stopCh <-chan struct{}
|
||||
}
|
||||
|
||||
func NewBackend(opts *options.Options, cache chan *v1alpha1.Event, stopCh <-chan struct{}) *Backend {
|
||||
|
||||
b := Backend{
|
||||
url: opts.WebhookUrl,
|
||||
waitTimeout: WaitTimeout,
|
||||
cache: cache,
|
||||
sendTimeout: SendTimeout,
|
||||
maxBatchSize: opts.MaxBatchSize,
|
||||
maxBatchWait: opts.MaxBatchWait,
|
||||
stopCh: stopCh,
|
||||
url: opts.WebhookUrl,
|
||||
getSenderTimeout: GetSenderTimeout,
|
||||
cache: cache,
|
||||
sendTimeout: SendTimeout,
|
||||
eventBatchSize: opts.EventBatchSize,
|
||||
eventBatchInterval: opts.EventBatchInterval,
|
||||
stopCh: stopCh,
|
||||
}
|
||||
|
||||
if len(b.url) == 0 {
|
||||
b.url = WebhookURL
|
||||
}
|
||||
|
||||
if b.maxBatchWait == 0 {
|
||||
b.maxBatchWait = DefaultBatchWait
|
||||
if b.eventBatchInterval == 0 {
|
||||
b.eventBatchInterval = DefaultBatchInterval
|
||||
}
|
||||
|
||||
if b.maxBatchSize == 0 {
|
||||
b.maxBatchSize = DefaultBatchSize
|
||||
if b.eventBatchSize == 0 {
|
||||
b.eventBatchSize = DefaultBatchSize
|
||||
}
|
||||
|
||||
goroutinesNum := opts.GoroutinesNum
|
||||
if goroutinesNum == 0 {
|
||||
goroutinesNum = DefaultGoroutinesNum
|
||||
sendersNum := opts.EventSendersNum
|
||||
if sendersNum == 0 {
|
||||
sendersNum = DefaultSendersNum
|
||||
}
|
||||
b.semCh = make(chan interface{}, goroutinesNum)
|
||||
b.senderCh = make(chan interface{}, sendersNum)
|
||||
|
||||
b.client = http.Client{
|
||||
Transport: &http.Transport{
|
||||
@@ -111,7 +111,7 @@ func (b *Backend) worker() {
|
||||
|
||||
func (b *Backend) getEvents() *v1alpha1.EventList {
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.maxBatchWait)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.eventBatchInterval)
|
||||
defer cancel()
|
||||
|
||||
events := &v1alpha1.EventList{}
|
||||
@@ -122,7 +122,7 @@ func (b *Backend) getEvents() *v1alpha1.EventList {
|
||||
break
|
||||
}
|
||||
events.Items = append(events.Items, *event)
|
||||
if len(events.Items) >= b.maxBatchSize {
|
||||
if len(events.Items) >= b.eventBatchSize {
|
||||
return events
|
||||
}
|
||||
case <-ctx.Done():
|
||||
@@ -141,14 +141,14 @@ func (b *Backend) sendEvents(events *v1alpha1.EventList) {
|
||||
stopCh := make(chan struct{})
|
||||
|
||||
send := func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.getSenderTimeout)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
klog.Error("get goroutine timeout")
|
||||
klog.Error("Get auditing event sender timeout")
|
||||
return
|
||||
case b.semCh <- struct{}{}:
|
||||
case b.senderCh <- struct{}{}:
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
@@ -159,7 +159,7 @@ func (b *Backend) sendEvents(events *v1alpha1.EventList) {
|
||||
|
||||
bs, err := b.eventToBytes(events)
|
||||
if err != nil {
|
||||
klog.V(6).Infof("json marshal error, %s", err)
|
||||
klog.Errorf("json marshal error, %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -180,7 +180,7 @@ func (b *Backend) sendEvents(events *v1alpha1.EventList) {
|
||||
go send()
|
||||
|
||||
defer func() {
|
||||
<-b.semCh
|
||||
<-b.senderCh
|
||||
}()
|
||||
|
||||
select {
|
||||
|
||||
@@ -229,7 +229,7 @@ func (a *auditing) cacheEvent(e auditv1alpha1.Event) {
|
||||
case a.cache <- &e:
|
||||
return
|
||||
case <-time.After(CacheTimeout):
|
||||
klog.Errorf("cache audit event %s timeout", e.AuditID)
|
||||
klog.V(8).Infof("cache audit event %s timeout", e.AuditID)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,15 +25,15 @@ import (
|
||||
type Options struct {
|
||||
Enable bool `json:"enable" yaml:"enable"`
|
||||
WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"`
|
||||
// The number of goroutines which send auditing events to webhook.
|
||||
GoroutinesNum int `json:"goroutinesNum" yaml:"goroutinesNum"`
|
||||
// The max size of the auditing event in a batch.
|
||||
MaxBatchSize int `json:"batchSize" yaml:"batchSize"`
|
||||
// MaxBatchWait indicates the maximum interval between two batches.
|
||||
MaxBatchWait time.Duration `json:"batchTimeout" yaml:"batchTimeout"`
|
||||
Host string `json:"host" yaml:"host"`
|
||||
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"`
|
||||
Version string `json:"version" yaml:"version"`
|
||||
// The maximum concurrent senders which send auditing events to the auditing webhook.
|
||||
EventSendersNum int `json:"eventSendersNum" yaml:"eventSendersNum"`
|
||||
// The batch size of auditing events.
|
||||
EventBatchSize int `json:"eventBatchSize" yaml:"eventBatchSize"`
|
||||
// The batch interval of auditing events.
|
||||
EventBatchInterval time.Duration `json:"eventBatchInterval" yaml:"eventBatchInterval"`
|
||||
Host string `json:"host" yaml:"host"`
|
||||
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"`
|
||||
Version string `json:"version" yaml:"version"`
|
||||
}
|
||||
|
||||
func NewElasticSearchOptions() *Options {
|
||||
@@ -59,12 +59,12 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
|
||||
fs.BoolVar(&s.Enable, "auditing-enabled", c.Enable, "Enable auditing component or not. ")
|
||||
|
||||
fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing wehook url")
|
||||
fs.IntVar(&s.GoroutinesNum, "auditing-goroutines-num", c.GoroutinesNum,
|
||||
"The number of goroutines which send auditing events to webhook.")
|
||||
fs.IntVar(&s.MaxBatchSize, "auditing-batch-max-size", c.MaxBatchSize,
|
||||
"The max size of the auditing event in a batch.")
|
||||
fs.DurationVar(&s.MaxBatchWait, "auditing-batch-max-wait", c.MaxBatchWait,
|
||||
"MaxBatchWait indicates the maximum interval between two batches.")
|
||||
fs.IntVar(&s.EventSendersNum, "auditing-event-senders-num", c.EventSendersNum,
|
||||
"The maximum concurrent senders which send auditing events to the auditing webhook.")
|
||||
fs.IntVar(&s.EventBatchSize, "auditing-event-batch-size", c.EventBatchSize,
|
||||
"The batch size of auditing events.")
|
||||
fs.DurationVar(&s.EventBatchInterval, "auditing-event-batch-interval", c.EventBatchInterval,
|
||||
"The batch interval of auditing events.")
|
||||
fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+
|
||||
"Elasticsearch service host. KubeSphere is using elastic as auditing store, "+
|
||||
"if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+
|
||||
|
||||
Reference in New Issue
Block a user