From 845f6bbe89a2ee56d1aa86a7858ee1dcc8e4f926 Mon Sep 17 00:00:00 2001 From: Rao Yunkun Date: Mon, 29 Mar 2021 15:53:22 +0800 Subject: [PATCH 1/4] Intergate OpenPitrix metrics into metering. Signed-off-by: Rao Yunkun --- pkg/api/metering/v1alpha1/types.go | 2 + pkg/kapis/metering/v1alpha1/handler.go | 19 +- pkg/kapis/metering/v1alpha1/register.go | 87 +++++++-- pkg/kapis/monitoring/v1alpha3/handler.go | 7 +- pkg/kapis/monitoring/v1alpha3/helper.go | 101 ++++++---- pkg/kapis/monitoring/v1alpha3/helper_test.go | 2 +- pkg/kapis/monitoring/v1alpha3/meter.go | 180 ++++++++++++++++-- pkg/kapis/tenant/v1alpha2/metering.go | 24 ++- pkg/kapis/tenant/v1alpha2/register.go | 6 +- pkg/models/monitoring/utils.go | 130 +++++++++---- pkg/models/tenant/metering.go | 115 +++++++++-- pkg/models/tenant/tenant.go | 2 +- .../monitoring/prometheus/prometheus.go | 1 + pkg/simple/client/monitoring/query_options.go | 12 ++ pkg/simple/client/monitoring/types.go | 14 +- 15 files changed, 551 insertions(+), 151 deletions(-) diff --git a/pkg/api/metering/v1alpha1/types.go b/pkg/api/metering/v1alpha1/types.go index 8afb1ce9e..f6917e971 100644 --- a/pkg/api/metering/v1alpha1/types.go +++ b/pkg/api/metering/v1alpha1/types.go @@ -51,6 +51,7 @@ type Query struct { Services string StorageClassName string PVCFilter string + Cluster string } func ParseQueryParameter(req *restful.Request) *Query { @@ -85,6 +86,7 @@ func ParseQueryParameter(req *restful.Request) *Query { q.Services = req.QueryParameter("services") q.StorageClassName = req.QueryParameter("storageclass") q.PVCFilter = req.QueryParameter("pvc_filter") + q.Cluster = req.QueryParameter("cluster") return &q } diff --git a/pkg/kapis/metering/v1alpha1/handler.go b/pkg/kapis/metering/v1alpha1/handler.go index 3e95b36ff..544029e3f 100644 --- a/pkg/kapis/metering/v1alpha1/handler.go +++ b/pkg/kapis/metering/v1alpha1/handler.go @@ -30,15 +30,16 @@ import ( ) type meterHandler interface { - HandleClusterMetersQuery(req *restful.Request, resp *restful.Response) - HandleNodeMetersQuery(req *restful.Request, resp *restful.Response) - HandleWorkspaceMetersQuery(req *restful.Request, resp *restful.Response) - HandleNamespaceMetersQuery(re *restful.Request, resp *restful.Response) - HandleWorkloadMetersQuery(req *restful.Request, resp *restful.Response) - HandleApplicationMetersQuery(req *restful.Request, resp *restful.Response) - HandlePodMetersQuery(req *restful.Request, resp *restful.Response) - HandleServiceMetersQuery(req *restful.Request, resp *restful.Response) - HandlePVCMetersQuery(req *restful.Request, resp *restful.Response) + HandleClusterMeterQuery(req *restful.Request, resp *restful.Response) + HandleNodeMeterQuery(req *restful.Request, resp *restful.Response) + HandleWorkspaceMeterQuery(req *restful.Request, resp *restful.Response) + HandleNamespaceMeterQuery(re *restful.Request, resp *restful.Response) + HandleOpenpitrixMeterQuery(req *restful.Request, resp *restful.Response) + HandleWorkloadMeterQuery(req *restful.Request, resp *restful.Response) + HandleApplicationMeterQuery(req *restful.Request, resp *restful.Response) + HandlePodMeterQuery(req *restful.Request, resp *restful.Response) + HandleServiceMeterQuery(req *restful.Request, resp *restful.Response) + HandlePVCMeterQuery(req *restful.Request, resp *restful.Response) } func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) meterHandler { diff --git a/pkg/kapis/metering/v1alpha1/register.go b/pkg/kapis/metering/v1alpha1/register.go index 5c5b53400..9b0433964 100644 --- a/pkg/kapis/metering/v1alpha1/register.go +++ b/pkg/kapis/metering/v1alpha1/register.go @@ -50,7 +50,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri h := newHandler(k8sClient, meteringClient, factory, ksClient, resourcev1alpha3.NewResourceGetter(factory, cache)) ws.Route(ws.GET("/cluster"). - To(h.HandleClusterMetersQuery). + To(h.HandleClusterMeterQuery). Doc("Get cluster-level meter data."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which meter data to return. For example, the following filter matches both cluster CPU usage and disk usage: `meter_cluster_cpu_usage|meter_cluster_memory_usage`.").DataType("string").Required(false)). @@ -64,7 +64,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/nodes"). - To(h.HandleNodeMetersQuery). + To(h.HandleNodeMeterQuery). Doc("Get node-level meter data of all nodes."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which meter data to return. For example, the following filter matches both node CPU usage and disk usage: `meter_node_cpu_usage|meter_node_memory_usage`.").DataType("string").Required(false)). @@ -85,7 +85,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/nodes/{node}"). - To(h.HandleNodeMetersQuery). + To(h.HandleNodeMeterQuery). Doc("Get node-level meter data of the specific node."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("node", "Node name.").DataType("string").Required(true)). @@ -102,7 +102,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/workspaces"). - To(h.HandleWorkspaceMetersQuery). + To(h.HandleWorkspaceMeterQuery). Doc("Get workspace-level meter data of all workspaces."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both workspace CPU usage and memory usage: `meter_workspace_cpu_usage|meter_workspace_memory_usage`.").DataType("string").Required(false)). @@ -123,7 +123,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/workspaces/{workspace}"). - To(h.HandleWorkspaceMetersQuery). + To(h.HandleWorkspaceMeterQuery). Doc("Get workspace-level meter data of a specific workspace."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("workspace", "Workspace name.").DataType("string").Required(true)). @@ -141,7 +141,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/workspaces/{workspace}/namespaces"). - To(h.HandleNamespaceMetersQuery). + To(h.HandleNamespaceMeterQuery). Doc("Get namespace-level meter data of a specific workspace."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("workspace", "Workspace name.").DataType("string").Required(true)). @@ -163,7 +163,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces"). - To(h.HandleNamespaceMetersQuery). + To(h.HandleNamespaceMeterQuery). Doc("Get namespace-level meter data of all namespaces."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both namespace CPU usage and memory usage: `meter_namespace_cpu_usage|meter_namespace_memory_usage_wo_cache`.").DataType("string").Required(false)). @@ -183,8 +183,25 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Returns(http.StatusOK, respOK, model.Metrics{})). Produces(restful.MIME_JSON) + ws.Route(ws.GET("/workspaces/{workspace}/namespaces/{namespace}"). + To(h.HandleNamespaceMeterQuery). + Doc("Get namespace-level meter data of the specific namespace."). + Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). + Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). + Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both namespace CPU usage and memory usage: `meter_namespace_cpu_usage|meter_namespace_memory_usage_wo_cache`.").DataType("string").Required(false)). + Param(ws.PathParameter("storageclass", "The name of the storageclass.").DataType("string").Required(false)). + Param(ws.QueryParameter("pvc_filter", "The PVC filter consists of a regexp pattern. It specifies which PVC data to return.").DataType("string").Required(false)). + Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").DataType("string").Required(false)). + Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").DataType("string").Required(false)). + Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)). + Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)). + Metadata(restfulspec.KeyOpenAPITags, []string{constants.NamespaceMetersTag}). + Writes(model.Metrics{}). + Returns(http.StatusOK, respOK, model.Metrics{})). + Produces(restful.MIME_JSON) + ws.Route(ws.GET("/namespaces/{namespace}"). - To(h.HandleNamespaceMetersQuery). + To(h.HandleNamespaceMeterQuery). Doc("Get namespace-level meter data of the specific namespace."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). @@ -201,7 +218,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}/workloads"). - To(h.HandleWorkloadMetersQuery). + To(h.HandleWorkloadMeterQuery). Doc("Get workload-level meter data of all workloads which belongs to a specific kind."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). @@ -222,7 +239,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}/applications"). - To(h.HandleApplicationMetersQuery). + To(h.HandleApplicationMeterQuery). Doc("Get app-level meter data of a specific application. Navigate to the app by the app's namespace."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). @@ -240,8 +257,46 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Returns(http.StatusOK, respOK, model.Metrics{})). Produces(restful.MIME_JSON) + ws.Route(ws.GET("/clusters/{cluster}/namespaces/{namespace}/openpitrixs"). + To(h.HandleOpenpitrixMeterQuery). + Doc("Get app-level meter data of a specific openpitrix app. Navigate to the app by the app's namespace."). + Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). + Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). + Param(ws.QueryParameter("openpitrix_ids", "Openpitrix application ids which can be joined by \"|\" ").DataType("string").Required(false)). + Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both pod CPU usage and memory usage: `meter_application_cpu_usage|meter_application_memory_usage_wo_cache`.").DataType("string").Required(false)). + Param(ws.PathParameter("storageclass", "The name of the storageclass.").DataType("string").Required(false)). + Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").DataType("string").Required(false)). + Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").DataType("string").Required(false)). + Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)). + Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_metric", "Sort pods by the specified metric. Not applicable if **start** and **end** are provided.").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "Sort order. One of asc, desc.").DefaultValue("desc.").DataType("string").Required(false)). + Metadata(restfulspec.KeyOpenAPITags, []string{constants.PodMetersTag}). + Writes(model.Metrics{}). + Returns(http.StatusOK, respOK, model.Metrics{})). + Produces(restful.MIME_JSON) + + ws.Route(ws.GET("/namespaces/{namespace}/openpitrixs"). + To(h.HandleOpenpitrixMeterQuery). + Doc("Get app-level meter data of a specific openpitrix app. Navigate to the app by the app's namespace."). + Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). + Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). + Param(ws.QueryParameter("openpitrix_ids", "Openpitrix application ids which can be joined by \"|\" ").DataType("string").Required(false)). + Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both pod CPU usage and memory usage: `meter_application_cpu_usage|meter_application_memory_usage_wo_cache`.").DataType("string").Required(false)). + Param(ws.PathParameter("storageclass", "The name of the storageclass.").DataType("string").Required(false)). + Param(ws.QueryParameter("start", "Start time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1559347200. ").DataType("string").Required(false)). + Param(ws.QueryParameter("end", "End time of query. Use **start** and **end** to retrieve metric data over a time span. It is a string with Unix time format, eg. 1561939200. ").DataType("string").Required(false)). + Param(ws.QueryParameter("step", "Time interval. Retrieve metric data at a fixed interval within the time range of start and end. It requires both **start** and **end** are provided. The format is [0-9]+[smhdwy]. Defaults to 10m (i.e. 10 min).").DataType("string").DefaultValue("10m").Required(false)). + Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_metric", "Sort pods by the specified metric. Not applicable if **start** and **end** are provided.").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "Sort order. One of asc, desc.").DefaultValue("desc.").DataType("string").Required(false)). + Metadata(restfulspec.KeyOpenAPITags, []string{constants.PodMetersTag}). + Writes(model.Metrics{}). + Returns(http.StatusOK, respOK, model.Metrics{})). + Produces(restful.MIME_JSON) + ws.Route(ws.GET("/namespaces/{namespace}/pods"). - To(h.HandlePodMetersQuery). + To(h.HandlePodMeterQuery). Doc("Get pod-level meter data of the specific namespace's pods."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). @@ -261,7 +316,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}/pods/{pod}"). - To(h.HandlePodMetersQuery). + To(h.HandlePodMeterQuery). Doc("Get pod-level meter data of a specific pod. Navigate to the pod by the pod's namespace."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). @@ -277,7 +332,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}/workloads/{workload}/pods"). - To(h.HandlePodMetersQuery). + To(h.HandlePodMeterQuery). Doc("Get pod-level meter data of a specific workload's pods. Navigate to the workload by the namespace."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). @@ -299,7 +354,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/nodes/{node}/pods"). - To(h.HandlePodMetersQuery). + To(h.HandlePodMeterQuery). Doc("Get pod-level meter data of all pods on a specific node."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("node", "Node name.").DataType("string").Required(true)). @@ -319,7 +374,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/nodes/{node}/pods/{pod}"). - To(h.HandlePodMetersQuery). + To(h.HandlePodMeterQuery). Doc("Get pod-level meter data of a specific pod. Navigate to the pod by the node where it is scheduled."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("node", "Node name.").DataType("string").Required(true)). @@ -335,7 +390,7 @@ func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteri Produces(restful.MIME_JSON) ws.Route(ws.GET("/namespaces/{namespace}/services"). - To(h.HandleServiceMetersQuery). + To(h.HandleServiceMeterQuery). Doc("Get service-level meter data of the specific namespace's pods."). Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.PathParameter("namespace", "The name of the namespace.").DataType("string").Required(true)). diff --git a/pkg/kapis/monitoring/v1alpha3/handler.go b/pkg/kapis/monitoring/v1alpha3/handler.go index 69775050c..d8300a282 100644 --- a/pkg/kapis/monitoring/v1alpha3/handler.go +++ b/pkg/kapis/monitoring/v1alpha3/handler.go @@ -43,10 +43,15 @@ type handler struct { } func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) *handler { + var opRelease openpitrix.Interface + if ksClient != nil { + opRelease = openpitrix.NewOpenpitrixOperator(f, ksClient, nil) + } + return &handler{ k: k, mo: model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, resourceGetter), - opRelease: nil, + opRelease: opRelease, } } diff --git a/pkg/kapis/monitoring/v1alpha3/helper.go b/pkg/kapis/monitoring/v1alpha3/helper.go index 490e6ec61..f10e19a17 100644 --- a/pkg/kapis/monitoring/v1alpha3/helper.go +++ b/pkg/kapis/monitoring/v1alpha3/helper.go @@ -60,6 +60,7 @@ const ( ) type reqParams struct { + metering bool operation string time string start string @@ -84,6 +85,8 @@ type reqParams struct { expression string metric string applications string + openpitrixs string + cluster string services string pvcFilter string } @@ -118,7 +121,6 @@ func (q queryOptions) shouldSort() bool { func parseRequestParams(req *restful.Request) reqParams { var r reqParams - r.operation = req.QueryParameter("operation") r.time = req.QueryParameter("time") r.start = req.QueryParameter("start") r.end = req.QueryParameter("end") @@ -131,21 +133,8 @@ func parseRequestParams(req *restful.Request) reqParams { r.resourceFilter = req.QueryParameter("resources_filter") r.workspaceName = req.PathParameter("workspace") r.namespaceName = req.PathParameter("namespace") - - if req.QueryParameter("node") != "" { - r.nodeName = req.QueryParameter("node") - } else { - // compatible with monitoring request - r.nodeName = req.PathParameter("node") - } - - if req.QueryParameter("kind") != "" { - r.workloadKind = req.QueryParameter("kind") - } else { - // compatible with monitoring request - r.workloadKind = req.PathParameter("kind") - } - + r.workloadKind = req.PathParameter("kind") + r.nodeName = req.PathParameter("node") r.workloadName = req.PathParameter("workload") r.podName = req.PathParameter("pod") r.containerName = req.PathParameter("container") @@ -154,13 +143,47 @@ func parseRequestParams(req *restful.Request) reqParams { r.componentType = req.PathParameter("component") r.expression = req.QueryParameter("expr") r.metric = req.QueryParameter("metric") - r.applications = req.QueryParameter("applications") - r.services = req.QueryParameter("services") - r.pvcFilter = req.QueryParameter("pvc_filter") return r } +func parseMeteringRequestParams(req *restful.Request) reqParams { + params := parseRequestParams(req) + + // mark this request is metering req + params.metering = true + + // whether need to export metering data + params.operation = req.QueryParameter("operation") + + // OpenPitrix belongs to which cluster + params.cluster = req.PathParameter("cluster") + + // specified which application crds + params.applications = req.QueryParameter("applications") + + // specified which OpenPitrix apps + params.openpitrixs = req.QueryParameter("openpitrix_ids") + + // specified which service + params.services = req.QueryParameter("services") + + // specified which pvc + params.pvcFilter = req.QueryParameter("pvc_filter") + + // support node param in URL query + if req.QueryParameter("node") != "" { + params.nodeName = req.QueryParameter("node") + } + + // support kind param in URL query + if req.QueryParameter("kind") != "" { + params.workloadKind = req.QueryParameter("kind") + } + + return params +} + func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOptions, err error) { if r.resourceFilter == "" { r.resourceFilter = DefaultFilter @@ -230,6 +253,26 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt } q.namedMetrics = model.ApplicationMetrics + case monitoring.LevelOpenpitrix: + q.identifier = model.IdentifierApplication + if r.namespaceName == "" { + return q, errors.New(fmt.Sprintf(ErrParameterNotfound, "namespace")) + } + + ops := []string{} + if len(r.openpitrixs) != 0 { + ops = strings.Split(r.openpitrixs, "|") + } + q.option = monitoring.OpenpitrixsOption{ + Cluster: r.cluster, + NamespaceName: r.namespaceName, + Openpitrixs: ops, + StorageClassName: r.storageClassName, + } + + // op share the same metrics with application + q.namedMetrics = model.ApplicationMetrics + case monitoring.LevelWorkload: q.identifier = model.IdentifierWorkload q.option = monitoring.WorkloadOption{ @@ -339,7 +382,7 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt } // Ensure query start time to be after the namespace creation time - if r.namespaceName != "" { + if r.namespaceName != "" && !r.metering { ns, err := h.k.CoreV1().Namespaces().Get(context.Background(), r.namespaceName, corev1.GetOptions{}) if err != nil { return q, err @@ -392,7 +435,7 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt return q, nil } -func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) { +func exportMetrics(metrics model.Metrics, startTime, endTime time.Time) (*bytes.Buffer, error) { var resBytes []byte for i, _ := range metrics.Results { @@ -415,18 +458,12 @@ func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) { } selector := strings.Join(targetList, "|") - var startTime, endTime string - if len(metricVal.ExportedSeries) > 0 { - startTime = metricVal.ExportedSeries[0].Timestamp() - endTime = metricVal.ExportedSeries[len(metricVal.ExportedSeries)-1].Timestamp() - } - statsTab := "\nmetric_name,selector,start_time,end_time,min,max,avg,sum,fee, currency_unit\n" + - fmt.Sprintf("%s,%s,%s,%s,%.2f,%.2f,%.2f,%.2f,%.2f,%s\n\n", + fmt.Sprintf("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n\n", metricName, selector, - startTime, - endTime, + startTime.String(), + endTime.String(), metricVal.MinValue, metricVal.MaxValue, metricVal.AvgValue, @@ -463,11 +500,11 @@ func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) { return output, nil } -func ExportMetrics(resp *restful.Response, metrics model.Metrics) { +func ExportMetrics(resp *restful.Response, metrics model.Metrics, startTime, endTime time.Time) { resp.Header().Set(restful.HEADER_ContentType, "text/plain") resp.Header().Set("Content-Disposition", "attachment") - output, err := exportMetrics(metrics) + output, err := exportMetrics(metrics, startTime, endTime) if err != nil { api.HandleBadRequest(resp, nil, err) return diff --git a/pkg/kapis/monitoring/v1alpha3/helper_test.go b/pkg/kapis/monitoring/v1alpha3/helper_test.go index efadff2fd..808bccb5f 100644 --- a/pkg/kapis/monitoring/v1alpha3/helper_test.go +++ b/pkg/kapis/monitoring/v1alpha3/helper_test.go @@ -372,7 +372,7 @@ func TestExportMetrics(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - _, err := exportMetrics(tt.metrics) + _, err := exportMetrics(tt.metrics, time.Now().Add(-time.Hour), time.Now()) if err != nil && !tt.expectedErr { t.Fatal("Failed to export metering metrics", err) } diff --git a/pkg/kapis/monitoring/v1alpha3/meter.go b/pkg/kapis/monitoring/v1alpha3/meter.go index cdb78526f..7f390a79a 100644 --- a/pkg/kapis/monitoring/v1alpha3/meter.go +++ b/pkg/kapis/monitoring/v1alpha3/meter.go @@ -4,7 +4,11 @@ import ( "regexp" "strings" + "kubesphere.io/kubesphere/pkg/models/openpitrix" + "kubesphere.io/kubesphere/pkg/server/params" + "github.com/emicklei/go-restful" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/klog" "kubesphere.io/kubesphere/pkg/api" @@ -12,8 +16,8 @@ import ( "kubesphere.io/kubesphere/pkg/simple/client/monitoring" ) -func (h handler) HandleClusterMetersQuery(req *restful.Request, resp *restful.Response) { - params := parseRequestParams(req) +func (h handler) HandleClusterMeterQuery(req *restful.Request, resp *restful.Response) { + params := parseMeteringRequestParams(req) opt, err := h.makeQueryOptions(params, monitoring.LevelCluster) if err != nil { api.HandleBadRequest(resp, nil, err) @@ -87,7 +91,7 @@ func (h handler) handleApplicationMetersQuery(meters []string, resp *restful.Res } if q.Operation == OperationExport { - ExportMetrics(resp, res) + ExportMetrics(resp, res, q.start, q.end) return } @@ -144,7 +148,7 @@ func (h handler) handleServiceMetersQuery(meters []string, resp *restful.Respons } if q.Operation == OperationExport { - ExportMetrics(resp, res) + ExportMetrics(resp, res, q.start, q.end) return } @@ -180,6 +184,12 @@ func (h handler) handleNamedMetersQuery(resp *restful.Response, q queryOptions) return } + _, ok = q.option.(monitoring.OpenpitrixsOption) + if ok { + h.handleOpenpitrixMetersQuery(meters, resp, q) + return + } + _, ok = q.option.(monitoring.ServicesOption) if ok { h.handleServiceMetersQuery(meters, resp, q) @@ -205,15 +215,16 @@ func (h handler) handleNamedMetersQuery(resp *restful.Response, q queryOptions) } if q.Operation == OperationExport { - ExportMetrics(resp, res) + ExportMetrics(resp, res, q.start, q.end) return } resp.WriteAsJson(res) } -func (h handler) HandleNodeMetersQuery(req *restful.Request, resp *restful.Response) { - params := parseRequestParams(req) +func (h handler) HandleNodeMeterQuery(req *restful.Request, resp *restful.Response) { + params := parseMeteringRequestParams(req) + params.metering = true opt, err := h.makeQueryOptions(params, monitoring.LevelNode) if err != nil { api.HandleBadRequest(resp, nil, err) @@ -222,8 +233,9 @@ func (h handler) HandleNodeMetersQuery(req *restful.Request, resp *restful.Respo h.handleNamedMetersQuery(resp, opt) } -func (h handler) HandleWorkspaceMetersQuery(req *restful.Request, resp *restful.Response) { - params := parseRequestParams(req) +func (h handler) HandleWorkspaceMeterQuery(req *restful.Request, resp *restful.Response) { + params := parseMeteringRequestParams(req) + params.metering = true opt, err := h.makeQueryOptions(params, monitoring.LevelWorkspace) if err != nil { api.HandleBadRequest(resp, nil, err) @@ -233,8 +245,9 @@ func (h handler) HandleWorkspaceMetersQuery(req *restful.Request, resp *restful. h.handleNamedMetersQuery(resp, opt) } -func (h handler) HandleNamespaceMetersQuery(req *restful.Request, resp *restful.Response) { - params := parseRequestParams(req) +func (h handler) HandleNamespaceMeterQuery(req *restful.Request, resp *restful.Response) { + params := parseMeteringRequestParams(req) + params.metering = true opt, err := h.makeQueryOptions(params, monitoring.LevelNamespace) if err != nil { if err.Error() == ErrNoHit { @@ -250,8 +263,8 @@ func (h handler) HandleNamespaceMetersQuery(req *restful.Request, resp *restful. h.handleNamedMetersQuery(resp, opt) } -func (h handler) HandleWorkloadMetersQuery(req *restful.Request, resp *restful.Response) { - params := parseRequestParams(req) +func (h handler) HandleWorkloadMeterQuery(req *restful.Request, resp *restful.Response) { + params := parseMeteringRequestParams(req) opt, err := h.makeQueryOptions(params, monitoring.LevelWorkload) if err != nil { if err.Error() == ErrNoHit { @@ -266,8 +279,8 @@ func (h handler) HandleWorkloadMetersQuery(req *restful.Request, resp *restful.R h.handleNamedMetersQuery(resp, opt) } -func (h handler) HandleApplicationMetersQuery(req *restful.Request, resp *restful.Response) { - params := parseRequestParams(req) +func (h handler) HandleApplicationMeterQuery(req *restful.Request, resp *restful.Response) { + params := parseMeteringRequestParams(req) opt, err := h.makeQueryOptions(params, monitoring.LevelApplication) if err != nil { if err.Error() == ErrNoHit { @@ -282,8 +295,25 @@ func (h handler) HandleApplicationMetersQuery(req *restful.Request, resp *restfu h.handleNamedMetersQuery(resp, opt) } -func (h handler) HandlePodMetersQuery(req *restful.Request, resp *restful.Response) { - params := parseRequestParams(req) +func (h handler) HandleOpenpitrixMeterQuery(req *restful.Request, resp *restful.Response) { + params := parseMeteringRequestParams(req) + opt, err := h.makeQueryOptions(params, monitoring.LevelOpenpitrix) + if err != nil { + if err.Error() == ErrNoHit { + res := handleNoHit(opt.namedMetrics) + resp.WriteAsJson(res) + return + } + + api.HandleBadRequest(resp, nil, err) + return + } + + h.handleNamedMetersQuery(resp, opt) +} + +func (h handler) HandlePodMeterQuery(req *restful.Request, resp *restful.Response) { + params := parseMeteringRequestParams(req) opt, err := h.makeQueryOptions(params, monitoring.LevelPod) if err != nil { if err.Error() == ErrNoHit { @@ -295,11 +325,12 @@ func (h handler) HandlePodMetersQuery(req *restful.Request, resp *restful.Respon api.HandleBadRequest(resp, nil, err) return } + h.handleNamedMetersQuery(resp, opt) } -func (h handler) HandleServiceMetersQuery(req *restful.Request, resp *restful.Response) { - params := parseRequestParams(req) +func (h handler) HandleServiceMeterQuery(req *restful.Request, resp *restful.Response) { + params := parseMeteringRequestParams(req) opt, err := h.makeQueryOptions(params, monitoring.LevelService) if err != nil { if err.Error() == ErrNoHit { @@ -315,8 +346,8 @@ func (h handler) HandleServiceMetersQuery(req *restful.Request, resp *restful.Re h.handleNamedMetersQuery(resp, opt) } -func (h handler) HandlePVCMetersQuery(req *restful.Request, resp *restful.Response) { - params := parseRequestParams(req) +func (h handler) HandlePVCMeterQuery(req *restful.Request, resp *restful.Response) { + params := parseMeteringRequestParams(req) opt, err := h.makeQueryOptions(params, monitoring.LevelPVC) if err != nil { if err.Error() == ErrNoHit { @@ -330,3 +361,110 @@ func (h handler) HandlePVCMetersQuery(req *restful.Request, resp *restful.Respon } h.handleNamedMetersQuery(resp, opt) } + +func (h handler) collectOps(cluster, ns string) []string { + + var ops []string + + conditions := params.Conditions{ + Match: make(map[string]string), + Fuzzy: make(map[string]string), + } + + resp, err := h.opRelease.ListApplications("", cluster, ns, &conditions, 10, 0, "", false) + if err != nil { + klog.Error("failed to list op apps") + return nil + } + totalCount := resp.TotalCount + resp, err = h.opRelease.ListApplications("", cluster, ns, &conditions, totalCount, 0, "", false) + if err != nil { + klog.Error("failed to list op apps") + return nil + } + + for _, item := range resp.Items { + app := item.(*openpitrix.Application) + ops = append(ops, app.Cluster.ClusterId) + } + return ops +} +func (h handler) getOpWorkloads(cluster, ns string, ops []string) map[string][]string { + + componentsMap := make(map[string][]string) + + if len(ops) == 0 { + ops = h.collectOps(cluster, ns) + } + + for _, op := range ops { + app, err := h.opRelease.DescribeApplication("", cluster, ns, op) + if err != nil { + klog.Error(err) + return nil + } + for _, object := range app.ReleaseInfo { + unstructuredObj := object.(*unstructured.Unstructured) + componentsMap[op] = append(componentsMap[op], unstructuredObj.GetKind()+":"+unstructuredObj.GetName()) + } + } + + return componentsMap +} +func (h handler) handleOpenpitrixMetersQuery(meters []string, resp *restful.Response, q queryOptions) { + var metricMap = make(map[string]int) + var res model.Metrics + var current_res model.Metrics + var err error + + oso, ok := q.option.(monitoring.OpenpitrixsOption) + if !ok { + klog.Error("invalid openpitrix option") + return + } + + opWorkloads := h.getOpWorkloads(oso.Cluster, oso.NamespaceName, oso.Openpitrixs) + + for k, _ := range opWorkloads { + opt := monitoring.ApplicationOption{ + NamespaceName: oso.NamespaceName, + Application: k, + ApplicationComponents: opWorkloads[k], + StorageClassName: oso.StorageClassName, + } + + if q.isRangeQuery() { + current_res, err = h.mo.GetNamedMetersOverTime(meters, q.start, q.end, q.step, opt) + } else { + current_res, err = h.mo.GetNamedMeters(meters, q.time, opt) + } + if err != nil { + api.HandleBadRequest(resp, nil, err) + return + } + if res.Results == nil { + res = current_res + metricMap = getMetricPosMap(res.Results) + } else { + for _, cur_res := range current_res.Results { + pos, ok := metricMap[cur_res.MetricName] + if ok { + res.Results[pos].MetricValues = append(res.Results[pos].MetricValues, cur_res.MetricValues...) + } else { + res.Results = append(res.Results, cur_res) + } + } + } + } + + if !q.isRangeQuery() && q.shouldSort() { + res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit) + } + + if q.Operation == OperationExport { + ExportMetrics(resp, res, q.start, q.end) + return + } + + resp.WriteAsJson(res) +} diff --git a/pkg/kapis/tenant/v1alpha2/metering.go b/pkg/kapis/tenant/v1alpha2/metering.go index 748f12057..bca0b1b59 100644 --- a/pkg/kapis/tenant/v1alpha2/metering.go +++ b/pkg/kapis/tenant/v1alpha2/metering.go @@ -6,6 +6,9 @@ import ( "github.com/emicklei/go-restful" "k8s.io/klog" + "strconv" + "time" + "kubesphere.io/kubesphere/pkg/api" meteringv1alpha1 "kubesphere.io/kubesphere/pkg/api/metering/v1alpha1" "kubesphere.io/kubesphere/pkg/apiserver/request" @@ -15,7 +18,7 @@ import ( monitoringclient "kubesphere.io/kubesphere/pkg/simple/client/monitoring" ) -func (h *tenantHandler) QueryMeterings(req *restful.Request, resp *restful.Response) { +func (h *tenantHandler) QueryMetering(req *restful.Request, resp *restful.Response) { u, ok := request.UserFrom(req.Request.Context()) if !ok { @@ -34,14 +37,29 @@ func (h *tenantHandler) QueryMeterings(req *restful.Request, resp *restful.Respo } if q.Operation == monitoringv1alpha3.OperationExport { - monitoringv1alpha3.ExportMetrics(resp, res) + + start, err := strconv.ParseInt(q.Start, 10, 64) + if err != nil { + api.HandleBadRequest(resp, nil, err) + return + } + end, err := strconv.ParseInt(q.End, 10, 64) + if err != nil { + api.HandleBadRequest(resp, nil, err) + return + } + + startTime := time.Unix(start, 0) + endTime := time.Unix(end, 0) + + monitoringv1alpha3.ExportMetrics(resp, res, startTime, endTime) return } resp.WriteAsJson(res) } -func (h *tenantHandler) QueryMeteringsHierarchy(req *restful.Request, resp *restful.Response) { +func (h *tenantHandler) QueryMeteringHierarchy(req *restful.Request, resp *restful.Response) { u, ok := request.UserFrom(req.Request.Context()) if !ok { err := fmt.Errorf("cannot obtain user info") diff --git a/pkg/kapis/tenant/v1alpha2/register.go b/pkg/kapis/tenant/v1alpha2/register.go index 353068db3..755dd5849 100644 --- a/pkg/kapis/tenant/v1alpha2/register.go +++ b/pkg/kapis/tenant/v1alpha2/register.go @@ -40,7 +40,6 @@ import ( kubesphere "kubesphere.io/kubesphere/pkg/client/clientset/versioned" "kubesphere.io/kubesphere/pkg/constants" "kubesphere.io/kubesphere/pkg/informers" - monitoringv1alpha3 "kubesphere.io/kubesphere/pkg/kapis/monitoring/v1alpha3" "kubesphere.io/kubesphere/pkg/models" "kubesphere.io/kubesphere/pkg/models/iam/am" "kubesphere.io/kubesphere/pkg/models/monitoring" @@ -301,10 +300,9 @@ func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8s Returns(http.StatusOK, api.StatusOK, auditingv1alpha1.APIResponse{})) ws.Route(ws.GET("/metering"). - To(handler.QueryMeterings). + To(handler.QueryMetering). Doc("Get meterings against the cluster."). Param(ws.QueryParameter("level", "Metering level.").DataType("string").Required(true)). - Param(ws.QueryParameter("operation", "Metering operation.").DataType("string").Required(false).DefaultValue(monitoringv1alpha3.OperationQuery)). Param(ws.QueryParameter("node", "Node name.").DataType("string").Required(false)). Param(ws.QueryParameter("workspace", "Workspace name.").DataType("string").Required(false)). Param(ws.QueryParameter("namespace", "Namespace name.").DataType("string").Required(false)). @@ -330,7 +328,7 @@ func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8s Returns(http.StatusOK, api.StatusOK, monitoring.Metrics{})) ws.Route(ws.GET("/namespaces/{namespace}/metering/hierarchy"). - To(handler.QueryMeteringsHierarchy). + To(handler.QueryMeteringHierarchy). Param(ws.PathParameter("namespace", "Namespace name.").DataType("string").Required(false)). Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both workspace CPU usage and memory usage: `meter_pod_cpu_usage|meter_pod_memory_usage_wo_cache`.").DataType("string").Required(false)). Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)). diff --git a/pkg/models/monitoring/utils.go b/pkg/models/monitoring/utils.go index 5f6ab87c2..200ef527f 100644 --- a/pkg/models/monitoring/utils.go +++ b/pkg/models/monitoring/utils.go @@ -1,7 +1,8 @@ package monitoring import ( - "math" + "fmt" + "math/big" "os" "k8s.io/apimachinery/pkg/util/yaml" @@ -18,6 +19,13 @@ const ( METER_RESOURCE_TYPE_PVC meteringConfig = "/etc/kubesphere/metering/ks-metering.yaml" + + meteringDefaultPrecision = 10 + meteringCorePrecision = 3 + meteringMemPrecision = 1 + meteringIngressPrecision = 0 + meteringEgressPrecision = 0 + meteringPvcPrecision = 1 ) var meterResourceUnitMap = map[int]string{ @@ -110,48 +118,59 @@ func LoadYaml() (*MeterConfig, error) { return &meterConfig, nil } -func getMaxPointValue(points []monitoring.Point) float64 { - var max float64 +func getMaxPointValue(points []monitoring.Point) string { + var max *big.Float for i, p := range points { if i == 0 { - max = p.Value() + max = new(big.Float).SetFloat64(p.Value()) } - if p.Value() > max { - max = p.Value() + pf := new(big.Float).SetFloat64(p.Value()) + if pf.Cmp(max) == 1 { + max = pf } } - return max + return fmt.Sprintf(generateFloatFormat(meteringDefaultPrecision), max) } -func getMinPointValue(points []monitoring.Point) float64 { - var min float64 +func getMinPointValue(points []monitoring.Point) string { + var min *big.Float for i, p := range points { if i == 0 { - min = p.Value() + min = new(big.Float).SetFloat64(p.Value()) } - if p.Value() < min { - min = p.Value() + pf := new(big.Float).SetFloat64(p.Value()) + if min.Cmp(pf) == 1 { + min = pf } } - return min + return fmt.Sprintf(generateFloatFormat(meteringDefaultPrecision), min) } -func getSumPointValue(points []monitoring.Point) float64 { - avg := 0.0 +func getSumPointValue(points []monitoring.Point) string { + sum := new(big.Float).SetFloat64(0) for _, p := range points { - avg += p.Value() + pf := new(big.Float).SetFloat64(p.Value()) + sum.Add(sum, pf) } - return avg + return fmt.Sprintf(generateFloatFormat(meteringDefaultPrecision), sum) } -func getAvgPointValue(points []monitoring.Point) float64 { - return getSumPointValue(points) / float64(len(points)) +func getAvgPointValue(points []monitoring.Point) string { + sum, ok := new(big.Float).SetString(getSumPointValue(points)) + if !ok { + klog.Error("failed to parse big.Float") + return "" + } + + length := new(big.Float).SetFloat64(float64(len(points))) + + return fmt.Sprintf("%.3f", sum.Quo(sum, length)) } func getCurrencyUnit() string { @@ -164,6 +183,10 @@ func getCurrencyUnit() string { return meterConfig.GetPriceInfo().CurrencyUnit } +func generateFloatFormat(precision int) string { + return "%." + fmt.Sprintf("%d", precision) + "f" +} + func getResourceUnit(meterName string) string { if resourceType, ok := MeterResourceMap[meterName]; !ok { klog.Errorf("invlaid meter %v", meterName) @@ -173,43 +196,66 @@ func getResourceUnit(meterName string) string { } } -func getFeeWithMeterName(meterName string, sum float64) float64 { +func getFeeWithMeterName(meterName string, sum string) string { + + s, ok := new(big.Float).SetString(sum) + if !ok { + klog.Error("failed to parse string to float") + return "" + } meterConfig, err := LoadYaml() if err != nil { klog.Error(err) - return -1 + return "" } priceInfo := meterConfig.GetPriceInfo() if resourceType, ok := MeterResourceMap[meterName]; !ok { klog.Errorf("invlaid meter %v", meterName) - return -1 + return "" } else { switch resourceType { case METER_RESOURCE_TYPE_CPU: - // unit: core, precision: 0.001 - sum = math.Round(sum*1000) / 1000 - return priceInfo.CpuPerCorePerHour * sum + CpuPerCorePerHour := new(big.Float).SetFloat64(priceInfo.CpuPerCorePerHour) + tmp := s.Mul(s, CpuPerCorePerHour) + + return fmt.Sprintf(generateFloatFormat(meteringCorePrecision), tmp) case METER_RESOURCE_TYPE_MEM: - // unit: Gigabyte, precision: 0.1 - sum = math.Round(sum/1073741824*10) / 10 - return priceInfo.MemPerGigabytesPerHour * sum + oneGiga := new(big.Float).SetInt64(1073741824) + MemPerGigabytesPerHour := new(big.Float).SetFloat64(priceInfo.MemPerGigabytesPerHour) + + // transform unit from bytes to Gigabytes + s.Quo(s, oneGiga) + + return fmt.Sprintf(generateFloatFormat(meteringMemPrecision), s.Mul(s, MemPerGigabytesPerHour)) case METER_RESOURCE_TYPE_NET_INGRESS: - // unit: Megabyte, precision: 1 - sum = math.Round(sum / 1048576) - return priceInfo.IngressNetworkTrafficPerMegabytesPerHour * sum + oneMega := new(big.Float).SetInt64(1048576) + IngressNetworkTrafficPerMegabytesPerHour := new(big.Float).SetFloat64(priceInfo.IngressNetworkTrafficPerMegabytesPerHour) + + // transform unit from bytes to Migabytes + s.Quo(s, oneMega) + + return fmt.Sprintf(generateFloatFormat(meteringIngressPrecision), s.Mul(s, IngressNetworkTrafficPerMegabytesPerHour)) case METER_RESOURCE_TYPE_NET_EGRESS: - // unit: Megabyte, precision: 1 - sum = math.Round(sum / 1048576) - return priceInfo.EgressNetworkTrafficPerMegabytesPerHour * sum + oneMega := new(big.Float).SetInt64(1048576) + EgressNetworkTrafficPerMegabytesPerHour := new(big.Float).SetPrec(meteringEgressPrecision).SetFloat64(priceInfo.EgressNetworkTrafficPerMegabytesPerHour) + + // transform unit from bytes to Migabytes + s.Quo(s, oneMega) + + return fmt.Sprintf(generateFloatFormat(meteringEgressPrecision), s.Mul(s, EgressNetworkTrafficPerMegabytesPerHour)) case METER_RESOURCE_TYPE_PVC: - // unit: Gigabyte, precision: 0.1 - sum = math.Round(sum/1073741824*10) / 10 - return priceInfo.PvcPerGigabytesPerHour * sum + oneGiga := new(big.Float).SetInt64(1073741824) + PvcPerGigabytesPerHour := new(big.Float).SetPrec(meteringPvcPrecision).SetFloat64(priceInfo.PvcPerGigabytesPerHour) + + // transform unit from bytes to Gigabytes + s.Quo(s, oneGiga) + + return fmt.Sprintf(generateFloatFormat(meteringPvcPrecision), s.Mul(s, PvcPerGigabytesPerHour)) } - return -1 + return "" } } @@ -224,9 +270,9 @@ func updateMetricStatData(metric monitoring.Metric, scalingMap map[string]float6 metricData.MetricValues[index].MaxValue = getMaxPointValue(metricValue.Series) metricData.MetricValues[index].AvgValue = getAvgPointValue(metricValue.Series) } else { - metricData.MetricValues[index].MinValue = (*metricValue.Sample)[1] - metricData.MetricValues[index].MaxValue = (*metricValue.Sample)[1] - metricData.MetricValues[index].AvgValue = (*metricValue.Sample)[1] + metricData.MetricValues[index].MinValue = getMinPointValue([]monitoring.Point{*metricValue.Sample}) + metricData.MetricValues[index].MaxValue = getMaxPointValue([]monitoring.Point{*metricValue.Sample}) + metricData.MetricValues[index].AvgValue = getAvgPointValue([]monitoring.Point{*metricValue.Sample}) } // squash points if step is more than one hour and calculate sum and fee @@ -241,7 +287,7 @@ func updateMetricStatData(metric monitoring.Metric, scalingMap map[string]float6 metricData.MetricValues[index].SumValue = sum metricData.MetricValues[index].Fee = getFeeWithMeterName(metricName, sum) } else { - sum := (*metricValue.Sample)[1] + sum := getSumPointValue([]monitoring.Point{*metricValue.Sample}) metricData.MetricValues[index].SumValue = sum metricData.MetricValues[index].Fee = getFeeWithMeterName(metricName, sum) } diff --git a/pkg/models/tenant/metering.go b/pkg/models/tenant/metering.go index 2f129762e..21b252d28 100644 --- a/pkg/models/tenant/metering.go +++ b/pkg/models/tenant/metering.go @@ -15,6 +15,7 @@ import ( appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/klog" @@ -25,6 +26,8 @@ import ( "kubesphere.io/kubesphere/pkg/apiserver/query" "kubesphere.io/kubesphere/pkg/apiserver/request" monitoringmodel "kubesphere.io/kubesphere/pkg/models/monitoring" + "kubesphere.io/kubesphere/pkg/models/openpitrix" + "kubesphere.io/kubesphere/pkg/server/params" "kubesphere.io/kubesphere/pkg/simple/client/monitoring" ) @@ -691,7 +694,12 @@ func (t *tenantOperator) transformMetricData(metrics monitoringmodel.Metrics) me for _, metricValue := range metric.MetricValues { //metricValue.SumValue podName := metricValue.Metadata["pod"] - podsStats.Set(podName, metricName, metricValue.SumValue) + if s, err := strconv.ParseFloat(metricValue.SumValue, 64); err != nil { + klog.Error("failed to parse string to float64") + return nil + } else { + podsStats.Set(podName, metricName, s) + } } } @@ -769,9 +777,15 @@ func (t *tenantOperator) updateDeploysStats(user user.Info, cluster, ns string, return err } - if ok, _ := t.isOpenPitrixComponent(cluster, ns, "deployment", deploy.Name); ok { - // TODO: for op deployment - continue + if ok, opName := t.isOpenPitrixComponent(cluster, ns, "deployment", deploy.Name); ok { + // for op deployment + for _, pod := range pods { + podsStat := podsStats[pod] + if err := resourceStats.GetOpenPitrixStats(opName).GetDeployStats(deploy.Name).SetPodStats(pod, podsStat); err != nil { + klog.Error(err) + return err + } + } } else if ok, appName := t.isAppComponent(ns, "deployment", deploy.Name); ok { // for app deployment for _, pod := range pods { @@ -798,7 +812,7 @@ func (t *tenantOperator) updateDeploysStats(user user.Info, cluster, ns string, } } - // TODO: op aggregate for deployment components + // OpenPitrix aggregate for deployment components for _, op := range resourceStats.OpenPitrixs { op.Aggregate() } @@ -833,9 +847,14 @@ func (t *tenantOperator) updateDaemonsetsStats(user user.Info, cluster, ns strin return err } - if ok, _ := t.isOpenPitrixComponent(cluster, ns, "daemonset", daemonset.Name); ok { - // TODO: for op daemonset - continue + if ok, opName := t.isOpenPitrixComponent(cluster, ns, "daemonset", daemonset.Name); ok { + // for op daemonset + for _, pod := range pods { + if err := resourceStats.GetOpenPitrixStats(opName).GetDaemonStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil { + klog.Error(err) + return err + } + } } else if ok, appName := t.isAppComponent(ns, "daemonset", daemonset.Name); ok { // for app daemonset for _, pod := range pods { @@ -860,7 +879,7 @@ func (t *tenantOperator) updateDaemonsetsStats(user user.Info, cluster, ns strin // here pod stats and level struct are ready - // TODO: op aggregate for daemonset components + // OpenPitrix aggregate for daemonset components for _, op := range resourceStats.OpenPitrixs { op.Aggregate() } @@ -877,8 +896,71 @@ func (t *tenantOperator) updateDaemonsetsStats(user user.Info, cluster, ns strin return nil } -// TODO: include op metering part +func (t *tenantOperator) collectOpenPitrixComponents(cluster, ns string) map[string][]string { + var opComponentsMap = make(map[string][]string) + var ops []string + conditions := params.Conditions{ + Match: make(map[string]string), + Fuzzy: make(map[string]string), + } + + resp, err := t.opRelease.ListApplications("", cluster, ns, &conditions, 10, 0, "", false) + if err != nil { + klog.Error("failed to list op apps") + return nil + } + totalCount := resp.TotalCount + resp, err = t.opRelease.ListApplications("", cluster, ns, &conditions, totalCount, 0, "", false) + if err != nil { + klog.Error("failed to list op apps") + return nil + } + + for _, item := range resp.Items { + app := item.(*openpitrix.Application) + ops = append(ops, app.Cluster.ClusterId) + } + + for _, op := range ops { + app, err := t.opRelease.DescribeApplication("", cluster, ns, op) + if err != nil { + klog.Error(err) + return nil + } + for _, object := range app.ReleaseInfo { + unstructuredObj := object.(*unstructured.Unstructured) + if unstructuredObj.GetKind() == "Service" || + unstructuredObj.GetKind() == "Deployment" || + unstructuredObj.GetKind() == "Daemonset" || + unstructuredObj.GetKind() == "Statefulset" { + opComponentsMap[op+":"+unstructuredObj.GetKind()] = append(opComponentsMap[unstructuredObj.GetKind()], unstructuredObj.GetName()) + } + } + } + + return opComponentsMap +} + func (t *tenantOperator) isOpenPitrixComponent(cluster, ns, kind, componentName string) (bool, string) { + opComponentsMap := t.collectOpenPitrixComponents(cluster, ns) + + for k, v := range opComponentsMap { + + kk := strings.Split(k, ":") + if len(kk) != 2 { + klog.Errorf("invalid op key %s", k) + return false, "" + } + opName := kk[0] + if kk[1] == strings.Title(kind) { + for _, svc := range v { + if componentName == svc { + return true, opName + } + } + } + } + return false, "" } @@ -916,9 +998,14 @@ func (t *tenantOperator) updateStatefulsetsStats(user user.Info, cluster, ns str return err } - if ok, _ := t.isOpenPitrixComponent(cluster, ns, "statefulset", statefulset.Name); ok { - // TODO: for op statefulset - continue + if ok, opName := t.isOpenPitrixComponent(cluster, ns, "statefulset", statefulset.Name); ok { + // for op statefulset + for _, pod := range pods { + if err := resourceStats.GetOpenPitrixStats(opName).GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil { + klog.Error(err) + return err + } + } } else if ok, appName := t.isAppComponent(ns, "daemonset", statefulset.Name); ok { // for app statefulset for _, pod := range pods { @@ -943,7 +1030,7 @@ func (t *tenantOperator) updateStatefulsetsStats(user user.Info, cluster, ns str } } - // TODO: op aggregate for statefulset components + // OpenPitrix aggregate for statefulset components for _, op := range resourceStats.OpenPitrixs { op.Aggregate() } diff --git a/pkg/models/tenant/tenant.go b/pkg/models/tenant/tenant.go index 0c018ef2a..1b9001acc 100644 --- a/pkg/models/tenant/tenant.go +++ b/pkg/models/tenant/tenant.go @@ -998,7 +998,7 @@ func (t *tenantOperator) MeteringHierarchy(user user.Info, queryParam *meteringv podsStats := t.transformMetricData(res) // classify pods stats - resourceStats, err := t.classifyPodStats(user, "", queryParam.NamespaceName, podsStats) + resourceStats, err := t.classifyPodStats(user, queryParam.Cluster, queryParam.NamespaceName, podsStats) if err != nil { klog.Error(err) return metering.ResourceStatistic{}, err diff --git a/pkg/simple/client/monitoring/prometheus/prometheus.go b/pkg/simple/client/monitoring/prometheus/prometheus.go index 22f36d769..b8745ea15 100644 --- a/pkg/simple/client/monitoring/prometheus/prometheus.go +++ b/pkg/simple/client/monitoring/prometheus/prometheus.go @@ -226,6 +226,7 @@ func (p prometheus) GetNamedMetersOverTime(meters []string, start, end time.Time go func(metric string) { parsedResp := monitoring.Metric{MetricName: metric} begin := time.Now() + value, _, err := p.client.QueryRange(prometheusCtx, makeMeterExpr(metric, *queryOptions), timeRange) end := time.Now() timeElapsed := end.Unix() - begin.Unix() diff --git a/pkg/simple/client/monitoring/query_options.go b/pkg/simple/client/monitoring/query_options.go index 7def5aeca..d1664d80c 100644 --- a/pkg/simple/client/monitoring/query_options.go +++ b/pkg/simple/client/monitoring/query_options.go @@ -151,6 +151,18 @@ func (aso ApplicationsOption) Apply(o *QueryOptions) { return } +type OpenpitrixsOption struct { + Cluster string + NamespaceName string + Openpitrixs []string + StorageClassName string +} + +func (oso OpenpitrixsOption) Apply(o *QueryOptions) { + // nothing should be done + return +} + // ApplicationsOption & OpenpitrixsOption share the same ApplicationOption struct type ApplicationOption struct { NamespaceName string diff --git a/pkg/simple/client/monitoring/types.go b/pkg/simple/client/monitoring/types.go index 353058a75..2874b1d4f 100644 --- a/pkg/simple/client/monitoring/types.go +++ b/pkg/simple/client/monitoring/types.go @@ -63,13 +63,13 @@ type MetricValue struct { ExportSample *ExportPoint `json:"exported_value,omitempty" description:"exported time series, values of vector type"` ExportedSeries []ExportPoint `json:"exported_values,omitempty" description:"exported time series, values of matrix type"` - MinValue float64 `json:"min_value" description:"minimum value from monitor points"` - MaxValue float64 `json:"max_value" description:"maximum value from monitor points"` - AvgValue float64 `json:"avg_value" description:"average value from monitor points"` - SumValue float64 `json:"sum_value" description:"sum value from monitor points"` - Fee float64 `json:"fee" description:"resource fee"` - ResourceUnit string `json:"resource_unit"` - CurrencyUnit string `json:"currency_unit"` + MinValue string `json:"min_value" description:"minimum value from monitor points"` + MaxValue string `json:"max_value" description:"maximum value from monitor points"` + AvgValue string `json:"avg_value" description:"average value from monitor points"` + SumValue string `json:"sum_value" description:"sum value from monitor points"` + Fee string `json:"fee" description:"resource fee"` + ResourceUnit string `json:"resource_unit"` + CurrencyUnit string `json:"currency_unit"` } func (mv *MetricValue) TransferToExportedMetricValue() { From 76ffaa87941e6e336418cc17b661bf3dc0f08f7d Mon Sep 17 00:00:00 2001 From: Rao Yunkun Date: Wed, 31 Mar 2021 16:58:10 +0800 Subject: [PATCH 2/4] Update UT Signed-off-by: Rao Yunkun --- pkg/kapis/monitoring/v1alpha3/helper_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/kapis/monitoring/v1alpha3/helper_test.go b/pkg/kapis/monitoring/v1alpha3/helper_test.go index 808bccb5f..9ddc6015e 100644 --- a/pkg/kapis/monitoring/v1alpha3/helper_test.go +++ b/pkg/kapis/monitoring/v1alpha3/helper_test.go @@ -274,10 +274,25 @@ func TestParseRequestParams(t *testing.T) { { params: reqParams{ namespaceName: "default", + openpitrixs: "op1|op2", }, lvl: monitoring.LevelOpenpitrix, expectedErr: true, }, + { + params: reqParams{ + namespaceName: "default", + }, + lvl: monitoring.LevelOpenpitrix, + expectedErr: true, + }, + { + params: reqParams{ + + }, + lvl: monitoring.LevelOpenpitrix, + expectedErr: true, + }, } for i, tt := range tests { From 836b279feeb62c82b57e09a69d711f9fa443e78d Mon Sep 17 00:00:00 2001 From: Rao Yunkun Date: Wed, 31 Mar 2021 21:18:56 +0800 Subject: [PATCH 3/4] Update metering UT. Signed-off-by: Rao Yunkun --- pkg/kapis/monitoring/v1alpha3/helper_test.go | 8 +- pkg/models/monitoring/utils.go | 24 +- pkg/models/monitoring/utils_test.go | 403 +++++++++++++++++++ pkg/models/tenant/metering_test.go | 193 +++++++++ 4 files changed, 617 insertions(+), 11 deletions(-) create mode 100644 pkg/models/monitoring/utils_test.go create mode 100644 pkg/models/tenant/metering_test.go diff --git a/pkg/kapis/monitoring/v1alpha3/helper_test.go b/pkg/kapis/monitoring/v1alpha3/helper_test.go index 9ddc6015e..77b8fd7a8 100644 --- a/pkg/kapis/monitoring/v1alpha3/helper_test.go +++ b/pkg/kapis/monitoring/v1alpha3/helper_test.go @@ -274,7 +274,7 @@ func TestParseRequestParams(t *testing.T) { { params: reqParams{ namespaceName: "default", - openpitrixs: "op1|op2", + openpitrixs: "op1|op2", }, lvl: monitoring.LevelOpenpitrix, expectedErr: true, @@ -287,10 +287,8 @@ func TestParseRequestParams(t *testing.T) { expectedErr: true, }, { - params: reqParams{ - - }, - lvl: monitoring.LevelOpenpitrix, + params: reqParams{}, + lvl: monitoring.LevelOpenpitrix, expectedErr: true, }, } diff --git a/pkg/models/monitoring/utils.go b/pkg/models/monitoring/utils.go index 200ef527f..4aae8e989 100644 --- a/pkg/models/monitoring/utils.go +++ b/pkg/models/monitoring/utils.go @@ -4,6 +4,7 @@ import ( "fmt" "math/big" "os" + "path/filepath" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/klog" @@ -18,7 +19,8 @@ const ( METER_RESOURCE_TYPE_NET_EGRESS METER_RESOURCE_TYPE_PVC - meteringConfig = "/etc/kubesphere/metering/ks-metering.yaml" + meteringConfigDir = "/etc/kubesphere/metering/" + meteringConfigName = "ks-metering.yaml" meteringDefaultPrecision = 10 meteringCorePrecision = 3 @@ -103,11 +105,21 @@ func (mc MeterConfig) GetPriceInfo() PriceInfo { func LoadYaml() (*MeterConfig, error) { var meterConfig MeterConfig + var mf *os.File + var err error - mf, err := os.Open(meteringConfig) - if err != nil { - klog.Error(err) - return nil, err + if _, err := os.Stat(meteringConfigName); os.IsNotExist(err) { + mf, err = os.Open(filepath.Join(meteringConfigDir, meteringConfigName)) + if err != nil { + klog.Error(err) + return nil, err + } + } else { + mf, err = os.Open(meteringConfigName) + if err != nil { + klog.Error(err) + return nil, err + } } if err = yaml.NewYAMLOrJSONDecoder(mf, 1024).Decode(&meterConfig); err != nil { @@ -170,7 +182,7 @@ func getAvgPointValue(points []monitoring.Point) string { length := new(big.Float).SetFloat64(float64(len(points))) - return fmt.Sprintf("%.3f", sum.Quo(sum, length)) + return fmt.Sprintf(generateFloatFormat(meteringDefaultPrecision), sum.Quo(sum, length)) } func getCurrencyUnit() string { diff --git a/pkg/models/monitoring/utils_test.go b/pkg/models/monitoring/utils_test.go new file mode 100644 index 000000000..29ecc47f4 --- /dev/null +++ b/pkg/models/monitoring/utils_test.go @@ -0,0 +1,403 @@ +package monitoring + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/google/go-cmp/cmp" + "gopkg.in/yaml.v2" + + "kubesphere.io/kubesphere/pkg/simple/client/monitoring" +) + +func TestGetMaxPointValue(t *testing.T) { + tests := []struct { + actualPoints []monitoring.Point + expectedValue string + }{ + { + actualPoints: []monitoring.Point{ + {1.0, 2.0}, + {3.0, 4.0}, + }, + expectedValue: "4.0000000000", + }, + { + actualPoints: []monitoring.Point{ + {2, 1}, + {4, 3.1}, + }, + expectedValue: "3.1000000000", + }, + { + actualPoints: []monitoring.Point{ + {5, 100}, + {6, 100000.001}, + }, + expectedValue: "100000.0010000000", + }, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + max := getMaxPointValue(tt.actualPoints) + if max != tt.expectedValue { + t.Fatal("max point value caculation is wrong.") + } + }) + } +} + +func TestGetMinPointValue(t *testing.T) { + tests := []struct { + actualPoints []monitoring.Point + expectedValue string + }{ + { + actualPoints: []monitoring.Point{ + {1.0, 2.0}, + {3.0, 4.0}, + }, + expectedValue: "2.0000000000", + }, + { + actualPoints: []monitoring.Point{ + {2, 1}, + {4, 3.1}, + }, + expectedValue: "1.0000000000", + }, + { + actualPoints: []monitoring.Point{ + {5, 100}, + {6, 100000.001}, + }, + expectedValue: "100.0000000000", + }, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + max := getMinPointValue(tt.actualPoints) + if max != tt.expectedValue { + t.Fatal("min point value caculation is wrong.") + } + }) + } +} + +func TestGetSumPointValue(t *testing.T) { + tests := []struct { + actualPoints []monitoring.Point + expectedValue string + }{ + { + actualPoints: []monitoring.Point{ + {1.0, 2.0}, + {3.0, 4.0}, + }, + expectedValue: "6.0000000000", + }, + { + actualPoints: []monitoring.Point{ + {2, 1}, + {4, 3.1}, + }, + expectedValue: "4.1000000000", + }, + { + actualPoints: []monitoring.Point{ + {5, 100}, + {6, 100000.001}, + }, + expectedValue: "100100.0010000000", + }, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + max := getSumPointValue(tt.actualPoints) + if max != tt.expectedValue { + t.Fatal("sum point value caculation is wrong.") + } + }) + } +} + +func TestGetAvgPointValue(t *testing.T) { + tests := []struct { + actualPoints []monitoring.Point + expectedValue string + }{ + { + actualPoints: []monitoring.Point{ + {1.0, 2.0}, + {3.0, 4.0}, + }, + expectedValue: "3.0000000000", + }, + { + actualPoints: []monitoring.Point{ + {2, 1}, + {4, 3.1}, + }, + expectedValue: "2.0500000000", + }, + { + actualPoints: []monitoring.Point{ + {5, 100}, + {6, 100000.001}, + }, + expectedValue: "50050.0005000000", + }, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + max := getAvgPointValue(tt.actualPoints) + if max != tt.expectedValue { + t.Fatal("avg point value caculattion is wrong.") + } + }) + } +} + +func saveTestConfig(t *testing.T, conf *MeterConfig) { + content, err := yaml.Marshal(conf) + if err != nil { + t.Fatalf("error marshal config. %v", err) + } + + err = ioutil.WriteFile(meteringConfigName, content, 0640) + if err != nil { + t.Fatalf("error write configuration file, %v", err) + } +} + +func cleanTestConfig(t *testing.T) { + + if _, err := os.Stat(meteringConfigName); os.IsNotExist(err) { + t.Log("file not exists, skipping") + return + } + + err := os.Remove(meteringConfigName) + if err != nil { + t.Fatalf("remove %s file failed", meteringConfigName) + } + +} + +func TestGetCurrencyUnit(t *testing.T) { + if getCurrencyUnit() != "" { + t.Fatal("currency unit should be empty") + } + + saveTestConfig(t, &MeterConfig{ + Billing: Billing{ + PriceInfo: PriceInfo{ + IngressNetworkTrafficPerMegabytesPerHour: 1, + EgressNetworkTrafficPerMegabytesPerHour: 2, + CpuPerCorePerHour: 3, + MemPerGigabytesPerHour: 4, + PvcPerGigabytesPerHour: 5, + CurrencyUnit: "CNY", + }, + }, + }) + defer cleanTestConfig(t) + + if getCurrencyUnit() != "CNY" { + t.Fatal("failed to get currency unit from config") + } +} + +func TestGenerateFloatFormat(t *testing.T) { + format := generateFloatFormat(10) + if format != "%.10f" { + t.Fatalf("get currency float format failed, %s", format) + } +} + +func TestGetResourceUnit(t *testing.T) { + + tests := []struct { + meterName string + expectedValue string + }{ + { + meterName: "no-exist", + expectedValue: "", + }, + { + meterName: "meter_cluster_cpu_usage", + expectedValue: "cores", + }, + } + for _, tt := range tests { + if getResourceUnit(tt.meterName) != tt.expectedValue { + t.Fatal("get resource unit failed") + } + } + +} + +func TestSquashPoints(t *testing.T) { + + tests := []struct { + input []monitoring.Point + factor int + expected []monitoring.Point + }{ + { + input: []monitoring.Point{ + {1, 1}, + {2, 2}, + {3, 3}, + {4, 4}, + {5, 5}, + {6, 6}, + {7, 7}, + {8, 8}, + }, + factor: 1, + expected: []monitoring.Point{ + {1, 1}, + {2, 2}, + {3, 3}, + {4, 4}, + {5, 5}, + {6, 6}, + {7, 7}, + {8, 8}, + }, + }, + { + input: []monitoring.Point{ + {1, 1}, + {2, 2}, + {3, 3}, + {4, 4}, + {5, 5}, + {6, 6}, + {7, 7}, + {8, 8}, + }, + factor: 2, + expected: []monitoring.Point{ + {2, 3}, + {4, 7}, + {6, 11}, + {8, 15}, + }, + }, + } + + for _, tt := range tests { + got := squashPoints(tt.input, tt.factor) + if diff := cmp.Diff(got, tt.expected); diff != "" { + t.Errorf("%T differ (-got, +want): %s", tt.expected, diff) + } + } +} + +func TestGetFeeWithMeterName(t *testing.T) { + + saveTestConfig(t, &MeterConfig{ + Billing: Billing{ + PriceInfo: PriceInfo{ + IngressNetworkTrafficPerMegabytesPerHour: 1, + EgressNetworkTrafficPerMegabytesPerHour: 2, + CpuPerCorePerHour: 3, + MemPerGigabytesPerHour: 4, + PvcPerGigabytesPerHour: 5, + CurrencyUnit: "CNY", + }, + }, + }) + defer cleanTestConfig(t) + + tests := []struct { + metric monitoring.Metric + scalingMap map[string]float64 + expected monitoring.MetricData + }{ + { + metric: monitoring.Metric{ + MetricName: "test", + MetricData: monitoring.MetricData{ + MetricType: monitoring.MetricTypeMatrix, + MetricValues: []monitoring.MetricValue{ + { + Metadata: map[string]string{}, + Series: []monitoring.Point{ + {1, 1}, + {2, 2}, + }, + }, + }, + }, + }, + scalingMap: map[string]float64{ + "test": 1, + }, + expected: monitoring.MetricData{ + MetricType: monitoring.MetricTypeMatrix, + MetricValues: []monitoring.MetricValue{ + { + Metadata: map[string]string{}, + Series: []monitoring.Point{ + {1, 1}, + {2, 2}, + }, + MinValue: "1.0000000000", + MaxValue: "2.0000000000", + AvgValue: "1.5000000000", + SumValue: "3.0000000000", + CurrencyUnit: "CNY", + }, + }, + }, + }, + { + metric: monitoring.Metric{ + MetricName: "test", + MetricData: monitoring.MetricData{ + MetricType: monitoring.MetricTypeVector, + MetricValues: []monitoring.MetricValue{ + { + Metadata: map[string]string{}, + Sample: &monitoring.Point{1, 2}, + }, + }, + }, + }, + scalingMap: nil, + expected: monitoring.MetricData{ + MetricType: monitoring.MetricTypeVector, + MetricValues: []monitoring.MetricValue{ + { + Metadata: map[string]string{}, + Sample: &monitoring.Point{1, 2}, + MinValue: "2.0000000000", + MaxValue: "2.0000000000", + AvgValue: "2.0000000000", + SumValue: "2.0000000000", + CurrencyUnit: "CNY", + }, + }, + }, + }, + } + + for _, test := range tests { + got := updateMetricStatData(test.metric, test.scalingMap) + if diff := cmp.Diff(got, test.expected); diff != "" { + t.Errorf("%T differ (-got, +want): %s", test.expected, diff) + return + } + } + +} diff --git a/pkg/models/tenant/metering_test.go b/pkg/models/tenant/metering_test.go new file mode 100644 index 000000000..434377705 --- /dev/null +++ b/pkg/models/tenant/metering_test.go @@ -0,0 +1,193 @@ +package tenant + +import ( + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "kubesphere.io/kubesphere/pkg/models/metering" + monitoringmodel "kubesphere.io/kubesphere/pkg/models/monitoring" + "kubesphere.io/kubesphere/pkg/simple/client/monitoring" +) + +func TestIsRangeQuery(t *testing.T) { + tests := []struct { + options QueryOptions + expectedValue bool + }{ + { + options: QueryOptions{}, + expectedValue: true, + }, + { + options: QueryOptions{Time: time.Now()}, + expectedValue: false, + }, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + if tt.options.isRangeQuery() != tt.expectedValue { + t.Fatal("error isRangeQuery") + } + }) + } +} + +func TestShouldSort(t *testing.T) { + tests := []struct { + options QueryOptions + expectedValue bool + }{ + { + options: QueryOptions{ + Target: "test", + Identifier: "test", + }, + expectedValue: true, + }, + { + options: QueryOptions{}, + expectedValue: false, + }, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + if tt.options.shouldSort() != tt.expectedValue { + t.Fatal("error shouldSort") + } + }) + } +} + +func TestGetMetricPosMap(t *testing.T) { + tests := []struct { + metrics []monitoring.Metric + expectedValue map[string]int + }{ + { + metrics: []monitoring.Metric{ + {MetricName: "one"}, + {MetricName: "two"}, + {MetricName: "three"}, + {MetricName: "four"}, + }, + expectedValue: map[string]int{ + "one": 0, + "two": 1, + "three": 2, + "four": 3, + }, + }, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + if diff := cmp.Diff(getMetricPosMap(tt.metrics), tt.expectedValue); diff != "" { + t.Errorf("%T differ (-got, +want): %s", tt.expectedValue, diff) + } + }) + } +} + +func TestTransformMetricData(t *testing.T) { + tests := []struct { + metrics monitoringmodel.Metrics + expectedValue metering.PodsStats + }{ + { + metrics: monitoringmodel.Metrics{ + Results: []monitoring.Metric{ + { + MetricName: "meter_pod_cpu_usage", + MetricData: monitoring.MetricData{ + MetricValues: monitoring.MetricValues{ + { + Metadata: map[string]string{ + "pod": "pod1", + }, + SumValue: "10", + }, + }, + }, + }, + { + MetricName: "meter_pod_memory_usage_wo_cache", + MetricData: monitoring.MetricData{ + MetricValues: monitoring.MetricValues{ + { + Metadata: map[string]string{ + "pod": "pod1", + }, + SumValue: "200", + }, + }, + }, + }, + { + MetricName: "meter_pod_net_bytes_transmitted", + MetricData: monitoring.MetricData{ + MetricValues: monitoring.MetricValues{ + { + Metadata: map[string]string{ + "pod": "pod1", + }, + SumValue: "300", + }, + }, + }, + }, + { + MetricName: "meter_pod_net_bytes_received", + MetricData: monitoring.MetricData{ + MetricValues: monitoring.MetricValues{ + { + Metadata: map[string]string{ + "pod": "pod1", + }, + SumValue: "300", + }, + }, + }, + }, + { + MetricName: "meter_pod_pvc_bytes_total", + MetricData: monitoring.MetricData{ + MetricValues: monitoring.MetricValues{ + { + Metadata: map[string]string{ + "pod": "pod1", + }, + SumValue: "400", + }, + }, + }, + }, + }, + }, + expectedValue: metering.PodsStats{ + "pod1": &metering.PodStatistic{ + CPUUsage: 10, + MemoryUsageWoCache: 200, + NetBytesReceived: 300, + NetBytesTransmitted: 300, + PVCBytesTotal: 400, + }, + }, + }, + } + + var tOperator tenantOperator + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + if diff := cmp.Diff(tOperator.transformMetricData(tt.metrics), tt.expectedValue); diff != "" { + t.Errorf("%T differ (-got, +want): %s", tt.expectedValue, diff) + } + }) + } + +} From bb024d8ad4cba325b423dd9b5fe06b12de1fd987 Mon Sep 17 00:00:00 2001 From: Rao Yunkun Date: Thu, 1 Apr 2021 18:37:33 +0800 Subject: [PATCH 4/4] Add code comments and clean code. Signed-off-by: Rao Yunkun --- pkg/models/metering/type.go | 12 ++---- pkg/models/tenant/metering.go | 73 +++++++++++++++-------------------- 2 files changed, 36 insertions(+), 49 deletions(-) diff --git a/pkg/models/metering/type.go b/pkg/models/metering/type.go index 8414593f2..511274056 100644 --- a/pkg/models/metering/type.go +++ b/pkg/models/metering/type.go @@ -161,12 +161,11 @@ type ServiceStatistic struct { Pods map[string]*PodStatistic `json:"pods" description:"pod statistic"` } -func (ss *ServiceStatistic) SetPodStats(name string, podStat *PodStatistic) error { +func (ss *ServiceStatistic) SetPodStats(name string, podStat *PodStatistic) { if ss.Pods == nil { ss.Pods = make(map[string]*PodStatistic) } ss.Pods[name] = podStat - return nil } func (ss *ServiceStatistic) GetPodStats(name string) *PodStatistic { @@ -211,12 +210,11 @@ func (ds *DeploymentStatistic) GetPodStats(name string) *PodStatistic { return ds.Pods[name] } -func (ds *DeploymentStatistic) SetPodStats(name string, podStat *PodStatistic) error { +func (ds *DeploymentStatistic) SetPodStats(name string, podStat *PodStatistic) { if ds.Pods == nil { ds.Pods = make(map[string]*PodStatistic) } ds.Pods[name] = podStat - return nil } func (ds *DeploymentStatistic) Aggregate() { @@ -252,12 +250,11 @@ func (ss *StatefulsetStatistic) GetPodStats(name string) *PodStatistic { return ss.Pods[name] } -func (ss *StatefulsetStatistic) SetPodStats(name string, podStat *PodStatistic) error { +func (ss *StatefulsetStatistic) SetPodStats(name string, podStat *PodStatistic) { if ss.Pods == nil { ss.Pods = make(map[string]*PodStatistic) } ss.Pods[name] = podStat - return nil } func (ss *StatefulsetStatistic) Aggregate() { @@ -293,12 +290,11 @@ func (ds *DaemonsetStatistic) GetPodStats(name string) *PodStatistic { return ds.Pods[name] } -func (ds *DaemonsetStatistic) SetPodStats(name string, podStat *PodStatistic) error { +func (ds *DaemonsetStatistic) SetPodStats(name string, podStat *PodStatistic) { if ds.Pods == nil { ds.Pods = make(map[string]*PodStatistic) } ds.Pods[name] = podStat - return nil } func (ds *DaemonsetStatistic) Aggregate() { diff --git a/pkg/models/tenant/metering.go b/pkg/models/tenant/metering.go index 21b252d28..8f4aa1886 100644 --- a/pkg/models/tenant/metering.go +++ b/pkg/models/tenant/metering.go @@ -781,10 +781,10 @@ func (t *tenantOperator) updateDeploysStats(user user.Info, cluster, ns string, // for op deployment for _, pod := range pods { podsStat := podsStats[pod] - if err := resourceStats.GetOpenPitrixStats(opName).GetDeployStats(deploy.Name).SetPodStats(pod, podsStat); err != nil { - klog.Error(err) - return err - } + // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists + // and then set pod stats data, the direction is as follows: + // OpenPitrix field(create if not existed) -> deployments field(create if not existed) -> pod + resourceStats.GetOpenPitrixStats(opName).GetDeployStats(deploy.Name).SetPodStats(pod, podsStat) } } else if ok, appName := t.isAppComponent(ns, "deployment", deploy.Name); ok { // for app deployment @@ -794,20 +794,19 @@ func (t *tenantOperator) updateDeploysStats(user user.Info, cluster, ns string, klog.Warningf("%v not found", pod) continue } - - if err := resourceStats.GetAppStats(appName).GetDeployStats(deploy.Name).SetPodStats(pod, podsStat); err != nil { - klog.Error(err) - return err - } + // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists + // and then set pod stats data, the direction is as follows: + // App field(create if not existed) -> deployments field(create if not existed) -> pod + resourceStats.GetAppStats(appName).GetDeployStats(deploy.Name).SetPodStats(pod, podsStat) } } else { // for k8s deployment only for _, pod := range pods { - if err := resourceStats.GetDeployStats(deploy.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } + // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists + // and then set pod stats data, the direction is as follows: + // Deployments field(create if not existed) -> pod + resourceStats.GetDeployStats(deploy.Name).SetPodStats(pod, podsStats[pod]) } } } @@ -850,29 +849,26 @@ func (t *tenantOperator) updateDaemonsetsStats(user user.Info, cluster, ns strin if ok, opName := t.isOpenPitrixComponent(cluster, ns, "daemonset", daemonset.Name); ok { // for op daemonset for _, pod := range pods { - if err := resourceStats.GetOpenPitrixStats(opName).GetDaemonStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } + // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists + // and then set pod stats data, the direction is as follows: + // OpenPitrix field(create if not existed) -> daemonsets field(create if not existed) -> pod + resourceStats.GetOpenPitrixStats(opName).GetDaemonStats(daemonset.Name).SetPodStats(pod, podsStats[pod]) } } else if ok, appName := t.isAppComponent(ns, "daemonset", daemonset.Name); ok { // for app daemonset for _, pod := range pods { // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists // and then set pod stats data, the direction is as follows: - // app field(create if not existed) -> statefulsets field(create if not existed) -> pod - if err := resourceStats.GetAppStats(appName).GetDaemonStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } + // App field(create if not existed) -> daemonsets field(create if not existed) -> pod + resourceStats.GetAppStats(appName).GetDaemonStats(daemonset.Name).SetPodStats(pod, podsStats[pod]) } } else { // for k8s daemonset for _, pod := range pods { - if err := resourceStats.GetDaemonsetStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } + // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists + // and then set pod stats data, the direction is as follows: + // Daemonsets field(create if not existed) -> pod + resourceStats.GetDaemonsetStats(daemonset.Name).SetPodStats(pod, podsStats[pod]) } } } @@ -1001,31 +997,26 @@ func (t *tenantOperator) updateStatefulsetsStats(user user.Info, cluster, ns str if ok, opName := t.isOpenPitrixComponent(cluster, ns, "statefulset", statefulset.Name); ok { // for op statefulset for _, pod := range pods { - if err := resourceStats.GetOpenPitrixStats(opName).GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } + // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists + // and then set pod stats data, the direction is as follows: + // OpenPitrix field(create if not existed) -> statefulsets field(create if not existed) -> pod + resourceStats.GetOpenPitrixStats(opName).GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]) } } else if ok, appName := t.isAppComponent(ns, "daemonset", statefulset.Name); ok { // for app statefulset for _, pod := range pods { // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists // and then set pod stats data, the direction is as follows: - // app field(create if not existed) -> statefulsets field(create if not existed) -> pod - if err := resourceStats.GetAppStats(appName).GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } + // App field(create if not existed) -> statefulsets field(create if not existed) -> pod + resourceStats.GetAppStats(appName).GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]) } } else { // for k8s statefulset for _, pod := range pods { - // same as above, the direction is similar: - // k8s field(create if not existed) -> statefulsets field(create if not existed) -> pod - if err := resourceStats.GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } + // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists + // and then set pod stats data, the direction is as follows: + // Statefulsets field(create if not existed) -> pod + resourceStats.GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]) } } }