Merge pull request #2187 from wanjunlei/master

add audit components
This commit is contained in:
KubeSphere CI Bot
2020-06-15 11:40:40 +08:00
committed by GitHub
40 changed files with 2558 additions and 12 deletions

View File

@@ -27,6 +27,7 @@ import (
unionauth "k8s.io/apiserver/pkg/authentication/request/union"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/klog"
audit "kubesphere.io/kubesphere/pkg/apiserver/auditing"
"kubesphere.io/kubesphere/pkg/apiserver/authentication/authenticators/basic"
"kubesphere.io/kubesphere/pkg/apiserver/authentication/authenticators/jwttoken"
"kubesphere.io/kubesphere/pkg/apiserver/authentication/request/anonymous"
@@ -139,7 +140,7 @@ type APIServer struct {
AuditingClient auditing.Client
}
func (s *APIServer) PrepareRun() error {
func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
s.container = restful.NewContainer()
s.container.Filter(logRequestAndResponse)
@@ -156,7 +157,7 @@ func (s *APIServer) PrepareRun() error {
s.Server.Handler = s.container
s.buildHandlerChain()
s.buildHandlerChain(stopCh)
return nil
}
@@ -232,7 +233,7 @@ func (s *APIServer) Run(stopCh <-chan struct{}) (err error) {
return err
}
func (s *APIServer) buildHandlerChain() {
func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
requestInfoResolver := &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis", "kapis", "kapi"),
GrouplessAPIPrefixes: sets.NewString("api", "kapi"),
@@ -241,6 +242,12 @@ func (s *APIServer) buildHandlerChain() {
handler := s.Server.Handler
handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})
if s.Config.AuditingOptions.Enable {
handler = filters.WithAuditing(handler,
audit.NewAuditing(s.InformerFactory.KubeSphereSharedInformerFactory().Auditing().V1alpha1().Webhooks().Lister(),
s.Config.AuditingOptions.WebhookUrl, stopCh))
}
if s.Config.MultiClusterOptions.Enable {
clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters(),
s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters().Lister())

View File

@@ -0,0 +1,109 @@
package auditing
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
"net/http"
"time"
)
const (
WaitTimeout = time.Second
WebhookURL = "https://kube-auditing-webhook-svc.kubesphere-logging-system.svc:443/audit/webhook/event"
)
type Backend struct {
url string
channelCapacity int
semCh chan interface{}
cache chan *v1alpha1.EventList
client http.Client
sendTimeout time.Duration
waitTimeout time.Duration
stopCh <-chan struct{}
}
func NewBackend(url string, channelCapacity int, cache chan *v1alpha1.EventList, sendTimeout time.Duration, stopCh <-chan struct{}) *Backend {
b := Backend{
url: url,
semCh: make(chan interface{}, channelCapacity),
channelCapacity: channelCapacity,
waitTimeout: WaitTimeout,
cache: cache,
sendTimeout: sendTimeout,
stopCh: stopCh,
}
if len(b.url) == 0 {
b.url = WebhookURL
}
b.client = http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
Timeout: b.sendTimeout,
}
go b.worker()
return &b
}
func (b *Backend) worker() {
for {
var event *v1alpha1.EventList
select {
case event = <-b.cache:
if event == nil {
break
}
case <-b.stopCh:
break
}
send := func(event *v1alpha1.EventList) {
ctx, cancel := context.WithTimeout(context.Background(), b.waitTimeout)
defer cancel()
select {
case <-ctx.Done():
klog.Errorf("get goroutine for audit(%s) timeout", event.Items[0].AuditID)
return
case b.semCh <- struct{}{}:
}
defer func() {
<-b.semCh
}()
bs, err := json.Marshal(event)
if err != nil {
klog.Errorf("json marshal error, %s", err)
return
}
response, err := b.client.Post(b.url, "application/json", bytes.NewBuffer(bs))
if err != nil {
klog.Errorf("send audit event[%s] error, %s", event.Items[0].AuditID, err)
return
}
if response.StatusCode != http.StatusOK {
klog.Errorf("send audit event[%s] error[%d]", event.Items[0].AuditID, response.StatusCode)
return
}
}
go send(event)
}
}

