Merge pull request #3201 from wanjunlei/auditing-log2
fix bug get goroutine for audit timeout
This commit is contained in:
@@ -281,7 +281,7 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
|
||||
|
||||
if s.Config.AuditingOptions.Enable {
|
||||
handler = filters.WithAuditing(handler,
|
||||
audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions.WebhookUrl, stopCh))
|
||||
audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions, stopCh))
|
||||
}
|
||||
|
||||
var authorizers authorizer.Authorizer
|
||||
|
||||
@@ -23,42 +23,62 @@ import (
|
||||
"encoding/json"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
|
||||
options "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
WaitTimeout = time.Second
|
||||
WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event"
|
||||
GetSenderTimeout = time.Second
|
||||
SendTimeout = time.Second * 3
|
||||
DefaultSendersNum = 100
|
||||
DefaultBatchSize = 100
|
||||
DefaultBatchInterval = time.Second * 3
|
||||
WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event"
|
||||
)
|
||||
|
||||
type Backend struct {
|
||||
url string
|
||||
channelCapacity int
|
||||
semCh chan interface{}
|
||||
cache chan *v1alpha1.EventList
|
||||
client http.Client
|
||||
sendTimeout time.Duration
|
||||
waitTimeout time.Duration
|
||||
stopCh <-chan struct{}
|
||||
url string
|
||||
senderCh chan interface{}
|
||||
cache chan *v1alpha1.Event
|
||||
client http.Client
|
||||
sendTimeout time.Duration
|
||||
getSenderTimeout time.Duration
|
||||
eventBatchSize int
|
||||
eventBatchInterval time.Duration
|
||||
stopCh <-chan struct{}
|
||||
}
|
||||
|
||||
func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend {
|
||||
func NewBackend(opts *options.Options, cache chan *v1alpha1.Event, stopCh <-chan struct{}) *Backend {
|
||||
|
||||
b := Backend{
|
||||
url: url,
|
||||
semCh: make(chan interface{}, channelCapacity),
|
||||
channelCapacity: channelCapacity,
|
||||
waitTimeout: WaitTimeout,
|
||||
cache: cache,
|
||||
sendTimeout: sendTimeout,
|
||||
stopCh: stopCh,
|
||||
url: opts.WebhookUrl,
|
||||
getSenderTimeout: GetSenderTimeout,
|
||||
cache: cache,
|
||||
sendTimeout: SendTimeout,
|
||||
eventBatchSize: opts.EventBatchSize,
|
||||
eventBatchInterval: opts.EventBatchInterval,
|
||||
stopCh: stopCh,
|
||||
}
|
||||
|
||||
if len(b.url) == 0 {
|
||||
b.url = WebhookURL
|
||||
}
|
||||
|
||||
if b.eventBatchInterval == 0 {
|
||||
b.eventBatchInterval = DefaultBatchInterval
|
||||
}
|
||||
|
||||
if b.eventBatchSize == 0 {
|
||||
b.eventBatchSize = DefaultBatchSize
|
||||
}
|
||||
|
||||
sendersNum := opts.EventSendersNum
|
||||
if sendersNum == 0 {
|
||||
sendersNum = DefaultSendersNum
|
||||
}
|
||||
b.senderCh = make(chan interface{}, sendersNum)
|
||||
|
||||
b.client = http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
@@ -76,53 +96,97 @@ func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList,
|
||||
func (b *Backend) worker() {
|
||||
|
||||
for {
|
||||
|
||||
var event *v1alpha1.EventList
|
||||
select {
|
||||
case event = <-b.cache:
|
||||
if event == nil {
|
||||
break
|
||||
}
|
||||
case <-b.stopCh:
|
||||
events := b.getEvents()
|
||||
if events == nil {
|
||||
break
|
||||
}
|
||||
|
||||
send := func(event *v1alpha1.EventList) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
klog.Errorf("get goroutine for audit(%s) timeout", event.Items[0].AuditID)
|
||||
return
|
||||
case b.semCh <- struct{}{}:
|
||||
}
|
||||
|
||||
defer func() {
|
||||
<-b.semCh
|
||||
}()
|
||||
|
||||
bs, err := b.eventToBytes(event)
|
||||
if err != nil {
|
||||
klog.V(6).Infof("json marshal error, %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(8).Infof("%s", string(bs))
|
||||
|
||||
response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs))
|
||||
if err != nil {
|
||||
klog.Errorf("send audit event[%s] error, %s", event.Items[0].AuditID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if response.StatusCode != http.StatusOK {
|
||||
klog.Errorf("send audit event[%s] error[%d]", event.Items[0].AuditID, response.StatusCode)
|
||||
return
|
||||
}
|
||||
if len(events.Items) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
go send(event)
|
||||
go b.sendEvents(events)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Backend) getEvents() *v1alpha1.EventList {
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.eventBatchInterval)
|
||||
defer cancel()
|
||||
|
||||
events := &v1alpha1.EventList{}
|
||||
for {
|
||||
select {
|
||||
case event := <-b.cache:
|
||||
if event == nil {
|
||||
break
|
||||
}
|
||||
events.Items = append(events.Items, *event)
|
||||
if len(events.Items) >= b.eventBatchSize {
|
||||
return events
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return events
|
||||
case <-b.stopCh:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Backend) sendEvents(events *v1alpha1.EventList) {
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.sendTimeout)
|
||||
defer cancel()
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
|
||||
send := func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.getSenderTimeout)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
klog.Error("Get auditing event sender timeout")
|
||||
return
|
||||
case b.senderCh <- struct{}{}:
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
stopCh <- struct{}{}
|
||||
klog.V(8).Infof("send %d auditing logs used %d", len(events.Items), time.Now().Sub(start).Milliseconds())
|
||||
}()
|
||||
|
||||
bs, err := b.eventToBytes(events)
|
||||
if err != nil {
|
||||
klog.Errorf("json marshal error, %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(8).Infof("%s", string(bs))
|
||||
|
||||
response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs))
|
||||
if err != nil {
|
||||
klog.Errorf("send audit events error, %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if response.StatusCode != http.StatusOK {
|
||||
klog.Errorf("send audit events error[%d]", response.StatusCode)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
go send()
|
||||
|
||||
defer func() {
|
||||
<-b.senderCh
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
klog.Error("send audit events timeout")
|
||||
case <-stopCh:
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3"
|
||||
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/devops"
|
||||
options "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch"
|
||||
"kubesphere.io/kubesphere/pkg/utils/iputil"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -46,8 +47,6 @@ const (
|
||||
DefaultWebhook = "kube-auditing-webhook"
|
||||
DefaultCacheCapacity = 10000
|
||||
CacheTimeout = time.Second
|
||||
SendTimeout = time.Second * 3
|
||||
ChannelCapacity = 10
|
||||
)
|
||||
|
||||
type Auditing interface {
|
||||
@@ -60,19 +59,19 @@ type Auditing interface {
|
||||
type auditing struct {
|
||||
webhookLister v1alpha1.WebhookLister
|
||||
devopsGetter v1alpha3.Interface
|
||||
cache chan *auditv1alpha1.EventList
|
||||
cache chan *auditv1alpha1.Event
|
||||
backend *Backend
|
||||
}
|
||||
|
||||
func NewAuditing(informers informers.InformerFactory, url string, stopCh <-chan struct{}) Auditing {
|
||||
func NewAuditing(informers informers.InformerFactory, opts *options.Options, stopCh <-chan struct{}) Auditing {
|
||||
|
||||
a := &auditing{
|
||||
webhookLister: informers.KubeSphereSharedInformerFactory().Auditing().V1alpha1().Webhooks().Lister(),
|
||||
devopsGetter: devops.New(informers.KubeSphereSharedInformerFactory()),
|
||||
cache: make(chan *auditv1alpha1.EventList, DefaultCacheCapacity),
|
||||
cache: make(chan *auditv1alpha1.Event, DefaultCacheCapacity),
|
||||
}
|
||||
|
||||
a.backend = NewBackend(url, ChannelCapacity, a.cache, SendTimeout, stopCh)
|
||||
a.backend = NewBackend(opts, a.cache, stopCh)
|
||||
return a
|
||||
}
|
||||
|
||||
@@ -226,13 +225,11 @@ func (a *auditing) LogResponseObject(e *auditv1alpha1.Event, resp *ResponseCaptu
|
||||
|
||||
func (a *auditing) cacheEvent(e auditv1alpha1.Event) {
|
||||
|
||||
eventList := &auditv1alpha1.EventList{}
|
||||
eventList.Items = append(eventList.Items, e)
|
||||
select {
|
||||
case a.cache <- eventList:
|
||||
case a.cache <- &e:
|
||||
return
|
||||
case <-time.After(CacheTimeout):
|
||||
klog.Errorf("cache audit event %s timeout", e.AuditID)
|
||||
klog.V(8).Infof("cache audit event %s timeout", e.AuditID)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,14 +19,21 @@ package elasticsearch
|
||||
import (
|
||||
"github.com/spf13/pflag"
|
||||
"kubesphere.io/kubesphere/pkg/utils/reflectutils"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Enable bool `json:"enable" yaml:"enable"`
|
||||
WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"`
|
||||
Host string `json:"host" yaml:"host"`
|
||||
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"`
|
||||
Version string `json:"version" yaml:"version"`
|
||||
Enable bool `json:"enable" yaml:"enable"`
|
||||
WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"`
|
||||
// The maximum concurrent senders which send auditing events to the auditing webhook.
|
||||
EventSendersNum int `json:"eventSendersNum" yaml:"eventSendersNum"`
|
||||
// The batch size of auditing events.
|
||||
EventBatchSize int `json:"eventBatchSize" yaml:"eventBatchSize"`
|
||||
// The batch interval of auditing events.
|
||||
EventBatchInterval time.Duration `json:"eventBatchInterval" yaml:"eventBatchInterval"`
|
||||
Host string `json:"host" yaml:"host"`
|
||||
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"`
|
||||
Version string `json:"version" yaml:"version"`
|
||||
}
|
||||
|
||||
func NewElasticSearchOptions() *Options {
|
||||
@@ -52,7 +59,12 @@ func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
|
||||
fs.BoolVar(&s.Enable, "auditing-enabled", c.Enable, "Enable auditing component or not. ")
|
||||
|
||||
fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing wehook url")
|
||||
|
||||
fs.IntVar(&s.EventSendersNum, "auditing-event-senders-num", c.EventSendersNum,
|
||||
"The maximum concurrent senders which send auditing events to the auditing webhook.")
|
||||
fs.IntVar(&s.EventBatchSize, "auditing-event-batch-size", c.EventBatchSize,
|
||||
"The batch size of auditing events.")
|
||||
fs.DurationVar(&s.EventBatchInterval, "auditing-event-batch-interval", c.EventBatchInterval,
|
||||
"The batch interval of auditing events.")
|
||||
fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+
|
||||
"Elasticsearch service host. KubeSphere is using elastic as auditing store, "+
|
||||
"if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+
|
||||
|
||||
Reference in New Issue
Block a user