From 356560ac749297efc570b49352845413318dd3b4 Mon Sep 17 00:00:00 2001 From: wanjunlei Date: Thu, 11 Jun 2020 17:44:18 +0800 Subject: [PATCH] resolve conversation formater Signed-off-by: wanjunlei debug debug Signed-off-by: wanjunlei --- cmd/ks-apiserver/app/server.go | 2 +- pkg/apis/auditing/v1alpha1/webhook_types.go | 2 +- pkg/apiserver/apiserver.go | 12 ++++--- pkg/apiserver/auditing/backend.go | 34 +++++++------------ pkg/apiserver/auditing/types.go | 24 ++++++++----- pkg/apiserver/auditing/types_test.go | 20 +++++------ pkg/apiserver/filters/auditing.go | 26 +++++--------- .../client/auditing/elasticsearch/options.go | 8 +++-- 8 files changed, 61 insertions(+), 67 deletions(-) diff --git a/cmd/ks-apiserver/app/server.go b/cmd/ks-apiserver/app/server.go index 09c4fa7db..0cbc44671 100644 --- a/cmd/ks-apiserver/app/server.go +++ b/cmd/ks-apiserver/app/server.go @@ -80,7 +80,7 @@ func Run(s *options.ServerRunOptions, stopCh <-chan struct{}) error { return err } - err = apiserver.PrepareRun() + err = apiserver.PrepareRun(stopCh) if err != nil { return nil } diff --git a/pkg/apis/auditing/v1alpha1/webhook_types.go b/pkg/apis/auditing/v1alpha1/webhook_types.go index a4ac92fb7..b579b5fe9 100644 --- a/pkg/apis/auditing/v1alpha1/webhook_types.go +++ b/pkg/apis/auditing/v1alpha1/webhook_types.go @@ -118,7 +118,7 @@ type WebhookSpec struct { // +optional AuditLevel v1alpha1.Level `json:"auditLevel" protobuf:"bytes,1,opt,name=auditLevel"` // K8s auditing is enabled or not. - K8sAuditingEnable bool `json:"k8sAuditingEnable,omitempty" protobuf:"bytes,8,opt,name=priority"` + K8sAuditingEnabled bool `json:"k8sAuditingEnabled,omitempty" protobuf:"bytes,8,opt,name=priority"` } // WebhookStatus defines the observed state of Webhook diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 0afd5ec74..d3cd54b86 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -141,7 +141,7 @@ type APIServer struct { AuditingClient auditing.Client } -func (s *APIServer) PrepareRun() error { +func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error { s.container = restful.NewContainer() s.container.Filter(logRequestAndResponse) @@ -158,7 +158,7 @@ func (s *APIServer) PrepareRun() error { s.Server.Handler = s.container - s.buildHandlerChain() + s.buildHandlerChain(stopCh) return nil } @@ -235,7 +235,7 @@ func (s *APIServer) Run(stopCh <-chan struct{}) (err error) { return err } -func (s *APIServer) buildHandlerChain() { +func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) { requestInfoResolver := &request.RequestInfoFactory{ APIPrefixes: sets.NewString("api", "apis", "kapis", "kapi"), GrouplessAPIPrefixes: sets.NewString("api", "kapi"), @@ -244,8 +244,10 @@ func (s *APIServer) buildHandlerChain() { handler := s.Server.Handler handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{}) - if s.Config.AuditingOptions.Enabled { - handler = filters.WithAuditing(handler, audit.NewAuditing(s.InformerFactory.KubeSphereSharedInformerFactory().Auditing().V1alpha1().Webhooks().Lister())) + if s.Config.AuditingOptions.Enable { + handler = filters.WithAuditing(handler, + audit.NewAuditing(s.InformerFactory.KubeSphereSharedInformerFactory().Auditing().V1alpha1().Webhooks().Lister(), + s.Config.AuditingOptions.WebhookUrl, stopCh)) } if s.Config.MultiClusterOptions.Enable { diff --git a/pkg/apiserver/auditing/backend.go b/pkg/apiserver/auditing/backend.go index 558f6f395..87287070f 100644 --- a/pkg/apiserver/auditing/backend.go +++ b/pkg/apiserver/auditing/backend.go @@ -7,33 +7,39 @@ import ( "encoding/json" "k8s.io/klog" "net/http" - "os" - "os/signal" "time" ) const ( WaitTimeout = time.Second - WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443" + WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event" ) type Backend struct { + url string channelCapacity int semCh chan interface{} cache chan *EventList client http.Client sendTimeout time.Duration waitTimeout time.Duration + stopCh <-chan struct{} } -func NewBackend(channelCapacity int, cache chan *EventList, sendTimeout time.Duration) *Backend { +func NewBackend(url string, channelCapacity int, cache chan *EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend { b := Backend{ + url: url, semCh: make(chan interface{}, channelCapacity), channelCapacity: channelCapacity, waitTimeout: WaitTimeout, cache: cache, sendTimeout: sendTimeout, + stopCh: stopCh, + } + + if len(b.url) == 0 { + b.url = WebhookURL } b.client = http.Client{ @@ -52,9 +58,6 @@ func NewBackend(channelCapacity int, cache chan *EventList, sendTimeout time.Dur func (b *Backend) worker() { - // Stop when receiver signal Interrupt. - stopCh := b.SetupSignalHandler() - for { var event *EventList @@ -63,7 +66,7 @@ func (b *Backend) worker() { if event == nil { break } - case <-stopCh: + case <-b.stopCh: break } @@ -88,7 +91,7 @@ func (b *Backend) worker() { return } - response, err := b.client.Post(WebhookURL, "application/json", bytes.NewBuffer(bs)) + response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs)) if err != nil { klog.Errorf("send audit event[%s] error, %s", event.Items[0].AuditID, err) return @@ -103,16 +106,3 @@ func (b *Backend) worker() { go send(event) } } - -func (b *Backend) SetupSignalHandler() (stopCh <-chan struct{}) { - - stop := make(chan struct{}) - c := make(chan os.Signal, 2) - signal.Notify(c, []os.Signal{os.Interrupt}...) - go func() { - <-c - close(stop) - }() - - return stop -} diff --git a/pkg/apiserver/auditing/types.go b/pkg/apiserver/auditing/types.go index 374f8277c..7c5cecd1e 100644 --- a/pkg/apiserver/auditing/types.go +++ b/pkg/apiserver/auditing/types.go @@ -2,6 +2,7 @@ package auditing import ( "bytes" + "encoding/json" "github.com/google/uuid" "io/ioutil" "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -25,8 +26,8 @@ const ( ) type Auditing interface { - Enable() bool - K8sAuditingEnable() bool + Enabled() bool + K8sAuditingEnabled() bool LogRequestObject(req *http.Request) *Event LogResponseObject(e *Event, resp *ResponseCapture, info *request.RequestInfo) } @@ -34,7 +35,7 @@ type Auditing interface { type Event struct { //The workspace which this audit event happened Workspace string - //The devops project which this audit event happened + //The cluster which this audit event happened Cluster string audit.Event @@ -50,14 +51,14 @@ type auditing struct { backend *Backend } -func NewAuditing(lister v1alpha1.WebhookLister) Auditing { +func NewAuditing(lister v1alpha1.WebhookLister, url string, stopCh <-chan struct{}) Auditing { a := &auditing{ lister: lister, cache: make(chan *EventList, DefaultCacheCapacity), } - a.backend = NewBackend(ChannelCapacity, a.cache, SendTimeout) + a.backend = NewBackend(url, ChannelCapacity, a.cache, SendTimeout, stopCh) return a } @@ -71,7 +72,7 @@ func (a *auditing) getAuditLevel() audit.Level { return (audit.Level)(wh.Spec.AuditLevel) } -func (a *auditing) Enable() bool { +func (a *auditing) Enabled() bool { level := a.getAuditLevel() if level.Less(audit.LevelMetadata) { @@ -80,14 +81,14 @@ func (a *auditing) Enable() bool { return true } -func (a *auditing) K8sAuditingEnable() bool { +func (a *auditing) K8sAuditingEnabled() bool { wh, err := a.lister.Get(DefaultWebhook) if err != nil { klog.Error(err) return false } - return wh.Spec.K8sAuditingEnable + return wh.Spec.K8sAuditingEnabled } func (a *auditing) LogRequestObject(req *http.Request) *Event { @@ -135,7 +136,7 @@ func (a *auditing) LogRequestObject(req *http.Request) *Event { func (a *auditing) LogResponseObject(e *Event, resp *ResponseCapture, info *request.RequestInfo) { // Auditing should igonre k8s request when k8s auditing is enabled. - if info.IsKubernetesRequest && a.K8sAuditingEnable() { + if info.IsKubernetesRequest && a.K8sAuditingEnabled() { return } @@ -176,6 +177,11 @@ func (a *auditing) LogResponseObject(e *Event, resp *ResponseCapture, info *requ } func (a *auditing) cacheEvent(e Event) { + if klog.V(8) { + bs, _ := json.Marshal(e) + klog.Infof("%s", string(bs)) + } + eventList := &EventList{} eventList.Items = append(eventList.Items, e) select { diff --git a/pkg/apiserver/auditing/types_test.go b/pkg/apiserver/auditing/types_test.go index e9d7ad888..df9d02c17 100644 --- a/pkg/apiserver/auditing/types_test.go +++ b/pkg/apiserver/auditing/types_test.go @@ -49,7 +49,7 @@ func TestGetAuditLevel(t *testing.T) { assert.Equal(t, string(webhook.Spec.AuditLevel), string(a.getAuditLevel())) } -func TestAuditing_Enable(t *testing.T) { +func TestAuditing_Enabled(t *testing.T) { webhook := &auditingv1alpha1.Webhook{ TypeMeta: metav1.TypeMeta{ APIVersion: auditingv1alpha1.SchemeGroupVersion.String(), @@ -73,10 +73,10 @@ func TestAuditing_Enable(t *testing.T) { panic(err) } - assert.Equal(t, false, a.Enable()) + assert.Equal(t, false, a.Enabled()) } -func TestAuditing_K8sAuditingEnable(t *testing.T) { +func TestAuditing_K8sAuditingEnabled(t *testing.T) { webhook := &auditingv1alpha1.Webhook{ TypeMeta: metav1.TypeMeta{ APIVersion: auditingv1alpha1.SchemeGroupVersion.String(), @@ -85,8 +85,8 @@ func TestAuditing_K8sAuditingEnable(t *testing.T) { Name: "kube-auditing-webhook", }, Spec: auditingv1alpha1.WebhookSpec{ - AuditLevel: v1alpha1.LevelNone, - K8sAuditingEnable: true, + AuditLevel: v1alpha1.LevelNone, + K8sAuditingEnabled: true, }, } @@ -101,7 +101,7 @@ func TestAuditing_K8sAuditingEnable(t *testing.T) { panic(err) } - assert.Equal(t, true, a.K8sAuditingEnable()) + assert.Equal(t, true, a.K8sAuditingEnabled()) } func TestAuditing_LogRequestObject(t *testing.T) { @@ -113,8 +113,8 @@ func TestAuditing_LogRequestObject(t *testing.T) { Name: "kube-auditing-webhook", }, Spec: auditingv1alpha1.WebhookSpec{ - AuditLevel: v1alpha1.LevelRequestResponse, - K8sAuditingEnable: true, + AuditLevel: v1alpha1.LevelRequestResponse, + K8sAuditingEnabled: true, }, } @@ -178,8 +178,8 @@ func TestAuditing_LogResponseObject(t *testing.T) { Name: "kube-auditing-webhook", }, Spec: auditingv1alpha1.WebhookSpec{ - AuditLevel: v1alpha1.LevelMetadata, - K8sAuditingEnable: true, + AuditLevel: v1alpha1.LevelMetadata, + K8sAuditingEnabled: true, }, } diff --git a/pkg/apiserver/filters/auditing.go b/pkg/apiserver/filters/auditing.go index e57bd5f09..28b46ef81 100644 --- a/pkg/apiserver/filters/auditing.go +++ b/pkg/apiserver/filters/auditing.go @@ -12,31 +12,23 @@ func WithAuditing(handler http.Handler, a auditing.Auditing) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { // When auditing level is LevelNone, request should not be auditing. - if !a.Enable() { + // Auditing level can be modified with cr kube-auditing-webhook, + // so it need to judge every time. + if !a.Enabled() { handler.ServeHTTP(w, req) return } e := a.LogRequestObject(req) resp := auditing.NewResponseCapture(w) + handler.ServeHTTP(resp, req) - // Create a new goroutine to finish the request, and wait for the response body. - // The advantage of using goroutine is that recording the return value of the - // request will not affect the processing of the request, even if the auditing fails. - go handler.ServeHTTP(resp, req) - - select { - case <-req.Context().Done(): - klog.Error("Server timeout") - return - case <-resp.StopCh: - info, ok := request.RequestInfoFrom(req.Context()) - if !ok { - klog.Error("Unable to retrieve request info from request") - return - } - a.LogResponseObject(e, resp, info) + info, ok := request.RequestInfoFrom(req.Context()) + if !ok { + klog.Error("Unable to retrieve request info from request") return } + + go a.LogResponseObject(e, resp, info) }) } diff --git a/pkg/simple/client/auditing/elasticsearch/options.go b/pkg/simple/client/auditing/elasticsearch/options.go index b1be3a714..9da5ac996 100644 --- a/pkg/simple/client/auditing/elasticsearch/options.go +++ b/pkg/simple/client/auditing/elasticsearch/options.go @@ -22,7 +22,8 @@ import ( ) type Options struct { - Enabled bool `json:"enabled" yaml:"enabled"` + Enable bool `json:"enable" yaml:"enable"` + WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"` Host string `json:"host" yaml:"host"` IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"` Version string `json:"version" yaml:"version"` @@ -48,7 +49,10 @@ func (s *Options) Validate() []error { } func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) { - fs.BoolVar(&s.Enabled, "auditing-enabled", c.Enabled, "Enable auditing component or not. ") + fs.BoolVar(&s.Enable, "auditing-enabled", c.Enable, "Enable auditing component or not. ") + + fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing wehook url") + fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+ "Elasticsearch service host. KubeSphere is using elastic as auditing store, "+ "if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+