View File

@@ -0,0 +1,229 @@
package auditing
import (
"bytes"
"encoding/json"
"github.com/google/uuid"
"io/ioutil"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/apis/audit"
"k8s.io/klog"
auditv1alpha1 "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
"kubesphere.io/kubesphere/pkg/apiserver/request"
"kubesphere.io/kubesphere/pkg/client/listers/auditing/v1alpha1"
"kubesphere.io/kubesphere/pkg/utils/iputil"
"net/http"
"time"
)
const (
DefaultWebhook = "kube-auditing-webhook"
DefaultCacheCapacity = 10000
CacheTimeout = time.Second
SendTimeout = time.Second * 3
ChannelCapacity = 10
)
type Auditing interface {
Enabled() bool
K8sAuditingEnabled() bool
LogRequestObject(req *http.Request, info *request.RequestInfo) *auditv1alpha1.Event
LogResponseObject(e *auditv1alpha1.Event, resp *ResponseCapture, info *request.RequestInfo)
}
type auditing struct {
lister v1alpha1.WebhookLister
cache chan *auditv1alpha1.EventList
backend *Backend
}
func NewAuditing(lister v1alpha1.WebhookLister, url string, stopCh <-chan struct{}) Auditing {
a := &auditing{
lister: lister,
cache: make(chan *auditv1alpha1.EventList, DefaultCacheCapacity),
}
a.backend = NewBackend(url, ChannelCapacity, a.cache, SendTimeout, stopCh)
return a
}
func (a *auditing) getAuditLevel() audit.Level {
wh, err := a.lister.Get(DefaultWebhook)
if err != nil {
klog.V(8).Info(err)
return audit.LevelNone
}
return (audit.Level)(wh.Spec.AuditLevel)
}
func (a *auditing) Enabled() bool {
level := a.getAuditLevel()
if level.Less(audit.LevelMetadata) {
return false
}
return true
}
func (a *auditing) K8sAuditingEnabled() bool {
wh, err := a.lister.Get(DefaultWebhook)
if err != nil {
klog.V(8).Info(err)
return false
}
return wh.Spec.K8sAuditingEnabled
}
// If the request is not a standard request, or a resource request,
// or part of the audit information cannot be obtained through url,
// the function that handles the request can obtain Event from
// the context of the request, assign value to audit information,
// including name, verb, resource, subresource, message etc like this.
//
// info, ok := request.AuditEventFrom(request.Request.Context())
// if ok {
// info.Verb = "post"
// info.Name = created.Name
// }
//
func (a *auditing) LogRequestObject(req *http.Request, info *request.RequestInfo) *auditv1alpha1.Event {
e := &auditv1alpha1.Event{
Workspace: info.Workspace,
Cluster: info.Cluster,
Event: audit.Event{
RequestURI: info.Path,
Verb: info.Verb,
Level: a.getAuditLevel(),
AuditID: types.UID(uuid.New().String()),
Stage: audit.StageResponseComplete,
ImpersonatedUser: nil,
UserAgent: req.UserAgent(),
RequestReceivedTimestamp: v1.NewMicroTime(time.Now()),
Annotations: nil,
ObjectRef: &audit.ObjectReference{
Resource: info.Resource,
Namespace: info.Namespace,
Name: info.Name,
UID: "",
APIGroup: info.APIGroup,
APIVersion: info.APIVersion,
ResourceVersion: info.ResourceScope,
Subresource: info.Subresource,
},
},
}
ips := make([]string, 1)
ips[0] = iputil.RemoteIp(req)
e.SourceIPs = ips
user, ok := request.UserFrom(req.Context())
if ok {
e.User.Username = user.GetName()
e.User.UID = user.GetUID()
e.User.Groups = user.GetGroups()
for k, v := range user.GetExtra() {
e.User.Extra[k] = v
}
}
if e.Level.GreaterOrEqual(audit.LevelRequest) && req.ContentLength > 0 {
body, err := ioutil.ReadAll(req.Body)
if err != nil {
klog.Error(err)
return e
}
_ = req.Body.Close()
req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
e.RequestObject = &runtime.Unknown{Raw: body}
}
return e
}
func (a *auditing) LogResponseObject(e *auditv1alpha1.Event, resp *ResponseCapture, info *request.RequestInfo) {
// Auditing should igonre k8s request when k8s auditing is enabled.
if info.IsKubernetesRequest && a.K8sAuditingEnabled() {
return
}
e.StageTimestamp = v1.NewMicroTime(time.Now())
e.ResponseStatus = &v1.Status{Code: int32(resp.StatusCode())}
if e.Level.GreaterOrEqual(audit.LevelRequestResponse) {
e.ResponseObject = &runtime.Unknown{Raw: resp.Bytes()}
}
a.cacheEvent(*e)
}
func (a *auditing) cacheEvent(e auditv1alpha1.Event) {
if klog.V(8) {
bs, _ := json.Marshal(e)
klog.Infof("%s", string(bs))
}
eventList := &auditv1alpha1.EventList{}
eventList.Items = append(eventList.Items, e)
select {
case a.cache <- eventList:
return
case <-time.After(CacheTimeout):
klog.Errorf("cache audit event %s timeout", e.AuditID)
break
}
}
type ResponseCapture struct {
http.ResponseWriter
wroteHeader bool
status int
body *bytes.Buffer
StopCh chan interface{}
}
func NewResponseCapture(w http.ResponseWriter) *ResponseCapture {
return &ResponseCapture{
ResponseWriter: w,
wroteHeader: false,
body: new(bytes.Buffer),
StopCh: make(chan interface{}, 1),
}
}
func (c *ResponseCapture) Header() http.Header {
return c.ResponseWriter.Header()
}
func (c *ResponseCapture) Write(data []byte) (int, error) {
defer func() {
c.StopCh <- struct{}{}
}()
c.WriteHeader(http.StatusOK)
c.body.Write(data)
return c.ResponseWriter.Write(data)
}
func (c *ResponseCapture) WriteHeader(statusCode int) {
if !c.wroteHeader {
c.status = statusCode
c.wroteHeader = true
}
}
func (c *ResponseCapture) Bytes() []byte {
return c.body.Bytes()
}
func (c *ResponseCapture) StatusCode() int {
return c.status
}

