Merge branch 'master' into options
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
apiserverconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
genericoptions "kubesphere.io/kubesphere/pkg/server/options"
|
||||
auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/cache"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
|
||||
eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch"
|
||||
@@ -62,6 +63,7 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
|
||||
s.LoggingOptions.AddFlags(fss.FlagSet("logging"), s.LoggingOptions)
|
||||
s.MultiClusterOptions.AddFlags(fss.FlagSet("multicluster"), s.MultiClusterOptions)
|
||||
s.EventsOptions.AddFlags(fss.FlagSet("events"), s.EventsOptions)
|
||||
s.AuditingOptions.AddFlags(fss.FlagSet("auditing"), s.AuditingOptions)
|
||||
|
||||
fs = fss.FlagSet("klog")
|
||||
local := flag.NewFlagSet("klog", flag.ExitOnError)
|
||||
@@ -169,6 +171,14 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS
|
||||
apiServer.EventsClient = eventsClient
|
||||
}
|
||||
|
||||
if s.AuditingOptions.Host != "" {
|
||||
auditingClient, err := auditingclient.NewClient(s.AuditingOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
apiServer.AuditingClient = auditingClient
|
||||
}
|
||||
|
||||
if s.OpenPitrixOptions != nil {
|
||||
opClient, err := openpitrix.NewClient(s.OpenPitrixOptions)
|
||||
if err != nil {
|
||||
|
||||
@@ -17,6 +17,7 @@ func (s *ServerRunOptions) Validate() []error {
|
||||
errors = append(errors, s.LoggingOptions.Validate()...)
|
||||
errors = append(errors, s.AuthorizationOptions.Validate()...)
|
||||
errors = append(errors, s.EventsOptions.Validate()...)
|
||||
errors = append(errors, s.AuditingOptions.Validate()...)
|
||||
|
||||
return errors
|
||||
}
|
||||
|
||||
@@ -26,6 +26,8 @@ import (
|
||||
apiserverconfig "kubesphere.io/kubesphere/pkg/apiserver/config"
|
||||
"kubesphere.io/kubesphere/pkg/utils/signals"
|
||||
"kubesphere.io/kubesphere/pkg/utils/term"
|
||||
|
||||
tracing "kubesphere.io/kubesphere/pkg/kapis/servicemesh/metrics/v1alpha2"
|
||||
)
|
||||
|
||||
func NewAPIServerCommand() *cobra.Command {
|
||||
@@ -90,7 +92,10 @@ func initializeServicemeshConfig(s *options.ServerRunOptions) {
|
||||
// Initialize kiali config
|
||||
config := kconfig.NewConfig()
|
||||
|
||||
//tracing.JaegerQueryUrl = s.ServiceMeshOptions.JaegerQueryHost
|
||||
// Config jaeger query endpoint address
|
||||
if s.ServiceMeshOptions != nil && len(s.ServiceMeshOptions.JaegerQueryHost) != 0 {
|
||||
tracing.JaegerQueryUrl = s.ServiceMeshOptions.JaegerQueryHost
|
||||
}
|
||||
|
||||
// Exclude system namespaces
|
||||
config.API.Namespaces.Exclude = []string{"istio-system", "kubesphere*", "kube*"}
|
||||
|
||||
108
pkg/api/auditing/v1alpha1/types.go
Normal file
108
pkg/api/auditing/v1alpha1/types.go
Normal file
@@ -0,0 +1,108 @@
|
||||
/*
|
||||
Copyright 2020 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
|
||||
import (
|
||||
"github.com/emicklei/go-restful"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type APIResponse struct {
|
||||
Events *auditing.Events `json:"query,omitempty" description:"query results"`
|
||||
Statistics *auditing.Statistics `json:"statistics,omitempty" description:"statistics results"`
|
||||
Histogram *auditing.Histogram `json:"histogram,omitempty" description:"histogram results"`
|
||||
}
|
||||
|
||||
type Query struct {
|
||||
Operation string `json:"operation,omitempty"`
|
||||
WorkspaceFilter string `json:"workspace_filter,omitempty"`
|
||||
WorkspaceSearch string `json:"workspace_search,omitempty"`
|
||||
ObjectRefNamespaceFilter string `json:"objectref_namespace_filter,omitempty"`
|
||||
ObjectRefNamespaceSearch string `json:"objectref_namespace_search,omitempty"`
|
||||
ObjectRefNameFilter string `json:"objectref_name_filter,omitempty"`
|
||||
ObjectRefNameSearch string `json:"objectref_name_search,omitempty"`
|
||||
LevelFilter string `json:"level_filter,omitempty"`
|
||||
VerbFilter string `json:"verb_filter,omitempty"`
|
||||
UserFilter string `json:"user_filter,omitempty"`
|
||||
UserSearch string `json:"user_search,omitempty"`
|
||||
GroupSearch string `json:"group_search,omitempty"`
|
||||
SourceIpSearch string `json:"source_ip_search,omitempty"`
|
||||
ObjectRefResourceFilter string `json:"objectref_resource_filter,omitempty"`
|
||||
ObjectRefSubresourceFilter string `json:"objectref_subresource_filter,omitempty"`
|
||||
ResponesStatusFilter string `json:"response_status_filter,omitempty"`
|
||||
|
||||
StartTime *time.Time `json:"start_time,omitempty"`
|
||||
EndTime *time.Time `json:"end_time,omitempty"`
|
||||
|
||||
Interval string `json:"interval,omitempty"`
|
||||
Sort string `json:"sort,omitempty"`
|
||||
From int64 `json:"from,omitempty"`
|
||||
Size int64 `json:"size,omitempty"`
|
||||
}
|
||||
|
||||
func ParseQueryParameter(req *restful.Request) (*Query, error) {
|
||||
q := &Query{}
|
||||
|
||||
q.Operation = req.QueryParameter("operation")
|
||||
q.WorkspaceFilter = req.QueryParameter("workspace_filter")
|
||||
q.WorkspaceSearch = req.QueryParameter("workspace_search")
|
||||
q.ObjectRefNamespaceFilter = req.QueryParameter("objectref_namespace_filter")
|
||||
q.ObjectRefNamespaceSearch = req.QueryParameter("objectref_namespace_search")
|
||||
q.ObjectRefNameFilter = req.QueryParameter("objectref_name_filter")
|
||||
q.ObjectRefNameSearch = req.QueryParameter("objectref_name_search")
|
||||
q.LevelFilter = req.QueryParameter("level_filter")
|
||||
q.VerbFilter = req.QueryParameter("verb_filter")
|
||||
q.SourceIpSearch = req.QueryParameter("source_ip_search")
|
||||
q.UserFilter = req.QueryParameter("user_filter")
|
||||
q.UserSearch = req.QueryParameter("user_search")
|
||||
q.GroupSearch = req.QueryParameter("group_search")
|
||||
q.ObjectRefResourceFilter = req.QueryParameter("objectref_resource_filter")
|
||||
q.ObjectRefSubresourceFilter = req.QueryParameter("objectref_subresource_filter")
|
||||
q.ResponesStatusFilter = req.QueryParameter("response_status_filter")
|
||||
|
||||
if tstr := req.QueryParameter("start_time"); tstr != "" {
|
||||
sec, err := strconv.ParseInt(tstr, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t := time.Unix(sec, 0)
|
||||
q.StartTime = &t
|
||||
}
|
||||
if tstr := req.QueryParameter("end_time"); tstr != "" {
|
||||
sec, err := strconv.ParseInt(tstr, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t := time.Unix(sec, 0)
|
||||
q.EndTime = &t
|
||||
}
|
||||
if q.Interval = req.QueryParameter("interval"); q.Interval == "" {
|
||||
q.Interval = "15m"
|
||||
}
|
||||
q.From, _ = strconv.ParseInt(req.QueryParameter("from"), 10, 64)
|
||||
size, err := strconv.ParseInt(req.QueryParameter("size"), 10, 64)
|
||||
if err != nil {
|
||||
size = 10
|
||||
}
|
||||
q.Size = size
|
||||
if q.Sort = req.QueryParameter("sort"); q.Sort != "asc" {
|
||||
q.Sort = "desc"
|
||||
}
|
||||
return q, nil
|
||||
}
|
||||
@@ -18,26 +18,38 @@ package api
|
||||
|
||||
import (
|
||||
"github.com/emicklei/go-restful"
|
||||
"k8s.io/klog"
|
||||
"net/http"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
func HandleInternalError(response *restful.Response, req *restful.Request, err error) {
|
||||
response.WriteError(http.StatusInternalServerError, err)
|
||||
_, fn, line, _ := runtime.Caller(1)
|
||||
klog.Errorf("%s:%d %v", fn, line, err)
|
||||
_ = response.WriteError(http.StatusInternalServerError, err)
|
||||
}
|
||||
|
||||
// HandleBadRequest writes http.StatusBadRequest and log error
|
||||
func HandleBadRequest(response *restful.Response, req *restful.Request, err error) {
|
||||
response.WriteError(http.StatusBadRequest, err)
|
||||
_, fn, line, _ := runtime.Caller(1)
|
||||
klog.Errorf("%s:%d %v", fn, line, err)
|
||||
_ = response.WriteError(http.StatusBadRequest, err)
|
||||
}
|
||||
|
||||
func HandleNotFound(response *restful.Response, req *restful.Request, err error) {
|
||||
response.WriteError(http.StatusNotFound, err)
|
||||
_, fn, line, _ := runtime.Caller(1)
|
||||
klog.Errorf("%s:%d %v", fn, line, err)
|
||||
_ = response.WriteError(http.StatusNotFound, err)
|
||||
}
|
||||
|
||||
func HandleForbidden(response *restful.Response, req *restful.Request, err error) {
|
||||
response.WriteError(http.StatusForbidden, err)
|
||||
_, fn, line, _ := runtime.Caller(1)
|
||||
klog.Errorf("%s:%d %v", fn, line, err)
|
||||
_ = response.WriteError(http.StatusForbidden, err)
|
||||
}
|
||||
|
||||
func HandleConflict(response *restful.Response, req *restful.Request, err error) {
|
||||
response.WriteError(http.StatusConflict, err)
|
||||
_, fn, line, _ := runtime.Caller(1)
|
||||
klog.Errorf("%s:%d %v", fn, line, err)
|
||||
_ = response.WriteError(http.StatusConflict, err)
|
||||
}
|
||||
|
||||
@@ -63,6 +63,7 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/kapis/version"
|
||||
"kubesphere.io/kubesphere/pkg/models/iam/am"
|
||||
"kubesphere.io/kubesphere/pkg/models/iam/im"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/cache"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/devops"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/events"
|
||||
@@ -135,6 +136,8 @@ type APIServer struct {
|
||||
SonarClient sonarqube.SonarInterface
|
||||
|
||||
EventsClient events.Client
|
||||
|
||||
AuditingClient auditing.Client
|
||||
}
|
||||
|
||||
func (s *APIServer) PrepareRun() error {
|
||||
@@ -172,7 +175,7 @@ func (s *APIServer) installKubeSphereAPIs() {
|
||||
urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory,
|
||||
s.KubernetesClient.Master()))
|
||||
urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
|
||||
s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient))
|
||||
s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient))
|
||||
urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.KubernetesClient.Config()))
|
||||
urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container,
|
||||
s.InformerFactory.KubernetesSharedInformerFactory(),
|
||||
@@ -241,7 +244,8 @@ func (s *APIServer) buildHandlerChain() {
|
||||
handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})
|
||||
|
||||
if s.Config.MultiClusterOptions.Enable {
|
||||
clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters().Lister())
|
||||
clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters(),
|
||||
s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters().Lister())
|
||||
handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher)
|
||||
}
|
||||
|
||||
@@ -480,10 +484,17 @@ func logStackOnRecover(panicReason interface{}, w http.ResponseWriter) {
|
||||
func logRequestAndResponse(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
|
||||
start := time.Now()
|
||||
chain.ProcessFilter(req, resp)
|
||||
klog.V(4).Infof("%s - \"%s %s %s\" %d %d %dms",
|
||||
|
||||
// Always log error response
|
||||
logWithVerbose := klog.V(4)
|
||||
if resp.StatusCode() > http.StatusBadRequest {
|
||||
logWithVerbose = klog.V(0)
|
||||
}
|
||||
|
||||
logWithVerbose.Infof("%s - \"%s %s %s\" %d %d %dms",
|
||||
getRequestIP(req),
|
||||
req.Request.Method,
|
||||
req.Request.RequestURI,
|
||||
req.Request.URL,
|
||||
req.Request.Proto,
|
||||
resp.StatusCode(),
|
||||
resp.ContentLength(),
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
authoptions "kubesphere.io/kubesphere/pkg/apiserver/authentication/options"
|
||||
authorizationoptions "kubesphere.io/kubesphere/pkg/apiserver/authorization/options"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
|
||||
auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/cache"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
|
||||
eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch"
|
||||
@@ -92,6 +93,7 @@ type Config struct {
|
||||
AuthorizationOptions *authorizationoptions.AuthorizationOptions `json:"authorization,omitempty" yaml:"authorization,omitempty" mapstructure:"authorization"`
|
||||
MultiClusterOptions *multicluster.Options `json:"multicluster,omitempty" yaml:"multicluster,omitempty" mapstructure:"multicluster"`
|
||||
EventsOptions *eventsclient.Options `json:"events,omitempty" yaml:"events,omitempty" mapstructure:"events"`
|
||||
AuditingOptions *auditingclient.Options `json:"auditing,omitempty" yaml:"auditing,omitempty" mapstructure:"auditing"`
|
||||
// Options used for enabling components, not actually used now. Once we switch Alerting/Notification API to kubesphere,
|
||||
// we can add these options to kubesphere command lines
|
||||
AlertingOptions *alerting.Options `json:"alerting,omitempty" yaml:"alerting,omitempty" mapstructure:"alerting"`
|
||||
@@ -118,6 +120,7 @@ func New() *Config {
|
||||
AuthorizationOptions: authorizationoptions.NewAuthorizationOptions(),
|
||||
MultiClusterOptions: multicluster.NewOptions(),
|
||||
EventsOptions: eventsclient.NewElasticSearchOptions(),
|
||||
AuditingOptions: auditingclient.NewElasticSearchOptions(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,8 +189,7 @@ func (conf *Config) stripEmptyOptions() {
|
||||
conf.DevopsOptions = nil
|
||||
}
|
||||
|
||||
if conf.MonitoringOptions != nil && conf.MonitoringOptions.Endpoint == "" &&
|
||||
conf.MonitoringOptions.SecondaryEndpoint == "" {
|
||||
if conf.MonitoringOptions != nil && conf.MonitoringOptions.Endpoint == "" {
|
||||
conf.MonitoringOptions = nil
|
||||
}
|
||||
|
||||
@@ -236,4 +238,8 @@ func (conf *Config) stripEmptyOptions() {
|
||||
if conf.EventsOptions != nil && conf.EventsOptions.Host == "" {
|
||||
conf.EventsOptions = nil
|
||||
}
|
||||
|
||||
if conf.AuditingOptions != nil && conf.AuditingOptions.Host == "" {
|
||||
conf.AuditingOptions = nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
authoptions "kubesphere.io/kubesphere/pkg/apiserver/authentication/options"
|
||||
authorizationoptions "kubesphere.io/kubesphere/pkg/apiserver/authorization/options"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
|
||||
auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing/elasticsearch"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/cache"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/devops/jenkins"
|
||||
eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch"
|
||||
@@ -103,8 +104,7 @@ func newTestConfig() (*Config, error) {
|
||||
WeaveScopeHost: "weave-scope-app.weave.svc",
|
||||
},
|
||||
MonitoringOptions: &prometheus.Options{
|
||||
Endpoint: "http://prometheus.kubesphere-monitoring-system.svc",
|
||||
SecondaryEndpoint: "http://prometheus.kubesphere-monitoring-system.svc",
|
||||
Endpoint: "http://prometheus.kubesphere-monitoring-system.svc",
|
||||
},
|
||||
LoggingOptions: &elasticsearch.Options{
|
||||
Host: "http://elasticsearch-logging.kubesphere-logging-system.svc:9200",
|
||||
@@ -146,6 +146,11 @@ func newTestConfig() (*Config, error) {
|
||||
IndexPrefix: "ks-logstash-events",
|
||||
Version: "6",
|
||||
},
|
||||
AuditingOptions: &auditingclient.Options{
|
||||
Host: "http://elasticsearch-logging-data.kubesphere-logging-system.svc:9200",
|
||||
IndexPrefix: "ks-logstash-auditing",
|
||||
Version: "6",
|
||||
},
|
||||
}
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
@@ -22,32 +22,75 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/klog"
|
||||
clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/request"
|
||||
"kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
|
||||
clusterinformer "kubesphere.io/kubesphere/pkg/client/informers/externalversions/cluster/v1alpha1"
|
||||
clusterlister "kubesphere.io/kubesphere/pkg/client/listers/cluster/v1alpha1"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const proxyURLFormat = "/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:/proxy%s"
|
||||
|
||||
// Dispatcher defines how to forward request to designated cluster based on cluster name
|
||||
// This should only be used in host cluster when multicluster mode enabled, use in any other cases may cause
|
||||
// unexpected behavior
|
||||
type Dispatcher interface {
|
||||
Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler)
|
||||
}
|
||||
|
||||
type innerCluster struct {
|
||||
kubernetesURL *url.URL
|
||||
kubesphereURL *url.URL
|
||||
transport http.RoundTripper
|
||||
}
|
||||
|
||||
type clusterDispatch struct {
|
||||
clusterLister v1alpha1.ClusterLister
|
||||
clusterLister clusterlister.ClusterLister
|
||||
|
||||
// dispatcher will build a in memory cluster cache to speed things up
|
||||
innerClusters map[string]*innerCluster
|
||||
|
||||
clusterInformerSynced cache.InformerSynced
|
||||
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
func NewClusterDispatch(clusterLister v1alpha1.ClusterLister) Dispatcher {
|
||||
return &clusterDispatch{
|
||||
func NewClusterDispatch(clusterInformer clusterinformer.ClusterInformer, clusterLister clusterlister.ClusterLister) Dispatcher {
|
||||
clusterDispatcher := &clusterDispatch{
|
||||
clusterLister: clusterLister,
|
||||
innerClusters: make(map[string]*innerCluster),
|
||||
mutex: sync.RWMutex{},
|
||||
}
|
||||
|
||||
clusterDispatcher.clusterInformerSynced = clusterInformer.Informer().HasSynced
|
||||
clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: clusterDispatcher.updateInnerClusters,
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
clusterDispatcher.updateInnerClusters(newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
cluster := obj.(*clusterv1alpha1.Cluster)
|
||||
clusterDispatcher.mutex.Lock()
|
||||
if _, ok := clusterDispatcher.innerClusters[cluster.Name]; ok {
|
||||
delete(clusterDispatcher.innerClusters, cluster.Name)
|
||||
}
|
||||
clusterDispatcher.mutex.Unlock()
|
||||
|
||||
},
|
||||
})
|
||||
|
||||
return clusterDispatcher
|
||||
}
|
||||
|
||||
// Dispatch dispatch requests to designated cluster
|
||||
func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, handler http.Handler) {
|
||||
|
||||
info, _ := request.RequestInfoFrom(req.Context())
|
||||
|
||||
if len(info.Cluster) == 0 {
|
||||
@@ -74,21 +117,47 @@ func (c *clusterDispatch) Dispatch(w http.ResponseWriter, req *http.Request, han
|
||||
}
|
||||
|
||||
if !isClusterReady(cluster) {
|
||||
http.Error(w, fmt.Sprintf("cluster agent is not ready"), http.StatusInternalServerError)
|
||||
http.Error(w, fmt.Sprintf("cluster is not ready"), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
endpoint, err := url.Parse(cluster.Spec.Connection.KubeSphereAPIEndpoint)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
innCluster := c.getInnerCluster(cluster.Name)
|
||||
if innCluster == nil {
|
||||
http.Error(w, fmt.Sprintf("cluster not ready"), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
transport := http.DefaultTransport
|
||||
|
||||
// change request host to actually cluster hosts
|
||||
u := *req.URL
|
||||
u.Host = endpoint.Host
|
||||
u.Path = strings.Replace(u.Path, fmt.Sprintf("/clusters/%s", info.Cluster), "", 1)
|
||||
|
||||
httpProxy := proxy.NewUpgradeAwareHandler(&u, http.DefaultTransport, true, false, c)
|
||||
// if cluster connection is direct and kubesphere apiserver endpoint is empty
|
||||
// we use kube-apiserver proxy way
|
||||
if cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeDirect &&
|
||||
len(cluster.Spec.Connection.KubeSphereAPIEndpoint) == 0 {
|
||||
|
||||
u.Scheme = innCluster.kubernetesURL.Scheme
|
||||
u.Host = innCluster.kubernetesURL.Host
|
||||
u.Path = fmt.Sprintf(proxyURLFormat, u.Path)
|
||||
transport = innCluster.transport
|
||||
|
||||
// The reason we need this is kube-apiserver doesn't behave like a standard proxy, it will strip
|
||||
// authorization header of proxy requests. Use custom header to avoid stripping by kube-apiserver.
|
||||
// https://github.com/kubernetes/kubernetes/issues/38775#issuecomment-277915961
|
||||
// We first copy req.Header['Authorization'] to req.Header['X-KubeSphere-Authorization'] before sending
|
||||
// designated cluster kube-apiserver, then copy req.Header['X-KubeSphere-Authorization'] to
|
||||
// req.Header['Authorization'] before authentication.
|
||||
req.Header.Set("X-KubeSphere-Authorization", req.Header.Get("Authorization"))
|
||||
} else {
|
||||
// everything else goes to ks-apiserver, since our ks-apiserver has the ability to proxy kube-apiserver requests
|
||||
|
||||
u.Host = innCluster.kubesphereURL.Host
|
||||
u.Scheme = innCluster.kubesphereURL.Scheme
|
||||
}
|
||||
|
||||
httpProxy := proxy.NewUpgradeAwareHandler(&u, transport, false, false, c)
|
||||
httpProxy.ServeHTTP(w, req)
|
||||
}
|
||||
|
||||
@@ -96,6 +165,57 @@ func (c *clusterDispatch) Error(w http.ResponseWriter, req *http.Request, err er
|
||||
responsewriters.InternalError(w, req, err)
|
||||
}
|
||||
|
||||
func (c *clusterDispatch) getInnerCluster(name string) *innerCluster {
|
||||
c.mutex.RLock()
|
||||
defer c.mutex.RUnlock()
|
||||
if cluster, ok := c.innerClusters[name]; ok {
|
||||
return cluster
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clusterDispatch) updateInnerClusters(obj interface{}) {
|
||||
cluster := obj.(*clusterv1alpha1.Cluster)
|
||||
|
||||
kubernetesEndpoint, err := url.Parse(cluster.Spec.Connection.KubernetesAPIEndpoint)
|
||||
if err != nil {
|
||||
klog.Errorf("Parse kubernetes apiserver endpoint %s failed, %v", cluster.Spec.Connection.KubernetesAPIEndpoint, err)
|
||||
return
|
||||
}
|
||||
|
||||
kubesphereEndpoint, err := url.Parse(cluster.Spec.Connection.KubeSphereAPIEndpoint)
|
||||
if err != nil {
|
||||
klog.Errorf("Parse kubesphere apiserver endpoint %s failed, %v", cluster.Spec.Connection.KubeSphereAPIEndpoint, err)
|
||||
return
|
||||
}
|
||||
|
||||
// prepare for
|
||||
clientConfig, err := clientcmd.NewClientConfigFromBytes(cluster.Spec.Connection.KubeConfig)
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to create client config from kubeconfig bytes, %#v", err)
|
||||
return
|
||||
}
|
||||
|
||||
clusterConfig, err := clientConfig.ClientConfig()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get client config, %#v", err)
|
||||
return
|
||||
}
|
||||
|
||||
transport, err := rest.TransportFor(clusterConfig)
|
||||
if err != nil {
|
||||
klog.Errorf("Create transport failed, %v", err)
|
||||
}
|
||||
|
||||
c.mutex.Lock()
|
||||
c.innerClusters[cluster.Name] = &innerCluster{
|
||||
kubernetesURL: kubernetesEndpoint,
|
||||
kubesphereURL: kubesphereEndpoint,
|
||||
transport: transport,
|
||||
}
|
||||
c.mutex.Unlock()
|
||||
}
|
||||
|
||||
func isClusterReady(cluster *clusterv1alpha1.Cluster) bool {
|
||||
for _, condition := range cluster.Status.Conditions {
|
||||
if condition.Type == clusterv1alpha1.ClusterReady && condition.Status == corev1.ConditionTrue {
|
||||
|
||||
1
pkg/apiserver/dispatch/dispatch_test.go
Normal file
1
pkg/apiserver/dispatch/dispatch_test.go
Normal file
@@ -0,0 +1 @@
|
||||
package dispatch
|
||||
@@ -32,6 +32,19 @@ func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver)
|
||||
return
|
||||
}
|
||||
|
||||
// KubeSphere supports kube-apiserver proxy requests in multicluster mode. But kube-apiserver
|
||||
// stripped all authorization headers. Use custom header to carry token to avoid losing authentication token.
|
||||
// We may need a better way. See issue below.
|
||||
// https://github.com/kubernetes/kubernetes/issues/38775#issuecomment-277915961
|
||||
authorization := req.Header.Get("Authorization")
|
||||
if len(authorization) == 0 {
|
||||
xAuthorization := req.Header.Get("X-KubeSphere-Authorization")
|
||||
if len(xAuthorization) != 0 {
|
||||
req.Header.Set("Authorization", xAuthorization)
|
||||
req.Header.Del("X-KubeSphere-Authorization")
|
||||
}
|
||||
}
|
||||
|
||||
req = req.WithContext(request.WithRequestInfo(ctx, info))
|
||||
handler.ServeHTTP(w, req)
|
||||
})
|
||||
|
||||
@@ -83,6 +83,7 @@ const (
|
||||
LogQueryTag = "Log Query"
|
||||
TerminalTag = "Terminal"
|
||||
EventsQueryTag = "Events Query"
|
||||
AuditingQueryTag = "Auditing Query"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -388,6 +388,11 @@ func (c *ClusterController) syncCluster(key string) error {
|
||||
// is safe.
|
||||
if isConditionTrue(cluster, clusterv1alpha1.ClusterAgentAvailable) ||
|
||||
cluster.Spec.Connection.Type == clusterv1alpha1.ConnectionTypeDirect {
|
||||
|
||||
if len(cluster.Spec.Connection.KubernetesAPIEndpoint) == 0 {
|
||||
cluster.Spec.Connection.KubernetesAPIEndpoint = clusterConfig.Host
|
||||
}
|
||||
|
||||
version, err := clientSet.Discovery().ServerVersion()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get kubernetes version, %#v", err)
|
||||
|
||||
@@ -228,7 +228,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
if _, ok := copySecret.Annotations[devopsv1alpha3.CredentialAutoSyncAnnoKey]; ok {
|
||||
_, err := c.devopsClient.UpdateCredentialInProject(nsName, copySecret)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to update secret %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to update secret %s ", key))
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -238,7 +238,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
// Finalizers processing logic
|
||||
if sliceutil.HasString(copySecret.ObjectMeta.Finalizers, devopsv1alpha3.CredentialFinalizerName) {
|
||||
if _, err := c.devopsClient.DeleteCredentialInProject(nsName, secret.Name); err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to delete secret %s in devops", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to delete secret %s in devops", key))
|
||||
return err
|
||||
}
|
||||
copySecret.ObjectMeta.Finalizers = sliceutil.RemoveString(copySecret.ObjectMeta.Finalizers, func(item string) bool {
|
||||
@@ -250,7 +250,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
if !reflect.DeepEqual(secret, copySecret) {
|
||||
_, err = c.client.CoreV1().Secrets(nsName).Update(copySecret)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to update secret %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to update secret %s ", key))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -185,7 +185,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
klog.Info(fmt.Sprintf("devopsproject '%s' in work queue no longer exists ", key))
|
||||
return nil
|
||||
}
|
||||
klog.Error(err, fmt.Sprintf("could not get devopsproject %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("could not get devopsproject %s ", key))
|
||||
return err
|
||||
}
|
||||
copyProject := project.DeepCopy()
|
||||
@@ -200,14 +200,14 @@ func (c *Controller) syncHandler(key string) error {
|
||||
if project.Status.AdminNamespace != "" {
|
||||
ns, err := c.namespaceLister.Get(project.Status.AdminNamespace)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
klog.Error(err, fmt.Sprintf("faild to get namespace"))
|
||||
klog.V(8).Info(err, fmt.Sprintf("faild to get namespace"))
|
||||
return err
|
||||
} else if errors.IsNotFound(err) {
|
||||
// if admin ns is not found, clean project status, rerun reconcile
|
||||
copyProject.Status.AdminNamespace = ""
|
||||
_, err := c.kubesphereClient.DevopsV1alpha3().DevOpsProjects().Update(copyProject)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to update project %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to update project %s ", key))
|
||||
return err
|
||||
}
|
||||
c.enqueueDevOpsProject(key)
|
||||
@@ -222,13 +222,13 @@ func (c *Controller) syncHandler(key string) error {
|
||||
copyNs := ns.DeepCopy()
|
||||
err := controllerutil.SetControllerReference(copyProject, copyNs, scheme.Scheme)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to set ownerreference %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to set ownerreference %s ", key))
|
||||
return err
|
||||
}
|
||||
copyNs.Labels[constants.DevOpsProjectLabelKey] = project.Name
|
||||
_, err = c.client.CoreV1().Namespaces().Update(copyNs)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to update ns %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to update ns %s ", key))
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -238,7 +238,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
namespaces, err := c.namespaceLister.List(
|
||||
labels.SelectorFromSet(labels.Set{constants.DevOpsProjectLabelKey: project.Name}))
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to list ns %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to list ns %s ", key))
|
||||
return err
|
||||
}
|
||||
// if there is no ns, generate new one
|
||||
@@ -246,7 +246,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
ns := c.generateNewNamespace(project)
|
||||
ns, err := c.client.CoreV1().Namespaces().Create(ns)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to create ns %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to create ns %s ", key))
|
||||
return err
|
||||
}
|
||||
copyProject.Status.AdminNamespace = ns.Name
|
||||
@@ -258,13 +258,13 @@ func (c *Controller) syncHandler(key string) error {
|
||||
copyNs := ns.DeepCopy()
|
||||
err := controllerutil.SetControllerReference(copyProject, copyNs, scheme.Scheme)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to set ownerreference %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to set ownerreference %s ", key))
|
||||
return err
|
||||
}
|
||||
copyNs.Labels[constants.DevOpsProjectLabelKey] = project.Name
|
||||
_, err = c.client.CoreV1().Namespaces().Update(copyNs)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to update ns %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to update ns %s ", key))
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -275,7 +275,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
if !reflect.DeepEqual(copyProject, project) {
|
||||
_, err := c.kubesphereClient.DevopsV1alpha3().DevOpsProjects().Update(copyProject)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to update ns %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to update ns %s ", key))
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -285,7 +285,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
klog.Error(err, fmt.Sprintf("failed to get project %s ", key))
|
||||
_, err := c.devopsClient.CreateDevOpsProject(copyProject.Status.AdminNamespace)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to get project %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to get project %s ", key))
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -294,7 +294,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
// Finalizers processing logic
|
||||
if sliceutil.HasString(project.ObjectMeta.Finalizers, devopsv1alpha3.DevOpsProjectFinalizerName) {
|
||||
if err := c.deleteDevOpsProjectInDevOps(project); err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to delete resource %s in devops", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to delete resource %s in devops", key))
|
||||
return err
|
||||
}
|
||||
project.ObjectMeta.Finalizers = sliceutil.RemoveString(project.ObjectMeta.Finalizers, func(item string) bool {
|
||||
@@ -303,7 +303,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
|
||||
_, err = c.kubesphereClient.DevopsV1alpha3().DevOpsProjects().Update(project)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to update project %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to update project %s ", key))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,7 +186,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
klog.Info(fmt.Sprintf("namespace '%s' in work queue no longer exists ", key))
|
||||
return nil
|
||||
}
|
||||
klog.Error(err, fmt.Sprintf("could not get namespace %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("could not get namespace %s ", key))
|
||||
return err
|
||||
}
|
||||
if !isDevOpsProjectAdminNamespace(namespace) {
|
||||
@@ -198,7 +198,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
pipeline, err := c.devOpsProjectLister.Pipelines(nsName).Get(name)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
klog.Info(fmt.Sprintf("copyPipeline '%s' in work queue no longer exists ", key))
|
||||
klog.V(8).Info(fmt.Sprintf("copyPipeline '%s' in work queue no longer exists ", key))
|
||||
return nil
|
||||
}
|
||||
klog.Error(err, fmt.Sprintf("could not get copyPipeline %s ", key))
|
||||
@@ -220,14 +220,14 @@ func (c *Controller) syncHandler(key string) error {
|
||||
if !reflect.DeepEqual(jenkinsPipeline.Spec, copyPipeline.Spec) {
|
||||
_, err := c.devopsClient.UpdateProjectPipeline(nsName, copyPipeline)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to update pipeline config %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to update pipeline config %s ", key))
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_, err := c.devopsClient.CreateProjectPipeline(nsName, copyPipeline)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to create copyPipeline %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to create copyPipeline %s ", key))
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -236,7 +236,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
// Finalizers processing logic
|
||||
if sliceutil.HasString(copyPipeline.ObjectMeta.Finalizers, devopsv1alpha3.PipelineFinalizerName) {
|
||||
if _, err := c.devopsClient.DeleteProjectPipeline(nsName, pipeline.Name); err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to delete pipeline %s in devops", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to delete pipeline %s in devops", key))
|
||||
}
|
||||
copyPipeline.ObjectMeta.Finalizers = sliceutil.RemoveString(copyPipeline.ObjectMeta.Finalizers, func(item string) bool {
|
||||
return item == devopsv1alpha3.PipelineFinalizerName
|
||||
@@ -247,7 +247,7 @@ func (c *Controller) syncHandler(key string) error {
|
||||
if !reflect.DeepEqual(pipeline, copyPipeline) {
|
||||
_, err = c.kubesphereClient.DevopsV1alpha3().Pipelines(nsName).Update(copyPipeline)
|
||||
if err != nil {
|
||||
klog.Error(err, fmt.Sprintf("failed to update pipeline %s ", key))
|
||||
klog.V(8).Info(err, fmt.Sprintf("failed to update pipeline %s ", key))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,7 +236,14 @@ func (h *handler) ValidateCluster(request *restful.Request, response *restful.Re
|
||||
return
|
||||
}
|
||||
|
||||
_, err = validateKubeSphereAPIServer(cluster.Spec.Connection.KubeSphereAPIEndpoint)
|
||||
// kubesphere apiserver endpoint not provided, that's allowed
|
||||
// Cluster dispatcher will use kube-apiserver proxy instead
|
||||
if len(cluster.Spec.Connection.KubeSphereAPIEndpoint) == 0 {
|
||||
response.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = validateKubeSphereAPIServer(cluster.Spec.Connection.KubeSphereAPIEndpoint, cluster.Spec.Connection.KubeConfig)
|
||||
if err != nil {
|
||||
api.HandleBadRequest(response, request, fmt.Errorf("unable validate kubesphere endpoint, %v", err))
|
||||
return
|
||||
@@ -279,16 +286,36 @@ func loadKubeConfigFromBytes(kubeconfig []byte) (*rest.Config, error) {
|
||||
}
|
||||
|
||||
// validateKubeSphereAPIServer uses version api to check the accessibility
|
||||
func validateKubeSphereAPIServer(ksEndpoint string) (*version.Info, error) {
|
||||
_, err := url.Parse(ksEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// If ksEndpoint is empty, use
|
||||
func validateKubeSphereAPIServer(ksEndpoint string, kubeconfig []byte) (*version.Info, error) {
|
||||
if len(ksEndpoint) == 0 && len(kubeconfig) == 0 {
|
||||
return nil, fmt.Errorf("neither kubesphere api endpoint nor kubeconfig was provided")
|
||||
}
|
||||
|
||||
client := http.Client{
|
||||
Timeout: defaultTimeout,
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("%s/kapis/version", ksEndpoint)
|
||||
|
||||
client := http.Client{
|
||||
Timeout: defaultTimeout,
|
||||
if len(ksEndpoint) != 0 {
|
||||
_, err := url.Parse(ksEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
config, err := loadKubeConfigFromBytes(kubeconfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
transport, err := rest.TransportFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client.Transport = transport
|
||||
path = fmt.Sprintf("%s/api/v1/namespaces/kubesphere-system/services/:ks-apiserver:/proxy/kapis/version", config.Host)
|
||||
}
|
||||
|
||||
response, err := client.Get(path)
|
||||
|
||||
@@ -263,7 +263,7 @@ func TestValidateKubeSphereEndpoint(t *testing.T) {
|
||||
svr := httptest.NewServer(http.HandlerFunc(endpoint))
|
||||
defer svr.Close()
|
||||
|
||||
got, err := validateKubeSphereAPIServer(svr.URL)
|
||||
got, err := validateKubeSphereAPIServer(svr.URL, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -10,7 +10,8 @@ import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
var JaegerQueryUrl = "http://jaeger-query.istio-system.svc:16686/jaeger"
|
||||
// default jaeger query api endpoint address
|
||||
var JaegerQueryUrl = "http://jaeger-query.istio-system.svc:16686"
|
||||
|
||||
// Get app metrics
|
||||
func getAppMetrics(request *restful.Request, response *restful.Response) {
|
||||
@@ -84,9 +85,10 @@ func getServiceTracing(request *restful.Request, response *restful.Response) {
|
||||
url := fmt.Sprintf("%s/api/traces?%s&service=%s", JaegerQueryUrl, request.Request.URL.RawQuery, serviceName)
|
||||
|
||||
resp, err := http.Get(url)
|
||||
klog.V(4).Infof("Proxy trace request to %s", url)
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("query jaeger faile with err %v", err)
|
||||
klog.Errorf("query jaeger failed with err %v", err)
|
||||
api.HandleInternalError(response, nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/pkg/api"
|
||||
auditingv1alpha1 "kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1"
|
||||
eventsv1alpha1 "kubesphere.io/kubesphere/pkg/api/events/v1alpha1"
|
||||
loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2"
|
||||
tenantv1alpha2 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha2"
|
||||
@@ -17,6 +18,7 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
"kubesphere.io/kubesphere/pkg/models/tenant"
|
||||
servererr "kubesphere.io/kubesphere/pkg/server/errors"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/events"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/logging"
|
||||
)
|
||||
@@ -25,10 +27,10 @@ type tenantHandler struct {
|
||||
tenant tenant.Interface
|
||||
}
|
||||
|
||||
func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface) *tenantHandler {
|
||||
func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface, auditingclient auditing.Client) *tenantHandler {
|
||||
|
||||
return &tenantHandler{
|
||||
tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient),
|
||||
tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,3 +284,29 @@ func (h *tenantHandler) QueryLogs(req *restful.Request, resp *restful.Response)
|
||||
resp.WriteAsJson(result)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *tenantHandler) Auditing(req *restful.Request, resp *restful.Response) {
|
||||
user, ok := request.UserFrom(req.Request.Context())
|
||||
if !ok {
|
||||
err := fmt.Errorf("cannot obtain user info")
|
||||
klog.Errorln(err)
|
||||
api.HandleForbidden(resp, req, err)
|
||||
return
|
||||
}
|
||||
queryParam, err := auditingv1alpha1.ParseQueryParameter(req)
|
||||
if err != nil {
|
||||
klog.Errorln(err)
|
||||
api.HandleInternalError(resp, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.tenant.Auditing(user, queryParam)
|
||||
if err != nil {
|
||||
klog.Errorln(err)
|
||||
api.HandleInternalError(resp, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
_ = resp.WriteEntity(result)
|
||||
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"kubesphere.io/kubesphere/pkg/api"
|
||||
auditingv1alpha1 "kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1"
|
||||
eventsv1alpha1 "kubesphere.io/kubesphere/pkg/api/events/v1alpha1"
|
||||
loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2"
|
||||
tenantv1alpha2 "kubesphere.io/kubesphere/pkg/apis/tenant/v1alpha2"
|
||||
@@ -32,6 +33,7 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
"kubesphere.io/kubesphere/pkg/models"
|
||||
"kubesphere.io/kubesphere/pkg/server/errors"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/events"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/logging"
|
||||
"net/http"
|
||||
@@ -43,9 +45,9 @@ const (
|
||||
|
||||
var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha2"}
|
||||
|
||||
func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface) error {
|
||||
func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Interface, auditingclient auditing.Client) error {
|
||||
ws := runtime.NewWebService(GroupVersion)
|
||||
handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient)
|
||||
handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient)
|
||||
|
||||
ws.Route(ws.POST("/workspaces").
|
||||
To(handler.CreateWorkspace).
|
||||
@@ -146,6 +148,35 @@ func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8s
|
||||
Consumes(restful.MIME_JSON, restful.MIME_XML).
|
||||
Produces(restful.MIME_JSON, "text/plain")
|
||||
|
||||
ws.Route(ws.GET("/auditing/events").
|
||||
To(handler.Auditing).
|
||||
Doc("Query auditing events against the cluster").
|
||||
Param(ws.QueryParameter("operation", "Operation type. This can be one of three types: `query` (for querying events), `statistics` (for retrieving statistical data), `histogram` (for displaying events count by time interval). Defaults to query.").DefaultValue("query")).
|
||||
Param(ws.QueryParameter("workspace_filter", "A comma-separated list of workspaces. This field restricts the query to specified workspaces. For example, the following filter matches the workspace my-ws and demo-ws: `my-ws,demo-ws`.")).
|
||||
Param(ws.QueryParameter("workspace_search", "A comma-separated list of keywords. Differing from **workspace_filter**, this field performs fuzzy matching on workspaces. For example, the following value limits the query to workspaces whose name contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.")).
|
||||
Param(ws.QueryParameter("objectref_namespace_filter", "A comma-separated list of namespaces. This field restricts the query to specified `ObjectRef.Namespace`.")).
|
||||
Param(ws.QueryParameter("objectref_namespace_search", "A comma-separated list of keywords. Differing from **objectref_namespace_filter**, this field performs fuzzy matching on `ObjectRef.Namespace`.")).
|
||||
Param(ws.QueryParameter("objectref_name_filter", "A comma-separated list of names. This field restricts the query to specified `ObjectRef.Name`.")).
|
||||
Param(ws.QueryParameter("objectref_name_search", "A comma-separated list of keywords. Differing from **objectref_name_filter**, this field performs fuzzy matching on `ObjectRef.Name`.")).
|
||||
Param(ws.QueryParameter("level_filter", "A comma-separated list of levels. This know values are Metadata, Request, RequestResponse.")).
|
||||
Param(ws.QueryParameter("verb_filter", "A comma-separated list of verbs. This field restricts the query to specified verb. This field restricts the query to specified `Verb`.")).
|
||||
Param(ws.QueryParameter("user_filter", "A comma-separated list of user. This field restricts the query to specified user. For example, the following filter matches the user user1 and user2: `user1,user2`.")).
|
||||
Param(ws.QueryParameter("user_search", "A comma-separated list of keywords. Differing from **user_filter**, this field performs fuzzy matching on 'User.username'. For example, the following value limits the query to user whose name contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.")).
|
||||
Param(ws.QueryParameter("group_search", "A comma-separated list of keywords. This field performs fuzzy matching on 'User.Groups'. For example, the following value limits the query to group which contains the word my(My,MY,...) *OR* demo(Demo,DemO,...): `my,demo`.")).
|
||||
Param(ws.QueryParameter("source_ip_search", "A comma-separated list of keywords. This field performs fuzzy matching on 'SourceIPs'. For example, the following value limits the query to SourceIPs which contains 127.0 *OR* 192.168.: `127.0,192.168.`.")).
|
||||
Param(ws.QueryParameter("objectref_resource_filter", "A comma-separated list of resource. This field restricts the query to specified ip. This field restricts the query to specified `ObjectRef.Resource`.")).
|
||||
Param(ws.QueryParameter("objectref_subresource_filter", "A comma-separated list of subresource. This field restricts the query to specified subresource. This field restricts the query to specified `ObjectRef.Subresource`.")).
|
||||
Param(ws.QueryParameter("response_status_filter", "A comma-separated list of response status code. This field restricts the query to specified response status code. This field restricts the query to specified `ResponseStatus.Code`.")).
|
||||
Param(ws.QueryParameter("start_time", "Start time of query (limits `RequestReceivedTimestamp`). The format is a string representing seconds since the epoch, eg. 1136214245.")).
|
||||
Param(ws.QueryParameter("end_time", "End time of query (limits `RequestReceivedTimestamp`). The format is a string representing seconds since the epoch, eg. 1136214245.")).
|
||||
Param(ws.QueryParameter("interval", "Time interval. It requires **operation** is set to `histogram`. The format is [0-9]+[smhdwMqy]. Defaults to 15m (i.e. 15 min).").DefaultValue("15m")).
|
||||
Param(ws.QueryParameter("sort", "Sort order. One of asc, desc. This field sorts events by `RequestReceivedTimestamp`.").DataType("string").DefaultValue("desc")).
|
||||
Param(ws.QueryParameter("from", "The offset from the result set. This field returns query results from the specified offset. It requires **operation** is set to `query`. Defaults to 0 (i.e. from the beginning of the result set).").DataType("integer").DefaultValue("0").Required(false)).
|
||||
Param(ws.QueryParameter("size", "Size of result set to return. It requires **operation** is set to `query`. Defaults to 10 (i.e. 10 event records).").DataType("integer").DefaultValue("10").Required(false)).
|
||||
Metadata(restfulspec.KeyOpenAPITags, []string{constants.AuditingQueryTag}).
|
||||
Writes(auditingv1alpha1.APIResponse{}).
|
||||
Returns(http.StatusOK, api.StatusOK, auditingv1alpha1.APIResponse{}))
|
||||
|
||||
c.Add(ws)
|
||||
return nil
|
||||
}
|
||||
|
||||
94
pkg/models/auditing/events.go
Normal file
94
pkg/models/auditing/events.go
Normal file
@@ -0,0 +1,94 @@
|
||||
/*
|
||||
Copyright 2020 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package auditing
|
||||
|
||||
import (
|
||||
"kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
|
||||
"kubesphere.io/kubesphere/pkg/utils/stringutils"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type Interface interface {
|
||||
Events(queryParam *v1alpha1.Query, MutateFilterFunc func(*auditing.Filter)) (*v1alpha1.APIResponse, error)
|
||||
}
|
||||
|
||||
type eventsOperator struct {
|
||||
client auditing.Client
|
||||
}
|
||||
|
||||
func NewEventsOperator(client auditing.Client) Interface {
|
||||
return &eventsOperator{client}
|
||||
}
|
||||
|
||||
func (eo *eventsOperator) Events(queryParam *v1alpha1.Query,
|
||||
MutateFilterFunc func(*auditing.Filter)) (*v1alpha1.APIResponse, error) {
|
||||
filter := &auditing.Filter{
|
||||
ObjectRefNames: stringutils.Split(queryParam.ObjectRefNameFilter, ","),
|
||||
ObjectRefNameFuzzy: stringutils.Split(queryParam.ObjectRefNameSearch, ","),
|
||||
Levels: stringutils.Split(queryParam.LevelFilter, ","),
|
||||
Verbs: stringutils.Split(queryParam.VerbFilter, ","),
|
||||
Users: stringutils.Split(queryParam.UserFilter, ","),
|
||||
UserFuzzy: stringutils.Split(queryParam.UserSearch, ","),
|
||||
GroupFuzzy: stringutils.Split(queryParam.GroupSearch, ","),
|
||||
SourceIpFuzzy: stringutils.Split(queryParam.SourceIpSearch, ","),
|
||||
ObjectRefResources: stringutils.Split(queryParam.ObjectRefResourceFilter, ","),
|
||||
ObjectRefSubresources: stringutils.Split(queryParam.ObjectRefSubresourceFilter, ","),
|
||||
StartTime: queryParam.StartTime,
|
||||
EndTime: queryParam.EndTime,
|
||||
}
|
||||
if MutateFilterFunc != nil {
|
||||
MutateFilterFunc(filter)
|
||||
}
|
||||
|
||||
cs := stringutils.Split(queryParam.ResponesStatusFilter, ",")
|
||||
for _, c := range cs {
|
||||
code, err := strconv.ParseInt(c, 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
filter.ResponseStatus = append(filter.ResponseStatus, int32(code))
|
||||
}
|
||||
|
||||
var ar v1alpha1.APIResponse
|
||||
var err error
|
||||
switch queryParam.Operation {
|
||||
case "histogram":
|
||||
if len(filter.ObjectRefNamespaceMap) == 0 {
|
||||
ar.Histogram = &auditing.Histogram{}
|
||||
} else {
|
||||
ar.Histogram, err = eo.client.CountOverTime(filter, queryParam.Interval)
|
||||
}
|
||||
case "statistics":
|
||||
if len(filter.ObjectRefNamespaceMap) == 0 {
|
||||
ar.Statistics = &auditing.Statistics{}
|
||||
} else {
|
||||
ar.Statistics, err = eo.client.StatisticsOnResources(filter)
|
||||
}
|
||||
default:
|
||||
if len(filter.ObjectRefNamespaceMap) == 0 {
|
||||
ar.Events = &auditing.Events{}
|
||||
} else {
|
||||
ar.Events, err = eo.client.SearchAuditingEvent(filter, queryParam.From, queryParam.Size, queryParam.Sort)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ar, nil
|
||||
}
|
||||
@@ -44,6 +44,7 @@ const (
|
||||
ingressControllerFolder = "/etc/kubesphere/ingress-controller"
|
||||
ingressControllerPrefix = "kubesphere-router-"
|
||||
ingressControllerNamespace = "kubesphere-controls-system"
|
||||
configMapSuffix = "-nginx"
|
||||
)
|
||||
|
||||
type RouterOperator interface {
|
||||
@@ -317,7 +318,7 @@ func (c *routerOperator) createOrUpdateRouterWorkload(namespace string, publishS
|
||||
deployment.Spec.Template.Labels["project"] = namespace
|
||||
|
||||
// Add configmap
|
||||
deployment.Spec.Template.Spec.Containers[0].Args = append(deployment.Spec.Template.Spec.Containers[0].Args, "--configmap=$(POD_NAMESPACE)/"+deployment.Name)
|
||||
deployment.Spec.Template.Spec.Containers[0].Args = append(deployment.Spec.Template.Spec.Containers[0].Args, "--configmap=$(POD_NAMESPACE)/"+deployment.Name+configMapSuffix)
|
||||
|
||||
// Isolate namespace
|
||||
deployment.Spec.Template.Spec.Containers[0].Args = append(deployment.Spec.Template.Spec.Containers[0].Args, "--watch-namespace="+namespace)
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog"
|
||||
"kubesphere.io/kubesphere/pkg/api"
|
||||
auditingv1alpha1 "kubesphere.io/kubesphere/pkg/api/auditing/v1alpha1"
|
||||
eventsv1alpha1 "kubesphere.io/kubesphere/pkg/api/events/v1alpha1"
|
||||
loggingv1alpha2 "kubesphere.io/kubesphere/pkg/api/logging/v1alpha2"
|
||||
clusterv1alpha1 "kubesphere.io/kubesphere/pkg/apis/cluster/v1alpha1"
|
||||
@@ -37,11 +38,13 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/query"
|
||||
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
"kubesphere.io/kubesphere/pkg/models/auditing"
|
||||
"kubesphere.io/kubesphere/pkg/models/events"
|
||||
"kubesphere.io/kubesphere/pkg/models/iam/am"
|
||||
"kubesphere.io/kubesphere/pkg/models/logging"
|
||||
resources "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3"
|
||||
resourcesv1alpha3 "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/resource"
|
||||
auditingclient "kubesphere.io/kubesphere/pkg/simple/client/auditing"
|
||||
eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events"
|
||||
loggingclient "kubesphere.io/kubesphere/pkg/simple/client/logging"
|
||||
"kubesphere.io/kubesphere/pkg/utils/stringutils"
|
||||
@@ -62,6 +65,7 @@ type Interface interface {
|
||||
Events(user user.Info, queryParam *eventsv1alpha1.Query) (*eventsv1alpha1.APIResponse, error)
|
||||
QueryLogs(user user.Info, query *loggingv1alpha2.Query) (*loggingv1alpha2.APIResponse, error)
|
||||
ExportLogs(user user.Info, query *loggingv1alpha2.Query, writer io.Writer) error
|
||||
Auditing(user user.Info, queryParam *auditingv1alpha1.Query) (*auditingv1alpha1.APIResponse, error)
|
||||
}
|
||||
|
||||
type tenantOperator struct {
|
||||
@@ -72,9 +76,10 @@ type tenantOperator struct {
|
||||
resourceGetter *resourcesv1alpha3.ResourceGetter
|
||||
events events.Interface
|
||||
lo logging.LoggingOperator
|
||||
auditing auditing.Interface
|
||||
}
|
||||
|
||||
func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Interface) Interface {
|
||||
func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Interface, auditingclient auditingclient.Client) Interface {
|
||||
amOperator := am.NewReadOnlyOperator(informers)
|
||||
authorizer := authorizerfactory.NewRBACAuthorizer(amOperator)
|
||||
return &tenantOperator{
|
||||
@@ -85,6 +90,7 @@ func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ks
|
||||
ksclient: ksclient,
|
||||
events: events.NewEventsOperator(evtsClient),
|
||||
lo: logging.NewLoggingOperator(loggingClient),
|
||||
auditing: auditing.NewEventsOperator(auditingclient),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -523,6 +529,48 @@ func (t *tenantOperator) ExportLogs(user user.Info, query *loggingv1alpha2.Query
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tenantOperator) Auditing(user user.Info, queryParam *auditingv1alpha1.Query) (*auditingv1alpha1.APIResponse, error) {
|
||||
iNamespaces, err := t.listIntersectedNamespaces(user,
|
||||
stringutils.Split(queryParam.WorkspaceFilter, ","),
|
||||
stringutils.Split(queryParam.WorkspaceSearch, ","),
|
||||
stringutils.Split(queryParam.ObjectRefNamespaceFilter, ","),
|
||||
stringutils.Split(queryParam.ObjectRefNamespaceSearch, ","))
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
namespaceCreateTimeMap := make(map[string]time.Time)
|
||||
for _, ns := range iNamespaces {
|
||||
namespaceCreateTimeMap[ns.Name] = ns.CreationTimestamp.Time
|
||||
}
|
||||
// If there are no ns and ws query conditions,
|
||||
// those events with empty `ObjectRef.Namespace` will also be listed when user can list all namespaces
|
||||
if len(queryParam.WorkspaceFilter) == 0 && len(queryParam.ObjectRefNamespaceFilter) == 0 &&
|
||||
len(queryParam.WorkspaceSearch) == 0 && len(queryParam.ObjectRefNamespaceSearch) == 0 {
|
||||
listEvts := authorizer.AttributesRecord{
|
||||
User: user,
|
||||
Verb: "list",
|
||||
APIGroup: "",
|
||||
APIVersion: "v1",
|
||||
Resource: "namespaces",
|
||||
ResourceRequest: true,
|
||||
}
|
||||
decision, _, err := t.authorizer.Authorize(listEvts)
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
if decision == authorizer.DecisionAllow {
|
||||
namespaceCreateTimeMap[""] = time.Time{}
|
||||
}
|
||||
}
|
||||
|
||||
return t.auditing.Events(queryParam, func(filter *auditingclient.Filter) {
|
||||
filter.ObjectRefNamespaceMap = namespaceCreateTimeMap
|
||||
})
|
||||
}
|
||||
|
||||
func contains(objects []runtime.Object, object runtime.Object) bool {
|
||||
for _, item := range objects {
|
||||
if item == object {
|
||||
|
||||
@@ -328,5 +328,5 @@ func prepare() Interface {
|
||||
RoleBindings().Informer().GetIndexer().Add(roleBinding)
|
||||
}
|
||||
|
||||
return New(fakeInformerFactory, nil, nil, nil, nil)
|
||||
return New(fakeInformerFactory, nil, nil, nil, nil, nil)
|
||||
}
|
||||
|
||||
171
pkg/simple/client/auditing/elasticsearch/clients.go
Normal file
171
pkg/simple/client/auditing/elasticsearch/clients.go
Normal file
@@ -0,0 +1,171 @@
|
||||
/*
|
||||
Copyright 2020 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
es5 "github.com/elastic/go-elasticsearch/v5"
|
||||
es5api "github.com/elastic/go-elasticsearch/v5/esapi"
|
||||
es6 "github.com/elastic/go-elasticsearch/v6"
|
||||
es6api "github.com/elastic/go-elasticsearch/v6/esapi"
|
||||
es7 "github.com/elastic/go-elasticsearch/v7"
|
||||
es7api "github.com/elastic/go-elasticsearch/v7/esapi"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type Request struct {
|
||||
Index string
|
||||
Body io.Reader
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
Hits Hits `json:"hits"`
|
||||
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
|
||||
}
|
||||
|
||||
type Hits struct {
|
||||
Total int64 `json:"total"`
|
||||
Hits jsoniter.RawMessage `json:"hits"`
|
||||
}
|
||||
|
||||
type Error struct {
|
||||
Type string `json:"type"`
|
||||
Reason string `json:"reason"`
|
||||
Status int `json:"status"`
|
||||
}
|
||||
|
||||
func (e Error) Error() string {
|
||||
return fmt.Sprintf("%s %s: %s", http.StatusText(e.Status), e.Type, e.Reason)
|
||||
}
|
||||
|
||||
type ClientV5 es5.Client
|
||||
|
||||
func (c *ClientV5) ExSearch(r *Request) (*Response, error) {
|
||||
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
|
||||
}
|
||||
func (c *ClientV5) parse(resp *es5api.Response, err error) (*Response, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting response: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
if resp.IsError() {
|
||||
return nil, fmt.Errorf(resp.String())
|
||||
}
|
||||
var r struct {
|
||||
Hits struct {
|
||||
Total int64 `json:"total"`
|
||||
Hits jsoniter.RawMessage `json:"hits"`
|
||||
} `json:"hits"`
|
||||
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
|
||||
return nil, fmt.Errorf("error parsing the response body: %s", err)
|
||||
}
|
||||
return &Response{
|
||||
Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits},
|
||||
Aggregations: r.Aggregations,
|
||||
}, nil
|
||||
}
|
||||
func (c *ClientV5) Version() (string, error) {
|
||||
resp, err := c.Info()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
if resp.IsError() {
|
||||
return "", fmt.Errorf(resp.String())
|
||||
}
|
||||
var r map[string]interface{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
|
||||
return "", fmt.Errorf("error parsing the response body: %s", err)
|
||||
}
|
||||
return fmt.Sprintf("%s", r["version"].(map[string]interface{})["number"]), nil
|
||||
}
|
||||
|
||||
type ClientV6 es6.Client
|
||||
|
||||
func (c *ClientV6) ExSearch(r *Request) (*Response, error) {
|
||||
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
|
||||
}
|
||||
func (c *ClientV6) parse(resp *es6api.Response, err error) (*Response, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting response: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
if resp.IsError() {
|
||||
return nil, fmt.Errorf(resp.String())
|
||||
}
|
||||
var r struct {
|
||||
Hits *struct {
|
||||
Total int64 `json:"total"`
|
||||
Hits jsoniter.RawMessage `json:"hits"`
|
||||
} `json:"hits"`
|
||||
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
|
||||
return nil, fmt.Errorf("error parsing the response body: %s", err)
|
||||
}
|
||||
return &Response{
|
||||
Hits: Hits{Total: r.Hits.Total, Hits: r.Hits.Hits},
|
||||
Aggregations: r.Aggregations,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type ClientV7 es7.Client
|
||||
|
||||
func (c *ClientV7) ExSearch(r *Request) (*Response, error) {
|
||||
return c.parse(c.Search(c.Search.WithIndex(r.Index), c.Search.WithBody(r.Body)))
|
||||
}
|
||||
func (c *ClientV7) parse(resp *es7api.Response, err error) (*Response, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting response: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
if resp.IsError() {
|
||||
return nil, fmt.Errorf(resp.String())
|
||||
}
|
||||
var r struct {
|
||||
Hits *struct {
|
||||
Total struct {
|
||||
Value int64 `json:"value"`
|
||||
} `json:"total"`
|
||||
Hits jsoniter.RawMessage `json:"hits"`
|
||||
} `json:"hits"`
|
||||
Aggregations map[string]jsoniter.RawMessage `json:"aggregations"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
|
||||
return nil, fmt.Errorf("error parsing the response body: %s", err)
|
||||
}
|
||||
return &Response{
|
||||
Hits: Hits{Total: r.Hits.Total.Value, Hits: r.Hits.Hits},
|
||||
Aggregations: r.Aggregations,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type client interface {
|
||||
ExSearch(r *Request) (*Response, error)
|
||||
}
|
||||
391
pkg/simple/client/auditing/elasticsearch/elasticsearch.go
Normal file
391
pkg/simple/client/auditing/elasticsearch/elasticsearch.go
Normal file
@@ -0,0 +1,391 @@
|
||||
/*
|
||||
Copyright 2020 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
es5 "github.com/elastic/go-elasticsearch/v5"
|
||||
es6 "github.com/elastic/go-elasticsearch/v6"
|
||||
es7 "github.com/elastic/go-elasticsearch/v7"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
|
||||
)
|
||||
|
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
|
||||
type Elasticsearch struct {
|
||||
c client
|
||||
opts struct {
|
||||
index string
|
||||
}
|
||||
}
|
||||
|
||||
func (es *Elasticsearch) SearchAuditingEvent(filter *auditing.Filter, from, size int64,
|
||||
sort string) (*auditing.Events, error) {
|
||||
queryPart := parseToQueryPart(filter)
|
||||
if sort == "" {
|
||||
sort = "desc"
|
||||
}
|
||||
sortPart := []map[string]interface{}{{
|
||||
"RequestReceivedTimestamp": map[string]string{"order": sort},
|
||||
}}
|
||||
b := map[string]interface{}{
|
||||
"from": from,
|
||||
"size": size,
|
||||
"query": queryPart,
|
||||
"sort": sortPart,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := es.c.ExSearch(&Request{
|
||||
Index: es.opts.index,
|
||||
Body: bytes.NewBuffer(body),
|
||||
})
|
||||
if err != nil || resp == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var innerHits []struct {
|
||||
*auditing.Event `json:"_source"`
|
||||
}
|
||||
if err := json.Unmarshal(resp.Hits.Hits, &innerHits); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
evts := auditing.Events{Total: resp.Hits.Total}
|
||||
for _, hit := range innerHits {
|
||||
evts.Records = append(evts.Records, hit.Event)
|
||||
}
|
||||
return &evts, nil
|
||||
}
|
||||
|
||||
func (es *Elasticsearch) CountOverTime(filter *auditing.Filter, interval string) (*auditing.Histogram, error) {
|
||||
if interval == "" {
|
||||
interval = "15m"
|
||||
}
|
||||
|
||||
queryPart := parseToQueryPart(filter)
|
||||
aggName := "events_count_over_timestamp"
|
||||
aggsPart := map[string]interface{}{
|
||||
aggName: map[string]interface{}{
|
||||
"date_histogram": map[string]string{
|
||||
"field": "RequestReceivedTimestamp",
|
||||
"interval": interval,
|
||||
},
|
||||
},
|
||||
}
|
||||
b := map[string]interface{}{
|
||||
"query": queryPart,
|
||||
"aggs": aggsPart,
|
||||
"size": 0, // do not get docs
|
||||
}
|
||||
|
||||
body, err := json.Marshal(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := es.c.ExSearch(&Request{
|
||||
Index: es.opts.index,
|
||||
Body: bytes.NewBuffer(body),
|
||||
})
|
||||
if err != nil || resp == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
raw := resp.Aggregations[aggName]
|
||||
var agg struct {
|
||||
Buckets []struct {
|
||||
KeyAsString string `json:"key_as_string"`
|
||||
Key int64 `json:"key"`
|
||||
DocCount int64 `json:"doc_count"`
|
||||
} `json:"buckets"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &agg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
histo := auditing.Histogram{Total: int64(len(agg.Buckets))}
|
||||
for _, b := range agg.Buckets {
|
||||
histo.Buckets = append(histo.Buckets,
|
||||
auditing.Bucket{Time: b.Key, Count: b.DocCount})
|
||||
}
|
||||
return &histo, nil
|
||||
}
|
||||
|
||||
func (es *Elasticsearch) StatisticsOnResources(filter *auditing.Filter) (*auditing.Statistics, error) {
|
||||
queryPart := parseToQueryPart(filter)
|
||||
aggName := "resources_count"
|
||||
aggsPart := map[string]interface{}{
|
||||
aggName: map[string]interface{}{
|
||||
"cardinality": map[string]string{
|
||||
"field": "AuditID.keyword",
|
||||
},
|
||||
},
|
||||
}
|
||||
b := map[string]interface{}{
|
||||
"query": queryPart,
|
||||
"aggs": aggsPart,
|
||||
"size": 0, // do not get docs
|
||||
}
|
||||
|
||||
body, err := json.Marshal(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := es.c.ExSearch(&Request{
|
||||
Index: es.opts.index,
|
||||
Body: bytes.NewBuffer(body),
|
||||
})
|
||||
if err != nil || resp == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
raw := resp.Aggregations[aggName]
|
||||
var agg struct {
|
||||
Value int64 `json:"value"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &agg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &auditing.Statistics{
|
||||
Resources: agg.Value,
|
||||
Events: resp.Hits.Total,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewClient(options *Options) (*Elasticsearch, error) {
|
||||
clientV5 := func() (*ClientV5, error) {
|
||||
c, err := es5.NewClient(es5.Config{Addresses: []string{options.Host}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return (*ClientV5)(c), nil
|
||||
}
|
||||
clientV6 := func() (*ClientV6, error) {
|
||||
c, err := es6.NewClient(es6.Config{Addresses: []string{options.Host}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return (*ClientV6)(c), nil
|
||||
}
|
||||
clientV7 := func() (*ClientV7, error) {
|
||||
c, err := es7.NewClient(es7.Config{Addresses: []string{options.Host}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return (*ClientV7)(c), nil
|
||||
}
|
||||
|
||||
var (
|
||||
version = options.Version
|
||||
es = Elasticsearch{}
|
||||
err error
|
||||
)
|
||||
es.opts.index = fmt.Sprintf("%s*", options.IndexPrefix)
|
||||
|
||||
if options.Version == "" {
|
||||
var c5 *ClientV5
|
||||
if c5, err = clientV5(); err == nil {
|
||||
if version, err = c5.Version(); err == nil {
|
||||
es.c = c5
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch strings.Split(version, ".")[0] {
|
||||
case "5":
|
||||
if es.c == nil {
|
||||
es.c, err = clientV5()
|
||||
}
|
||||
case "6":
|
||||
es.c, err = clientV6()
|
||||
case "7":
|
||||
es.c, err = clientV7()
|
||||
default:
|
||||
err = fmt.Errorf("unsupported elasticsearch version %s", version)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &es, nil
|
||||
}
|
||||
|
||||
func parseToQueryPart(f *auditing.Filter) interface{} {
|
||||
if f == nil {
|
||||
return nil
|
||||
}
|
||||
type BoolBody struct {
|
||||
Filter []map[string]interface{} `json:"filter,omitempty"`
|
||||
Should []map[string]interface{} `json:"should,omitempty"`
|
||||
MinimumShouldMatch *int `json:"minimum_should_match,omitempty"`
|
||||
}
|
||||
var mini = 1
|
||||
b := BoolBody{}
|
||||
queryBody := map[string]interface{}{
|
||||
"bool": &b,
|
||||
}
|
||||
|
||||
if len(f.ObjectRefNamespaceMap) > 0 {
|
||||
bi := BoolBody{MinimumShouldMatch: &mini}
|
||||
for k, v := range f.ObjectRefNamespaceMap {
|
||||
bi.Should = append(bi.Should, map[string]interface{}{
|
||||
"bool": &BoolBody{
|
||||
Filter: []map[string]interface{}{{
|
||||
"match_phrase": map[string]string{"ObjectRef.Namespace.keyword": k},
|
||||
}, {
|
||||
"range": map[string]interface{}{
|
||||
"RequestReceivedTimestamp": map[string]interface{}{
|
||||
"gte": v,
|
||||
},
|
||||
},
|
||||
}},
|
||||
},
|
||||
})
|
||||
}
|
||||
if len(bi.Should) > 0 {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": &bi})
|
||||
}
|
||||
}
|
||||
|
||||
shouldBoolbody := func(mtype, fieldName string, fieldValues []string, fieldValueMutate func(string) string) *BoolBody {
|
||||
bi := BoolBody{MinimumShouldMatch: &mini}
|
||||
for _, v := range fieldValues {
|
||||
if fieldValueMutate != nil {
|
||||
v = fieldValueMutate(v)
|
||||
}
|
||||
bi.Should = append(bi.Should, map[string]interface{}{
|
||||
mtype: map[string]string{fieldName: v},
|
||||
})
|
||||
}
|
||||
if len(bi.Should) == 0 {
|
||||
return nil
|
||||
}
|
||||
return &bi
|
||||
}
|
||||
|
||||
if len(f.ObjectRefNames) > 0 {
|
||||
if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Name.keyword",
|
||||
f.ObjectRefNames, nil); bi != nil {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
}
|
||||
if len(f.ObjectRefNameFuzzy) > 0 {
|
||||
if bi := shouldBoolbody("wildcard", "ObjectRef.Name",
|
||||
f.ObjectRefNameFuzzy, func(s string) string {
|
||||
return fmt.Sprintf("*" + s + "*")
|
||||
}); bi != nil {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
}
|
||||
|
||||
if len(f.Verbs) > 0 {
|
||||
if bi := shouldBoolbody("match_phrase", "Verb",
|
||||
f.Verbs, nil); bi != nil {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
}
|
||||
if len(f.Levels) > 0 {
|
||||
if bi := shouldBoolbody("match_phrase", "Level",
|
||||
f.Levels, nil); bi != nil {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
}
|
||||
|
||||
if len(f.SourceIpFuzzy) > 0 {
|
||||
if bi := shouldBoolbody("wildcard", "SourceIPs",
|
||||
f.SourceIpFuzzy, func(s string) string {
|
||||
return fmt.Sprintf("*" + s + "*")
|
||||
}); bi != nil {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
}
|
||||
|
||||
if len(f.Users) > 0 {
|
||||
if bi := shouldBoolbody("match_phrase", "User.Username.keyword",
|
||||
f.Users, nil); bi != nil {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
}
|
||||
if len(f.UserFuzzy) > 0 {
|
||||
if bi := shouldBoolbody("wildcard", "User.Username",
|
||||
f.UserFuzzy, func(s string) string {
|
||||
return fmt.Sprintf("*" + s + "*")
|
||||
}); bi != nil {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
}
|
||||
|
||||
if len(f.GroupFuzzy) > 0 {
|
||||
if bi := shouldBoolbody("wildcard", "User.Groups",
|
||||
f.GroupFuzzy, func(s string) string {
|
||||
return fmt.Sprintf("*" + s + "*")
|
||||
}); bi != nil {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
}
|
||||
|
||||
if len(f.ObjectRefResources) > 0 {
|
||||
if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Resource.keyword",
|
||||
f.ObjectRefResources, nil); bi != nil {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
}
|
||||
|
||||
if len(f.ObjectRefSubresources) > 0 {
|
||||
if bi := shouldBoolbody("match_phrase_prefix", "ObjectRef.Subresource.keyword",
|
||||
f.ObjectRefSubresources, nil); bi != nil {
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
}
|
||||
|
||||
if f.ResponseStatus != nil && len(f.ResponseStatus) > 0 {
|
||||
|
||||
bi := BoolBody{MinimumShouldMatch: &mini}
|
||||
for _, v := range f.ResponseStatus {
|
||||
bi.Should = append(bi.Should, map[string]interface{}{
|
||||
"term": map[string]int32{"ResponseStatus.code": v},
|
||||
})
|
||||
}
|
||||
|
||||
b.Filter = append(b.Filter, map[string]interface{}{"bool": bi})
|
||||
}
|
||||
|
||||
if f.StartTime != nil || f.EndTime != nil {
|
||||
m := make(map[string]*time.Time)
|
||||
if f.StartTime != nil {
|
||||
m["gte"] = f.StartTime
|
||||
}
|
||||
if f.EndTime != nil {
|
||||
m["lte"] = f.EndTime
|
||||
}
|
||||
b.Filter = append(b.Filter, map[string]interface{}{
|
||||
"range": map[string]interface{}{"RequestReceivedTimestamp": m},
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
return queryBody
|
||||
}
|
||||
224
pkg/simple/client/auditing/elasticsearch/elasticsearch_test.go
Normal file
224
pkg/simple/client/auditing/elasticsearch/elasticsearch_test.go
Normal file
@@ -0,0 +1,224 @@
|
||||
/*
|
||||
Copyright 2020 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func MockElasticsearchService(pattern string, fakeCode int, fakeResp string) *httptest.Server {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(pattern, func(res http.ResponseWriter, req *http.Request) {
|
||||
res.WriteHeader(fakeCode)
|
||||
_, _ = res.Write([]byte(fakeResp))
|
||||
})
|
||||
return httptest.NewServer(mux)
|
||||
}
|
||||
|
||||
func TestStatisticsOnResources(t *testing.T) {
|
||||
var tests = []struct {
|
||||
description string
|
||||
filter auditing.Filter
|
||||
fakeVersion string
|
||||
fakeCode int
|
||||
fakeResp string
|
||||
expected auditing.Statistics
|
||||
expectedError bool
|
||||
}{{
|
||||
description: "ES index exists",
|
||||
filter: auditing.Filter{},
|
||||
fakeVersion: "6",
|
||||
fakeCode: 200,
|
||||
fakeResp: `
|
||||
{
|
||||
"took": 16,
|
||||
"timed_out": false,
|
||||
"_shards": {
|
||||
"total": 1,
|
||||
"successful": 1,
|
||||
"skipped": 0,
|
||||
"failed": 0
|
||||
},
|
||||
"hits": {
|
||||
"total": 10000,
|
||||
"max_score": null,
|
||||
"hits": [
|
||||
|
||||
]
|
||||
},
|
||||
"aggregations": {
|
||||
"resources_count": {
|
||||
"value": 100
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
expected: auditing.Statistics{
|
||||
Events: 10000,
|
||||
Resources: 100,
|
||||
},
|
||||
expectedError: false,
|
||||
}, {
|
||||
description: "ES index not exists",
|
||||
filter: auditing.Filter{},
|
||||
fakeVersion: "6",
|
||||
fakeCode: 404,
|
||||
fakeResp: `
|
||||
{
|
||||
"error": {
|
||||
"root_cause": [
|
||||
{
|
||||
"type": "index_not_found_exception",
|
||||
"reason": "no such index [events]",
|
||||
"resource.type": "index_or_alias",
|
||||
"resource.id": "events",
|
||||
"index_uuid": "_na_",
|
||||
"index": "events"
|
||||
}
|
||||
],
|
||||
"type": "index_not_found_exception",
|
||||
"reason": "no such index [events]",
|
||||
"resource.type": "index_or_alias",
|
||||
"resource.id": "events",
|
||||
"index_uuid": "_na_",
|
||||
"index": "events"
|
||||
},
|
||||
"status": 404
|
||||
}
|
||||
`,
|
||||
expectedError: true,
|
||||
}}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
mes := MockElasticsearchService("/", test.fakeCode, test.fakeResp)
|
||||
defer mes.Close()
|
||||
|
||||
es, err := NewClient(&Options{Host: mes.URL, IndexPrefix: "ks-logstash-events", Version: "6"})
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
stats, err := es.StatisticsOnResources(&test.filter)
|
||||
|
||||
if test.expectedError {
|
||||
if err == nil {
|
||||
t.Fatalf("expected err like %s", test.fakeResp)
|
||||
} else if !strings.Contains(err.Error(), strconv.Itoa(test.fakeCode)) {
|
||||
t.Fatalf("err does not contain expected code: %d", test.fakeCode)
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, &test.expected); diff != "" {
|
||||
t.Fatalf("%T differ (-got, +want): %s", test.expected, diff)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseToQueryPart(t *testing.T) {
|
||||
q := `
|
||||
{
|
||||
"bool": {
|
||||
"filter": [
|
||||
{
|
||||
"bool": {
|
||||
"should": [
|
||||
{
|
||||
"bool": {
|
||||
"filter": [
|
||||
{
|
||||
"match_phrase": {
|
||||
"ObjectRef.Namespace.keyword": "kubesphere-system"
|
||||
}
|
||||
},
|
||||
{
|
||||
"range": {
|
||||
"RequestReceivedTimestamp": {
|
||||
"gte": "2020-01-01T01:01:01.000000001Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
],
|
||||
"minimum_should_match": 1
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool": {
|
||||
"should": [
|
||||
{
|
||||
"wildcard": {
|
||||
"ObjectRef.Name": "*istio*"
|
||||
}
|
||||
}
|
||||
],
|
||||
"minimum_should_match": 1
|
||||
}
|
||||
},
|
||||
{
|
||||
"range": {
|
||||
"RequestReceivedTimestamp": {
|
||||
"gte": "2019-12-01T01:01:01.000000001Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
`
|
||||
nsCreateTime := time.Date(2020, time.Month(1), 1, 1, 1, 1, 1, time.UTC)
|
||||
startTime := nsCreateTime.AddDate(0, -1, 0)
|
||||
|
||||
filter := &auditing.Filter{
|
||||
ObjectRefNamespaceMap: map[string]time.Time{
|
||||
"kubesphere-system": nsCreateTime,
|
||||
},
|
||||
ObjectRefNameFuzzy: []string{"istio"},
|
||||
StartTime: &startTime,
|
||||
}
|
||||
|
||||
qp := parseToQueryPart(filter)
|
||||
bs, err := json.Marshal(qp)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
queryPart := &map[string]interface{}{}
|
||||
if err := json.Unmarshal(bs, queryPart); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
expectedQueryPart := &map[string]interface{}{}
|
||||
if err := json.Unmarshal([]byte(q), expectedQueryPart); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
assert.Equal(t, expectedQueryPart, queryPart)
|
||||
}
|
||||
61
pkg/simple/client/auditing/elasticsearch/options.go
Normal file
61
pkg/simple/client/auditing/elasticsearch/options.go
Normal file
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
Copyright 2020 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"github.com/spf13/pflag"
|
||||
"kubesphere.io/kubesphere/pkg/utils/reflectutils"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Host string `json:"host" yaml:"host"`
|
||||
IndexPrefix string `json:"indexPrefix,omitempty" yaml:"indexPrefix"`
|
||||
Version string `json:"version" yaml:"version"`
|
||||
}
|
||||
|
||||
func NewElasticSearchOptions() *Options {
|
||||
return &Options{
|
||||
Host: "",
|
||||
IndexPrefix: "ks-logstash-auditing",
|
||||
Version: "",
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Options) ApplyTo(options *Options) {
|
||||
if s.Host != "" {
|
||||
reflectutils.Override(options, s)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Options) Validate() []error {
|
||||
errs := make([]error, 0)
|
||||
return errs
|
||||
}
|
||||
|
||||
func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
|
||||
fs.StringVar(&s.Host, "elasticsearch-host", c.Host, ""+
|
||||
"Elasticsearch service host. KubeSphere is using elastic as auditing store, "+
|
||||
"if this filed left blank, KubeSphere will use kubernetes builtin event API instead, and"+
|
||||
" the following elastic search options will be ignored.")
|
||||
|
||||
fs.StringVar(&s.IndexPrefix, "index-prefix", c.IndexPrefix, ""+
|
||||
"Index name prefix. KubeSphere will retrieve auditing against indices matching the prefix.")
|
||||
|
||||
fs.StringVar(&s.Version, "elasticsearch-version", c.Version, ""+
|
||||
"Elasticsearch major version, e.g. 5/6/7, if left blank, will detect automatically."+
|
||||
"Currently, minimum supported version is 5.x")
|
||||
}
|
||||
65
pkg/simple/client/auditing/interface.go
Normal file
65
pkg/simple/client/auditing/interface.go
Normal file
@@ -0,0 +1,65 @@
|
||||
/*
|
||||
Copyright 2020 The KubeSphere Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package auditing
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
SearchAuditingEvent(filter *Filter, from, size int64, sort string) (*Events, error)
|
||||
CountOverTime(filter *Filter, interval string) (*Histogram, error)
|
||||
StatisticsOnResources(filter *Filter) (*Statistics, error)
|
||||
}
|
||||
|
||||
type Filter struct {
|
||||
ObjectRefNamespaceMap map[string]time.Time
|
||||
ObjectRefNames []string
|
||||
ObjectRefNameFuzzy []string
|
||||
Levels []string
|
||||
Verbs []string
|
||||
Users []string
|
||||
UserFuzzy []string
|
||||
GroupFuzzy []string
|
||||
SourceIpFuzzy []string
|
||||
ObjectRefResources []string
|
||||
ObjectRefSubresources []string
|
||||
ResponseStatus []int32
|
||||
StartTime *time.Time
|
||||
EndTime *time.Time
|
||||
}
|
||||
|
||||
type Event map[string]interface{}
|
||||
|
||||
type Events struct {
|
||||
Total int64 `json:"total" description:"total number of matched results"`
|
||||
Records []*Event `json:"records" description:"actual array of results"`
|
||||
}
|
||||
|
||||
type Histogram struct {
|
||||
Total int64 `json:"total" description:"total number of events"`
|
||||
Buckets []Bucket `json:"buckets" description:"actual array of histogram results"`
|
||||
}
|
||||
type Bucket struct {
|
||||
Time int64 `json:"time" description:"timestamp"`
|
||||
Count int64 `json:"count" description:"total number of events at intervals"`
|
||||
}
|
||||
|
||||
type Statistics struct {
|
||||
Resources int64 `json:"resources" description:"total number of resources"`
|
||||
Events int64 `json:"events" description:"total number of events"`
|
||||
}
|
||||
@@ -119,14 +119,15 @@ func (bb *bodyBuilder) mainBool(sf logging.SearchFilter) *bodyBuilder {
|
||||
ms = append(ms, Match{Bool: &b})
|
||||
}
|
||||
|
||||
if !sf.Starttime.IsZero() || !sf.Endtime.IsZero() {
|
||||
fromTo := Match{
|
||||
Range: &Range{&Time{
|
||||
Gte: &sf.Starttime,
|
||||
Lte: &sf.Endtime,
|
||||
}},
|
||||
}
|
||||
ms = append(ms, fromTo)
|
||||
r := &Range{Time: &Time{}}
|
||||
if !sf.Starttime.IsZero() {
|
||||
r.Gte = &sf.Starttime
|
||||
}
|
||||
if !sf.Endtime.IsZero() {
|
||||
r.Lte = &sf.Endtime
|
||||
}
|
||||
if r.Lte != nil || r.Gte != nil {
|
||||
ms = append(ms, Match{Range: r})
|
||||
}
|
||||
|
||||
bb.Body.Query = &Query{Bool{Filter: ms}}
|
||||
|
||||
@@ -45,6 +45,12 @@ func TestMainBool(t *testing.T) {
|
||||
},
|
||||
expected: "api_body_4.json",
|
||||
},
|
||||
{
|
||||
filter: logging.SearchFilter{
|
||||
Starttime: time.Unix(1590744676, 0),
|
||||
},
|
||||
expected: "api_body_7.json",
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
|
||||
15
pkg/simple/client/logging/elasticsearch/testdata/api_body_7.json
vendored
Normal file
15
pkg/simple/client/logging/elasticsearch/testdata/api_body_7.json
vendored
Normal file
@@ -0,0 +1,15 @@
|
||||
{
|
||||
"query":{
|
||||
"bool":{
|
||||
"filter":[
|
||||
{
|
||||
"range":{
|
||||
"time":{
|
||||
"gte":"2020-05-29T17:31:16+08:00"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,14 +5,12 @@ import (
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Endpoint string `json:"endpoint,omitempty" yaml:"endpoint"`
|
||||
SecondaryEndpoint string `json:"secondaryEndpoint,omitempty" yaml:"secondaryEndpoint"`
|
||||
Endpoint string `json:"endpoint,omitempty" yaml:"endpoint"`
|
||||
}
|
||||
|
||||
func NewPrometheusOptions() *Options {
|
||||
return &Options{
|
||||
Endpoint: "",
|
||||
SecondaryEndpoint: "",
|
||||
Endpoint: "",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,17 +23,10 @@ func (s *Options) ApplyTo(options *Options) {
|
||||
if s.Endpoint != "" {
|
||||
options.Endpoint = s.Endpoint
|
||||
}
|
||||
|
||||
if s.SecondaryEndpoint != "" {
|
||||
options.SecondaryEndpoint = s.SecondaryEndpoint
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
|
||||
fs.StringVar(&s.Endpoint, "prometheus-endpoint", c.Endpoint, ""+
|
||||
"Prometheus service endpoint which stores KubeSphere monitoring data, if left "+
|
||||
"blank, will use builtin metrics-server as data source.")
|
||||
|
||||
fs.StringVar(&s.SecondaryEndpoint, "prometheus-secondary-endpoint", c.SecondaryEndpoint, ""+
|
||||
"Prometheus secondary service endpoint, if left empty and endpoint is set, will use endpoint instead.")
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ func generateSwaggerJson() []byte {
|
||||
urlruntime.Must(operationsv1alpha2.AddToContainer(container, clientsets.Kubernetes()))
|
||||
urlruntime.Must(resourcesv1alpha2.AddToContainer(container, clientsets.Kubernetes(), informerFactory))
|
||||
urlruntime.Must(resourcesv1alpha3.AddToContainer(container, informerFactory))
|
||||
urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil))
|
||||
urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil, nil))
|
||||
urlruntime.Must(terminalv1alpha2.AddToContainer(container, clientsets.Kubernetes(), nil))
|
||||
urlruntime.Must(metricsv1alpha2.AddToContainer(container))
|
||||
urlruntime.Must(networkv1alpha2.AddToContainer(container, ""))
|
||||
|
||||
Reference in New Issue
Block a user