Intergate OpenPitrix metrics into metering.

Signed-off-by: Rao Yunkun <yunkunrao@yunify.com>
This commit is contained in:
Rao Yunkun
2021-03-29 15:53:22 +08:00
parent ea93f3832d
commit 845f6bbe89
15 changed files with 551 additions and 151 deletions

View File

@@ -43,10 +43,15 @@ type handler struct {
}
func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) *handler {
var opRelease openpitrix.Interface
if ksClient != nil {
opRelease = openpitrix.NewOpenpitrixOperator(f, ksClient, nil)
}
return &handler{
k: k,
mo: model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, resourceGetter),
opRelease: nil,
opRelease: opRelease,
}
}

View File

@@ -60,6 +60,7 @@ const (
)
type reqParams struct {
metering bool
operation string
time string
start string
@@ -84,6 +85,8 @@ type reqParams struct {
expression string
metric string
applications string
openpitrixs string
cluster string
services string
pvcFilter string
}
@@ -118,7 +121,6 @@ func (q queryOptions) shouldSort() bool {
func parseRequestParams(req *restful.Request) reqParams {
var r reqParams
r.operation = req.QueryParameter("operation")
r.time = req.QueryParameter("time")
r.start = req.QueryParameter("start")
r.end = req.QueryParameter("end")
@@ -131,21 +133,8 @@ func parseRequestParams(req *restful.Request) reqParams {
r.resourceFilter = req.QueryParameter("resources_filter")
r.workspaceName = req.PathParameter("workspace")
r.namespaceName = req.PathParameter("namespace")
if req.QueryParameter("node") != "" {
r.nodeName = req.QueryParameter("node")
} else {
// compatible with monitoring request
r.nodeName = req.PathParameter("node")
}
if req.QueryParameter("kind") != "" {
r.workloadKind = req.QueryParameter("kind")
} else {
// compatible with monitoring request
r.workloadKind = req.PathParameter("kind")
}
r.workloadKind = req.PathParameter("kind")
r.nodeName = req.PathParameter("node")
r.workloadName = req.PathParameter("workload")
r.podName = req.PathParameter("pod")
r.containerName = req.PathParameter("container")
@@ -154,13 +143,47 @@ func parseRequestParams(req *restful.Request) reqParams {
r.componentType = req.PathParameter("component")
r.expression = req.QueryParameter("expr")
r.metric = req.QueryParameter("metric")
r.applications = req.QueryParameter("applications")
r.services = req.QueryParameter("services")
r.pvcFilter = req.QueryParameter("pvc_filter")
return r
}
func parseMeteringRequestParams(req *restful.Request) reqParams {
params := parseRequestParams(req)
// mark this request is metering req
params.metering = true
// whether need to export metering data
params.operation = req.QueryParameter("operation")
// OpenPitrix belongs to which cluster
params.cluster = req.PathParameter("cluster")
// specified which application crds
params.applications = req.QueryParameter("applications")
// specified which OpenPitrix apps
params.openpitrixs = req.QueryParameter("openpitrix_ids")
// specified which service
params.services = req.QueryParameter("services")
// specified which pvc
params.pvcFilter = req.QueryParameter("pvc_filter")
// support node param in URL query
if req.QueryParameter("node") != "" {
params.nodeName = req.QueryParameter("node")
}
// support kind param in URL query
if req.QueryParameter("kind") != "" {
params.workloadKind = req.QueryParameter("kind")
}
return params
}
func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOptions, err error) {
if r.resourceFilter == "" {
r.resourceFilter = DefaultFilter
@@ -230,6 +253,26 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
}
q.namedMetrics = model.ApplicationMetrics
case monitoring.LevelOpenpitrix:
q.identifier = model.IdentifierApplication
if r.namespaceName == "" {
return q, errors.New(fmt.Sprintf(ErrParameterNotfound, "namespace"))
}
ops := []string{}
if len(r.openpitrixs) != 0 {
ops = strings.Split(r.openpitrixs, "|")
}
q.option = monitoring.OpenpitrixsOption{
Cluster: r.cluster,
NamespaceName: r.namespaceName,
Openpitrixs: ops,
StorageClassName: r.storageClassName,
}
// op share the same metrics with application
q.namedMetrics = model.ApplicationMetrics
case monitoring.LevelWorkload:
q.identifier = model.IdentifierWorkload
q.option = monitoring.WorkloadOption{
@@ -339,7 +382,7 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
}
// Ensure query start time to be after the namespace creation time
if r.namespaceName != "" {
if r.namespaceName != "" && !r.metering {
ns, err := h.k.CoreV1().Namespaces().Get(context.Background(), r.namespaceName, corev1.GetOptions{})
if err != nil {
return q, err
@@ -392,7 +435,7 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
return q, nil
}
func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) {
func exportMetrics(metrics model.Metrics, startTime, endTime time.Time) (*bytes.Buffer, error) {
var resBytes []byte
for i, _ := range metrics.Results {
@@ -415,18 +458,12 @@ func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) {
}
selector := strings.Join(targetList, "|")
var startTime, endTime string
if len(metricVal.ExportedSeries) > 0 {
startTime = metricVal.ExportedSeries[0].Timestamp()
endTime = metricVal.ExportedSeries[len(metricVal.ExportedSeries)-1].Timestamp()
}
statsTab := "\nmetric_name,selector,start_time,end_time,min,max,avg,sum,fee, currency_unit\n" +
fmt.Sprintf("%s,%s,%s,%s,%.2f,%.2f,%.2f,%.2f,%.2f,%s\n\n",
fmt.Sprintf("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n\n",
metricName,
selector,
startTime,
endTime,
startTime.String(),
endTime.String(),
metricVal.MinValue,
metricVal.MaxValue,
metricVal.AvgValue,
@@ -463,11 +500,11 @@ func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) {
return output, nil
}
func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
func ExportMetrics(resp *restful.Response, metrics model.Metrics, startTime, endTime time.Time) {
resp.Header().Set(restful.HEADER_ContentType, "text/plain")
resp.Header().Set("Content-Disposition", "attachment")
output, err := exportMetrics(metrics)
output, err := exportMetrics(metrics, startTime, endTime)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return

View File

@@ -372,7 +372,7 @@ func TestExportMetrics(t *testing.T) {
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
_, err := exportMetrics(tt.metrics)
_, err := exportMetrics(tt.metrics, time.Now().Add(-time.Hour), time.Now())
if err != nil && !tt.expectedErr {
t.Fatal("Failed to export metering metrics", err)
}

View File

@@ -4,7 +4,11 @@ import (
"regexp"
"strings"
"kubesphere.io/kubesphere/pkg/models/openpitrix"
"kubesphere.io/kubesphere/pkg/server/params"
"github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/api"
@@ -12,8 +16,8 @@ import (
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
func (h handler) HandleClusterMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleClusterMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelCluster)
if err != nil {
api.HandleBadRequest(resp, nil, err)
@@ -87,7 +91,7 @@ func (h handler) handleApplicationMetersQuery(meters []string, resp *restful.Res
}
if q.Operation == OperationExport {
ExportMetrics(resp, res)
ExportMetrics(resp, res, q.start, q.end)
return
}
@@ -144,7 +148,7 @@ func (h handler) handleServiceMetersQuery(meters []string, resp *restful.Respons
}
if q.Operation == OperationExport {
ExportMetrics(resp, res)
ExportMetrics(resp, res, q.start, q.end)
return
}
@@ -180,6 +184,12 @@ func (h handler) handleNamedMetersQuery(resp *restful.Response, q queryOptions)
return
}
_, ok = q.option.(monitoring.OpenpitrixsOption)
if ok {
h.handleOpenpitrixMetersQuery(meters, resp, q)
return
}
_, ok = q.option.(monitoring.ServicesOption)
if ok {
h.handleServiceMetersQuery(meters, resp, q)
@@ -205,15 +215,16 @@ func (h handler) handleNamedMetersQuery(resp *restful.Response, q queryOptions)
}
if q.Operation == OperationExport {
ExportMetrics(resp, res)
ExportMetrics(resp, res, q.start, q.end)
return
}
resp.WriteAsJson(res)
}
func (h handler) HandleNodeMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleNodeMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
params.metering = true
opt, err := h.makeQueryOptions(params, monitoring.LevelNode)
if err != nil {
api.HandleBadRequest(resp, nil, err)
@@ -222,8 +233,9 @@ func (h handler) HandleNodeMetersQuery(req *restful.Request, resp *restful.Respo
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleWorkspaceMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleWorkspaceMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
params.metering = true
opt, err := h.makeQueryOptions(params, monitoring.LevelWorkspace)
if err != nil {
api.HandleBadRequest(resp, nil, err)
@@ -233,8 +245,9 @@ func (h handler) HandleWorkspaceMetersQuery(req *restful.Request, resp *restful.
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleNamespaceMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleNamespaceMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
params.metering = true
opt, err := h.makeQueryOptions(params, monitoring.LevelNamespace)
if err != nil {
if err.Error() == ErrNoHit {
@@ -250,8 +263,8 @@ func (h handler) HandleNamespaceMetersQuery(req *restful.Request, resp *restful.
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleWorkloadMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleWorkloadMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelWorkload)
if err != nil {
if err.Error() == ErrNoHit {
@@ -266,8 +279,8 @@ func (h handler) HandleWorkloadMetersQuery(req *restful.Request, resp *restful.R
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleApplicationMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleApplicationMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelApplication)
if err != nil {
if err.Error() == ErrNoHit {
@@ -282,8 +295,25 @@ func (h handler) HandleApplicationMetersQuery(req *restful.Request, resp *restfu
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandlePodMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleOpenpitrixMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelOpenpitrix)
if err != nil {
if err.Error() == ErrNoHit {
res := handleNoHit(opt.namedMetrics)
resp.WriteAsJson(res)
return
}
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandlePodMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelPod)
if err != nil {
if err.Error() == ErrNoHit {
@@ -295,11 +325,12 @@ func (h handler) HandlePodMetersQuery(req *restful.Request, resp *restful.Respon
api.HandleBadRequest(resp, nil, err)
return
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandleServiceMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandleServiceMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelService)
if err != nil {
if err.Error() == ErrNoHit {
@@ -315,8 +346,8 @@ func (h handler) HandleServiceMetersQuery(req *restful.Request, resp *restful.Re
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) HandlePVCMetersQuery(req *restful.Request, resp *restful.Response) {
params := parseRequestParams(req)
func (h handler) HandlePVCMeterQuery(req *restful.Request, resp *restful.Response) {
params := parseMeteringRequestParams(req)
opt, err := h.makeQueryOptions(params, monitoring.LevelPVC)
if err != nil {
if err.Error() == ErrNoHit {
@@ -330,3 +361,110 @@ func (h handler) HandlePVCMetersQuery(req *restful.Request, resp *restful.Respon
}
h.handleNamedMetersQuery(resp, opt)
}
func (h handler) collectOps(cluster, ns string) []string {
var ops []string
conditions := params.Conditions{
Match: make(map[string]string),
Fuzzy: make(map[string]string),
}
resp, err := h.opRelease.ListApplications("", cluster, ns, &conditions, 10, 0, "", false)
if err != nil {
klog.Error("failed to list op apps")
return nil
}
totalCount := resp.TotalCount
resp, err = h.opRelease.ListApplications("", cluster, ns, &conditions, totalCount, 0, "", false)
if err != nil {
klog.Error("failed to list op apps")
return nil
}
for _, item := range resp.Items {
app := item.(*openpitrix.Application)
ops = append(ops, app.Cluster.ClusterId)
}
return ops
}
func (h handler) getOpWorkloads(cluster, ns string, ops []string) map[string][]string {
componentsMap := make(map[string][]string)
if len(ops) == 0 {
ops = h.collectOps(cluster, ns)
}
for _, op := range ops {
app, err := h.opRelease.DescribeApplication("", cluster, ns, op)
if err != nil {
klog.Error(err)
return nil
}
for _, object := range app.ReleaseInfo {
unstructuredObj := object.(*unstructured.Unstructured)
componentsMap[op] = append(componentsMap[op], unstructuredObj.GetKind()+":"+unstructuredObj.GetName())
}
}
return componentsMap
}
func (h handler) handleOpenpitrixMetersQuery(meters []string, resp *restful.Response, q queryOptions) {
var metricMap = make(map[string]int)
var res model.Metrics
var current_res model.Metrics
var err error
oso, ok := q.option.(monitoring.OpenpitrixsOption)
if !ok {
klog.Error("invalid openpitrix option")
return
}
opWorkloads := h.getOpWorkloads(oso.Cluster, oso.NamespaceName, oso.Openpitrixs)
for k, _ := range opWorkloads {
opt := monitoring.ApplicationOption{
NamespaceName: oso.NamespaceName,
Application: k,
ApplicationComponents: opWorkloads[k],
StorageClassName: oso.StorageClassName,
}
if q.isRangeQuery() {
current_res, err = h.mo.GetNamedMetersOverTime(meters, q.start, q.end, q.step, opt)
} else {
current_res, err = h.mo.GetNamedMeters(meters, q.time, opt)
}
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
if res.Results == nil {
res = current_res
metricMap = getMetricPosMap(res.Results)
} else {
for _, cur_res := range current_res.Results {
pos, ok := metricMap[cur_res.MetricName]
if ok {
res.Results[pos].MetricValues = append(res.Results[pos].MetricValues, cur_res.MetricValues...)
} else {
res.Results = append(res.Results, cur_res)
}
}
}
}
if !q.isRangeQuery() && q.shouldSort() {
res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit)
}
if q.Operation == OperationExport {
ExportMetrics(resp, res, q.start, q.end)
return
}
resp.WriteAsJson(res)
}