Merge pull request #3643 from yunkunrao/master

Intergate OpenPitrix metrics into metering.
This commit is contained in:
KubeSphere CI Bot
2021-04-02 21:41:53 +08:00
committed by GitHub
18 changed files with 1201 additions and 193 deletions

View File

@@ -51,6 +51,7 @@ type Query struct {
Services string
StorageClassName string
PVCFilter string
Cluster string
}
func ParseQueryParameter(req *restful.Request) *Query {
@@ -85,6 +86,7 @@ func ParseQueryParameter(req *restful.Request) *Query {
q.Services = req.QueryParameter("services")
q.StorageClassName = req.QueryParameter("storageclass")
q.PVCFilter = req.QueryParameter("pvc_filter")
q.Cluster = req.QueryParameter("cluster")
return &q
}

View File

@@ -30,15 +30,16 @@ import (
)
type meterHandler interface {
HandleClusterMetersQuery(req *restful.Request, resp *restful.Response)
HandleNodeMetersQuery(req *restful.Request, resp *restful.Response)
HandleWorkspaceMetersQuery(req *restful.Request, resp *restful.Response)
HandleNamespaceMetersQuery(re *restful.Request, resp *restful.Response)
HandleWorkloadMetersQuery(req *restful.Request, resp *restful.Response)
HandleApplicationMetersQuery(req *restful.Request, resp *restful.Response)
HandlePodMetersQuery(req *restful.Request, resp *restful.Response)
HandleServiceMetersQuery(req *restful.Request, resp *restful.Response)
HandlePVCMetersQuery(req *restful.Request, resp *restful.Response)
HandleClusterMeterQuery(req *restful.Request, resp *restful.Response)
HandleNodeMeterQuery(req *restful.Request, resp *restful.Response)
HandleWorkspaceMeterQuery(req *restful.Request, resp *restful.Response)
HandleNamespaceMeterQuery(re *restful.Request, resp *restful.Response)
HandleOpenpitrixMeterQuery(req *restful.Request, resp *restful.Response)
HandleWorkloadMeterQuery(req *restful.Request, resp *restful.Response)
HandleApplicationMeterQuery(req *restful.Request, resp *restful.Response)
HandlePodMeterQuery(req *restful.Request, resp *restful.Response)
HandleServiceMeterQuery(req *restful.Request, resp *restful.Response)
HandlePVCMeterQuery(req *restful.Request, resp *restful.Response)
}
func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) meterHandler {

View File

@@ -50,7 +50,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
h := newHandler(k8sClient, meteringClient, factory, ksClient, resourcev1alpha3.NewResourceGetter(factory, cache))
ws.Route(ws.GET("/cluster").
To(h.HandleClusterMetersQuery).
To(h.HandleClusterMeterQuery).
Doc("Get cluster-level meter data.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which meter data to return. For example, the following filter matches both cluster CPU usage and disk usage: `meter_cluster_cpu_usage|meter_cluster_memory_usage`.").DataType("string").Required(false)).
@@ -64,7 +64,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/nodes").
To(h.HandleNodeMetersQuery).
To(h.HandleNodeMeterQuery).
Doc("Get node-level meter data of all nodes.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which meter data to return. For example, the following filter matches both node CPU usage and disk usage: `meter_node_cpu_usage|meter_node_memory_usage`.").DataType("string").Required(false)).
@@ -85,7 +85,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/nodes/{node}").
To(h.HandleNodeMetersQuery).
To(h.HandleNodeMeterQuery).
Doc("Get node-level meter data of the specific node.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("node", "Node name.").DataType("string").Required(true)).
@@ -102,7 +102,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/workspaces").
To(h.HandleWorkspaceMetersQuery).
To(h.HandleWorkspaceMeterQuery).
Doc("Get workspace-level meter data of all workspaces.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both workspace CPU usage and memory usage: `meter_workspace_cpu_usage|meter_workspace_memory_usage`.").DataType("string").Required(false)).
@@ -123,7 +123,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/workspaces/{workspace}").
To(h.HandleWorkspaceMetersQuery).
To(h.HandleWorkspaceMeterQuery).
Doc("Get workspace-level meter data of a specific workspace.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("workspace", "Workspace name.").DataType("string").Required(true)).
@@ -141,7 +141,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/workspaces/{workspace}/namespaces").
To(h.HandleNamespaceMetersQuery).
To(h.HandleNamespaceMeterQuery).
Doc("Get namespace-level meter data of a specific workspace.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("workspace", "Workspace name.").DataType("string").Required(true)).
@@ -163,7 +163,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces").
To(h.HandleNamespaceMetersQuery).
To(h.HandleNamespaceMeterQuery).
Doc("Get namespace-level meter data of all namespaces.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both namespace CPU usage and memory usage: `meter_namespace_cpu_usage|meter_namespace_memory_usage_wo_cache`.").DataType("string").Required(false)).
@@ -183,8 +183,25 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Returns(http.StatusOK, respOK, model.Metrics{})).
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/workspaces/{workspace}/namespaces/{namespace}").
To(h.HandleNamespaceMeterQuery).
Doc("Get namespace-level meter data of the specific namespace.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both namespace CPU usage and memory usage: `meter_namespace_cpu_usage|meter_namespace_memory_usage_wo_cache`.").DataType("string").Required(false)).
Param(ws.PathParameter("storageclass", "The name of the storageclass.").DataType("string").Required(false)).
Param(ws.QueryParameter("pvc_filter", "The PVC filter consists of a regexp pattern. It specifies which PVC data to return.").DataType("string").Required(false)).
Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").DataType("string").Required(false)).
Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").DataType("string").Required(false)).
Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)).
Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.NamespaceMetersTag}).
Writes(model.Metrics{}).
Returns(http.StatusOK, respOK, model.Metrics{})).
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces/{namespace}").
To(h.HandleNamespaceMetersQuery).
To(h.HandleNamespaceMeterQuery).
Doc("Get namespace-level meter data of the specific namespace.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
@@ -201,7 +218,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces/{namespace}/workloads").
To(h.HandleWorkloadMetersQuery).
To(h.HandleWorkloadMeterQuery).
Doc("Get workload-level meter data of all workloads which belongs to a specific kind.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
@@ -222,7 +239,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces/{namespace}/applications").
To(h.HandleApplicationMetersQuery).
To(h.HandleApplicationMeterQuery).
Doc("Get app-level meter data of a specific application. Navigate to the app by the app's namespace.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
@@ -240,8 +257,46 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Returns(http.StatusOK, respOK, model.Metrics{})).
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/clusters/{cluster}/namespaces/{namespace}/openpitrixs").
To(h.HandleOpenpitrixMeterQuery).
Doc("Get app-level meter data of a specific openpitrix app. Navigate to the app by the app's namespace.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
Param(ws.QueryParameter("openpitrix_ids", "Openpitrix application ids which can be joined by \"|\" ").DataType("string").Required(false)).
Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both pod CPU usage and memory usage: `meter_application_cpu_usage|meter_application_memory_usage_wo_cache`.").DataType("string").Required(false)).
Param(ws.PathParameter("storageclass", "The name of the storageclass.").DataType("string").Required(false)).
Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").DataType("string").Required(false)).
Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").DataType("string").Required(false)).
Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)).
Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)).
Param(ws.QueryParameter("sort_metric", "Sort pods by the specified metric. Not applicable if **start** and **end** are provided.").DataType("string").Required(false)).
Param(ws.QueryParameter("sort_type", "Sort order. One of asc, desc.").DefaultValue("desc.").DataType("string").Required(false)).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.PodMetersTag}).
Writes(model.Metrics{}).
Returns(http.StatusOK, respOK, model.Metrics{})).
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces/{namespace}/openpitrixs").
To(h.HandleOpenpitrixMeterQuery).
Doc("Get app-level meter data of a specific openpitrix app. Navigate to the app by the app's namespace.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
Param(ws.QueryParameter("openpitrix_ids", "Openpitrix application ids which can be joined by \"|\" ").DataType("string").Required(false)).
Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both pod CPU usage and memory usage: `meter_application_cpu_usage|meter_application_memory_usage_wo_cache`.").DataType("string").Required(false)).
Param(ws.PathParameter("storageclass", "The name of the storageclass.").DataType("string").Required(false)).
Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").DataType("string").Required(false)).
Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").DataType("string").Required(false)).
Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)).
Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)).
Param(ws.QueryParameter("sort_metric", "Sort pods by the specified metric. Not applicable if **start** and **end** are provided.").DataType("string").Required(false)).
Param(ws.QueryParameter("sort_type", "Sort order. One of asc, desc.").DefaultValue("desc.").DataType("string").Required(false)).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.PodMetersTag}).
Writes(model.Metrics{}).
Returns(http.StatusOK, respOK, model.Metrics{})).
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces/{namespace}/pods").
To(h.HandlePodMetersQuery).
To(h.HandlePodMeterQuery).
Doc("Get pod-level meter data of the specific namespace's pods.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
@@ -261,7 +316,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces/{namespace}/pods/{pod}").
To(h.HandlePodMetersQuery).
To(h.HandlePodMeterQuery).
Doc("Get pod-level meter data of a specific pod. Navigate to the pod by the pod's namespace.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
@@ -277,7 +332,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces/{namespace}/workloads/{workload}/pods").
To(h.HandlePodMetersQuery).
To(h.HandlePodMeterQuery).
Doc("Get pod-level meter data of a specific workload's pods. Navigate to the workload by the namespace.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).
@@ -299,7 +354,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/nodes/{node}/pods").
To(h.HandlePodMetersQuery).
To(h.HandlePodMeterQuery).
Doc("Get pod-level meter data of all pods on a specific node.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("node", "Node name.").DataType("string").Required(true)).
@@ -319,7 +374,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/nodes/{node}/pods/{pod}").
To(h.HandlePodMetersQuery).
To(h.HandlePodMeterQuery).
Doc("Get pod-level meter data of a specific pod. Navigate to the pod by the node where it is scheduled.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("node", "Node name.").DataType("string").Required(true)).
@@ -335,7 +390,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri
Produces(restful.MIME_JSON)
ws.Route(ws.GET("/namespaces/{namespace}/services").
To(h.HandleServiceMetersQuery).
To(h.HandleServiceMeterQuery).
Doc("Get service-level meter data of the specific namespace's pods.").
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)).

View File

@@ -43,10 +43,15 @@ type handler struct {
}
func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) *handler {
var opRelease openpitrix.Interface
if ksClient != nil {
opRelease = openpitrix.NewOpenpitrixOperator(f, ksClient, nil)
}
return &handler{
k: k,
mo: model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, resourceGetter),
opRelease: nil,
opRelease: opRelease,
}
}

View File

@@ -60,6 +60,7 @@ const (
)
type reqParams struct {
metering bool
operation string
time string
start string
@@ -84,6 +85,8 @@ type reqParams struct {
expression string
metric string
applications string
openpitrixs string
cluster string
services string
pvcFilter string
}
@@ -118,7 +121,6 @@ func (q queryOptions) shouldSort() bool {
func parseRequestParams(req *restful.Request) reqParams {
var r reqParams
r.operation = req.QueryParameter("operation")
r.time = req.QueryParameter("time")
r.start = req.QueryParameter("start")
r.end = req.QueryParameter("end")
@@ -131,21 +133,8 @@ func parseRequestParams(req *restful.Request) reqParams {
r.resourceFilter = req.QueryParameter("resources_filter")
r.workspaceName = req.PathParameter("workspace")
r.namespaceName = req.PathParameter("namespace")
if req.QueryParameter("node") != "" {
r.nodeName = req.QueryParameter("node")
} else {
// compatible with monitoring request
r.nodeName = req.PathParameter("node")
}
if req.QueryParameter("kind") != "" {
r.workloadKind = req.QueryParameter("kind")
} else {
// compatible with monitoring request
r.workloadKind = req.PathParameter("kind")
}
r.workloadKind = req.PathParameter("kind")
r.nodeName = req.PathParameter("node")
r.workloadName = req.PathParameter("workload")
r.podName = req.PathParameter("pod")
r.containerName = req.PathParameter("container")
@@ -154,13 +143,47 @@ func parseRequestParams(req *restful.Request) reqParams {
r.componentType = req.PathParameter("component")
r.expression = req.QueryParameter("expr")
r.metric = req.QueryParameter("metric")
r.applications = req.QueryParameter("applications")
r.services = req.QueryParameter("services")
r.pvcFilter = req.QueryParameter("pvc_filter")
return r
}
func parseMeteringRequestParams(req *restful.Request) reqParams {
params := parseRequestParams(req)
// mark this request is metering req
params.metering = true
// whether need to export metering data
params.operation = req.QueryParameter("operation")
// OpenPitrix belongs to which cluster
params.cluster = req.PathParameter("cluster")
// specified which application crds
params.applications = req.QueryParameter("applications")
// specified which OpenPitrix apps
params.openpitrixs = req.QueryParameter("openpitrix_ids")
// specified which service
params.services = req.QueryParameter("services")
// specified which pvc
params.pvcFilter = req.QueryParameter("pvc_filter")
// support node param in URL query
if req.QueryParameter("node") != "" {
params.nodeName = req.QueryParameter("node")
}
// support kind param in URL query
if req.QueryParameter("kind") != "" {
params.workloadKind = req.QueryParameter("kind")
}
return params
}
func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOptions, err error) {
if r.resourceFilter == "" {
r.resourceFilter = DefaultFilter
@@ -230,6 +253,26 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
}
q.namedMetrics = model.ApplicationMetrics
case monitoring.LevelOpenpitrix:
q.identifier = model.IdentifierApplication
if r.namespaceName == "" {
return q, errors.New(fmt.Sprintf(ErrParameterNotfound, "namespace"))
}
ops := []string{}
if len(r.openpitrixs) != 0 {
ops = strings.Split(r.openpitrixs, "|")
}
q.option = monitoring.OpenpitrixsOption{
Cluster: r.cluster,
NamespaceName: r.namespaceName,
Openpitrixs: ops,
StorageClassName: r.storageClassName,
}
// op share the same metrics with application
q.namedMetrics = model.ApplicationMetrics
case monitoring.LevelWorkload:
q.identifier = model.IdentifierWorkload
q.option = monitoring.WorkloadOption{
@@ -339,7 +382,7 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
}
// Ensure query start time to be after the namespace creation time
if r.namespaceName != "" {
if r.namespaceName != "" && !r.metering {
ns, err := h.k.CoreV1().Namespaces().Get(context.Background(), r.namespaceName, corev1.GetOptions{})
if err != nil {
return q, err
@@ -392,7 +435,7 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
return q, nil
}
func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) {
func exportMetrics(metrics model.Metrics, startTime, endTime time.Time) (*bytes.Buffer, error) {
var resBytes []byte
for i, _ := range metrics.Results {
@@ -415,18 +458,12 @@ func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) {
}
selector := strings.Join(targetList, "|")
var startTime, endTime string
if len(metricVal.ExportedSeries) > 0 {
startTime = metricVal.ExportedSeries[0].Timestamp()
endTime = metricVal.ExportedSeries[len(metricVal.ExportedSeries)-1].Timestamp()
}
statsTab := "\nmetric_name,selector,start_time,end_time,min,max,avg,sum,fee, currency_unit\n" +
fmt.Sprintf("%s,%s,%s,%s,%.2f,%.2f,%.2f,%.2f,%.2f,%s\n\n",
fmt.Sprintf("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n\n",
metricName,
selector,
startTime,
endTime,
startTime.String(),
endTime.String(),
metricVal.MinValue,
metricVal.MaxValue,
metricVal.AvgValue,
@@ -463,11 +500,11 @@ func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) {
return output, nil
}
func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
func ExportMetrics(resp *restful.Response, metrics model.Metrics, startTime, endTime time.Time) {
resp.Header().Set(restful.HEADER_ContentType, "text/plain")
resp.Header().Set("Content-Disposition", "attachment")
output, err := exportMetrics(metrics)
output, err := exportMetrics(metrics, startTime, endTime)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return

View File

@@ -274,10 +274,23 @@ func TestParseRequestParams(t *testing.T) {
{
params: reqParams{
namespaceName: "default",
openpitrixs: "op1|op2",
},
lvl: monitoring.LevelOpenpitrix,
expectedErr: true,
},
{
params: reqParams{
namespaceName: "default",
},
lvl: monitoring.LevelOpenpitrix,
expectedErr: true,
},
{
params: reqParams{},
lvl: monitoring.LevelOpenpitrix,
expectedErr: true,
},
}
for i, tt := range tests {
@@ -372,7 +385,7 @@ func TestExportMetrics(t *testing.T) {
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
_, err := exportMetrics(tt.metrics)
_, err := exportMetrics(tt.metrics, time.Now().Add(-time.Hour), time.Now())
if err != nil && !tt.expectedErr {
t.Fatal("Failed to export metering metrics", err)
}

View File

@@ -4,7 +4,11 @@ import (
"regexp"
"strings"
"kubesphere.io/kubesphere/pkg/models/openpitrix"
"kubesphere.io/kubesphere/pkg/server/params"
"github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api"
@@ -12,8 +16,8 @@ import (
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
func (h handler) HandleClusterMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleClusterMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelCluster)
if err != nil {
api.HandleBadRequest(resp, nil, err)
@@ -87,7 +91,7 @@ func (h handler) handleApplicationMetersQuery(meters []string, resp *restful.Res
}
if q.Operation == OperationExport {
ExportMetrics(resp, res)
ExportMetrics(resp, res, q.start, q.end)
return
}
@@ -144,7 +148,7 @@ func (h handler) handleServiceMetersQuery(meters []string, resp *restful.Respons
}
if q.Operation == OperationExport {
ExportMetrics(resp, res)
ExportMetrics(resp, res, q.start, q.end)
return
}
@@ -180,6 +184,12 @@ func (h handler) handleNamedMetersQuery(resp *restful.Response, q queryOptions)
return
}
_, ok = q.option.(monitoring.OpenpitrixsOption)
if ok {
h.handleOpenpitrixMetersQuery(meters, resp, q)
return
}
_, ok = q.option.(monitoring.ServicesOption)
if ok {
h.handleServiceMetersQuery(meters, resp, q)
@@ -205,15 +215,16 @@ func (h handler) handleNamedMetersQuery(resp *restful.Response, q queryOptions)
}
if q.Operation == OperationExport {
ExportMetrics(resp, res)
ExportMetrics(resp, res, q.start, q.end)
return
}
resp.WriteAsJson(res)
}
func (h handler) HandleNodeMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleNodeMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
params.metering = true
opt, err := h.makeQueryOptions(params, monitoring.LevelNode)
if err != nil {
api.HandleBadRequest(resp, nil, err)
@@ -222,8 +233,9 @@ func (h handler) HandleNodeMetersQuery(req *restful.Request, resp *restful.Respo
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleWorkspaceMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleWorkspaceMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
params.metering = true
opt, err := h.makeQueryOptions(params, monitoring.LevelWorkspace)
if err != nil {
api.HandleBadRequest(resp, nil, err)
@@ -233,8 +245,9 @@ func (h handler) HandleWorkspaceMetersQuery(req *restful.Request, resp *restful.
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleNamespaceMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleNamespaceMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
params.metering = true
opt, err := h.makeQueryOptions(params, monitoring.LevelNamespace)
if err != nil {
if err.Error() == ErrNoHit {
@@ -250,8 +263,8 @@ func (h handler) HandleNamespaceMetersQuery(req *restful.Request, resp *restful.
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleWorkloadMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleWorkloadMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelWorkload)
if err != nil {
if err.Error() == ErrNoHit {
@@ -266,8 +279,8 @@ func (h handler) HandleWorkloadMetersQuery(req *restful.Request, resp *restful.R
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleApplicationMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleApplicationMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelApplication)
if err != nil {
if err.Error() == ErrNoHit {
@@ -282,8 +295,25 @@ func (h handler) HandleApplicationMetersQuery(req *restful.Request, resp *restfu
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandlePodMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleOpenpitrixMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelOpenpitrix)
if err != nil {
if err.Error() == ErrNoHit {
res := handleNoHit(opt.namedMetrics)
resp.WriteAsJson(res)
return
}
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandlePodMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelPod)
if err != nil {
if err.Error() == ErrNoHit {
@@ -295,11 +325,12 @@ func (h handler) HandlePodMetersQuery(req *restful.Request, resp *restful.Respon
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleServiceMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleServiceMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelService)
if err != nil {
if err.Error() == ErrNoHit {
@@ -315,8 +346,8 @@ func (h handler) HandleServiceMetersQuery(req *restful.Request, resp *restful.Re
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandlePVCMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandlePVCMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelPVC)
if err != nil {
if err.Error() == ErrNoHit {
@@ -330,3 +361,110 @@ func (h handler) HandlePVCMetersQuery(req *restful.Request, resp *restful.Respon
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) collectOps(cluster, ns string) []string {
var ops []string
conditions := params.Conditions{
Match: make(map[string]string),
Fuzzy: make(map[string]string),
}
resp, err := h.opRelease.ListApplications("", cluster, ns, &conditions, 10, 0, "", false)
if err != nil {
klog.Error("failed to list op apps")
return nil
}
totalCount := resp.TotalCount
resp, err = h.opRelease.ListApplications("", cluster, ns, &conditions, totalCount, 0, "", false)
if err != nil {
klog.Error("failed to list op apps")
return nil
}
for _, item := range resp.Items {
app := item.(*openpitrix.Application)
ops = append(ops, app.Cluster.ClusterId)
}
return ops
}
func (h handler) getOpWorkloads(cluster, ns string, ops []string) map[string][]string {
componentsMap := make(map[string][]string)
if len(ops) == 0 {
ops = h.collectOps(cluster, ns)
}
for _, op := range ops {
app, err := h.opRelease.DescribeApplication("", cluster, ns, op)
if err != nil {
klog.Error(err)
return nil
}
for _, object := range app.ReleaseInfo {
unstructuredObj := object.(*unstructured.Unstructured)
componentsMap[op] = append(componentsMap[op], unstructuredObj.GetKind()+":"+unstructuredObj.GetName())
}
}
return componentsMap
}
func (h handler) handleOpenpitrixMetersQuery(meters []string, resp *restful.Response, q queryOptions) {
var metricMap = make(map[string]int)
var res model.Metrics
var current_res model.Metrics
var err error
oso, ok := q.option.(monitoring.OpenpitrixsOption)
if !ok {
klog.Error("invalid openpitrix option")
return
}
opWorkloads := h.getOpWorkloads(oso.Cluster, oso.NamespaceName, oso.Openpitrixs)
for k, _ := range opWorkloads {
opt := monitoring.ApplicationOption{
NamespaceName: oso.NamespaceName,
Application: k,
ApplicationComponents: opWorkloads[k],
StorageClassName: oso.StorageClassName,
}
if q.isRangeQuery() {
current_res, err = h.mo.GetNamedMetersOverTime(meters, q.start, q.end, q.step, opt)
} else {
current_res, err = h.mo.GetNamedMeters(meters, q.time, opt)
}
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
if res.Results == nil {
res = current_res
metricMap = getMetricPosMap(res.Results)
} else {
for _, cur_res := range current_res.Results {
pos, ok := metricMap[cur_res.MetricName]
if ok {
res.Results[pos].MetricValues = append(res.Results[pos].MetricValues, cur_res.MetricValues...)
} else {
res.Results = append(res.Results, cur_res)
}
}
}
}
if !q.isRangeQuery() && q.shouldSort() {
res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit)
}
if q.Operation == OperationExport {
ExportMetrics(resp, res, q.start, q.end)
return
}
resp.WriteAsJson(res)
}

View File

@@ -6,6 +6,9 @@ import (
"github.com/emicklei/go-restful"
"k8s.io/klog"
"strconv"
"time"
"kubesphere.io/kubesphere/pkg/api"
meteringv1alpha1 "kubesphere.io/kubesphere/pkg/api/metering/v1alpha1"
"kubesphere.io/kubesphere/pkg/apiserver/request"
@@ -15,7 +18,7 @@ import (
monitoringclient "kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
func (h *tenantHandler) QueryMeterings(req *restful.Request, resp *restful.Response) {
func (h *tenantHandler) QueryMetering(req *restful.Request, resp *restful.Response) {
u, ok := request.UserFrom(req.Request.Context())
if !ok {
@@ -34,14 +37,29 @@ func (h *tenantHandler) QueryMeterings(req *restful.Request, resp *restful.Respo
}
if q.Operation == monitoringv1alpha3.OperationExport {
monitoringv1alpha3.ExportMetrics(resp, res)
start, err := strconv.ParseInt(q.Start, 10, 64)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
end, err := strconv.ParseInt(q.End, 10, 64)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
startTime := time.Unix(start, 0)
endTime := time.Unix(end, 0)
monitoringv1alpha3.ExportMetrics(resp, res, startTime, endTime)
return
}
resp.WriteAsJson(res)
}
func (h *tenantHandler) QueryMeteringsHierarchy(req *restful.Request, resp *restful.Response) {
func (h *tenantHandler) QueryMeteringHierarchy(req *restful.Request, resp *restful.Response) {
u, ok := request.UserFrom(req.Request.Context())
if !ok {
err := fmt.Errorf("cannot obtain user info")

View File

@@ -40,7 +40,6 @@ import (
kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/informers"
monitoringv1alpha3 "kubesphere.io/kubesphere/pkg/kapis/monitoring/v1alpha3"
"kubesphere.io/kubesphere/pkg/models"
"kubesphere.io/kubesphere/pkg/models/iam/am"
"kubesphere.io/kubesphere/pkg/models/monitoring"
@@ -301,10 +300,9 @@ func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8s
Returns(http.StatusOK, api.StatusOK, auditingv1alpha1.APIResponse{}))
ws.Route(ws.GET("/metering").
To(handler.QueryMeterings).
To(handler.QueryMetering).
Doc("Get meterings against the cluster.").
Param(ws.QueryParameter("level", "Metering level.").DataType("string").Required(true)).
Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)).
Param(ws.QueryParameter("node", "Node name.").DataType("string").Required(false)).
Param(ws.QueryParameter("workspace", "Workspace name.").DataType("string").Required(false)).
Param(ws.QueryParameter("namespace", "Namespace name.").DataType("string").Required(false)).
@@ -330,7 +328,7 @@ func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8s
Returns(http.StatusOK, api.StatusOK, monitoring.Metrics{}))
ws.Route(ws.GET("/namespaces/{namespace}/metering/hierarchy").
To(handler.QueryMeteringsHierarchy).
To(handler.QueryMeteringHierarchy).
Param(ws.PathParameter("namespace", "Namespace name.").DataType("string").Required(false)).
Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both workspace CPU usage and memory usage: `meter_pod_cpu_usage|meter_pod_memory_usage_wo_cache`.").DataType("string").Required(false)).
Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)).

View File

@@ -161,12 +161,11 @@ type ServiceStatistic struct {
Pods map[string]*PodStatistic `json:"pods" description:"pod statistic"`
}
func (ss *ServiceStatistic) SetPodStats(name string, podStat *PodStatistic) error {
func (ss *ServiceStatistic) SetPodStats(name string, podStat *PodStatistic) {
if ss.Pods == nil {
ss.Pods = make(map[string]*PodStatistic)
}
ss.Pods[name] = podStat
return nil
}
func (ss *ServiceStatistic) GetPodStats(name string) *PodStatistic {
@@ -211,12 +210,11 @@ func (ds *DeploymentStatistic) GetPodStats(name string) *PodStatistic {
return ds.Pods[name]
}
func (ds *DeploymentStatistic) SetPodStats(name string, podStat *PodStatistic) error {
func (ds *DeploymentStatistic) SetPodStats(name string, podStat *PodStatistic) {
if ds.Pods == nil {
ds.Pods = make(map[string]*PodStatistic)
}
ds.Pods[name] = podStat
return nil
}
func (ds *DeploymentStatistic) Aggregate() {
@@ -252,12 +250,11 @@ func (ss *StatefulsetStatistic) GetPodStats(name string) *PodStatistic {
return ss.Pods[name]
}
func (ss *StatefulsetStatistic) SetPodStats(name string, podStat *PodStatistic) error {
func (ss *StatefulsetStatistic) SetPodStats(name string, podStat *PodStatistic) {
if ss.Pods == nil {
ss.Pods = make(map[string]*PodStatistic)
}
ss.Pods[name] = podStat
return nil
}
func (ss *StatefulsetStatistic) Aggregate() {
@@ -293,12 +290,11 @@ func (ds *DaemonsetStatistic) GetPodStats(name string) *PodStatistic {
return ds.Pods[name]
}
func (ds *DaemonsetStatistic) SetPodStats(name string, podStat *PodStatistic) error {
func (ds *DaemonsetStatistic) SetPodStats(name string, podStat *PodStatistic) {
if ds.Pods == nil {
ds.Pods = make(map[string]*PodStatistic)
}
ds.Pods[name] = podStat
return nil
}
func (ds *DaemonsetStatistic) Aggregate() {

View File

@@ -1,8 +1,10 @@
package monitoring
import (
"math"
"fmt"
"math/big"
"os"
"path/filepath"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/klog"
@@ -17,7 +19,15 @@ const (
METER_RESOURCE_TYPE_NET_EGRESS
METER_RESOURCE_TYPE_PVC
meteringConfig = "/etc/kubesphere/metering/ks-metering.yaml"
meteringConfigDir = "/etc/kubesphere/metering/"
meteringConfigName = "ks-metering.yaml"
meteringDefaultPrecision = 10
meteringCorePrecision = 3
meteringMemPrecision = 1
meteringIngressPrecision = 0
meteringEgressPrecision = 0
meteringPvcPrecision = 1
)
var meterResourceUnitMap = map[int]string{
@@ -95,11 +105,21 @@ func (mc MeterConfig) GetPriceInfo() PriceInfo {
func LoadYaml() (*MeterConfig, error) {
var meterConfig MeterConfig
var mf *os.File
var err error
mf, err := os.Open(meteringConfig)
if err != nil {
klog.Error(err)
return nil, err
if _, err := os.Stat(meteringConfigName); os.IsNotExist(err) {
mf, err = os.Open(filepath.Join(meteringConfigDir, meteringConfigName))
if err != nil {
klog.Error(err)
return nil, err
}
} else {
mf, err = os.Open(meteringConfigName)
if err != nil {
klog.Error(err)
return nil, err
}
}
if err = yaml.NewYAMLOrJSONDecoder(mf, 1024).Decode(&meterConfig); err != nil {
@@ -110,48 +130,59 @@ func LoadYaml() (*MeterConfig, error) {
return &meterConfig, nil
}
func getMaxPointValue(points []monitoring.Point) float64 {
var max float64
func getMaxPointValue(points []monitoring.Point) string {
var max *big.Float
for i, p := range points {
if i == 0 {
max = p.Value()
max = new(big.Float).SetFloat64(p.Value())
}
if p.Value() > max {
max = p.Value()
pf := new(big.Float).SetFloat64(p.Value())
if pf.Cmp(max) == 1 {
max = pf
}
}
return max
return fmt.Sprintf(generateFloatFormat(meteringDefaultPrecision), max)
}
func getMinPointValue(points []monitoring.Point) float64 {
var min float64
func getMinPointValue(points []monitoring.Point) string {
var min *big.Float
for i, p := range points {
if i == 0 {
min = p.Value()
min = new(big.Float).SetFloat64(p.Value())
}
if p.Value() < min {
min = p.Value()
pf := new(big.Float).SetFloat64(p.Value())
if min.Cmp(pf) == 1 {
min = pf
}
}
return min
return fmt.Sprintf(generateFloatFormat(meteringDefaultPrecision), min)
}
func getSumPointValue(points []monitoring.Point) float64 {
avg := 0.0
func getSumPointValue(points []monitoring.Point) string {
sum := new(big.Float).SetFloat64(0)
for _, p := range points {
avg += p.Value()
pf := new(big.Float).SetFloat64(p.Value())
sum.Add(sum, pf)
}
return avg
return fmt.Sprintf(generateFloatFormat(meteringDefaultPrecision), sum)
}
func getAvgPointValue(points []monitoring.Point) float64 {
return getSumPointValue(points) / float64(len(points))
func getAvgPointValue(points []monitoring.Point) string {
sum, ok := new(big.Float).SetString(getSumPointValue(points))
if !ok {
klog.Error("failed to parse big.Float")
return ""
}
length := new(big.Float).SetFloat64(float64(len(points)))
return fmt.Sprintf(generateFloatFormat(meteringDefaultPrecision), sum.Quo(sum, length))
}
func getCurrencyUnit() string {
@@ -164,6 +195,10 @@ func getCurrencyUnit() string {
return meterConfig.GetPriceInfo().CurrencyUnit
}
func generateFloatFormat(precision int) string {
return "%." + fmt.Sprintf("%d", precision) + "f"
}
func getResourceUnit(meterName string) string {
if resourceType, ok := MeterResourceMap[meterName]; !ok {
klog.Errorf("invlaid meter %v", meterName)
@@ -173,43 +208,66 @@ func getResourceUnit(meterName string) string {
}
}
func getFeeWithMeterName(meterName string, sum float64) float64 {
func getFeeWithMeterName(meterName string, sum string) string {
s, ok := new(big.Float).SetString(sum)
if !ok {
klog.Error("failed to parse string to float")
return ""
}
meterConfig, err := LoadYaml()
if err != nil {
klog.Error(err)
return -1
return ""
}
priceInfo := meterConfig.GetPriceInfo()
if resourceType, ok := MeterResourceMap[meterName]; !ok {
klog.Errorf("invlaid meter %v", meterName)
return -1
return ""
} else {
switch resourceType {
case METER_RESOURCE_TYPE_CPU:
// unit: core, precision: 0.001
sum = math.Round(sum*1000) / 1000
return priceInfo.CpuPerCorePerHour * sum
CpuPerCorePerHour := new(big.Float).SetFloat64(priceInfo.CpuPerCorePerHour)
tmp := s.Mul(s, CpuPerCorePerHour)
return fmt.Sprintf(generateFloatFormat(meteringCorePrecision), tmp)
case METER_RESOURCE_TYPE_MEM:
// unit: Gigabyte, precision: 0.1
sum = math.Round(sum/1073741824*10) / 10
return priceInfo.MemPerGigabytesPerHour * sum
oneGiga := new(big.Float).SetInt64(1073741824)
MemPerGigabytesPerHour := new(big.Float).SetFloat64(priceInfo.MemPerGigabytesPerHour)
// transform unit from bytes to Gigabytes
s.Quo(s, oneGiga)
return fmt.Sprintf(generateFloatFormat(meteringMemPrecision), s.Mul(s, MemPerGigabytesPerHour))
case METER_RESOURCE_TYPE_NET_INGRESS:
// unit: Megabyte, precision: 1
sum = math.Round(sum / 1048576)
return priceInfo.IngressNetworkTrafficPerMegabytesPerHour * sum
oneMega := new(big.Float).SetInt64(1048576)
IngressNetworkTrafficPerMegabytesPerHour := new(big.Float).SetFloat64(priceInfo.IngressNetworkTrafficPerMegabytesPerHour)
// transform unit from bytes to Migabytes
s.Quo(s, oneMega)
return fmt.Sprintf(generateFloatFormat(meteringIngressPrecision), s.Mul(s, IngressNetworkTrafficPerMegabytesPerHour))
case METER_RESOURCE_TYPE_NET_EGRESS:
// unit: Megabyte, precision: 1
sum = math.Round(sum / 1048576)
return priceInfo.EgressNetworkTrafficPerMegabytesPerHour * sum
oneMega := new(big.Float).SetInt64(1048576)
EgressNetworkTrafficPerMegabytesPerHour := new(big.Float).SetPrec(meteringEgressPrecision).SetFloat64(priceInfo.EgressNetworkTrafficPerMegabytesPerHour)
// transform unit from bytes to Migabytes
s.Quo(s, oneMega)
return fmt.Sprintf(generateFloatFormat(meteringEgressPrecision), s.Mul(s, EgressNetworkTrafficPerMegabytesPerHour))
case METER_RESOURCE_TYPE_PVC:
// unit: Gigabyte, precision: 0.1
sum = math.Round(sum/1073741824*10) / 10
return priceInfo.PvcPerGigabytesPerHour * sum
oneGiga := new(big.Float).SetInt64(1073741824)
PvcPerGigabytesPerHour := new(big.Float).SetPrec(meteringPvcPrecision).SetFloat64(priceInfo.PvcPerGigabytesPerHour)
// transform unit from bytes to Gigabytes
s.Quo(s, oneGiga)
return fmt.Sprintf(generateFloatFormat(meteringPvcPrecision), s.Mul(s, PvcPerGigabytesPerHour))
}
return -1
return ""
}
}
@@ -224,9 +282,9 @@ func updateMetricStatData(metric monitoring.Metric, scalingMap map[string]float6
metricData.MetricValues[index].MaxValue = getMaxPointValue(metricValue.Series)
metricData.MetricValues[index].AvgValue = getAvgPointValue(metricValue.Series)
} else {
metricData.MetricValues[index].MinValue = (*metricValue.Sample)[1]
metricData.MetricValues[index].MaxValue = (*metricValue.Sample)[1]
metricData.MetricValues[index].AvgValue = (*metricValue.Sample)[1]
metricData.MetricValues[index].MinValue = getMinPointValue([]monitoring.Point{*metricValue.Sample})
metricData.MetricValues[index].MaxValue = getMaxPointValue([]monitoring.Point{*metricValue.Sample})
metricData.MetricValues[index].AvgValue = getAvgPointValue([]monitoring.Point{*metricValue.Sample})
}
// squash points if step is more than one hour and calculate sum and fee
@@ -241,7 +299,7 @@ func updateMetricStatData(metric monitoring.Metric, scalingMap map[string]float6
metricData.MetricValues[index].SumValue = sum
metricData.MetricValues[index].Fee = getFeeWithMeterName(metricName, sum)
} else {
sum := (*metricValue.Sample)[1]
sum := getSumPointValue([]monitoring.Point{*metricValue.Sample})
metricData.MetricValues[index].SumValue = sum
metricData.MetricValues[index].Fee = getFeeWithMeterName(metricName, sum)
}

View File

@@ -0,0 +1,403 @@
package monitoring
import (
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/google/go-cmp/cmp"
"gopkg.in/yaml.v2"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
func TestGetMaxPointValue(t *testing.T) {
tests := []struct {
actualPoints []monitoring.Point
expectedValue string
}{
{
actualPoints: []monitoring.Point{
{1.0, 2.0},
{3.0, 4.0},
},
expectedValue: "4.0000000000",
},
{
actualPoints: []monitoring.Point{
{2, 1},
{4, 3.1},
},
expectedValue: "3.1000000000",
},
{
actualPoints: []monitoring.Point{
{5, 100},
{6, 100000.001},
},
expectedValue: "100000.0010000000",
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
max := getMaxPointValue(tt.actualPoints)
if max != tt.expectedValue {
t.Fatal("max point value caculation is wrong.")
}
})
}
}
func TestGetMinPointValue(t *testing.T) {
tests := []struct {
actualPoints []monitoring.Point
expectedValue string
}{
{
actualPoints: []monitoring.Point{
{1.0, 2.0},
{3.0, 4.0},
},
expectedValue: "2.0000000000",
},
{
actualPoints: []monitoring.Point{
{2, 1},
{4, 3.1},
},
expectedValue: "1.0000000000",
},
{
actualPoints: []monitoring.Point{
{5, 100},
{6, 100000.001},
},
expectedValue: "100.0000000000",
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
max := getMinPointValue(tt.actualPoints)
if max != tt.expectedValue {
t.Fatal("min point value caculation is wrong.")
}
})
}
}
func TestGetSumPointValue(t *testing.T) {
tests := []struct {
actualPoints []monitoring.Point
expectedValue string
}{
{
actualPoints: []monitoring.Point{
{1.0, 2.0},
{3.0, 4.0},
},
expectedValue: "6.0000000000",
},
{
actualPoints: []monitoring.Point{
{2, 1},
{4, 3.1},
},
expectedValue: "4.1000000000",
},
{
actualPoints: []monitoring.Point{
{5, 100},
{6, 100000.001},
},
expectedValue: "100100.0010000000",
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
max := getSumPointValue(tt.actualPoints)
if max != tt.expectedValue {
t.Fatal("sum point value caculation is wrong.")
}
})
}
}
func TestGetAvgPointValue(t *testing.T) {
tests := []struct {
actualPoints []monitoring.Point
expectedValue string
}{
{
actualPoints: []monitoring.Point{
{1.0, 2.0},
{3.0, 4.0},
},
expectedValue: "3.0000000000",
},
{
actualPoints: []monitoring.Point{
{2, 1},
{4, 3.1},
},
expectedValue: "2.0500000000",
},
{
actualPoints: []monitoring.Point{
{5, 100},
{6, 100000.001},
},
expectedValue: "50050.0005000000",
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
max := getAvgPointValue(tt.actualPoints)
if max != tt.expectedValue {
t.Fatal("avg point value caculattion is wrong.")
}
})
}
}
func saveTestConfig(t *testing.T, conf *MeterConfig) {
content, err := yaml.Marshal(conf)
if err != nil {
t.Fatalf("error marshal config. %v", err)
}
err = ioutil.WriteFile(meteringConfigName, content, 0640)
if err != nil {
t.Fatalf("error write configuration file, %v", err)
}
}
func cleanTestConfig(t *testing.T) {
if _, err := os.Stat(meteringConfigName); os.IsNotExist(err) {
t.Log("file not exists, skipping")
return
}
err := os.Remove(meteringConfigName)
if err != nil {
t.Fatalf("remove %s file failed", meteringConfigName)
}
}
func TestGetCurrencyUnit(t *testing.T) {
if getCurrencyUnit() != "" {
t.Fatal("currency unit should be empty")
}
saveTestConfig(t, &MeterConfig{
Billing: Billing{
PriceInfo: PriceInfo{
IngressNetworkTrafficPerMegabytesPerHour: 1,
EgressNetworkTrafficPerMegabytesPerHour: 2,
CpuPerCorePerHour: 3,
MemPerGigabytesPerHour: 4,
PvcPerGigabytesPerHour: 5,
CurrencyUnit: "CNY",
},
},
})
defer cleanTestConfig(t)
if getCurrencyUnit() != "CNY" {
t.Fatal("failed to get currency unit from config")
}
}
func TestGenerateFloatFormat(t *testing.T) {
format := generateFloatFormat(10)
if format != "%.10f" {
t.Fatalf("get currency float format failed, %s", format)
}
}
func TestGetResourceUnit(t *testing.T) {
tests := []struct {
meterName string
expectedValue string
}{
{
meterName: "no-exist",
expectedValue: "",
},
{
meterName: "meter_cluster_cpu_usage",
expectedValue: "cores",
},
}
for _, tt := range tests {
if getResourceUnit(tt.meterName) != tt.expectedValue {
t.Fatal("get resource unit failed")
}
}
}
func TestSquashPoints(t *testing.T) {
tests := []struct {
input []monitoring.Point
factor int
expected []monitoring.Point
}{
{
input: []monitoring.Point{
{1, 1},
{2, 2},
{3, 3},
{4, 4},
{5, 5},
{6, 6},
{7, 7},
{8, 8},
},
factor: 1,
expected: []monitoring.Point{
{1, 1},
{2, 2},
{3, 3},
{4, 4},
{5, 5},
{6, 6},
{7, 7},
{8, 8},
},
},
{
input: []monitoring.Point{
{1, 1},
{2, 2},
{3, 3},
{4, 4},
{5, 5},
{6, 6},
{7, 7},
{8, 8},
},
factor: 2,
expected: []monitoring.Point{
{2, 3},
{4, 7},
{6, 11},
{8, 15},
},
},
}
for _, tt := range tests {
got := squashPoints(tt.input, tt.factor)
if diff := cmp.Diff(got, tt.expected); diff != "" {
t.Errorf("%T differ (-got, +want): %s", tt.expected, diff)
}
}
}
func TestGetFeeWithMeterName(t *testing.T) {
saveTestConfig(t, &MeterConfig{
Billing: Billing{
PriceInfo: PriceInfo{
IngressNetworkTrafficPerMegabytesPerHour: 1,
EgressNetworkTrafficPerMegabytesPerHour: 2,
CpuPerCorePerHour: 3,
MemPerGigabytesPerHour: 4,
PvcPerGigabytesPerHour: 5,
CurrencyUnit: "CNY",
},
},
})
defer cleanTestConfig(t)
tests := []struct {
metric monitoring.Metric
scalingMap map[string]float64
expected monitoring.MetricData
}{
{
metric: monitoring.Metric{
MetricName: "test",
MetricData: monitoring.MetricData{
MetricType: monitoring.MetricTypeMatrix,
MetricValues: []monitoring.MetricValue{
{
Metadata: map[string]string{},
Series: []monitoring.Point{
{1, 1},
{2, 2},
},
},
},
},
},
scalingMap: map[string]float64{
"test": 1,
},
expected: monitoring.MetricData{
MetricType: monitoring.MetricTypeMatrix,
MetricValues: []monitoring.MetricValue{
{
Metadata: map[string]string{},
Series: []monitoring.Point{
{1, 1},
{2, 2},
},
MinValue: "1.0000000000",
MaxValue: "2.0000000000",
AvgValue: "1.5000000000",
SumValue: "3.0000000000",
CurrencyUnit: "CNY",
},
},
},
},
{
metric: monitoring.Metric{
MetricName: "test",
MetricData: monitoring.MetricData{
MetricType: monitoring.MetricTypeVector,
MetricValues: []monitoring.MetricValue{
{
Metadata: map[string]string{},
Sample: &monitoring.Point{1, 2},
},
},
},
},
scalingMap: nil,
expected: monitoring.MetricData{
MetricType: monitoring.MetricTypeVector,
MetricValues: []monitoring.MetricValue{
{
Metadata: map[string]string{},
Sample: &monitoring.Point{1, 2},
MinValue: "2.0000000000",
MaxValue: "2.0000000000",
AvgValue: "2.0000000000",
SumValue: "2.0000000000",
CurrencyUnit: "CNY",
},
},
},
},
}
for _, test := range tests {
got := updateMetricStatData(test.metric, test.scalingMap)
if diff := cmp.Diff(got, test.expected); diff != "" {
t.Errorf("%T differ (-got, +want): %s", test.expected, diff)
return
}
}
}

View File

@@ -15,6 +15,7 @@ import (
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/klog"
@@ -25,6 +26,8 @@ import (
"kubesphere.io/kubesphere/pkg/apiserver/query"
"kubesphere.io/kubesphere/pkg/apiserver/request"
monitoringmodel "kubesphere.io/kubesphere/pkg/models/monitoring"
"kubesphere.io/kubesphere/pkg/models/openpitrix"
"kubesphere.io/kubesphere/pkg/server/params"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
@@ -691,7 +694,12 @@ func (t *tenantOperator) transformMetricData(metrics monitoringmodel.Metrics) me
for _, metricValue := range metric.MetricValues {
//metricValue.SumValue
podName := metricValue.Metadata["pod"]
podsStats.Set(podName, metricName, metricValue.SumValue)
if s, err := strconv.ParseFloat(metricValue.SumValue, 64); err != nil {
klog.Error("failed to parse string to float64")
return nil
} else {
podsStats.Set(podName, metricName, s)
}
}
}
@@ -769,9 +777,15 @@ func (t *tenantOperator) updateDeploysStats(user user.Info, cluster, ns string,
return err
}
if ok, _ := t.isOpenPitrixComponent(cluster, ns, "deployment", deploy.Name); ok {
// TODO: for op deployment
continue
if ok, opName := t.isOpenPitrixComponent(cluster, ns, "deployment", deploy.Name); ok {
// for op deployment
for _, pod := range pods {
podsStat := podsStats[pod]
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// OpenPitrix field(create if not existed) -> deployments field(create if not existed) -> pod
resourceStats.GetOpenPitrixStats(opName).GetDeployStats(deploy.Name).SetPodStats(pod, podsStat)
}
} else if ok, appName := t.isAppComponent(ns, "deployment", deploy.Name); ok {
// for app deployment
for _, pod := range pods {
@@ -780,25 +794,24 @@ func (t *tenantOperator) updateDeploysStats(user user.Info, cluster, ns string,
klog.Warningf("%v not found", pod)
continue
}
if err := resourceStats.GetAppStats(appName).GetDeployStats(deploy.Name).SetPodStats(pod, podsStat); err != nil {
klog.Error(err)
return err
}
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// App field(create if not existed) -> deployments field(create if not existed) -> pod
resourceStats.GetAppStats(appName).GetDeployStats(deploy.Name).SetPodStats(pod, podsStat)
}
} else {
// for k8s deployment only
for _, pod := range pods {
if err := resourceStats.GetDeployStats(deploy.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// Deployments field(create if not existed) -> pod
resourceStats.GetDeployStats(deploy.Name).SetPodStats(pod, podsStats[pod])
}
}
}
// TODO: op aggregate for deployment components
// OpenPitrix aggregate for deployment components
for _, op := range resourceStats.OpenPitrixs {
op.Aggregate()
}
@@ -833,34 +846,36 @@ func (t *tenantOperator) updateDaemonsetsStats(user user.Info, cluster, ns strin
return err
}
if ok, _ := t.isOpenPitrixComponent(cluster, ns, "daemonset", daemonset.Name); ok {
// TODO: for op daemonset
continue
if ok, opName := t.isOpenPitrixComponent(cluster, ns, "daemonset", daemonset.Name); ok {
// for op daemonset
for _, pod := range pods {
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// OpenPitrix field(create if not existed) -> daemonsets field(create if not existed) -> pod
resourceStats.GetOpenPitrixStats(opName).GetDaemonStats(daemonset.Name).SetPodStats(pod, podsStats[pod])
}
} else if ok, appName := t.isAppComponent(ns, "daemonset", daemonset.Name); ok {
// for app daemonset
for _, pod := range pods {
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// app field(create if not existed) -> statefulsets field(create if not existed) -> pod
if err := resourceStats.GetAppStats(appName).GetDaemonStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
// App field(create if not existed) -> daemonsets field(create if not existed) -> pod
resourceStats.GetAppStats(appName).GetDaemonStats(daemonset.Name).SetPodStats(pod, podsStats[pod])
}
} else {
// for k8s daemonset
for _, pod := range pods {
if err := resourceStats.GetDaemonsetStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// Daemonsets field(create if not existed) -> pod
resourceStats.GetDaemonsetStats(daemonset.Name).SetPodStats(pod, podsStats[pod])
}
}
}
// here pod stats and level struct are ready
// TODO: op aggregate for daemonset components
// OpenPitrix aggregate for daemonset components
for _, op := range resourceStats.OpenPitrixs {
op.Aggregate()
}
@@ -877,8 +892,71 @@ func (t *tenantOperator) updateDaemonsetsStats(user user.Info, cluster, ns strin
return nil
}
// TODO: include op metering part
func (t *tenantOperator) collectOpenPitrixComponents(cluster, ns string) map[string][]string {
var opComponentsMap = make(map[string][]string)
var ops []string
conditions := params.Conditions{
Match: make(map[string]string),
Fuzzy: make(map[string]string),
}
resp, err := t.opRelease.ListApplications("", cluster, ns, &conditions, 10, 0, "", false)
if err != nil {
klog.Error("failed to list op apps")
return nil
}
totalCount := resp.TotalCount
resp, err = t.opRelease.ListApplications("", cluster, ns, &conditions, totalCount, 0, "", false)
if err != nil {
klog.Error("failed to list op apps")
return nil
}
for _, item := range resp.Items {
app := item.(*openpitrix.Application)
ops = append(ops, app.Cluster.ClusterId)
}
for _, op := range ops {
app, err := t.opRelease.DescribeApplication("", cluster, ns, op)
if err != nil {
klog.Error(err)
return nil
}
for _, object := range app.ReleaseInfo {
unstructuredObj := object.(*unstructured.Unstructured)
if unstructuredObj.GetKind() == "Service" ||
unstructuredObj.GetKind() == "Deployment" ||
unstructuredObj.GetKind() == "Daemonset" ||
unstructuredObj.GetKind() == "Statefulset" {
opComponentsMap[op+":"+unstructuredObj.GetKind()] = append(opComponentsMap[unstructuredObj.GetKind()], unstructuredObj.GetName())
}
}
}
return opComponentsMap
}
func (t *tenantOperator) isOpenPitrixComponent(cluster, ns, kind, componentName string) (bool, string) {
opComponentsMap := t.collectOpenPitrixComponents(cluster, ns)
for k, v := range opComponentsMap {
kk := strings.Split(k, ":")
if len(kk) != 2 {
klog.Errorf("invalid op key %s", k)
return false, ""
}
opName := kk[0]
if kk[1] == strings.Title(kind) {
for _, svc := range v {
if componentName == svc {
return true, opName
}
}
}
}
return false, ""
}
@@ -916,34 +994,34 @@ func (t *tenantOperator) updateStatefulsetsStats(user user.Info, cluster, ns str
return err
}
if ok, _ := t.isOpenPitrixComponent(cluster, ns, "statefulset", statefulset.Name); ok {
// TODO: for op statefulset
continue
if ok, opName := t.isOpenPitrixComponent(cluster, ns, "statefulset", statefulset.Name); ok {
// for op statefulset
for _, pod := range pods {
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// OpenPitrix field(create if not existed) -> statefulsets field(create if not existed) -> pod
resourceStats.GetOpenPitrixStats(opName).GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod])
}
} else if ok, appName := t.isAppComponent(ns, "daemonset", statefulset.Name); ok {
// for app statefulset
for _, pod := range pods {
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// app field(create if not existed) -> statefulsets field(create if not existed) -> pod
if err := resourceStats.GetAppStats(appName).GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
// App field(create if not existed) -> statefulsets field(create if not existed) -> pod
resourceStats.GetAppStats(appName).GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod])
}
} else {
// for k8s statefulset
for _, pod := range pods {
// same as above, the direction is similar:
// k8s field(create if not existed) -> statefulsets field(create if not existed) -> pod
if err := resourceStats.GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// Statefulsets field(create if not existed) -> pod
resourceStats.GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod])
}
}
}
// TODO: op aggregate for statefulset components
// OpenPitrix aggregate for statefulset components
for _, op := range resourceStats.OpenPitrixs {
op.Aggregate()
}

View File

@@ -0,0 +1,193 @@
package tenant
import (
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"kubesphere.io/kubesphere/pkg/models/metering"
monitoringmodel "kubesphere.io/kubesphere/pkg/models/monitoring"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
func TestIsRangeQuery(t *testing.T) {
tests := []struct {
options QueryOptions
expectedValue bool
}{
{
options: QueryOptions{},
expectedValue: true,
},
{
options: QueryOptions{Time: time.Now()},
expectedValue: false,
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
if tt.options.isRangeQuery() != tt.expectedValue {
t.Fatal("error isRangeQuery")
}
})
}
}
func TestShouldSort(t *testing.T) {
tests := []struct {
options QueryOptions
expectedValue bool
}{
{
options: QueryOptions{
Target: "test",
Identifier: "test",
},
expectedValue: true,
},
{
options: QueryOptions{},
expectedValue: false,
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
if tt.options.shouldSort() != tt.expectedValue {
t.Fatal("error shouldSort")
}
})
}
}
func TestGetMetricPosMap(t *testing.T) {
tests := []struct {
metrics []monitoring.Metric
expectedValue map[string]int
}{
{
metrics: []monitoring.Metric{
{MetricName: "one"},
{MetricName: "two"},
{MetricName: "three"},
{MetricName: "four"},
},
expectedValue: map[string]int{
"one": 0,
"two": 1,
"three": 2,
"four": 3,
},
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
if diff := cmp.Diff(getMetricPosMap(tt.metrics), tt.expectedValue); diff != "" {
t.Errorf("%T differ (-got, +want): %s", tt.expectedValue, diff)
}
})
}
}
func TestTransformMetricData(t *testing.T) {
tests := []struct {
metrics monitoringmodel.Metrics
expectedValue metering.PodsStats
}{
{
metrics: monitoringmodel.Metrics{
Results: []monitoring.Metric{
{
MetricName: "meter_pod_cpu_usage",
MetricData: monitoring.MetricData{
MetricValues: monitoring.MetricValues{
{
Metadata: map[string]string{
"pod": "pod1",
},
SumValue: "10",
},
},
},
},
{
MetricName: "meter_pod_memory_usage_wo_cache",
MetricData: monitoring.MetricData{
MetricValues: monitoring.MetricValues{
{
Metadata: map[string]string{
"pod": "pod1",
},
SumValue: "200",
},
},
},
},
{
MetricName: "meter_pod_net_bytes_transmitted",
MetricData: monitoring.MetricData{
MetricValues: monitoring.MetricValues{
{
Metadata: map[string]string{
"pod": "pod1",
},
SumValue: "300",
},
},
},
},
{
MetricName: "meter_pod_net_bytes_received",
MetricData: monitoring.MetricData{
MetricValues: monitoring.MetricValues{
{
Metadata: map[string]string{
"pod": "pod1",
},
SumValue: "300",
},
},
},
},
{
MetricName: "meter_pod_pvc_bytes_total",
MetricData: monitoring.MetricData{
MetricValues: monitoring.MetricValues{
{
Metadata: map[string]string{
"pod": "pod1",
},
SumValue: "400",
},
},
},
},
},
},
expectedValue: metering.PodsStats{
"pod1": &metering.PodStatistic{
CPUUsage: 10,
MemoryUsageWoCache: 200,
NetBytesReceived: 300,
NetBytesTransmitted: 300,
PVCBytesTotal: 400,
},
},
},
}
var tOperator tenantOperator
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
if diff := cmp.Diff(tOperator.transformMetricData(tt.metrics), tt.expectedValue); diff != "" {
t.Errorf("%T differ (-got, +want): %s", tt.expectedValue, diff)
}
})
}
}

View File

@@ -998,7 +998,7 @@ func (t *tenantOperator) MeteringHierarchy(user user.Info, queryParam *meteringv
podsStats := t.transformMetricData(res)
// classify pods stats
resourceStats, err := t.classifyPodStats(user, "", queryParam.NamespaceName, podsStats)
resourceStats, err := t.classifyPodStats(user, queryParam.Cluster, queryParam.NamespaceName, podsStats)
if err != nil {
klog.Error(err)
return metering.ResourceStatistic{}, err

View File

@@ -226,6 +226,7 @@ func (p prometheus) GetNamedMetersOverTime(meters []string, start, end time.Time
go func(metric string) {
parsedResp := monitoring.Metric{MetricName: metric}
begin := time.Now()
value, _, err := p.client.QueryRange(prometheusCtx, makeMeterExpr(metric, *queryOptions), timeRange)
end := time.Now()
timeElapsed := end.Unix() - begin.Unix()

View File

@@ -151,6 +151,18 @@ func (aso ApplicationsOption) Apply(o *QueryOptions) {
return
}
type OpenpitrixsOption struct {
Cluster string
NamespaceName string
Openpitrixs []string
StorageClassName string
}
func (oso OpenpitrixsOption) Apply(o *QueryOptions) {
// nothing should be done
return
}
// ApplicationsOption & OpenpitrixsOption share the same ApplicationOption struct
type ApplicationOption struct {
NamespaceName string

View File

@@ -63,13 +63,13 @@ type MetricValue struct {
ExportSample *ExportPoint `json:"exported_value,omitempty" description:"exported time series, values of vector type"`
ExportedSeries []ExportPoint `json:"exported_values,omitempty" description:"exported time series, values of matrix type"`
MinValue float64 `json:"min_value" description:"minimum value from monitor points"`
MaxValue float64 `json:"max_value" description:"maximum value from monitor points"`
AvgValue float64 `json:"avg_value" description:"average value from monitor points"`
SumValue float64 `json:"sum_value" description:"sum value from monitor points"`
Fee float64 `json:"fee" description:"resource fee"`
ResourceUnit string `json:"resource_unit"`
CurrencyUnit string `json:"currency_unit"`
MinValue string `json:"min_value" description:"minimum value from monitor points"`
MaxValue string `json:"max_value" description:"maximum value from monitor points"`
AvgValue string `json:"avg_value" description:"average value from monitor points"`
SumValue string `json:"sum_value" description:"sum value from monitor points"`
Fee string `json:"fee" description:"resource fee"`
ResourceUnit string `json:"resource_unit"`
CurrencyUnit string `json:"currency_unit"`
}
func (mv *MetricValue) TransferToExportedMetricValue() {