Integate metering module and support metering data csv format export.

Signed-off-by: Rao Yunkun <yunkunrao@yunify.com>
This commit is contained in:
Rao Yunkun
2021-03-25 10:01:38 +08:00
parent ac275b6e98
commit d08e402384
23 changed files with 664 additions and 350 deletions

View File

@@ -23,6 +23,9 @@ import (
"regexp"
"strings"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"kubesphere.io/kubesphere/pkg/models/openpitrix"
"github.com/emicklei/go-restful"
"k8s.io/client-go/kubernetes"
@@ -34,12 +37,17 @@ import (
)
type handler struct {
k kubernetes.Interface
mo model.MonitoringOperator
k kubernetes.Interface
mo model.MonitoringOperator
opRelease openpitrix.ReleaseInterface
}
func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, resourceGetter *resourcev1alpha3.ResourceGetter) *handler {
return &handler{k, model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, resourceGetter)}
func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) *handler {
return &handler{
k: k,
mo: model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, resourceGetter),
opRelease: nil,
}
}
func (h handler) handleKubeSphereMetricsQuery(req *restful.Request, resp *restful.Response) {

View File

@@ -25,13 +25,14 @@ import (
"strings"
"time"
"kubesphere.io/kubesphere/pkg/api"
"github.com/jszwec/csvutil"
"github.com/emicklei/go-restful"
"github.com/pkg/errors"
corev1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"kubesphere.io/kubesphere/pkg/api"
model "kubesphere.io/kubesphere/pkg/models/monitoring"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
@@ -217,9 +218,14 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
return q, errors.New(fmt.Sprintf(ErrParameterNotfound, "namespace"))
}
application := []string{}
if len(r.applications) != 0 {
application = strings.Split(r.applications, "|")
}
q.option = monitoring.ApplicationsOption{
NamespaceName: r.namespaceName,
Applications: strings.Split(r.applications, "|"),
Applications: application,
StorageClassName: r.storageClassName, // metering pvc
}
q.namedMetrics = model.ApplicationMetrics
@@ -247,10 +253,17 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
case monitoring.LevelService:
q.identifier = model.IdentifierService
svcs := []string{}
if len(r.services) != 0 {
svcs = strings.Split(r.services, "|")
}
q.option = monitoring.ServicesOption{
NamespaceName: r.namespaceName,
Services: strings.Split(r.services, "|"),
Services: svcs,
}
q.namedMetrics = model.ServiceMetrics
case monitoring.LevelContainer:
@@ -379,9 +392,8 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
return q, nil
}
func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
resp.Header().Set(restful.HEADER_ContentType, "text/plain")
resp.Header().Set("Content-Disposition", "attachment")
func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) {
var resBytes []byte
for i, _ := range metrics.Results {
ret := metrics.Results[i]
@@ -390,14 +402,72 @@ func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
}
}
resBytes, err := csvutil.Marshal(metrics.Results)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
for _, metric := range metrics.Results {
metricName := metric.MetricName
var csvpoints []monitoring.CSVPoint
for _, metricVal := range metric.MetricValues {
var targetList []string
for k, v := range metricVal.Metadata {
targetList = append(targetList, fmt.Sprintf("%s=%s", k, v))
}
selector := strings.Join(targetList, "|")
var startTime, endTime string
if len(metricVal.ExportedSeries) > 0 {
startTime = metricVal.ExportedSeries[0].Timestamp()
endTime = metricVal.ExportedSeries[len(metricVal.ExportedSeries)-1].Timestamp()
}
statsTab := "\nmetric_name,selector,start_time,end_time,min,max,avg,sum,fee, currency_unit\n" +
fmt.Sprintf("%s,%s,%s,%s,%.2f,%.2f,%.2f,%.2f,%.2f,%s\n\n",
metricName,
selector,
startTime,
endTime,
metricVal.MinValue,
metricVal.MaxValue,
metricVal.AvgValue,
metricVal.SumValue,
metricVal.Fee,
metricVal.CurrencyUnit)
csvpoints = nil
resourceUnit := metricVal.ResourceUnit
for _, p := range metricVal.ExportedSeries {
csvpoints = append(csvpoints, p.TransformToCSVPoint(metricName, selector, resourceUnit))
}
dataTab, err := csvutil.Marshal(csvpoints)
if err != nil {
return nil, err
}
resBytes = append(resBytes, statsTab...)
resBytes = append(resBytes, dataTab...)
}
}
if len(resBytes) == 0 {
resBytes = []byte("no data")
}
output := new(bytes.Buffer)
_, err = output.Write(resBytes)
_, err := output.Write(resBytes)
if err != nil {
return nil, err
}
return output, nil
}
func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
resp.Header().Set(restful.HEADER_ContentType, "text/plain")
resp.Header().Set("Content-Disposition", "attachment")
output, err := exportMetrics(metrics)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
@@ -408,5 +478,6 @@ func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
api.HandleBadRequest(resp, nil, err)
return
}
return
}

View File

