Add two elements Message and Devops into Event struct.
Pass the event object instead of RequestInfo by request context to request handler.
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
"kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -19,14 +20,14 @@ type Backend struct {
|
|||||||
url string
|
url string
|
||||||
channelCapacity int
|
channelCapacity int
|
||||||
semCh chan interface{}
|
semCh chan interface{}
|
||||||
cache chan *EventList
|
cache chan *v1alpha1.EventList
|
||||||
client http.Client
|
client http.Client
|
||||||
sendTimeout time.Duration
|
sendTimeout time.Duration
|
||||||
waitTimeout time.Duration
|
waitTimeout time.Duration
|
||||||
stopCh <-chan struct{}
|
stopCh <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBackend(url string, channelCapacity int, cache chan *EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend {
|
func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend {
|
||||||
|
|
||||||
b := Backend{
|
b := Backend{
|
||||||
url: url,
|
url: url,
|
||||||
@@ -60,7 +61,7 @@ func (b *Backend) worker() {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
||||||
var event *EventList
|
var event *v1alpha1.EventList
|
||||||
select {
|
select {
|
||||||
case event = <-b.cache:
|
case event = <-b.cache:
|
||||||
if event == nil {
|
if event == nil {
|
||||||
@@ -70,7 +71,7 @@ func (b *Backend) worker() {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
send := func(event *EventList) {
|
send := func(event *v1alpha1.EventList) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apiserver/pkg/apis/audit"
|
"k8s.io/apiserver/pkg/apis/audit"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
auditv1alpha1 "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
|
||||||
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
||||||
"kubesphere.io/kubesphere/pkg/client/listers/auditing/v1alpha1"
|
"kubesphere.io/kubesphere/pkg/client/listers/auditing/v1alpha1"
|
||||||
"kubesphere.io/kubesphere/pkg/utils/iputil"
|
"kubesphere.io/kubesphere/pkg/utils/iputil"
|
||||||
@@ -28,26 +29,13 @@ const (
|
|||||||
type Auditing interface {
|
type Auditing interface {
|
||||||
Enabled() bool
|
Enabled() bool
|
||||||
K8sAuditingEnabled() bool
|
K8sAuditingEnabled() bool
|
||||||
LogRequestObject(req *http.Request) *Event
|
LogRequestObject(req *http.Request, info *request.RequestInfo) *auditv1alpha1.Event
|
||||||
LogResponseObject(e *Event, resp *ResponseCapture, info *request.RequestInfo)
|
LogResponseObject(e *auditv1alpha1.Event, resp *ResponseCapture, info *request.RequestInfo)
|
||||||
}
|
|
||||||
|
|
||||||
type Event struct {
|
|
||||||
//The workspace which this audit event happened
|
|
||||||
Workspace string
|
|
||||||
//The cluster which this audit event happened
|
|
||||||
Cluster string
|
|
||||||
|
|
||||||
audit.Event
|
|
||||||
}
|
|
||||||
|
|
||||||
type EventList struct {
|
|
||||||
Items []Event
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type auditing struct {
|
type auditing struct {
|
||||||
lister v1alpha1.WebhookLister
|
lister v1alpha1.WebhookLister
|
||||||
cache chan *EventList
|
cache chan *auditv1alpha1.EventList
|
||||||
backend *Backend
|
backend *Backend
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -55,7 +43,7 @@ func NewAuditing(lister v1alpha1.WebhookLister, url string, stopCh <-chan struct
|
|||||||
|
|
||||||
a := &auditing{
|
a := &auditing{
|
||||||
lister: lister,
|
lister: lister,
|
||||||
cache: make(chan *EventList, DefaultCacheCapacity),
|
cache: make(chan *auditv1alpha1.EventList, DefaultCacheCapacity),
|
||||||
}
|
}
|
||||||
|
|
||||||
a.backend = NewBackend(url, ChannelCapacity, a.cache, SendTimeout, stopCh)
|
a.backend = NewBackend(url, ChannelCapacity, a.cache, SendTimeout, stopCh)
|
||||||
@@ -91,9 +79,26 @@ func (a *auditing) K8sAuditingEnabled() bool {
|
|||||||
return wh.Spec.K8sAuditingEnabled
|
return wh.Spec.K8sAuditingEnabled
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *auditing) LogRequestObject(req *http.Request) *Event {
|
// If the request is not a standard request, or a resource request,
|
||||||
e := &Event{
|
// or part of the audit information cannot be obtained through url,
|
||||||
|
// the function that handles the request can obtain Event from
|
||||||
|
// the context of the request, assign value to audit information,
|
||||||
|
// including name, verb, resource, subresource, message etc like this.
|
||||||
|
//
|
||||||
|
// info, ok := request.AuditEventFrom(request.Request.Context())
|
||||||
|
// if ok {
|
||||||
|
// info.Verb = "post"
|
||||||
|
// info.Name = created.Name
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
func (a *auditing) LogRequestObject(req *http.Request, info *request.RequestInfo) *auditv1alpha1.Event {
|
||||||
|
|
||||||
|
e := &auditv1alpha1.Event{
|
||||||
|
Workspace: info.Workspace,
|
||||||
|
Cluster: info.Cluster,
|
||||||
Event: audit.Event{
|
Event: audit.Event{
|
||||||
|
RequestURI: info.Path,
|
||||||
|
Verb: info.Verb,
|
||||||
Level: a.getAuditLevel(),
|
Level: a.getAuditLevel(),
|
||||||
AuditID: types.UID(uuid.New().String()),
|
AuditID: types.UID(uuid.New().String()),
|
||||||
Stage: audit.StageResponseComplete,
|
Stage: audit.StageResponseComplete,
|
||||||
@@ -101,6 +106,16 @@ func (a *auditing) LogRequestObject(req *http.Request) *Event {
|
|||||||
UserAgent: req.UserAgent(),
|
UserAgent: req.UserAgent(),
|
||||||
RequestReceivedTimestamp: v1.NewMicroTime(time.Now()),
|
RequestReceivedTimestamp: v1.NewMicroTime(time.Now()),
|
||||||
Annotations: nil,
|
Annotations: nil,
|
||||||
|
ObjectRef: &audit.ObjectReference{
|
||||||
|
Resource: info.Resource,
|
||||||
|
Namespace: info.Namespace,
|
||||||
|
Name: info.Name,
|
||||||
|
UID: "",
|
||||||
|
APIGroup: info.APIGroup,
|
||||||
|
APIVersion: info.APIVersion,
|
||||||
|
ResourceVersion: info.ResourceScope,
|
||||||
|
Subresource: info.Subresource,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -133,7 +148,7 @@ func (a *auditing) LogRequestObject(req *http.Request) *Event {
|
|||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *auditing) LogResponseObject(e *Event, resp *ResponseCapture, info *request.RequestInfo) {
|
func (a *auditing) LogResponseObject(e *auditv1alpha1.Event, resp *ResponseCapture, info *request.RequestInfo) {
|
||||||
|
|
||||||
// Auditing should igonre k8s request when k8s auditing is enabled.
|
// Auditing should igonre k8s request when k8s auditing is enabled.
|
||||||
if info.IsKubernetesRequest && a.K8sAuditingEnabled() {
|
if info.IsKubernetesRequest && a.K8sAuditingEnabled() {
|
||||||
@@ -146,43 +161,16 @@ func (a *auditing) LogResponseObject(e *Event, resp *ResponseCapture, info *requ
|
|||||||
e.ResponseObject = &runtime.Unknown{Raw: resp.Bytes()}
|
e.ResponseObject = &runtime.Unknown{Raw: resp.Bytes()}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the request is not a standard request, or a resource request,
|
|
||||||
// or part of the audit information cannot be obtained through url,
|
|
||||||
// the function that handles the request can obtain RequestInfo from
|
|
||||||
// the context of the request, assign value to audit information,
|
|
||||||
// including name, verb, resource, subresource, etc like this.
|
|
||||||
//
|
|
||||||
// info, ok := request.RequestInfoFrom(request.Request.Context())
|
|
||||||
// if ok {
|
|
||||||
// info.Verb = "post"
|
|
||||||
// info.Name = created.Name
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
e.Workspace = info.Workspace
|
|
||||||
e.Cluster = info.Cluster
|
|
||||||
e.RequestURI = info.Path
|
|
||||||
e.Verb = info.Verb
|
|
||||||
e.ObjectRef = &audit.ObjectReference{
|
|
||||||
Resource: info.Resource,
|
|
||||||
Namespace: info.Namespace,
|
|
||||||
Name: info.Name,
|
|
||||||
UID: "",
|
|
||||||
APIGroup: info.APIGroup,
|
|
||||||
APIVersion: info.APIVersion,
|
|
||||||
ResourceVersion: info.ResourceScope,
|
|
||||||
Subresource: info.Subresource,
|
|
||||||
}
|
|
||||||
|
|
||||||
a.cacheEvent(*e)
|
a.cacheEvent(*e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *auditing) cacheEvent(e Event) {
|
func (a *auditing) cacheEvent(e auditv1alpha1.Event) {
|
||||||
if klog.V(8) {
|
if klog.V(8) {
|
||||||
bs, _ := json.Marshal(e)
|
bs, _ := json.Marshal(e)
|
||||||
klog.Infof("%s", string(bs))
|
klog.Infof("%s", string(bs))
|
||||||
}
|
}
|
||||||
|
|
||||||
eventList := &EventList{}
|
eventList := &auditv1alpha1.EventList{}
|
||||||
eventList.Items = append(eventList.Items, e)
|
eventList.Items = append(eventList.Items, e)
|
||||||
select {
|
select {
|
||||||
case a.cache <- eventList:
|
case a.cache <- eventList:
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
|
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
auditingv1alpha1 "kubesphere.io/kubesphere/pkg/apis/auditing/v1alpha1"
|
auditingv1alpha1 "kubesphere.io/kubesphere/pkg/apis/auditing/v1alpha1"
|
||||||
|
v1alpha12 "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
|
||||||
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
||||||
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
|
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
|
||||||
ksinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
|
ksinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
|
||||||
@@ -145,12 +146,25 @@ func TestAuditing_LogRequestObject(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
e := a.LogRequestObject(req)
|
info := &request.RequestInfo{
|
||||||
|
RequestInfo: &k8srequest.RequestInfo{
|
||||||
|
IsResourceRequest: false,
|
||||||
|
Path: "/kapis/tenant.kubesphere.io/v1alpha2/workspaces",
|
||||||
|
Verb: "create",
|
||||||
|
APIGroup: "tenant.kubesphere.io",
|
||||||
|
APIVersion: "v1alpha2",
|
||||||
|
Resource: "workspaces",
|
||||||
|
Name: "test",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
expectedEvent := &Event{
|
e := a.LogRequestObject(req, info)
|
||||||
|
|
||||||
|
expectedEvent := &v1alpha12.Event{
|
||||||
Event: audit.Event{
|
Event: audit.Event{
|
||||||
AuditID: e.AuditID,
|
AuditID: e.AuditID,
|
||||||
Level: "RequestResponse",
|
Level: "RequestResponse",
|
||||||
|
Verb: "create",
|
||||||
Stage: "ResponseComplete",
|
Stage: "ResponseComplete",
|
||||||
User: v1.UserInfo{
|
User: v1.UserInfo{
|
||||||
Username: "admin",
|
Username: "admin",
|
||||||
@@ -161,8 +175,18 @@ func TestAuditing_LogRequestObject(t *testing.T) {
|
|||||||
SourceIPs: []string{
|
SourceIPs: []string{
|
||||||
"192.168.0.2",
|
"192.168.0.2",
|
||||||
},
|
},
|
||||||
|
RequestURI: "/kapis/tenant.kubesphere.io/v1alpha2/workspaces",
|
||||||
RequestReceivedTimestamp: e.RequestReceivedTimestamp,
|
RequestReceivedTimestamp: e.RequestReceivedTimestamp,
|
||||||
|
ObjectRef: &audit.ObjectReference{
|
||||||
|
Resource: "workspaces",
|
||||||
|
Namespace: "",
|
||||||
|
Name: "test",
|
||||||
|
UID: "",
|
||||||
|
APIGroup: "tenant.kubesphere.io",
|
||||||
|
APIVersion: "v1alpha2",
|
||||||
|
ResourceVersion: "",
|
||||||
|
Subresource: "",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -210,8 +234,6 @@ func TestAuditing_LogResponseObject(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
e := a.LogRequestObject(req)
|
|
||||||
|
|
||||||
info := &request.RequestInfo{
|
info := &request.RequestInfo{
|
||||||
RequestInfo: &k8srequest.RequestInfo{
|
RequestInfo: &k8srequest.RequestInfo{
|
||||||
IsResourceRequest: false,
|
IsResourceRequest: false,
|
||||||
@@ -224,12 +246,14 @@ func TestAuditing_LogResponseObject(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e := a.LogRequestObject(req, info)
|
||||||
|
|
||||||
resp := &ResponseCapture{}
|
resp := &ResponseCapture{}
|
||||||
resp.WriteHeader(200)
|
resp.WriteHeader(200)
|
||||||
|
|
||||||
a.LogResponseObject(e, resp, info)
|
a.LogResponseObject(e, resp, info)
|
||||||
|
|
||||||
expectedEvent := &Event{
|
expectedEvent := &v1alpha12.Event{
|
||||||
Event: audit.Event{
|
Event: audit.Event{
|
||||||
Verb: "create",
|
Verb: "create",
|
||||||
AuditID: e.AuditID,
|
AuditID: e.AuditID,
|
||||||
|
|||||||
20
pkg/apiserver/auditing/v1alpha1/event.go
Normal file
20
pkg/apiserver/auditing/v1alpha1/event.go
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
package v1alpha1
|
||||||
|
|
||||||
|
import "k8s.io/apiserver/pkg/apis/audit"
|
||||||
|
|
||||||
|
type Event struct {
|
||||||
|
// Devops project
|
||||||
|
Devops string
|
||||||
|
// The workspace which this audit event happened
|
||||||
|
Workspace string
|
||||||
|
// The cluster which this audit event happened
|
||||||
|
Cluster string
|
||||||
|
// Message send to user.s
|
||||||
|
Message string
|
||||||
|
|
||||||
|
audit.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventList struct {
|
||||||
|
Items []Event
|
||||||
|
}
|
||||||
@@ -19,16 +19,18 @@ func WithAuditing(handler http.Handler, a auditing.Auditing) http.Handler {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
e := a.LogRequestObject(req)
|
|
||||||
resp := auditing.NewResponseCapture(w)
|
|
||||||
handler.ServeHTTP(resp, req)
|
|
||||||
|
|
||||||
info, ok := request.RequestInfoFrom(req.Context())
|
info, ok := request.RequestInfoFrom(req.Context())
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Error("Unable to retrieve request info from request")
|
klog.Error("Unable to retrieve request info from request")
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e := a.LogRequestObject(req, info)
|
||||||
|
req = req.WithContext(request.WithAuditEvent(req.Context(), e))
|
||||||
|
resp := auditing.NewResponseCapture(w)
|
||||||
|
handler.ServeHTTP(resp, req)
|
||||||
|
|
||||||
go a.LogResponseObject(e, resp, info)
|
go a.LogResponseObject(e, resp, info)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,9 +18,9 @@ package request
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apiserver/pkg/apis/audit"
|
|
||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -87,12 +87,12 @@ func UserFrom(ctx context.Context) (user.Info, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WithAuditEvent returns set audit event struct.
|
// WithAuditEvent returns set audit event struct.
|
||||||
func WithAuditEvent(parent context.Context, ev *audit.Event) context.Context {
|
func WithAuditEvent(parent context.Context, ev *v1alpha1.Event) context.Context {
|
||||||
return WithValue(parent, auditKey, ev)
|
return WithValue(parent, auditKey, ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AuditEventFrom returns the audit event struct on the ctx
|
// AuditEventFrom returns the audit event struct on the ctx
|
||||||
func AuditEventFrom(ctx context.Context) *audit.Event {
|
func AuditEventFrom(ctx context.Context) *v1alpha1.Event {
|
||||||
ev, _ := ctx.Value(auditKey).(*audit.Event)
|
ev, _ := ctx.Value(auditKey).(*v1alpha1.Event)
|
||||||
return ev
|
return ev
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user