resolve conversation

formater

Signed-off-by: wanjunlei <wanjunlei@yunify.com>

debug

debug

Signed-off-by: wanjunlei <wanjunlei@yunify.com>
This commit is contained in:
wanjunlei
2020-06-11 17:44:18 +08:00
parent 4cb84de44d
commit 356560ac74
8 changed files with 61 additions and 67 deletions

View File

@@ -80,7 +80,7 @@ func Run(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
return err
}
err = apiserver.PrepareRun()
err = apiserver.PrepareRun(stopCh)
if err != nil {
return nil
}

View File

@@ -118,7 +118,7 @@ type WebhookSpec struct {
// +optional
AuditLevel v1alpha1.Level `json:"auditLevel" protobuf:"bytes,1,opt,name=auditLevel"`
// K8s auditing is enabled or not.
K8sAuditingEnable bool `json:"k8sAuditingEnable,omitempty" protobuf:"bytes,8,opt,name=priority"`
K8sAuditingEnabled bool `json:"k8sAuditingEnabled,omitempty" protobuf:"bytes,8,opt,name=priority"`
}
// WebhookStatus defines the observed state of Webhook

View File

@@ -141,7 +141,7 @@ type APIServer struct {
AuditingClient auditing.Client
}
func (s *APIServer) PrepareRun() error {
func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
s.container = restful.NewContainer()
s.container.Filter(logRequestAndResponse)
@@ -158,7 +158,7 @@ func (s *APIServer) PrepareRun() error {
s.Server.Handler = s.container
s.buildHandlerChain()
s.buildHandlerChain(stopCh)
return nil
}
@@ -235,7 +235,7 @@ func (s *APIServer) Run(stopCh <-chan struct{}) (err error) {
return err
}
func (s *APIServer) buildHandlerChain() {
func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
requestInfoResolver := &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis", "kapis", "kapi"),
GrouplessAPIPrefixes: sets.NewString("api", "kapi"),
@@ -244,8 +244,10 @@ func (s *APIServer) buildHandlerChain() {
handler := s.Server.Handler
handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})
if s.Config.AuditingOptions.Enabled {
handler = filters.WithAuditing(handler, audit.NewAuditing(s.InformerFactory.KubeSphereSharedInformerFactory().Auditing().V1alpha1().Webhooks().Lister()))
if s.Config.AuditingOptions.Enable {
handler = filters.WithAuditing(handler,
audit.NewAuditing(s.InformerFactory.KubeSphereSharedInformerFactory().Auditing().V1alpha1().Webhooks().Lister(),
s.Config.AuditingOptions.WebhookUrl, stopCh))
}
if s.Config.MultiClusterOptions.Enable {

View File

@@ -7,33 +7,39 @@ import (
"encoding/json"
"k8s.io/klog"
"net/http"
"os"
"os/signal"
"time"
)
const (
WaitTimeout = time.Second
WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443"
WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event"
)
type Backend struct {
url string
channelCapacity int
semCh chan interface{}
cache chan *EventList
client http.Client
sendTimeout time.Duration
waitTimeout time.Duration
stopCh <-chan struct{}
}
func NewBackend(channelCapacity int, cache chan *EventList, sendTimeout time.Duration) *Backend {
func NewBackend(url string, channelCapacity int, cache chan *EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend {
b := Backend{
url: url,
semCh: make(chan interface{}, channelCapacity),
channelCapacity: channelCapacity,
waitTimeout: WaitTimeout,
cache: cache,
sendTimeout: sendTimeout,
stopCh: stopCh,
}
if len(b.url) == 0 {
b.url = WebhookURL
}
b.client = http.Client{
@@ -52,9 +58,6 @@ func NewBackend(channelCapacity int, cache chan *EventList, sendTimeout time.Dur
func (b *Backend) worker() {
// Stop when receiver signal Interrupt.
stopCh := b.SetupSignalHandler()
for {
var event *EventList
@@ -63,7 +66,7 @@ func (b *Backend) worker() {
if event == nil {
break
}
case <-stopCh:
case <-b.stopCh:
break
}
@@ -88,7 +91,7 @@ func (b *Backend) worker() {
return
}
response, err := b.client.Post(WebhookURL, "application/json", bytes.NewBuffer(bs))
response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs))
if err != nil {
klog.Errorf("send audit event[%s] error, %s", event.Items[0].AuditID, err)
return
@@ -103,16 +106,3 @@ func (b *Backend) worker() {
go send(event)
}
}
func (b *Backend) SetupSignalHandler() (stopCh <-chan struct{}) {
stop := make(chan struct{})
c := make(chan os.Signal, 2)
signal.Notify(c, []os.Signal{os.Interrupt}...)
go func() {
<-c
close(stop)
}()
return stop
}

View File

@@ -2,6 +2,7 @@ package auditing
import (
"bytes"
"encoding/json"
"github.com/google/uuid"
"io/ioutil"
"k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -25,8 +26,8 @@ const (
)
type Auditing interface {
Enable() bool
K8sAuditingEnable() bool
Enabled() bool
K8sAuditingEnabled() bool
LogRequestObject(req *http.Request) *Event
LogResponseObject(e *Event, resp *ResponseCapture, info *request.RequestInfo)
}
@@ -34,7 +35,7 @@ type Auditing interface {
type Event struct {
//The workspace which this audit event happened
Workspace string
//The devops project which this audit event happened
//The cluster which this audit event happened
Cluster string
audit.Event
@@ -50,14 +51,14 @@ type auditing struct {
backend *Backend
}
func NewAuditing(lister v1alpha1.WebhookLister) Auditing {
func NewAuditing(lister v1alpha1.WebhookLister, url string, stopCh <-chan struct{}) Auditing {
a := &auditing{
lister: lister,
cache: make(chan *EventList, DefaultCacheCapacity),
}
a.backend = NewBackend(ChannelCapacity, a.cache, SendTimeout)
a.backend = NewBackend(url, ChannelCapacity, a.cache, SendTimeout, stopCh)
return a
}
@@ -71,7 +72,7 @@ func (a *auditing) getAuditLevel() audit.Level {
return (audit.Level)(wh.Spec.AuditLevel)
}
func (a *auditing) Enable() bool {
func (a *auditing) Enabled() bool {
level := a.getAuditLevel()
if level.Less(audit.LevelMetadata) {
@@ -80,14 +81,14 @@ func (a *auditing) Enable() bool {
return true
}
func (a *auditing) K8sAuditingEnable() bool {
func (a *auditing) K8sAuditingEnabled() bool {
wh, err := a.lister.Get(DefaultWebhook)
if err != nil {
klog.Error(err)
return false
}
return wh.Spec.K8sAuditingEnable
return wh.Spec.K8sAuditingEnabled
}
func (a *auditing) LogRequestObject(req *http.Request) *Event {
@@ -135,7 +136,7 @@ func (a *auditing) LogRequestObject(req *http.Request) *Event {
func (a *auditing) LogResponseObject(e *Event, resp *ResponseCapture, info *request.RequestInfo) {
// Auditing should igonre k8s request when k8s auditing is enabled.
if info.IsKubernetesRequest && a.K8sAuditingEnable() {
if info.IsKubernetesRequest && a.K8sAuditingEnabled() {
return
}
@@ -176,6 +177,11 @@ func (a *auditing) LogResponseObject(e *Event, resp *ResponseCapture, info *requ
}
func (a *auditing) cacheEvent(e Event) {
if klog.V(8) {
bs, _ := json.Marshal(e)
klog.Infof("%s", string(bs))
}
eventList := &EventList{}
eventList.Items = append(eventList.Items, e)
select {

View File

@@ -49,7 +49,7 @@ func TestGetAuditLevel(t *testing.T) {
assert.Equal(t, string(webhook.Spec.AuditLevel), string(a.getAuditLevel()))
}
func TestAuditing_Enable(t *testing.T) {
func TestAuditing_Enabled(t *testing.T) {
webhook := &auditingv1alpha1.Webhook{
TypeMeta: metav1.TypeMeta{
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
@@ -73,10 +73,10 @@ func TestAuditing_Enable(t *testing.T) {
panic(err)
}
assert.Equal(t, false, a.Enable())
assert.Equal(t, false, a.Enabled())
}
func TestAuditing_K8sAuditingEnable(t *testing.T) {
func TestAuditing_K8sAuditingEnabled(t *testing.T) {
webhook := &auditingv1alpha1.Webhook{
TypeMeta: metav1.TypeMeta{
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
@@ -85,8 +85,8 @@ func TestAuditing_K8sAuditingEnable(t *testing.T) {
Name: "kube-auditing-webhook",
},
Spec: auditingv1alpha1.WebhookSpec{
AuditLevel: v1alpha1.LevelNone,
K8sAuditingEnable: true,
AuditLevel: v1alpha1.LevelNone,
K8sAuditingEnabled: true,
},
}
@@ -101,7 +101,7 @@ func TestAuditing_K8sAuditingEnable(t *testing.T) {
panic(err)
}
assert.Equal(t, true, a.K8sAuditingEnable())
assert.Equal(t, true, a.K8sAuditingEnabled())
}
func TestAuditing_LogRequestObject(t *testing.T) {
@@ -113,8 +113,8 @@ func TestAuditing_LogRequestObject(t *testing.T) {
Name: "kube-auditing-webhook",
},
Spec: auditingv1alpha1.WebhookSpec{
AuditLevel: v1alpha1.LevelRequestResponse,
K8sAuditingEnable: true,
AuditLevel: v1alpha1.LevelRequestResponse,
K8sAuditingEnabled: true,
},
}
@@ -178,8 +178,8 @@ func TestAuditing_LogResponseObject(t *testing.T) {
Name: "kube-auditing-webhook",
},
Spec: auditingv1alpha1.WebhookSpec{
AuditLevel: v1alpha1.LevelMetadata,
K8sAuditingEnable: true,
AuditLevel: v1alpha1.LevelMetadata,
K8sAuditingEnabled: true,
},
}

View File

@@ -12,31 +12,23 @@ func WithAuditing(handler http.Handler, a auditing.Auditing) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// When auditing level is LevelNone, request should not be auditing.
if !a.Enable() {
// Auditing level can be modified with cr kube-auditing-webhook,
// so it need to judge every time.
if !a.Enabled() {
handler.ServeHTTP(w, req)
return
}
e := a.LogRequestObject(req)
resp := auditing.NewResponseCapture(w)
handler.ServeHTTP(resp, req)
// Create a new goroutine to finish the request, and wait for the response body.
// The advantage of using goroutine is that recording the return value of the
// request will not affect the processing of the request, even if the auditing fails.
go handler.ServeHTTP(resp, req)
select {
case <-req.Context().Done():
klog.Error("Server timeout")
return
case <-resp.StopCh:
info, ok := request.RequestInfoFrom(req.Context())
if !ok {
klog.Error("Unable to retrieve request info from request")
return
}
a.LogResponseObject(e, resp, info)
info, ok := request.RequestInfoFrom(req.Context())
if !ok {
klog.Error("Unable to retrieve request info from request")
return
}
go a.LogResponseObject(e, resp, info)
})
}

View File

@@ -22,7 +22,8 @@ import (
)
type Options struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Enable bool `json:"enable" yaml:"enable"`
WebhookUrl string `json:"webhookUrl" yaml:"webhookUrl"`
Host string `json:"host" yaml:"host"`
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"`
Version string `json:"version" yaml:"version"`
@@ -48,7 +49,10 @@ func (s *Options) Validate() []error {
}
func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
fs.BoolVar(&s.Enabled, "auditing-enabled", c.Enabled, "Enable auditing component or not. ")
fs.BoolVar(&s.Enable, "auditing-enabled", c.Enable, "Enable auditing component or not. ")
fs.StringVar(&s.WebhookUrl, "auditing-webhook-url", c.WebhookUrl, "Auditing wehook url")
fs.StringVar(&s.Host, "auditing-elasticsearch-host", c.Host, ""+
"Elasticsearch service host. KubeSphere is using elastic as auditing store, "+
"if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+