add audit components
Signed-off-by: wanjunlei <wanjunlei@yunify.com> debug add test add test add test
This commit is contained in:
118
pkg/apiserver/auditing/backend.go
Normal file
118
pkg/apiserver/auditing/backend.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package auditing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"k8s.io/klog"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
WaitTimeout = time.Second
|
||||
WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443"
|
||||
)
|
||||
|
||||
type Backend struct {
|
||||
channelCapacity int
|
||||
semCh chan interface{}
|
||||
cache chan *EventList
|
||||
client http.Client
|
||||
sendTimeout time.Duration
|
||||
waitTimeout time.Duration
|
||||
}
|
||||
|
||||
func NewBackend(channelCapacity int, cache chan *EventList, sendTimeout time.Duration) *Backend {
|
||||
|
||||
b := Backend{
|
||||
semCh: make(chan interface{}, channelCapacity),
|
||||
channelCapacity: channelCapacity,
|
||||
waitTimeout: WaitTimeout,
|
||||
cache: cache,
|
||||
sendTimeout: sendTimeout,
|
||||
}
|
||||
|
||||
b.client = http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
},
|
||||
Timeout: b.sendTimeout,
|
||||
}
|
||||
|
||||
go b.worker()
|
||||
|
||||
return &b
|
||||
}
|
||||
|
||||
func (b *Backend) worker() {
|
||||
|
||||
// Stop when receiver signal Interrupt.
|
||||
stopCh := b.SetupSignalHandler()
|
||||
|
||||
for {
|
||||
|
||||
var event *EventList
|
||||
select {
|
||||
case event = <-b.cache:
|
||||
if event == nil {
|
||||
break
|
||||
}
|
||||
case <-stopCh:
|
||||
break
|
||||
}
|
||||
|
||||
send := func(event *EventList) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
klog.Errorf("get goroutine for audit(%s) timeout", event.Items[0].AuditID)
|
||||
return
|
||||
case b.semCh <- struct{}{}:
|
||||
}
|
||||
|
||||
defer func() {
|
||||
<-b.semCh
|
||||
}()
|
||||
|
||||
bs, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
klog.Errorf("json marshal error, %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
response, err := b.client.Post(WebhookURL, "application/json", bytes.NewBuffer(bs))
|
||||
if err != nil {
|
||||
klog.Errorf("send audit event[%s] error, %s", event.Items[0].AuditID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if response.StatusCode != http.StatusOK {
|
||||
klog.Errorf("send audit event[%s] error[%d]", event.Items[0].AuditID, response.StatusCode)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
go send(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Backend) SetupSignalHandler() (stopCh <-chan struct{}) {
|
||||
|
||||
stop := make(chan struct{})
|
||||
c := make(chan os.Signal, 2)
|
||||
signal.Notify(c, []os.Signal{os.Interrupt}...)
|
||||
go func() {
|
||||
<-c
|
||||
close(stop)
|
||||
}()
|
||||
|
||||
return stop
|
||||
}
|
||||
235
pkg/apiserver/auditing/types.go
Normal file
235
pkg/apiserver/auditing/types.go
Normal file
@@ -0,0 +1,235 @@
|
||||
package auditing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/google/uuid"
|
||||
"io/ioutil"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apiserver/pkg/apis/audit"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
||||
"kubesphere.io/kubesphere/pkg/client/listers/auditing/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/utils/iputil"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultWebhook = "kube-auditing-webhook"
|
||||
DefaultCacheCapacity = 10000
|
||||
CacheTimeout = time.Second
|
||||
SendTimeout = time.Second * 3
|
||||
ChannelCapacity = 10
|
||||
)
|
||||
|
||||
type Auditing interface {
|
||||
Enable() bool
|
||||
K8sAuditingEnable() bool
|
||||
LogRequestObject(req *http.Request) *Event
|
||||
LogResponseObject(e *Event, resp *ResponseCapture, info *request.RequestInfo)
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
//The workspace which this audit event happened
|
||||
Workspace string
|
||||
//The devops project which this audit event happened
|
||||
Cluster string
|
||||
|
||||
audit.Event
|
||||
}
|
||||
|
||||
type EventList struct {
|
||||
Items []Event
|
||||
}
|
||||
|
||||
type auditing struct {
|
||||
lister v1alpha1.WebhookLister
|
||||
cache chan *EventList
|
||||
backend *Backend
|
||||
}
|
||||
|
||||
func NewAuditing(lister v1alpha1.WebhookLister) Auditing {
|
||||
|
||||
a := &auditing{
|
||||
lister: lister,
|
||||
cache: make(chan *EventList, DefaultCacheCapacity),
|
||||
}
|
||||
|
||||
a.backend = NewBackend(ChannelCapacity, a.cache, SendTimeout)
|
||||
return a
|
||||
}
|
||||
|
||||
func (a *auditing) getAuditLevel() audit.Level {
|
||||
wh, err := a.lister.Get(DefaultWebhook)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return audit.LevelNone
|
||||
}
|
||||
|
||||
return (audit.Level)(wh.Spec.AuditLevel)
|
||||
}
|
||||
|
||||
func (a *auditing) Enable() bool {
|
||||
|
||||
level := a.getAuditLevel()
|
||||
if level.Less(audit.LevelMetadata) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *auditing) K8sAuditingEnable() bool {
|
||||
wh, err := a.lister.Get(DefaultWebhook)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return false
|
||||
}
|
||||
|
||||
return wh.Spec.K8sAuditingEnable
|
||||
}
|
||||
|
||||
func (a *auditing) LogRequestObject(req *http.Request) *Event {
|
||||
e := &Event{
|
||||
Event: audit.Event{
|
||||
Level: a.getAuditLevel(),
|
||||
AuditID: types.UID(uuid.New().String()),
|
||||
Stage: audit.StageResponseComplete,
|
||||
ImpersonatedUser: nil,
|
||||
UserAgent: req.UserAgent(),
|
||||
RequestReceivedTimestamp: v1.NewMicroTime(time.Now()),
|
||||
Annotations: nil,
|
||||
},
|
||||
}
|
||||
|
||||
ips := make([]string, 1)
|
||||
ips[0] = iputil.RemoteIp(req)
|
||||
e.SourceIPs = ips
|
||||
|
||||
user, ok := request.UserFrom(req.Context())
|
||||
if ok {
|
||||
e.User.Username = user.GetName()
|
||||
e.User.UID = user.GetUID()
|
||||
e.User.Groups = user.GetGroups()
|
||||
|
||||
for k, v := range user.GetExtra() {
|
||||
e.User.Extra[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if e.Level.GreaterOrEqual(audit.LevelRequest) && req.ContentLength > 0 {
|
||||
body, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return e
|
||||
}
|
||||
_ = req.Body.Close()
|
||||
req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
|
||||
e.RequestObject = &runtime.Unknown{Raw: body}
|
||||
}
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
func (a *auditing) LogResponseObject(e *Event, resp *ResponseCapture, info *request.RequestInfo) {
|
||||
|
||||
// Auditing should igonre k8s request when k8s auditing is enabled.
|
||||
if info.IsKubernetesRequest && a.K8sAuditingEnable() {
|
||||
return
|
||||
}
|
||||
|
||||
e.StageTimestamp = v1.NewMicroTime(time.Now())
|
||||
e.ResponseStatus = &v1.Status{Code: int32(resp.StatusCode())}
|
||||
if e.Level.GreaterOrEqual(audit.LevelRequestResponse) {
|
||||
e.ResponseObject = &runtime.Unknown{Raw: resp.Bytes()}
|
||||
}
|
||||
|
||||
// If the request is not a standard request, or a resource request,
|
||||
// or part of the audit information cannot be obtained through url,
|
||||
// the function that handles the request can obtain RequestInfo from
|
||||
// the context of the request, assign value to audit information,
|
||||
// including name, verb, resource, subresource, etc like this.
|
||||
//
|
||||
// info, ok := request.RequestInfoFrom(request.Request.Context())
|
||||
// if ok {
|
||||
// info.Verb = "post"
|
||||
// info.Name = created.Name
|
||||
// }
|
||||
//
|
||||
e.Workspace = info.Workspace
|
||||
e.Cluster = info.Cluster
|
||||
e.RequestURI = info.Path
|
||||
e.Verb = info.Verb
|
||||
e.ObjectRef = &audit.ObjectReference{
|
||||
Resource: info.Resource,
|
||||
Namespace: info.Namespace,
|
||||
Name: info.Name,
|
||||
UID: "",
|
||||
APIGroup: info.APIGroup,
|
||||
APIVersion: info.APIVersion,
|
||||
ResourceVersion: info.ResourceScope,
|
||||
Subresource: info.Subresource,
|
||||
}
|
||||
|
||||
a.cacheEvent(*e)
|
||||
}
|
||||
|
||||
func (a *auditing) cacheEvent(e Event) {
|
||||
eventList := &EventList{}
|
||||
eventList.Items = append(eventList.Items, e)
|
||||
select {
|
||||
case a.cache <- eventList:
|
||||
return
|
||||
case <-time.After(CacheTimeout):
|
||||
klog.Errorf("cache audit event %s timeout", e.AuditID)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
type ResponseCapture struct {
|
||||
http.ResponseWriter
|
||||
wroteHeader bool
|
||||
status int
|
||||
body *bytes.Buffer
|
||||
StopCh chan interface{}
|
||||
}
|
||||
|
||||
func NewResponseCapture(w http.ResponseWriter) *ResponseCapture {
|
||||
return &ResponseCapture{
|
||||
ResponseWriter: w,
|
||||
wroteHeader: false,
|
||||
body: new(bytes.Buffer),
|
||||
StopCh: make(chan interface{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ResponseCapture) Header() http.Header {
|
||||
return c.ResponseWriter.Header()
|
||||
}
|
||||
|
||||
func (c *ResponseCapture) Write(data []byte) (int, error) {
|
||||
|
||||
defer func() {
|
||||
c.StopCh <- struct{}{}
|
||||
}()
|
||||
|
||||
c.WriteHeader(http.StatusOK)
|
||||
c.body.Write(data)
|
||||
return c.ResponseWriter.Write(data)
|
||||
}
|
||||
|
||||
func (c *ResponseCapture) WriteHeader(statusCode int) {
|
||||
if !c.wroteHeader {
|
||||
c.status = statusCode
|
||||
c.wroteHeader = true
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ResponseCapture) Bytes() []byte {
|
||||
return c.body.Bytes()
|
||||
}
|
||||
|
||||
func (c *ResponseCapture) StatusCode() int {
|
||||
return c.status
|
||||
}
|
||||
273
pkg/apiserver/auditing/types_test.go
Normal file
273
pkg/apiserver/auditing/types_test.go
Normal file
@@ -0,0 +1,273 @@
|
||||
package auditing
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/api/auditregistration/v1alpha1"
|
||||
v1 "k8s.io/api/authentication/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apiserver/pkg/apis/audit"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
auditingv1alpha1 "kubesphere.io/kubesphere/pkg/apis/auditing/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
||||
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
|
||||
ksinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
|
||||
"kubesphere.io/kubesphere/pkg/utils/iputil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var noResyncPeriodFunc = func() time.Duration { return 0 }
|
||||
|
||||
func TestGetAuditLevel(t *testing.T) {
|
||||
webhook := &auditingv1alpha1.Webhook{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "kube-auditing-webhook",
|
||||
},
|
||||
Spec: auditingv1alpha1.WebhookSpec{
|
||||
AuditLevel: v1alpha1.LevelRequestResponse,
|
||||
},
|
||||
}
|
||||
|
||||
informer := ksinformers.NewSharedInformerFactory(fake.NewSimpleClientset(), noResyncPeriodFunc())
|
||||
|
||||
a := auditing{
|
||||
lister: informer.Auditing().V1alpha1().Webhooks().Lister(),
|
||||
}
|
||||
|
||||
err := informer.Auditing().V1alpha1().Webhooks().Informer().GetIndexer().Add(webhook)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
assert.Equal(t, string(webhook.Spec.AuditLevel), string(a.getAuditLevel()))
|
||||
}
|
||||
|
||||
func TestAuditing_Enable(t *testing.T) {
|
||||
webhook := &auditingv1alpha1.Webhook{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "kube-auditing-webhook",
|
||||
},
|
||||
Spec: auditingv1alpha1.WebhookSpec{
|
||||
AuditLevel: v1alpha1.LevelNone,
|
||||
},
|
||||
}
|
||||
|
||||
informer := ksinformers.NewSharedInformerFactory(fake.NewSimpleClientset(), noResyncPeriodFunc())
|
||||
|
||||
a := auditing{
|
||||
lister: informer.Auditing().V1alpha1().Webhooks().Lister(),
|
||||
}
|
||||
|
||||
err := informer.Auditing().V1alpha1().Webhooks().Informer().GetIndexer().Add(webhook)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
assert.Equal(t, false, a.Enable())
|
||||
}
|
||||
|
||||
func TestAuditing_K8sAuditingEnable(t *testing.T) {
|
||||
webhook := &auditingv1alpha1.Webhook{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "kube-auditing-webhook",
|
||||
},
|
||||
Spec: auditingv1alpha1.WebhookSpec{
|
||||
AuditLevel: v1alpha1.LevelNone,
|
||||
K8sAuditingEnable: true,
|
||||
},
|
||||
}
|
||||
|
||||
informer := ksinformers.NewSharedInformerFactory(fake.NewSimpleClientset(), noResyncPeriodFunc())
|
||||
|
||||
a := auditing{
|
||||
lister: informer.Auditing().V1alpha1().Webhooks().Lister(),
|
||||
}
|
||||
|
||||
err := informer.Auditing().V1alpha1().Webhooks().Informer().GetIndexer().Add(webhook)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
assert.Equal(t, true, a.K8sAuditingEnable())
|
||||
}
|
||||
|
||||
func TestAuditing_LogRequestObject(t *testing.T) {
|
||||
webhook := &auditingv1alpha1.Webhook{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "kube-auditing-webhook",
|
||||
},
|
||||
Spec: auditingv1alpha1.WebhookSpec{
|
||||
AuditLevel: v1alpha1.LevelRequestResponse,
|
||||
K8sAuditingEnable: true,
|
||||
},
|
||||
}
|
||||
|
||||
informer := ksinformers.NewSharedInformerFactory(fake.NewSimpleClientset(), noResyncPeriodFunc())
|
||||
|
||||
a := auditing{
|
||||
lister: informer.Auditing().V1alpha1().Webhooks().Lister(),
|
||||
}
|
||||
|
||||
err := informer.Auditing().V1alpha1().Webhooks().Informer().GetIndexer().Add(webhook)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
req := &http.Request{}
|
||||
u, err := url.Parse("http://139.198.121.143:32306//kapis/tenant.kubesphere.io/v1alpha2/workspaces")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
req.URL = u
|
||||
req.Header = http.Header{}
|
||||
req.Header.Add(iputil.XClientIP, "192.168.0.2")
|
||||
req = req.WithContext(request.WithUser(req.Context(), &user.DefaultInfo{
|
||||
Name: "admin",
|
||||
Groups: []string{
|
||||
"system",
|
||||
},
|
||||
}))
|
||||
|
||||
e := a.LogRequestObject(req)
|
||||
|
||||
expectedEvent := &Event{
|
||||
Event: audit.Event{
|
||||
AuditID: e.AuditID,
|
||||
Level: "RequestResponse",
|
||||
Stage: "ResponseComplete",
|
||||
User: v1.UserInfo{
|
||||
Username: "admin",
|
||||
Groups: []string{
|
||||
"system",
|
||||
},
|
||||
},
|
||||
SourceIPs: []string{
|
||||
"192.168.0.2",
|
||||
},
|
||||
|
||||
RequestReceivedTimestamp: e.RequestReceivedTimestamp,
|
||||
},
|
||||
}
|
||||
|
||||
assert.Equal(t, expectedEvent, e)
|
||||
}
|
||||
|
||||
func TestAuditing_LogResponseObject(t *testing.T) {
|
||||
webhook := &auditingv1alpha1.Webhook{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "kube-auditing-webhook",
|
||||
},
|
||||
Spec: auditingv1alpha1.WebhookSpec{
|
||||
AuditLevel: v1alpha1.LevelMetadata,
|
||||
K8sAuditingEnable: true,
|
||||
},
|
||||
}
|
||||
|
||||
informer := ksinformers.NewSharedInformerFactory(fake.NewSimpleClientset(), noResyncPeriodFunc())
|
||||
|
||||
a := auditing{
|
||||
lister: informer.Auditing().V1alpha1().Webhooks().Lister(),
|
||||
}
|
||||
|
||||
err := informer.Auditing().V1alpha1().Webhooks().Informer().GetIndexer().Add(webhook)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
req := &http.Request{}
|
||||
u, err := url.Parse("http://139.198.121.143:32306//kapis/tenant.kubesphere.io/v1alpha2/workspaces")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
req.URL = u
|
||||
req.Header = http.Header{}
|
||||
req.Header.Add(iputil.XClientIP, "192.168.0.2")
|
||||
req = req.WithContext(request.WithUser(req.Context(), &user.DefaultInfo{
|
||||
Name: "admin",
|
||||
Groups: []string{
|
||||
"system",
|
||||
},
|
||||
}))
|
||||
|
||||
e := a.LogRequestObject(req)
|
||||
|
||||
info := &request.RequestInfo{
|
||||
RequestInfo: &k8srequest.RequestInfo{
|
||||
IsResourceRequest: false,
|
||||
Path: "/kapis/tenant.kubesphere.io/v1alpha2/workspaces",
|
||||
Verb: "create",
|
||||
APIGroup: "tenant.kubesphere.io",
|
||||
APIVersion: "v1alpha2",
|
||||
Resource: "workspaces",
|
||||
Name: "test",
|
||||
},
|
||||
}
|
||||
|
||||
resp := &ResponseCapture{}
|
||||
resp.WriteHeader(200)
|
||||
|
||||
a.LogResponseObject(e, resp, info)
|
||||
|
||||
expectedEvent := &Event{
|
||||
Event: audit.Event{
|
||||
Verb: "create",
|
||||
AuditID: e.AuditID,
|
||||
Level: "Metadata",
|
||||
Stage: "ResponseComplete",
|
||||
User: v1.UserInfo{
|
||||
Username: "admin",
|
||||
Groups: []string{
|
||||
"system",
|
||||
},
|
||||
},
|
||||
SourceIPs: []string{
|
||||
"192.168.0.2",
|
||||
},
|
||||
ObjectRef: &audit.ObjectReference{
|
||||
Resource: "workspaces",
|
||||
Name: "test",
|
||||
APIGroup: "tenant.kubesphere.io",
|
||||
APIVersion: "v1alpha2",
|
||||
},
|
||||
|
||||
RequestReceivedTimestamp: e.RequestReceivedTimestamp,
|
||||
StageTimestamp: e.StageTimestamp,
|
||||
RequestURI: "/kapis/tenant.kubesphere.io/v1alpha2/workspaces",
|
||||
ResponseStatus: &metav1.Status{
|
||||
Code: 200,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
expectedBs, err := json.Marshal(expectedEvent)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
bs, err := json.Marshal(e)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
assert.EqualValues(t, string(expectedBs), string(bs))
|
||||
}
|
||||
Reference in New Issue
Block a user