@@ -22,10 +22,14 @@ import (
"time"
"github.com/google/go-cmp/cmp"
fakesnapshot "github.com/kubernetes-csi/external-snapshotter/client/v3/clientset/versioned/fake"
fakeistio "istio.io/client-go/pkg/clientset/versioned/fake"
corev1 "k8s.io/api/core/v1"
fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
fakeks "kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
"kubesphere.io/kubesphere/pkg/informers"
model "kubesphere.io/kubesphere/pkg/models/monitoring"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
@@ -217,13 +221,78 @@ func TestParseRequestParams(t *testing.T) {
},
expectedErr: false,
},
{
params: reqParams{
time: "1585830000",
operation: OperationQuery,
},
lvl: monitoring.LevelApplication,
expectedErr: true,
},
{
params: reqParams{
start: "1585880000",
end: "1585830000",
operation: OperationQuery,
namespaceName: "default",
applications: "app1|app2",
},
lvl: monitoring.LevelApplication,
expectedErr: true,
},
{
params: reqParams{
start: "1585880000",
end: "1585830000",
operation: OperationQuery,
namespaceName: "default",
},
lvl: monitoring.LevelApplication,
expectedErr: true,
},
{
params: reqParams{
target: "meter_service_cpu_usage",
time: "1585880000",
operation: OperationQuery,
namespaceName: "default",
},
lvl: monitoring.LevelService,
expectedErr: true,
},
{
params: reqParams{
target: "meter_service_cpu_usage",
time: "1585880000",
operation: OperationQuery,
namespaceName: "default",
services: "svc1|svc2",
},
lvl: monitoring.LevelService,
expectedErr: true,
},
{
params: reqParams{
namespaceName: "default",
},
lvl: monitoring.LevelOpenpitrix,
expectedErr: true,
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
client := fake.NewSimpleClientset(&tt.namespace)
fakeInformerFactory := informers.NewInformerFactories(client, nil, nil, nil, nil, nil)
handler := NewHandler(client, nil, nil, fakeInformerFactory, nil)
ksClient := fakeks.NewSimpleClientset()
istioClient := fakeistio.NewSimpleClientset()
snapshotClient := fakesnapshot.NewSimpleClientset()
apiextensionsClient := fakeapiextensions.NewSimpleClientset()
fakeInformerFactory := informers.NewInformerFactories(client, ksClient, istioClient, snapshotClient, apiextensionsClient, nil)
fakeInformerFactory.KubeSphereSharedInformerFactory()
handler := NewHandler(client, nil, nil, fakeInformerFactory, ksClient, nil)
result, err := handler.makeQueryOptions(tt.params, tt.lvl)
if err != nil {
if !tt.expectedErr {
@@ -242,3 +311,71 @@ func TestParseRequestParams(t *testing.T) {
})
}
}
func TestExportMetrics(t *testing.T) {
fakeMetadata := map[string]string{
"k1": "v1",
"k2": "v2",
"k3": "v3",
}
fakeExportedSeries := []monitoring.ExportPoint{
{1616641733, 2},
{1616641800, 4},
}
tests := []struct {
metrics model.Metrics
expectedErr bool
}{
{
metrics: model.Metrics{
Results: []monitoring.Metric{
{
MetricName: "test",
MetricData: monitoring.MetricData{
MetricType: "",
MetricValues: []monitoring.MetricValue{
{
Metadata: fakeMetadata,
ExportedSeries: fakeExportedSeries,
},
},
},
},
},
},
expectedErr: false,
},
{
metrics: model.Metrics{
Results: []monitoring.Metric{
{
MetricName: "test",
MetricData: monitoring.MetricData{
MetricType: "",
MetricValues: []monitoring.MetricValue{
{
Metadata: fakeMetadata,
ExportedSeries: nil,
},
},
},
},
},
},
expectedErr: true,
},
{},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
_, err := exportMetrics(tt.metrics)
if err != nil && !tt.expectedErr {
t.Fatal("Failed to export metering metrics", err)
}
})
}
}

View File

@@ -32,6 +32,10 @@ func getMetricPosMap(metrics []monitoring.Metric) map[string]int {
return metricMap
}
func (h handler) getAppWorkloads(ns string, apps []string) map[string][]string {
return h.mo.GetAppWorkloads(ns, apps)
}
func (h handler) handleApplicationMetersQuery(meters []string, resp *restful.Response, q queryOptions) {
var metricMap = make(map[string]int)
var res model.Metrics
@@ -43,13 +47,13 @@ func (h handler) handleApplicationMetersQuery(meters []string, resp *restful.Res
klog.Error("invalid application option")
return
}
componentsMap := h.mo.GetAppComponentsMap(aso.NamespaceName, aso.Applications)
appWorkloads := h.getAppWorkloads(aso.NamespaceName, aso.Applications)
for k, _ := range componentsMap {
for k, _ := range appWorkloads {
opt := monitoring.ApplicationOption{
NamespaceName: aso.NamespaceName,
Application: k,
ApplicationComponents: componentsMap[k],
ApplicationComponents: appWorkloads[k],
StorageClassName: aso.StorageClassName,
}

View File

@@ -20,6 +20,8 @@ package v1alpha3
import (
"net/http"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"github.com/emicklei/go-restful"
restfulspec "github.com/emicklei/go-restful-openapi"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -39,10 +41,10 @@ const (
var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha3"}
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory) error {
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface) error {
ws := runtime.NewWebService(GroupVersion)
h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, nil)
h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, ksClient, nil)
ws.Route(ws.GET("/kubesphere").
To(h.handleKubeSphereMetricsQuery).