View File

@@ -0,0 +1,297 @@
package auditing
import (
"encoding/json"
"github.com/stretchr/testify/assert"
"k8s.io/api/auditregistration/v1alpha1"
v1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/authentication/user"
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
auditingv1alpha1 "kubesphere.io/kubesphere/pkg/apis/auditing/v1alpha1"
v1alpha12 "kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
"kubesphere.io/kubesphere/pkg/apiserver/request"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
ksinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
"kubesphere.io/kubesphere/pkg/utils/iputil"
"net/http"
"net/url"
"testing"
"time"
)
var noResyncPeriodFunc = func() time.Duration { return 0 }
func TestGetAuditLevel(t *testing.T) {
webhook := &auditingv1alpha1.Webhook{
TypeMeta: metav1.TypeMeta{
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "kube-auditing-webhook",
},
Spec: auditingv1alpha1.WebhookSpec{
AuditLevel: v1alpha1.LevelRequestResponse,
},
}
informer := ksinformers.NewSharedInformerFactory(fake.NewSimpleClientset(), noResyncPeriodFunc())
a := auditing{
lister: informer.Auditing().V1alpha1().Webhooks().Lister(),
}
err := informer.Auditing().V1alpha1().Webhooks().Informer().GetIndexer().Add(webhook)
if err != nil {
panic(err)
}
assert.Equal(t, string(webhook.Spec.AuditLevel), string(a.getAuditLevel()))
}
func TestAuditing_Enabled(t *testing.T) {
webhook := &auditingv1alpha1.Webhook{
TypeMeta: metav1.TypeMeta{
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "kube-auditing-webhook",
},
Spec: auditingv1alpha1.WebhookSpec{
AuditLevel: v1alpha1.LevelNone,
},
}
informer := ksinformers.NewSharedInformerFactory(fake.NewSimpleClientset(), noResyncPeriodFunc())
a := auditing{
lister: informer.Auditing().V1alpha1().Webhooks().Lister(),
}
err := informer.Auditing().V1alpha1().Webhooks().Informer().GetIndexer().Add(webhook)
if err != nil {
panic(err)
}
assert.Equal(t, false, a.Enabled())
}
func TestAuditing_K8sAuditingEnabled(t *testing.T) {
webhook := &auditingv1alpha1.Webhook{
TypeMeta: metav1.TypeMeta{
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "kube-auditing-webhook",
},
Spec: auditingv1alpha1.WebhookSpec{
AuditLevel: v1alpha1.LevelNone,
K8sAuditingEnabled: true,
},
}
informer := ksinformers.NewSharedInformerFactory(fake.NewSimpleClientset(), noResyncPeriodFunc())
a := auditing{
lister: informer.Auditing().V1alpha1().Webhooks().Lister(),
}
err := informer.Auditing().V1alpha1().Webhooks().Informer().GetIndexer().Add(webhook)
if err != nil {
panic(err)
}
assert.Equal(t, true, a.K8sAuditingEnabled())
}
func TestAuditing_LogRequestObject(t *testing.T) {
webhook := &auditingv1alpha1.Webhook{
TypeMeta: metav1.TypeMeta{
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "kube-auditing-webhook",
},
Spec: auditingv1alpha1.WebhookSpec{
AuditLevel: v1alpha1.LevelRequestResponse,
K8sAuditingEnabled: true,
},
}
informer := ksinformers.NewSharedInformerFactory(fake.NewSimpleClientset(), noResyncPeriodFunc())
a := auditing{
lister: informer.Auditing().V1alpha1().Webhooks().Lister(),
}
err := informer.Auditing().V1alpha1().Webhooks().Informer().GetIndexer().Add(webhook)
if err != nil {
panic(err)
}
req := &http.Request{}
u, err := url.Parse("http://139.198.121.143:32306//kapis/tenant.kubesphere.io/v1alpha2/workspaces")
if err != nil {
panic(err)
}
req.URL = u
req.Header = http.Header{}
req.Header.Add(iputil.XClientIP, "192.168.0.2")
req = req.WithContext(request.WithUser(req.Context(), &user.DefaultInfo{
Name: "admin",
Groups: []string{
"system",
},
}))
info := &request.RequestInfo{
RequestInfo: &k8srequest.RequestInfo{
IsResourceRequest: false,
Path: "/kapis/tenant.kubesphere.io/v1alpha2/workspaces",
Verb: "create",
APIGroup: "tenant.kubesphere.io",
APIVersion: "v1alpha2",
Resource: "workspaces",
Name: "test",
},
}
e := a.LogRequestObject(req, info)
expectedEvent := &v1alpha12.Event{
Event: audit.Event{
AuditID: e.AuditID,
Level: "RequestResponse",
Verb: "create",
Stage: "ResponseComplete",
User: v1.UserInfo{
Username: "admin",
Groups: []string{
"system",
},
},
SourceIPs: []string{
"192.168.0.2",
},
RequestURI: "/kapis/tenant.kubesphere.io/v1alpha2/workspaces",
RequestReceivedTimestamp: e.RequestReceivedTimestamp,
ObjectRef: &audit.ObjectReference{
Resource: "workspaces",
Namespace: "",
Name: "test",
UID: "",
APIGroup: "tenant.kubesphere.io",
APIVersion: "v1alpha2",
ResourceVersion: "",
Subresource: "",
},
},
}
assert.Equal(t, expectedEvent, e)
}
func TestAuditing_LogResponseObject(t *testing.T) {
webhook := &auditingv1alpha1.Webhook{
TypeMeta: metav1.TypeMeta{
APIVersion: auditingv1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "kube-auditing-webhook",
},
Spec: auditingv1alpha1.WebhookSpec{
AuditLevel: v1alpha1.LevelMetadata,
K8sAuditingEnabled: true,
},
}
informer := ksinformers.NewSharedInformerFactory(fake.NewSimpleClientset(), noResyncPeriodFunc())
a := auditing{
lister: informer.Auditing().V1alpha1().Webhooks().Lister(),
}
err := informer.Auditing().V1alpha1().Webhooks().Informer().GetIndexer().Add(webhook)
if err != nil {
panic(err)
}
req := &http.Request{}
u, err := url.Parse("http://139.198.121.143:32306//kapis/tenant.kubesphere.io/v1alpha2/workspaces")
if err != nil {
panic(err)
}
req.URL = u
req.Header = http.Header{}
req.Header.Add(iputil.XClientIP, "192.168.0.2")
req = req.WithContext(request.WithUser(req.Context(), &user.DefaultInfo{
Name: "admin",
Groups: []string{
"system",
},
}))
info := &request.RequestInfo{
RequestInfo: &k8srequest.RequestInfo{
IsResourceRequest: false,
Path: "/kapis/tenant.kubesphere.io/v1alpha2/workspaces",
Verb: "create",
APIGroup: "tenant.kubesphere.io",
APIVersion: "v1alpha2",
Resource: "workspaces",
Name: "test",
},
}
e := a.LogRequestObject(req, info)
resp := &ResponseCapture{}
resp.WriteHeader(200)
a.LogResponseObject(e, resp, info)
expectedEvent := &v1alpha12.Event{
Event: audit.Event{
Verb: "create",
AuditID: e.AuditID,
Level: "Metadata",
Stage: "ResponseComplete",
User: v1.UserInfo{
Username: "admin",
Groups: []string{
"system",
},
},
SourceIPs: []string{
"192.168.0.2",
},
ObjectRef: &audit.ObjectReference{
Resource: "workspaces",
Name: "test",
APIGroup: "tenant.kubesphere.io",
APIVersion: "v1alpha2",
},
RequestReceivedTimestamp: e.RequestReceivedTimestamp,
StageTimestamp: e.StageTimestamp,
RequestURI: "/kapis/tenant.kubesphere.io/v1alpha2/workspaces",
ResponseStatus: &metav1.Status{
Code: 200,
},
},
}
expectedBs, err := json.Marshal(expectedEvent)
if err != nil {
panic(err)
}
bs, err := json.Marshal(e)
if err != nil {
panic(err)
}
assert.EqualValues(t, string(expectedBs), string(bs))
}

