diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 7eb243d8e..271869515 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -219,8 +219,8 @@ func (s *APIServer) installKubeSphereAPIs() { urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config)) urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache)) - urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory)) - urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.RuntimeCache)) + urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere())) + urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache)) urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions)) urlruntime.Must(openpitrixv2alpha1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions)) urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes())) diff --git a/pkg/apiserver/config/config.go b/pkg/apiserver/config/config.go index bf7db349b..d8e26ce2e 100644 --- a/pkg/apiserver/config/config.go +++ b/pkg/apiserver/config/config.go @@ -35,6 +35,7 @@ import ( "kubesphere.io/kubesphere/pkg/simple/client/kubeedge" "kubesphere.io/kubesphere/pkg/simple/client/ldap" "kubesphere.io/kubesphere/pkg/simple/client/logging" + "kubesphere.io/kubesphere/pkg/simple/client/metering" "kubesphere.io/kubesphere/pkg/simple/client/monitoring/prometheus" "kubesphere.io/kubesphere/pkg/simple/client/multicluster" "kubesphere.io/kubesphere/pkg/simple/client/network" @@ -101,6 +102,7 @@ type Config struct { AlertingOptions *alerting.Options `json:"alerting,omitempty" yaml:"alerting,omitempty" mapstructure:"alerting"` NotificationOptions *notification.Options `json:"notification,omitempty" yaml:"notification,omitempty" mapstructure:"notification"` KubeEdgeOptions *kubeedge.Options `json:"kubeedge,omitempty" yaml:"kubeedge,omitempty" mapstructure:"kubeedge"` + MeteringOptions *metering.Options `json:"metering,omitempty" yaml:"metering,omitempty" mapstructure:"metering"` } // newConfig creates a default non-empty Config @@ -125,6 +127,7 @@ func New() *Config { EventsOptions: events.NewEventsOptions(), AuditingOptions: auditing.NewAuditingOptions(), KubeEdgeOptions: kubeedge.NewKubeEdgeOptions(), + MeteringOptions: metering.NewMeteringOptions(), } } @@ -287,4 +290,8 @@ func (conf *Config) stripEmptyOptions() { if conf.KubeEdgeOptions != nil && conf.KubeEdgeOptions.Endpoint == "" { conf.KubeEdgeOptions = nil } + + if conf.MeteringOptions != nil && !conf.MeteringOptions.Enable { + conf.MeteringOptions = nil + } } diff --git a/pkg/apiserver/config/config_test.go b/pkg/apiserver/config/config_test.go index fc4979211..6a3eca9e9 100644 --- a/pkg/apiserver/config/config_test.go +++ b/pkg/apiserver/config/config_test.go @@ -39,6 +39,7 @@ import ( "kubesphere.io/kubesphere/pkg/simple/client/kubeedge" "kubesphere.io/kubesphere/pkg/simple/client/ldap" "kubesphere.io/kubesphere/pkg/simple/client/logging" + "kubesphere.io/kubesphere/pkg/simple/client/metering" "kubesphere.io/kubesphere/pkg/simple/client/monitoring/prometheus" "kubesphere.io/kubesphere/pkg/simple/client/multicluster" "kubesphere.io/kubesphere/pkg/simple/client/network" @@ -170,6 +171,9 @@ func newTestConfig() (*Config, error) { KubeEdgeOptions: &kubeedge.Options{ Endpoint: "http://edge-watcher.kubeedge.svc/api/", }, + MeteringOptions: &metering.Options{ + Enable: false, + }, } return conf, nil } @@ -185,6 +189,13 @@ func saveTestConfig(t *testing.T, conf *Config) { } } +func testMeteringConfig(t *testing.T, conf *Config) { + conf.ToMap() + if conf.MeteringOptions != nil { + t.Fatalf("setting metering options failed") + } +} + func cleanTestConfig(t *testing.T) { file := fmt.Sprintf("%s.yaml", defaultConfigurationName) if _, err := os.Stat(file); os.IsNotExist(err) { @@ -214,4 +225,7 @@ func TestGet(t *testing.T) { if diff := cmp.Diff(conf, conf2); diff != "" { t.Fatal(diff) } + + testMeteringConfig(t, conf) + } diff --git a/pkg/kapis/metering/v1alpha1/handler.go b/pkg/kapis/metering/v1alpha1/handler.go index b1e5204bf..3e95b36ff 100644 --- a/pkg/kapis/metering/v1alpha1/handler.go +++ b/pkg/kapis/metering/v1alpha1/handler.go @@ -22,6 +22,7 @@ import ( "github.com/emicklei/go-restful" "k8s.io/client-go/kubernetes" + "kubesphere.io/kubesphere/pkg/client/clientset/versioned" "kubesphere.io/kubesphere/pkg/informers" monitorhle "kubesphere.io/kubesphere/pkg/kapis/monitoring/v1alpha3" resourcev1alpha3 "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/resource" @@ -40,6 +41,6 @@ type meterHandler interface { HandlePVCMetersQuery(req *restful.Request, resp *restful.Response) } -func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, resourceGetter *resourcev1alpha3.ResourceGetter) meterHandler { - return monitorhle.NewHandler(k, m, nil, f, resourceGetter) +func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) meterHandler { + return monitorhle.NewHandler(k, m, nil, f, ksClient, resourceGetter) } diff --git a/pkg/kapis/metering/v1alpha1/register.go b/pkg/kapis/metering/v1alpha1/register.go index 7d6e5ab88..5c5b53400 100644 --- a/pkg/kapis/metering/v1alpha1/register.go +++ b/pkg/kapis/metering/v1alpha1/register.go @@ -20,6 +20,8 @@ package v1alpha1 import ( "net/http" + "kubesphere.io/kubesphere/pkg/client/clientset/versioned" + "github.com/emicklei/go-restful" restfulspec "github.com/emicklei/go-restful-openapi" "k8s.io/apimachinery/pkg/runtime/schema" @@ -42,10 +44,10 @@ const ( var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha1"} -func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteringClient monitoring.Interface, factory informers.InformerFactory, cache cache.Cache) error { +func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteringClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, cache cache.Cache) error { ws := runtime.NewWebService(GroupVersion) - h := newHandler(k8sClient, meteringClient, factory, resourcev1alpha3.NewResourceGetter(factory, cache)) + h := newHandler(k8sClient, meteringClient, factory, ksClient, resourcev1alpha3.NewResourceGetter(factory, cache)) ws.Route(ws.GET("/cluster"). To(h.HandleClusterMetersQuery). diff --git a/pkg/kapis/monitoring/v1alpha3/handler.go b/pkg/kapis/monitoring/v1alpha3/handler.go index fcd83d1f3..69775050c 100644 --- a/pkg/kapis/monitoring/v1alpha3/handler.go +++ b/pkg/kapis/monitoring/v1alpha3/handler.go @@ -23,6 +23,9 @@ import ( "regexp" "strings" + "kubesphere.io/kubesphere/pkg/client/clientset/versioned" + "kubesphere.io/kubesphere/pkg/models/openpitrix" + "github.com/emicklei/go-restful" "k8s.io/client-go/kubernetes" @@ -34,12 +37,17 @@ import ( ) type handler struct { - k kubernetes.Interface - mo model.MonitoringOperator + k kubernetes.Interface + mo model.MonitoringOperator + opRelease openpitrix.ReleaseInterface } -func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, resourceGetter *resourcev1alpha3.ResourceGetter) *handler { - return &handler{k, model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, resourceGetter)} +func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) *handler { + return &handler{ + k: k, + mo: model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, resourceGetter), + opRelease: nil, + } } func (h handler) handleKubeSphereMetricsQuery(req *restful.Request, resp *restful.Response) { diff --git a/pkg/kapis/monitoring/v1alpha3/helper.go b/pkg/kapis/monitoring/v1alpha3/helper.go index b541fa542..490e6ec61 100644 --- a/pkg/kapis/monitoring/v1alpha3/helper.go +++ b/pkg/kapis/monitoring/v1alpha3/helper.go @@ -25,13 +25,14 @@ import ( "strings" "time" + "kubesphere.io/kubesphere/pkg/api" + "github.com/jszwec/csvutil" "github.com/emicklei/go-restful" "github.com/pkg/errors" corev1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "kubesphere.io/kubesphere/pkg/api" model "kubesphere.io/kubesphere/pkg/models/monitoring" "kubesphere.io/kubesphere/pkg/simple/client/monitoring" ) @@ -217,9 +218,14 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt return q, errors.New(fmt.Sprintf(ErrParameterNotfound, "namespace")) } + application := []string{} + if len(r.applications) != 0 { + application = strings.Split(r.applications, "|") + } + q.option = monitoring.ApplicationsOption{ NamespaceName: r.namespaceName, - Applications: strings.Split(r.applications, "|"), + Applications: application, StorageClassName: r.storageClassName, // metering pvc } q.namedMetrics = model.ApplicationMetrics @@ -247,10 +253,17 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt case monitoring.LevelService: q.identifier = model.IdentifierService + + svcs := []string{} + if len(r.services) != 0 { + svcs = strings.Split(r.services, "|") + } + q.option = monitoring.ServicesOption{ NamespaceName: r.namespaceName, - Services: strings.Split(r.services, "|"), + Services: svcs, } + q.namedMetrics = model.ServiceMetrics case monitoring.LevelContainer: @@ -379,9 +392,8 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt return q, nil } -func ExportMetrics(resp *restful.Response, metrics model.Metrics) { - resp.Header().Set(restful.HEADER_ContentType, "text/plain") - resp.Header().Set("Content-Disposition", "attachment") +func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) { + var resBytes []byte for i, _ := range metrics.Results { ret := metrics.Results[i] @@ -390,14 +402,72 @@ func ExportMetrics(resp *restful.Response, metrics model.Metrics) { } } - resBytes, err := csvutil.Marshal(metrics.Results) - if err != nil { - api.HandleBadRequest(resp, nil, err) - return + for _, metric := range metrics.Results { + + metricName := metric.MetricName + + var csvpoints []monitoring.CSVPoint + for _, metricVal := range metric.MetricValues { + + var targetList []string + for k, v := range metricVal.Metadata { + targetList = append(targetList, fmt.Sprintf("%s=%s", k, v)) + } + selector := strings.Join(targetList, "|") + + var startTime, endTime string + if len(metricVal.ExportedSeries) > 0 { + startTime = metricVal.ExportedSeries[0].Timestamp() + endTime = metricVal.ExportedSeries[len(metricVal.ExportedSeries)-1].Timestamp() + } + + statsTab := "\nmetric_name,selector,start_time,end_time,min,max,avg,sum,fee, currency_unit\n" + + fmt.Sprintf("%s,%s,%s,%s,%.2f,%.2f,%.2f,%.2f,%.2f,%s\n\n", + metricName, + selector, + startTime, + endTime, + metricVal.MinValue, + metricVal.MaxValue, + metricVal.AvgValue, + metricVal.SumValue, + metricVal.Fee, + metricVal.CurrencyUnit) + + csvpoints = nil + resourceUnit := metricVal.ResourceUnit + for _, p := range metricVal.ExportedSeries { + csvpoints = append(csvpoints, p.TransformToCSVPoint(metricName, selector, resourceUnit)) + } + + dataTab, err := csvutil.Marshal(csvpoints) + if err != nil { + return nil, err + } + + resBytes = append(resBytes, statsTab...) + resBytes = append(resBytes, dataTab...) + } + } + + if len(resBytes) == 0 { + resBytes = []byte("no data") } output := new(bytes.Buffer) - _, err = output.Write(resBytes) + _, err := output.Write(resBytes) + if err != nil { + return nil, err + } + + return output, nil +} + +func ExportMetrics(resp *restful.Response, metrics model.Metrics) { + resp.Header().Set(restful.HEADER_ContentType, "text/plain") + resp.Header().Set("Content-Disposition", "attachment") + + output, err := exportMetrics(metrics) if err != nil { api.HandleBadRequest(resp, nil, err) return @@ -408,5 +478,6 @@ func ExportMetrics(resp *restful.Response, metrics model.Metrics) { api.HandleBadRequest(resp, nil, err) return } + return } diff --git a/pkg/kapis/monitoring/v1alpha3/helper_test.go b/pkg/kapis/monitoring/v1alpha3/helper_test.go index 514ee886b..efadff2fd 100644 --- a/pkg/kapis/monitoring/v1alpha3/helper_test.go +++ b/pkg/kapis/monitoring/v1alpha3/helper_test.go @@ -22,10 +22,14 @@ import ( "time" "github.com/google/go-cmp/cmp" + fakesnapshot "github.com/kubernetes-csi/external-snapshotter/client/v3/clientset/versioned/fake" + fakeistio "istio.io/client-go/pkg/clientset/versioned/fake" corev1 "k8s.io/api/core/v1" + fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + fakeks "kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake" "kubesphere.io/kubesphere/pkg/informers" model "kubesphere.io/kubesphere/pkg/models/monitoring" "kubesphere.io/kubesphere/pkg/simple/client/monitoring" @@ -217,13 +221,78 @@ func TestParseRequestParams(t *testing.T) { }, expectedErr: false, }, + { + params: reqParams{ + time: "1585830000", + operation: OperationQuery, + }, + lvl: monitoring.LevelApplication, + expectedErr: true, + }, + { + params: reqParams{ + start: "1585880000", + end: "1585830000", + operation: OperationQuery, + namespaceName: "default", + applications: "app1|app2", + }, + lvl: monitoring.LevelApplication, + expectedErr: true, + }, + { + params: reqParams{ + start: "1585880000", + end: "1585830000", + operation: OperationQuery, + namespaceName: "default", + }, + lvl: monitoring.LevelApplication, + expectedErr: true, + }, + { + params: reqParams{ + target: "meter_service_cpu_usage", + time: "1585880000", + operation: OperationQuery, + namespaceName: "default", + }, + lvl: monitoring.LevelService, + expectedErr: true, + }, + { + params: reqParams{ + target: "meter_service_cpu_usage", + time: "1585880000", + operation: OperationQuery, + namespaceName: "default", + services: "svc1|svc2", + }, + lvl: monitoring.LevelService, + expectedErr: true, + }, + { + params: reqParams{ + namespaceName: "default", + }, + lvl: monitoring.LevelOpenpitrix, + expectedErr: true, + }, } for i, tt := range tests { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { client := fake.NewSimpleClientset(&tt.namespace) - fakeInformerFactory := informers.NewInformerFactories(client, nil, nil, nil, nil, nil) - handler := NewHandler(client, nil, nil, fakeInformerFactory, nil) + ksClient := fakeks.NewSimpleClientset() + istioClient := fakeistio.NewSimpleClientset() + snapshotClient := fakesnapshot.NewSimpleClientset() + apiextensionsClient := fakeapiextensions.NewSimpleClientset() + fakeInformerFactory := informers.NewInformerFactories(client, ksClient, istioClient, snapshotClient, apiextensionsClient, nil) + + fakeInformerFactory.KubeSphereSharedInformerFactory() + + handler := NewHandler(client, nil, nil, fakeInformerFactory, ksClient, nil) + result, err := handler.makeQueryOptions(tt.params, tt.lvl) if err != nil { if !tt.expectedErr { @@ -242,3 +311,71 @@ func TestParseRequestParams(t *testing.T) { }) } } + +func TestExportMetrics(t *testing.T) { + + fakeMetadata := map[string]string{ + "k1": "v1", + "k2": "v2", + "k3": "v3", + } + + fakeExportedSeries := []monitoring.ExportPoint{ + {1616641733, 2}, + {1616641800, 4}, + } + + tests := []struct { + metrics model.Metrics + expectedErr bool + }{ + { + metrics: model.Metrics{ + Results: []monitoring.Metric{ + { + MetricName: "test", + MetricData: monitoring.MetricData{ + MetricType: "", + MetricValues: []monitoring.MetricValue{ + { + Metadata: fakeMetadata, + ExportedSeries: fakeExportedSeries, + }, + }, + }, + }, + }, + }, + expectedErr: false, + }, + { + metrics: model.Metrics{ + Results: []monitoring.Metric{ + { + MetricName: "test", + MetricData: monitoring.MetricData{ + MetricType: "", + MetricValues: []monitoring.MetricValue{ + { + Metadata: fakeMetadata, + ExportedSeries: nil, + }, + }, + }, + }, + }, + }, + expectedErr: true, + }, + {}, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + _, err := exportMetrics(tt.metrics) + if err != nil && !tt.expectedErr { + t.Fatal("Failed to export metering metrics", err) + } + }) + } +} diff --git a/pkg/kapis/monitoring/v1alpha3/meter.go b/pkg/kapis/monitoring/v1alpha3/meter.go index 22bf4a7b6..cdb78526f 100644 --- a/pkg/kapis/monitoring/v1alpha3/meter.go +++ b/pkg/kapis/monitoring/v1alpha3/meter.go @@ -32,6 +32,10 @@ func getMetricPosMap(metrics []monitoring.Metric) map[string]int { return metricMap } +func (h handler) getAppWorkloads(ns string, apps []string) map[string][]string { + return h.mo.GetAppWorkloads(ns, apps) +} + func (h handler) handleApplicationMetersQuery(meters []string, resp *restful.Response, q queryOptions) { var metricMap = make(map[string]int) var res model.Metrics @@ -43,13 +47,13 @@ func (h handler) handleApplicationMetersQuery(meters []string, resp *restful.Res klog.Error("invalid application option") return } - componentsMap := h.mo.GetAppComponentsMap(aso.NamespaceName, aso.Applications) + appWorkloads := h.getAppWorkloads(aso.NamespaceName, aso.Applications) - for k, _ := range componentsMap { + for k, _ := range appWorkloads { opt := monitoring.ApplicationOption{ NamespaceName: aso.NamespaceName, Application: k, - ApplicationComponents: componentsMap[k], + ApplicationComponents: appWorkloads[k], StorageClassName: aso.StorageClassName, } diff --git a/pkg/kapis/monitoring/v1alpha3/register.go b/pkg/kapis/monitoring/v1alpha3/register.go index 507b78e5f..3531fd60e 100644 --- a/pkg/kapis/monitoring/v1alpha3/register.go +++ b/pkg/kapis/monitoring/v1alpha3/register.go @@ -20,6 +20,8 @@ package v1alpha3 import ( "net/http" + "kubesphere.io/kubesphere/pkg/client/clientset/versioned" + "github.com/emicklei/go-restful" restfulspec "github.com/emicklei/go-restful-openapi" "k8s.io/apimachinery/pkg/runtime/schema" @@ -39,10 +41,10 @@ const ( var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha3"} -func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory) error { +func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface) error { ws := runtime.NewWebService(GroupVersion) - h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, nil) + h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, ksClient, nil) ws.Route(ws.GET("/kubesphere"). To(h.handleKubeSphereMetricsQuery). diff --git a/pkg/kapis/tenant/v1alpha2/handler.go b/pkg/kapis/tenant/v1alpha2/handler.go index 219d09f97..70611d683 100644 --- a/pkg/kapis/tenant/v1alpha2/handler.go +++ b/pkg/kapis/tenant/v1alpha2/handler.go @@ -55,8 +55,7 @@ type tenantHandler struct { func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Client, auditingclient auditing.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer, - monitoringclient monitoringclient.Interface, - resourceGetter *resourcev1alpha3.ResourceGetter) *tenantHandler { + monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) *tenantHandler { return &tenantHandler{ tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourceGetter), diff --git a/pkg/kapis/tenant/v1alpha2/metering.go b/pkg/kapis/tenant/v1alpha2/metering.go index 5f1fef10b..748f12057 100644 --- a/pkg/kapis/tenant/v1alpha2/metering.go +++ b/pkg/kapis/tenant/v1alpha2/metering.go @@ -64,25 +64,26 @@ func (h *tenantHandler) QueryMeteringsHierarchy(req *restful.Request, resp *rest func (h *tenantHandler) HandlePriceInfoQuery(req *restful.Request, resp *restful.Response) { - var priceInfoResponse metering.PriceInfo - priceInfoResponse.Init() + var priceResponse metering.PriceResponse + priceResponse.Init() meterConfig, err := monitoring.LoadYaml() if err != nil { klog.Warning(err) - resp.WriteAsJson(priceInfoResponse) + resp.WriteAsJson(priceResponse) return } priceInfo := meterConfig.GetPriceInfo() - priceInfoResponse.Currency = priceInfo.CurrencyUnit - priceInfoResponse.CpuPerCorePerHour = priceInfo.CpuPerCorePerHour - priceInfoResponse.MemPerGigabytesPerHour = priceInfo.MemPerGigabytesPerHour - priceInfoResponse.IngressNetworkTrafficPerGiagabytesPerHour = priceInfo.IngressNetworkTrafficPerGiagabytesPerHour - priceInfoResponse.EgressNetworkTrafficPerGiagabytesPerHour = priceInfo.EgressNetworkTrafficPerGigabytesPerHour - priceInfoResponse.PvcPerGigabytesPerHour = priceInfo.PvcPerGigabytesPerHour + priceResponse.RetentionDay = meterConfig.RetentionDay + priceResponse.Currency = priceInfo.CurrencyUnit + priceResponse.CpuPerCorePerHour = priceInfo.CpuPerCorePerHour + priceResponse.MemPerGigabytesPerHour = priceInfo.MemPerGigabytesPerHour + priceResponse.IngressNetworkTrafficPerMegabytesPerHour = priceInfo.IngressNetworkTrafficPerMegabytesPerHour + priceResponse.EgressNetworkTrafficPerMegabytesPerHour = priceInfo.EgressNetworkTrafficPerMegabytesPerHour + priceResponse.PvcPerGigabytesPerHour = priceInfo.PvcPerGigabytesPerHour - resp.WriteAsJson(priceInfoResponse) + resp.WriteAsJson(priceResponse) return } diff --git a/pkg/kapis/tenant/v1alpha2/register.go b/pkg/kapis/tenant/v1alpha2/register.go index 233f20214..353068db3 100644 --- a/pkg/kapis/tenant/v1alpha2/register.go +++ b/pkg/kapis/tenant/v1alpha2/register.go @@ -334,6 +334,7 @@ func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8s Param(ws.PathParameter("namespace", "Namespace name.").DataType("string").Required(false)). Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both workspace CPU usage and memory usage: `meter_pod_cpu_usage|meter_pod_memory_usage_wo_cache`.").DataType("string").Required(false)). Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)). + Param(ws.QueryParameter("cluster", "Cluster name").DataType("string").Required(false)). Doc("get current metering hierarchies info in last one hour"). Writes(metering.ResourceStatistic{}). Returns(http.StatusOK, api.StatusOK, metering.ResourceStatistic{})) diff --git a/pkg/models/metering/type.go b/pkg/models/metering/type.go index 8e3ff1631..8414593f2 100644 --- a/pkg/models/metering/type.go +++ b/pkg/models/metering/type.go @@ -1,12 +1,17 @@ package metering type PriceInfo struct { - Currency string `json:"currency" description:"currency"` - CpuPerCorePerHour float64 `json:"cpu_per_core_per_hour,omitempty" description:"cpu price"` - MemPerGigabytesPerHour float64 `json:"mem_per_gigabytes_per_hour,omitempty" description:"mem price"` - IngressNetworkTrafficPerGiagabytesPerHour float64 `json:"ingress_network_traffic_per_giagabytes_per_hour,omitempty" description:"ingress price"` - EgressNetworkTrafficPerGiagabytesPerHour float64 `json:"egress_network_traffic_per_gigabytes_per_hour,omitempty" description:"egress price"` - PvcPerGigabytesPerHour float64 `json:"pvc_per_gigabytes_per_hour,omitempty" description:"pvc price"` + Currency string `json:"currency" description:"currency"` + CpuPerCorePerHour float64 `json:"cpu_per_core_per_hour,omitempty" description:"cpu price"` + MemPerGigabytesPerHour float64 `json:"mem_per_gigabytes_per_hour,omitempty" description:"mem price"` + IngressNetworkTrafficPerMegabytesPerHour float64 `json:"ingress_network_traffic_per_megabytes_per_hour,omitempty" description:"ingress price"` + EgressNetworkTrafficPerMegabytesPerHour float64 `json:"egress_network_traffic_per_megabytes_per_hour,omitempty" description:"egress price"` + PvcPerGigabytesPerHour float64 `json:"pvc_per_gigabytes_per_hour,omitempty" description:"pvc price"` +} + +type PriceResponse struct { + RetentionDay string `json:"retention_day"` + PriceInfo `json:",inline"` } // currently init method fill illegal value to hint that metering config file was not mounted yet @@ -14,8 +19,8 @@ func (p *PriceInfo) Init() { p.Currency = "" p.CpuPerCorePerHour = -1 p.MemPerGigabytesPerHour = -1 - p.IngressNetworkTrafficPerGiagabytesPerHour = -1 - p.EgressNetworkTrafficPerGiagabytesPerHour = -1 + p.IngressNetworkTrafficPerMegabytesPerHour = -1 + p.EgressNetworkTrafficPerMegabytesPerHour = -1 p.PvcPerGigabytesPerHour = -1 } @@ -47,46 +52,105 @@ func (ps *PodsStats) Set(podName, meterName string, value float64) { } } -type AppStatistic struct { - CPUUsage float64 `json:"cpu_usage" description:"cpu_usage"` - MemoryUsageWoCache float64 `json:"memory_usage_wo_cache" description:"memory_usage_wo_cache"` - NetBytesTransmitted float64 `json:"net_bytes_transmitted" description:"net_bytes_transmitted"` - NetBytesReceived float64 `json:"net_bytes_received" description:"net_bytes_received"` - PVCBytesTotal float64 `json:"pvc_bytes_total" description:"pvc_bytes_total"` - Services map[string]*ServiceStatistic `json:"services" description:"services"` +type OpenPitrixStatistic struct { + AppStatistic } -func (as *AppStatistic) GetServiceStats(name string) *ServiceStatistic { - if as.Services == nil { - as.Services = make(map[string]*ServiceStatistic) +type AppStatistic struct { + CPUUsage float64 `json:"cpu_usage" description:"cpu_usage"` + MemoryUsageWoCache float64 `json:"memory_usage_wo_cache" description:"memory_usage_wo_cache"` + NetBytesTransmitted float64 `json:"net_bytes_transmitted" description:"net_bytes_transmitted"` + NetBytesReceived float64 `json:"net_bytes_received" description:"net_bytes_received"` + PVCBytesTotal float64 `json:"pvc_bytes_total" description:"pvc_bytes_total"` + Deploys map[string]*DeploymentStatistic `json:"deployments" description:"deployment statistic"` + Statefulsets map[string]*StatefulsetStatistic `json:"statefulsets" description:"statefulset statistic"` + Daemonsets map[string]*DaemonsetStatistic `json:"daemonsets" description:"daemonsets statistics"` +} + +func (as *AppStatistic) GetDeployStats(name string) *DeploymentStatistic { + if as.Deploys == nil { + as.Deploys = make(map[string]*DeploymentStatistic) } - if as.Services[name] == nil { - as.Services[name] = &ServiceStatistic{} + if as.Deploys[name] == nil { + as.Deploys[name] = &DeploymentStatistic{} } - return as.Services[name] + return as.Deploys[name] +} + +func (as *AppStatistic) GetDaemonStats(name string) *DaemonsetStatistic { + if as.Daemonsets == nil { + as.Daemonsets = make(map[string]*DaemonsetStatistic) + } + if as.Daemonsets[name] == nil { + as.Daemonsets[name] = &DaemonsetStatistic{} + } + return as.Daemonsets[name] +} + +func (as *AppStatistic) GetStatefulsetStats(name string) *StatefulsetStatistic { + if as.Statefulsets == nil { + as.Statefulsets = make(map[string]*StatefulsetStatistic) + } + if as.Statefulsets[name] == nil { + as.Statefulsets[name] = &StatefulsetStatistic{} + } + return as.Statefulsets[name] } func (as *AppStatistic) Aggregate() { - if as.Services == nil { + if as.Deploys == nil && as.Statefulsets == nil && as.Daemonsets == nil { return } - // remove duplicate pods which were selected by different svc - podsMap := make(map[string]struct{}) - for _, svcObj := range as.Services { - for podName, podObj := range svcObj.Pods { - if _, ok := podsMap[podName]; ok { - continue - } else { - podsMap[podName] = struct{}{} - } - as.CPUUsage += podObj.CPUUsage - as.MemoryUsageWoCache += podObj.MemoryUsageWoCache - as.NetBytesTransmitted += podObj.NetBytesTransmitted - as.NetBytesReceived += podObj.NetBytesReceived - as.PVCBytesTotal += podObj.PVCBytesTotal + // aggregate deployment stats + for _, deployObj := range as.Deploys { + for _, podObj := range deployObj.Pods { + deployObj.CPUUsage += podObj.CPUUsage + deployObj.MemoryUsageWoCache += podObj.MemoryUsageWoCache + deployObj.NetBytesTransmitted += podObj.NetBytesTransmitted + deployObj.NetBytesReceived += podObj.NetBytesReceived + deployObj.PVCBytesTotal += podObj.PVCBytesTotal } + as.CPUUsage += deployObj.CPUUsage + as.MemoryUsageWoCache += deployObj.MemoryUsageWoCache + as.NetBytesTransmitted += deployObj.NetBytesTransmitted + as.NetBytesReceived += deployObj.NetBytesReceived + as.PVCBytesTotal += deployObj.PVCBytesTotal } + + // aggregate statfulset stats + for _, statfulObj := range as.Statefulsets { + for _, podObj := range statfulObj.Pods { + statfulObj.CPUUsage += podObj.CPUUsage + statfulObj.MemoryUsageWoCache += podObj.MemoryUsageWoCache + statfulObj.NetBytesTransmitted += podObj.NetBytesTransmitted + statfulObj.NetBytesReceived += podObj.NetBytesReceived + statfulObj.PVCBytesTotal += podObj.PVCBytesTotal + } + as.CPUUsage += statfulObj.CPUUsage + as.MemoryUsageWoCache += statfulObj.MemoryUsageWoCache + as.NetBytesTransmitted += statfulObj.NetBytesTransmitted + as.NetBytesReceived += statfulObj.NetBytesReceived + as.PVCBytesTotal += statfulObj.PVCBytesTotal + } + + // aggregate daemonset stats + for _, daemonsetObj := range as.Daemonsets { + for _, podObj := range daemonsetObj.Pods { + daemonsetObj.CPUUsage += podObj.CPUUsage + daemonsetObj.MemoryUsageWoCache += podObj.MemoryUsageWoCache + daemonsetObj.NetBytesTransmitted += podObj.NetBytesTransmitted + daemonsetObj.NetBytesReceived += podObj.NetBytesReceived + daemonsetObj.PVCBytesTotal += podObj.PVCBytesTotal + } + as.CPUUsage += daemonsetObj.CPUUsage + as.MemoryUsageWoCache += daemonsetObj.MemoryUsageWoCache + as.NetBytesTransmitted += daemonsetObj.NetBytesTransmitted + as.NetBytesReceived += daemonsetObj.NetBytesReceived + as.PVCBytesTotal += daemonsetObj.PVCBytesTotal + } + + return } type ServiceStatistic struct { @@ -251,13 +315,28 @@ func (ds *DaemonsetStatistic) Aggregate() { } type ResourceStatistic struct { - Apps map[string]*AppStatistic `json:"apps" description:"app statistic"` - Services map[string]*ServiceStatistic `json:"services" description:"service statistic"` + // openpitrix statistic + OpenPitrixs map[string]*OpenPitrixStatistic `json:"openpitrixs" description:"openpitrix statistic"` + + // app crd statistic + Apps map[string]*AppStatistic `json:"apps" description:"app statistic"` + + // k8s workload only which exclude app and op Deploys map[string]*DeploymentStatistic `json:"deployments" description:"deployment statistic"` Statefulsets map[string]*StatefulsetStatistic `json:"statefulsets" description:"statefulset statistic"` Daemonsets map[string]*DaemonsetStatistic `json:"daemonsets" description:"daemonsets statistics"` } +func (rs *ResourceStatistic) GetOpenPitrixStats(name string) *OpenPitrixStatistic { + if rs.OpenPitrixs == nil { + rs.OpenPitrixs = make(map[string]*OpenPitrixStatistic) + } + if rs.OpenPitrixs[name] == nil { + rs.OpenPitrixs[name] = &OpenPitrixStatistic{} + } + return rs.OpenPitrixs[name] +} + func (rs *ResourceStatistic) GetAppStats(name string) *AppStatistic { if rs.Apps == nil { rs.Apps = make(map[string]*AppStatistic) @@ -268,16 +347,6 @@ func (rs *ResourceStatistic) GetAppStats(name string) *AppStatistic { return rs.Apps[name] } -func (rs *ResourceStatistic) GetServiceStats(name string) *ServiceStatistic { - if rs.Services == nil { - rs.Services = make(map[string]*ServiceStatistic) - } - if rs.Services[name] == nil { - rs.Services[name] = &ServiceStatistic{} - } - return rs.Services[name] -} - func (rs *ResourceStatistic) GetDeployStats(name string) *DeploymentStatistic { if rs.Deploys == nil { rs.Deploys = make(map[string]*DeploymentStatistic) diff --git a/pkg/models/monitoring/monitoring.go b/pkg/models/monitoring/monitoring.go index 258e7e884..d0a6d8b70 100644 --- a/pkg/models/monitoring/monitoring.go +++ b/pkg/models/monitoring/monitoring.go @@ -59,7 +59,7 @@ type MonitoringOperator interface { // meter GetNamedMetersOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) (Metrics, error) GetNamedMeters(metrics []string, time time.Time, opt monitoring.QueryOption) (Metrics, error) - GetAppComponentsMap(ns string, apps []string) map[string][]string + GetAppWorkloads(ns string, apps []string) map[string][]string GetSerivePodsMap(ns string, services []string) map[string][]string } @@ -432,10 +432,10 @@ func (mo monitoringOperator) GetNamedMetersOverTime(meters []string, start, end } // query time range: (start, end], so here we need to exclude start itself. - if start.Add(step).After(end) { + if start.Add(time.Hour).After(end) { start = end } else { - start = start.Add(step) + start = start.Add(time.Hour) } var opts []monitoring.QueryOption @@ -444,10 +444,10 @@ func (mo monitoringOperator) GetNamedMetersOverTime(meters []string, start, end opts = append(opts, monitoring.MeterOption{ Start: start, End: end, - Step: step, + Step: time.Hour, }) - ress := mo.prometheus.GetNamedMetersOverTime(meters, start, end, step, opts) + ress := mo.prometheus.GetNamedMetersOverTime(meters, start, end, time.Hour, opts) sMap := generateScalingFactorMap(step) for i, _ := range ress { @@ -471,7 +471,7 @@ func (mo monitoringOperator) GetNamedMeters(meters []string, time time.Time, opt return metersPerHour, nil } -func (mo monitoringOperator) GetAppComponentsMap(ns string, apps []string) map[string][]string { +func (mo monitoringOperator) GetAppWorkloads(ns string, apps []string) map[string][]string { componentsMap := make(map[string][]string) applicationList := []*appv1beta1.Application{} @@ -584,7 +584,7 @@ func (mo monitoringOperator) GetSerivePodsMap(ns string, services []string) map[ svcSelector := svcObj.Spec.Selector if len(svcSelector) == 0 { - return svcPodsMap + continue } svcLabels := labels.Set{} diff --git a/pkg/models/monitoring/utils.go b/pkg/models/monitoring/utils.go index 6cd6425cf..5f6ab87c2 100644 --- a/pkg/models/monitoring/utils.go +++ b/pkg/models/monitoring/utils.go @@ -71,12 +71,12 @@ var MeterResourceMap = map[string]int{ } type PriceInfo struct { - CpuPerCorePerHour float64 `json:"cpuPerCorePerHour" yaml:"cpuPerCorePerHour"` - MemPerGigabytesPerHour float64 `json:"memPerGigabytesPerHour" yaml:"memPerGigabytesPerHour"` - IngressNetworkTrafficPerGiagabytesPerHour float64 `json:"ingressNetworkTrafficPerGiagabytesPerHour" yaml:"ingressNetworkTrafficPerGiagabytesPerHour"` - EgressNetworkTrafficPerGigabytesPerHour float64 `json:"egressNetworkTrafficPerGigabytesPerHour" yaml:"egressNetworkTrafficPerGigabytesPerHour"` - PvcPerGigabytesPerHour float64 `json:"pvcPerGigabytesPerHour" yaml:"pvcPerGigabytesPerHour"` - CurrencyUnit string `json:"currencyUnit" yaml:"currencyUnit"` + CpuPerCorePerHour float64 `json:"cpuPerCorePerHour" yaml:"cpuPerCorePerHour"` + MemPerGigabytesPerHour float64 `json:"memPerGigabytesPerHour" yaml:"memPerGigabytesPerHour"` + IngressNetworkTrafficPerMegabytesPerHour float64 `json:"ingressNetworkTrafficPerMegabytesPerHour" yaml:"ingressNetworkTrafficPerGiagabytesPerHour"` + EgressNetworkTrafficPerMegabytesPerHour float64 `json:"egressNetworkTrafficPerMegabytesPerHour" yaml:"egressNetworkTrafficPerGigabytesPerHour"` + PvcPerGigabytesPerHour float64 `json:"pvcPerGigabytesPerHour" yaml:"pvcPerGigabytesPerHour"` + CurrencyUnit string `json:"currencyUnit" yaml:"currencyUnit"` } type Billing struct { @@ -84,7 +84,8 @@ type Billing struct { } type MeterConfig struct { - Billing Billing `json:"billing" yaml:"billing"` + RetentionDay string `json:"retentionDay" yaml:"retentionDay"` + Billing Billing `json:"billing" yaml:"billing"` } func (mc MeterConfig) GetPriceInfo() PriceInfo { @@ -197,11 +198,11 @@ func getFeeWithMeterName(meterName string, sum float64) float64 { case METER_RESOURCE_TYPE_NET_INGRESS: // unit: Megabyte, precision: 1 sum = math.Round(sum / 1048576) - return priceInfo.IngressNetworkTrafficPerGiagabytesPerHour * sum + return priceInfo.IngressNetworkTrafficPerMegabytesPerHour * sum case METER_RESOURCE_TYPE_NET_EGRESS: - // unit: Megabyte, precision: + // unit: Megabyte, precision: 1 sum = math.Round(sum / 1048576) - return priceInfo.EgressNetworkTrafficPerGigabytesPerHour * sum + return priceInfo.EgressNetworkTrafficPerMegabytesPerHour * sum case METER_RESOURCE_TYPE_PVC: // unit: Gigabyte, precision: 0.1 sum = math.Round(sum/1073741824*10) / 10 @@ -217,37 +218,56 @@ func updateMetricStatData(metric monitoring.Metric, scalingMap map[string]float6 metricData := metric.MetricData for index, metricValue := range metricData.MetricValues { - var points []monitoring.Point + // calulate min, max, avg value first, then squash points with factor if metricData.MetricType == monitoring.MetricTypeMatrix { - points = metricValue.Series + metricData.MetricValues[index].MinValue = getMinPointValue(metricValue.Series) + metricData.MetricValues[index].MaxValue = getMaxPointValue(metricValue.Series) + metricData.MetricValues[index].AvgValue = getAvgPointValue(metricValue.Series) } else { - points = append(points, *metricValue.Sample) + metricData.MetricValues[index].MinValue = (*metricValue.Sample)[1] + metricData.MetricValues[index].MaxValue = (*metricValue.Sample)[1] + metricData.MetricValues[index].AvgValue = (*metricValue.Sample)[1] } + // squash points if step is more than one hour and calculate sum and fee var factor float64 = 1 if scalingMap != nil { factor = scalingMap[metricName] } + metricData.MetricValues[index].Series = squashPoints(metricData.MetricValues[index].Series, int(factor)) - if len(points) == 1 { - sample := points[0] - sum := sample[1] * factor - metricData.MetricValues[index].MinValue = sample[1] - metricData.MetricValues[index].MaxValue = sample[1] - metricData.MetricValues[index].AvgValue = sample[1] + if metricData.MetricType == monitoring.MetricTypeMatrix { + sum := getSumPointValue(metricData.MetricValues[index].Series) metricData.MetricValues[index].SumValue = sum metricData.MetricValues[index].Fee = getFeeWithMeterName(metricName, sum) } else { - sum := getSumPointValue(points) * factor - metricData.MetricValues[index].MinValue = getMinPointValue(points) - metricData.MetricValues[index].MaxValue = getMaxPointValue(points) - metricData.MetricValues[index].AvgValue = getAvgPointValue(points) + sum := (*metricValue.Sample)[1] metricData.MetricValues[index].SumValue = sum metricData.MetricValues[index].Fee = getFeeWithMeterName(metricName, sum) } + metricData.MetricValues[index].CurrencyUnit = getCurrencyUnit() metricData.MetricValues[index].ResourceUnit = getResourceUnit(metricName) } return metricData } + +func squashPoints(input []monitoring.Point, factor int) (output []monitoring.Point) { + + if factor <= 0 { + klog.Errorln("factor should be positive") + return nil + } + + for i := 0; i < len(input); i++ { + + if i%factor == 0 { + output = append([]monitoring.Point{input[len(input)-1-i]}, output...) + } else { + output[0] = output[0].Add(input[len(input)-1-i]) + } + } + + return output +} diff --git a/pkg/models/tenant/metering.go b/pkg/models/tenant/metering.go index 427845305..2f129762e 100644 --- a/pkg/models/tenant/metering.go +++ b/pkg/models/tenant/metering.go @@ -598,7 +598,7 @@ func (t *tenantOperator) processApplicationMetersQuery(meters []string, q QueryO klog.Error(err.Error()) return } - componentsMap := t.mo.GetAppComponentsMap(aso.NamespaceName, aso.Applications) + componentsMap := t.mo.GetAppWorkloads(aso.NamespaceName, aso.Applications) for k, _ := range componentsMap { opt := monitoring.ApplicationOption{ @@ -698,90 +698,24 @@ func (t *tenantOperator) transformMetricData(metrics monitoringmodel.Metrics) me return podsStats } -func (t *tenantOperator) classifyPodStats(user user.Info, ns string, podsStats metering.PodsStats) (resourceStats metering.ResourceStatistic, err error) { - - if err = t.updateServicesStats(user, ns, podsStats, &resourceStats); err != nil { +func (t *tenantOperator) classifyPodStats(user user.Info, cluster, ns string, podsStats metering.PodsStats) (resourceStats metering.ResourceStatistic, err error) { + // classify pod stats into following 3 levels under spedified namespace and user info + // 1. project -> workload(deploy, sts, ds) -> pod + // 2. project -> app -> workload(deploy, sts, ds) -> pod + // 3. project -> op -> workload(deploy, sts, ds) -> pod + if err = t.updateDeploysStats(user, cluster, ns, podsStats, &resourceStats); err != nil { return } - - if err = t.updateDeploysStats(user, ns, podsStats, &resourceStats); err != nil { + if err = t.updateDaemonsetsStats(user, cluster, ns, podsStats, &resourceStats); err != nil { return } - - if err = t.updateDaemonsetsStats(user, ns, podsStats, &resourceStats); err != nil { - return - } - - if err = t.updateStatefulsetsStats(user, ns, podsStats, &resourceStats); err != nil { + if err = t.updateStatefulsetsStats(user, cluster, ns, podsStats, &resourceStats); err != nil { return } return } -func (t *tenantOperator) updateServicesStats(user user.Info, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error { - - svcList, err := t.listServices(user, ns) - if err != nil { - return err - } - - for _, svc := range svcList.Items { - if svc.Annotations[constants.ApplicationReleaseName] != "" && - svc.Annotations[constants.ApplicationReleaseNS] != "" && - t.isOpNamespace(ns) { - // for op svc - // currently we do NOT include op svc - continue - } else { - appName, nameOK := svc.Labels[constants.ApplicationName] - appVersion, versionOK := svc.Labels[constants.ApplicationVersion] - - svcPodsMap := t.mo.GetSerivePodsMap(ns, []string{svc.Name}) - pods := svcPodsMap[svc.Name] - - if nameOK && versionOK { - // for app crd svc - for _, pod := range pods { - podStat := podsStats[pod] - if podStat == nil { - klog.Warningf("%v not found", pod) - continue - } - - appFullName := appName + ":" + appVersion - if err := resourceStats.GetAppStats(appFullName).GetServiceStats(svc.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } - } - } else { - // for k8s svc - for _, pod := range pods { - if err := resourceStats.GetServiceStats(svc.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } - } - } - } - } - - // aggregate svc data - for _, app := range resourceStats.Apps { - for _, svc := range app.Services { - svc.Aggregate() - } - app.Aggregate() - } - - for _, svc := range resourceStats.Services { - svc.Aggregate() - } - - return nil -} - func (t *tenantOperator) listServices(user user.Info, ns string) (*corev1.ServiceList, error) { svcScope := request.NamespaceScope @@ -817,7 +751,11 @@ func (t *tenantOperator) listServices(user user.Info, ns string) (*corev1.Servic return svcs, nil } -func (t *tenantOperator) updateDeploysStats(user user.Info, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error { +// updateDeploysStats will update deployment field in resource stats struct with pod stats data and deployments will be classified into 3 classes: +// 1. openpitrix deployments +// 2. app deployments +// 3. k8s deploymnets +func (t *tenantOperator) updateDeploysStats(user user.Info, cluster, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error { deployList, err := t.listDeploys(user, ns) if err != nil { return err @@ -825,44 +763,63 @@ func (t *tenantOperator) updateDeploysStats(user user.Info, ns string, podsStats for _, deploy := range deployList.Items { - if deploy.Annotations[constants.ApplicationReleaseName] != "" && - deploy.Annotations[constants.ApplicationReleaseNS] != "" && - t.isOpNamespace(ns) { - // for op deploy - // currently we do NOT include op deploy - continue - } else { - _, appNameOK := deploy.Labels[constants.ApplicationName] - _, appVersionOK := deploy.Labels[constants.ApplicationVersion] + pods, err := t.listPods(user, ns, deploy.Spec.Selector) + if err != nil { + klog.Error(err) + return err + } - pods, err := t.listPods(user, ns, deploy.Spec.Selector) - if err != nil { - klog.Error(err) - return err + if ok, _ := t.isOpenPitrixComponent(cluster, ns, "deployment", deploy.Name); ok { + // TODO: for op deployment + continue + } else if ok, appName := t.isAppComponent(ns, "deployment", deploy.Name); ok { + // for app deployment + for _, pod := range pods { + podsStat := podsStats[pod] + if podsStat == nil { + klog.Warningf("%v not found", pod) + continue + } + + if err := resourceStats.GetAppStats(appName).GetDeployStats(deploy.Name).SetPodStats(pod, podsStat); err != nil { + klog.Error(err) + return err + } } - if appNameOK && appVersionOK { - // for app crd svc - continue - } else { - // for k8s svc - for _, pod := range pods { - if err := resourceStats.GetDeployStats(deploy.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } + } else { + // for k8s deployment only + for _, pod := range pods { + if err := resourceStats.GetDeployStats(deploy.Name).SetPodStats(pod, podsStats[pod]); err != nil { + klog.Error(err) + return err } } } } + // TODO: op aggregate for deployment components + for _, op := range resourceStats.OpenPitrixs { + op.Aggregate() + } + + // app aggregate for deployment components + for _, app := range resourceStats.Apps { + app.Aggregate() + } + + // k8s aggregate for deployment components for _, deploy := range resourceStats.Deploys { deploy.Aggregate() } return nil } -func (t *tenantOperator) updateDaemonsetsStats(user user.Info, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error { +// updateDaemonsetsStats will update daemonsets field in resource stats struct with pod stats data and daemonsets will be classified into 3 classes: +// 1. openpitrix daemonsets +// 2. app daemonsets +// 3. k8s daemonsets +func (t *tenantOperator) updateDaemonsetsStats(user user.Info, cluster, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error { daemonsetList, err := t.listDaemonsets(user, ns) if err != nil { return err @@ -870,59 +827,81 @@ func (t *tenantOperator) updateDaemonsetsStats(user user.Info, ns string, podsSt for _, daemonset := range daemonsetList.Items { - if daemonset.Annotations["meta.helm.sh/release-name"] != "" && - daemonset.Annotations["meta.helm.sh/release-namespace"] != "" && - t.isOpNamespace(ns) { - // for op deploy - // currently we do NOT include op deploy + pods, err := t.listPods(user, ns, daemonset.Spec.Selector) + if err != nil { + klog.Error(err) + return err + } + + if ok, _ := t.isOpenPitrixComponent(cluster, ns, "daemonset", daemonset.Name); ok { + // TODO: for op daemonset continue - } else { - appName := daemonset.Labels[constants.ApplicationName] - appVersion := daemonset.Labels[constants.ApplicationVersion] - - pods, err := t.listPods(user, ns, daemonset.Spec.Selector) - if err != nil { - klog.Error(err) - return err + } else if ok, appName := t.isAppComponent(ns, "daemonset", daemonset.Name); ok { + // for app daemonset + for _, pod := range pods { + // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists + // and then set pod stats data, the direction is as follows: + // app field(create if not existed) -> statefulsets field(create if not existed) -> pod + if err := resourceStats.GetAppStats(appName).GetDaemonStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil { + klog.Error(err) + return err + } } - - if appName != "" && appVersion != "" { - // for app crd svc - continue - } else { - // for k8s svc - for _, pod := range pods { - if err := resourceStats.GetDaemonsetStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } + } else { + // for k8s daemonset + for _, pod := range pods { + if err := resourceStats.GetDaemonsetStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil { + klog.Error(err) + return err } } } } + // here pod stats and level struct are ready + + // TODO: op aggregate for daemonset components + for _, op := range resourceStats.OpenPitrixs { + op.Aggregate() + } + + // app aggregate for daemonset components + for _, app := range resourceStats.Apps { + app.Aggregate() + } + + // k8s aggregate for daemonset components for _, daemonset := range resourceStats.Daemonsets { daemonset.Aggregate() } return nil } -func (t *tenantOperator) isOpNamespace(ns string) bool { - - nsObj, err := t.k8sclient.CoreV1().Namespaces().Get(context.Background(), ns, metav1.GetOptions{}) - if err != nil { - return false - } - - ws := nsObj.Labels[constants.WorkspaceLabelKey] - - if len(ws) != 0 && ws != "system-workspace" { - return true - } - return false +// TODO: include op metering part +func (t *tenantOperator) isOpenPitrixComponent(cluster, ns, kind, componentName string) (bool, string) { + return false, "" } -func (t *tenantOperator) updateStatefulsetsStats(user user.Info, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error { +func (t *tenantOperator) isAppComponent(ns, kind, componentName string) (bool, string) { + + appWorkloads := t.mo.GetAppWorkloads(ns, nil) + + for appName, cList := range appWorkloads { + for _, component := range cList { + if component == fmt.Sprintf("%s:%s", strings.Title(kind), componentName) { + return true, appName + } + } + } + + return false, "" +} + +// updateStatefulsetsStats will update statefulsets field in resource stats struct with pod stats data and statefulsets will be classified into 3 classes: +// 1. openpitrix statefulsets +// 2. app statefulsets +// 3. k8s statefulsets +func (t *tenantOperator) updateStatefulsetsStats(user user.Info, cluster, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error { statefulsetsList, err := t.listStatefulsets(user, ns) if err != nil { return err @@ -930,37 +909,51 @@ func (t *tenantOperator) updateStatefulsetsStats(user user.Info, ns string, pods for _, statefulset := range statefulsetsList.Items { - if statefulset.Annotations[constants.ApplicationReleaseName] != "" && - statefulset.Annotations[constants.ApplicationReleaseNS] != "" && - t.isOpNamespace(ns) { - // for op deploy - // currently we do NOT include op deploy + // query pod list under the statefulset within the namespace + pods, err := t.listPods(user, ns, statefulset.Spec.Selector) + if err != nil { + klog.Error(err) + return err + } + + if ok, _ := t.isOpenPitrixComponent(cluster, ns, "statefulset", statefulset.Name); ok { + // TODO: for op statefulset continue - } else { - appName := statefulset.Labels[constants.ApplicationName] - appVersion := statefulset.Labels[constants.ApplicationVersion] - - pods, err := t.listPods(user, ns, statefulset.Spec.Selector) - if err != nil { - klog.Error(err) - return err + } else if ok, appName := t.isAppComponent(ns, "daemonset", statefulset.Name); ok { + // for app statefulset + for _, pod := range pods { + // aggregate order is from bottom(pods) to top(app), we should create outer field if not exists + // and then set pod stats data, the direction is as follows: + // app field(create if not existed) -> statefulsets field(create if not existed) -> pod + if err := resourceStats.GetAppStats(appName).GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil { + klog.Error(err) + return err + } } - - if appName != "" && appVersion != "" { - // for app crd svc - continue - } else { - // for k8s svc - for _, pod := range pods { - if err := resourceStats.GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil { - klog.Error(err) - return err - } + } else { + // for k8s statefulset + for _, pod := range pods { + // same as above, the direction is similar: + // k8s field(create if not existed) -> statefulsets field(create if not existed) -> pod + if err := resourceStats.GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil { + klog.Error(err) + return err } } } } + // TODO: op aggregate for statefulset components + for _, op := range resourceStats.OpenPitrixs { + op.Aggregate() + } + + // app aggregate for statefulset components + for _, app := range resourceStats.Apps { + app.Aggregate() + } + + // k8s aggregate for statefulset components for _, statefulset := range resourceStats.Statefulsets { statefulset.Aggregate() } @@ -1048,6 +1041,16 @@ func (t *tenantOperator) listDeploys(user user.Info, ns string) (*appv1.Deployme return deploys, nil } +func (t *tenantOperator) getAppNameFromLabels(labels map[string]string) string { + appName := labels[constants.ApplicationName] + appVersion := labels[constants.ApplicationVersion] + if appName == "" || appVersion == "" { + return "" + } + + return fmt.Sprintf("%s:%s", appName, appVersion) +} + func (t *tenantOperator) listDaemonsets(user user.Info, ns string) (*appv1.DaemonSetList, error) { dsScope := request.NamespaceScope diff --git a/pkg/models/tenant/tenant.go b/pkg/models/tenant/tenant.go index 2d5808057..0c018ef2a 100644 --- a/pkg/models/tenant/tenant.go +++ b/pkg/models/tenant/tenant.go @@ -24,6 +24,8 @@ import ( "strings" "time" + "kubesphere.io/kubesphere/pkg/models/openpitrix" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -104,9 +106,15 @@ type tenantOperator struct { lo logging.LoggingOperator auditing auditing.Interface mo monitoring.MonitoringOperator + opRelease openpitrix.ReleaseInterface } func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Client, auditingclient auditingclient.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer, monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) Interface { + var openpitrixRelease openpitrix.ReleaseInterface + if ksclient != nil { + openpitrixRelease = openpitrix.NewOpenpitrixOperator(informers, ksclient, nil) + } + return &tenantOperator{ am: am, authorizer: authorizer, @@ -117,6 +125,7 @@ func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ks lo: logging.NewLoggingOperator(loggingClient), auditing: auditing.NewEventsOperator(auditingclient), mo: monitoring.NewMonitoringOperator(monitoringclient, nil, k8sclient, informers, resourceGetter), + opRelease: openpitrixRelease, } } @@ -989,7 +998,7 @@ func (t *tenantOperator) MeteringHierarchy(user user.Info, queryParam *meteringv podsStats := t.transformMetricData(res) // classify pods stats - resourceStats, err := t.classifyPodStats(user, queryParam.NamespaceName, podsStats) + resourceStats, err := t.classifyPodStats(user, "", queryParam.NamespaceName, podsStats) if err != nil { klog.Error(err) return metering.ResourceStatistic{}, err diff --git a/pkg/simple/client/alerting/rule_client.go b/pkg/simple/client/alerting/rule_client.go index a51275653..05a07715c 100644 --- a/pkg/simple/client/alerting/rule_client.go +++ b/pkg/simple/client/alerting/rule_client.go @@ -7,8 +7,8 @@ import ( "net/http" "github.com/pkg/errors" + "github.com/prometheus/client_golang/api" ) -import "github.com/prometheus/client_golang/api" const ( apiPrefix = "/api/v1" diff --git a/pkg/simple/client/metering/options.go b/pkg/simple/client/metering/options.go new file mode 100644 index 000000000..d95d9f619 --- /dev/null +++ b/pkg/simple/client/metering/options.go @@ -0,0 +1,9 @@ +package metering + +type Options struct { + Enable bool `json:"enable" yaml:"enable"` +} + +func NewMeteringOptions() *Options { + return &Options{} +} diff --git a/pkg/simple/client/monitoring/query_options.go b/pkg/simple/client/monitoring/query_options.go index 75b9afeb3..7def5aeca 100644 --- a/pkg/simple/client/monitoring/query_options.go +++ b/pkg/simple/client/monitoring/query_options.go @@ -30,6 +30,7 @@ const ( LevelWorkspace LevelNamespace LevelApplication + LevelOpenpitrix LevelWorkload LevelService LevelPod @@ -150,6 +151,7 @@ func (aso ApplicationsOption) Apply(o *QueryOptions) { return } +// ApplicationsOption & OpenpitrixsOption share the same ApplicationOption struct type ApplicationOption struct { NamespaceName string Application string diff --git a/pkg/simple/client/monitoring/types.go b/pkg/simple/client/monitoring/types.go index 954e6d62d..353058a75 100644 --- a/pkg/simple/client/monitoring/types.go +++ b/pkg/simple/client/monitoring/types.go @@ -20,11 +20,9 @@ import ( "errors" "fmt" "strconv" - "strings" "time" jsoniter "github.com/json-iterator/go" - "github.com/jszwec/csvutil" ) const ( @@ -46,41 +44,11 @@ type Metric struct { type MetricValues []MetricValue -func (m MetricValues) MarshalCSV() ([]byte, error) { - - var ret []string - for _, v := range m { - tmp, err := v.MarshalCSV() - if err != nil { - return nil, err - } - - ret = append(ret, string(tmp)) - } - - return []byte(strings.Join(ret, "||")), nil -} - type MetricData struct { MetricType string `json:"resultType,omitempty" description:"result type, one of matrix, vector" csv:"metric_type"` MetricValues `json:"result,omitempty" description:"metric data including labels, time series and values" csv:"metric_values"` } -func (m MetricData) MarshalCSV() ([]byte, error) { - var ret []byte - - for _, v := range m.MetricValues { - tmp, err := csvutil.Marshal(&v) - if err != nil { - return nil, err - } - - ret = append(ret, tmp...) - } - - return ret, nil -} - // The first element is the timestamp, the second is the metric value. // eg, [1585658599.195, 0.528] type Point [2]float64 @@ -104,41 +72,6 @@ type MetricValue struct { CurrencyUnit string `json:"currency_unit"` } -func (mv MetricValue) MarshalCSV() ([]byte, error) { - // metric value format: - // target,stats value(include fees),exported_value,exported_values - // for example: - // {workspace:demo-ws},,2021-02-23 01:00:00 AM 0|2021-02-23 02:00:00 AM 0|... - var metricValueCSVTemplate = "{%s},unit:%s|min:%.3f|max:%.3f|avg:%.3f|sum:%.3f|fee:%.2f %s,%s,%s" - - var targetList []string - for k, v := range mv.Metadata { - targetList = append(targetList, fmt.Sprintf("%s=%s", k, v)) - } - - exportedSampleStr := "" - if mv.ExportSample != nil { - exportedSampleStr = mv.ExportSample.Format() - } - - exportedSeriesStrList := []string{} - for _, v := range mv.ExportedSeries { - exportedSeriesStrList = append(exportedSeriesStrList, v.Format()) - } - - return []byte(fmt.Sprintf(metricValueCSVTemplate, - strings.Join(targetList, "|"), - mv.ResourceUnit, - mv.MinValue, - mv.MaxValue, - mv.AvgValue, - mv.SumValue, - mv.Fee, - mv.CurrencyUnit, - exportedSampleStr, - exportedSeriesStrList)), nil -} - func (mv *MetricValue) TransferToExportedMetricValue() { if mv.Sample != nil { @@ -167,6 +100,10 @@ func (p Point) transferToExported() ExportPoint { return ExportPoint{p[0], p[1]} } +func (p Point) Add(other Point) Point { + return Point{p[0], p[1] + other[1]} +} + // MarshalJSON implements json.Marshaler. It will be called when writing JSON to HTTP response // Inspired by prometheus/client_golang func (p Point) MarshalJSON() ([]byte, error) { @@ -214,6 +151,14 @@ func (p *Point) UnmarshalJSON(b []byte) error { return nil } +type CSVPoint struct { + MetricName string `csv:"metric_name"` + Selector string `csv:"selector"` + Time string `csv:"time"` + Value string `csv:"value"` + ResourceUnit string `csv:"unit"` +} + type ExportPoint [2]float64 func (p ExportPoint) Timestamp() string { @@ -227,3 +172,13 @@ func (p ExportPoint) Value() float64 { func (p ExportPoint) Format() string { return p.Timestamp() + " " + strconv.FormatFloat(p.Value(), 'f', -1, 64) } + +func (p ExportPoint) TransformToCSVPoint(metricName string, selector string, resourceUnit string) CSVPoint { + return CSVPoint{ + MetricName: metricName, + Selector: selector, + Time: p.Timestamp(), + Value: strconv.FormatFloat(p.Value(), 'f', -1, 64), + ResourceUnit: resourceUnit, + } +} diff --git a/tools/cmd/doc-gen/main.go b/tools/cmd/doc-gen/main.go index 6429ba928..3d4f2234c 100644 --- a/tools/cmd/doc-gen/main.go +++ b/tools/cmd/doc-gen/main.go @@ -125,7 +125,7 @@ func generateSwaggerJson() []byte { urlruntime.Must(devopsv1alpha2.AddToContainer(container, informerFactory.KubeSphereSharedInformerFactory(), &fakedevops.Devops{}, nil, clientsets.KubeSphere(), fakes3.NewFakeS3(), "", nil)) urlruntime.Must(devopsv1alpha3.AddToContainer(container, &fakedevops.Devops{}, clientsets.Kubernetes(), clientsets.KubeSphere(), informerFactory.KubeSphereSharedInformerFactory(), informerFactory.KubernetesSharedInformerFactory())) urlruntime.Must(iamv1alpha2.AddToContainer(container, nil, nil, group.New(informerFactory, clientsets.KubeSphere(), clientsets.Kubernetes()), nil)) - urlruntime.Must(monitoringv1alpha3.AddToContainer(container, clientsets.Kubernetes(), nil, nil, informerFactory)) + urlruntime.Must(monitoringv1alpha3.AddToContainer(container, clientsets.Kubernetes(), nil, nil, informerFactory, nil)) urlruntime.Must(openpitrixv1.AddToContainer(container, informerFactory, fake.NewSimpleClientset(), nil)) urlruntime.Must(openpitrixv2.AddToContainer(container, informerFactory, fake.NewSimpleClientset(), nil)) urlruntime.Must(operationsv1alpha2.AddToContainer(container, clientsets.Kubernetes()))