View File

@@ -0,0 +1,20 @@
package v1alpha1
import "k8s.io/apiserver/pkg/apis/audit"
type Event struct {
// Devops project
Devops string
// The workspace which this audit event happened
Workspace string
// The cluster which this audit event happened
Cluster string
// Message send to user.
Message string
audit.Event
}
type EventList struct {
Items []Event
}

View File

@@ -0,0 +1,36 @@
package filters
import (
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apiserver/auditing"
"kubesphere.io/kubesphere/pkg/apiserver/request"
"net/http"
)
func WithAuditing(handler http.Handler, a auditing.Auditing) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// When auditing level is LevelNone, request should not be auditing.
// Auditing level can be modified with cr kube-auditing-webhook,
// so it need to judge every time.
if !a.Enabled() {
handler.ServeHTTP(w, req)
return
}
info, ok := request.RequestInfoFrom(req.Context())
if !ok {
klog.Error("Unable to retrieve request info from request")
handler.ServeHTTP(w, req)
return
}
e := a.LogRequestObject(req, info)
req = req.WithContext(request.WithAuditEvent(req.Context(), e))
resp := auditing.NewResponseCapture(w)
handler.ServeHTTP(resp, req)
go a.LogResponseObject(e, resp, info)
})
}

View File

@@ -18,9 +18,9 @@ package request
import (
"context"
"kubesphere.io/kubesphere/pkg/apiserver/auditing/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/authentication/user"
)
@@ -87,12 +87,12 @@ func UserFrom(ctx context.Context) (user.Info, bool) {
}
// WithAuditEvent returns set audit event struct.
func WithAuditEvent(parent context.Context, ev *audit.Event) context.Context {
func WithAuditEvent(parent context.Context, ev *v1alpha1.Event) context.Context {
return WithValue(parent, auditKey, ev)
}
// AuditEventFrom returns the audit event struct on the ctx
func AuditEventFrom(ctx context.Context) *audit.Event {
ev, _ := ctx.Value(auditKey).(*audit.Event)
func AuditEventFrom(ctx context.Context) *v1alpha1.Event {
ev, _ := ctx.Value(auditKey).(*v1alpha1.Event)
return ev
}