未验证 提交 2dfa2489 编写于 作者: K KubeSphere CI Bot 提交者: GitHub

Merge pull request #3441 from yunkunrao/master

Update metering csv export format and intergrate metering module.
......@@ -219,8 +219,8 @@ func (s *APIServer) installKubeSphereAPIs() {
urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory))
urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.RuntimeCache))
urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere()))
urlruntime.Must(meteringv1alpha1.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.RuntimeCache))
urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
urlruntime.Must(openpitrixv2alpha1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))
urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
......
......@@ -35,6 +35,7 @@ import (
"kubesphere.io/kubesphere/pkg/simple/client/kubeedge"
"kubesphere.io/kubesphere/pkg/simple/client/ldap"
"kubesphere.io/kubesphere/pkg/simple/client/logging"
"kubesphere.io/kubesphere/pkg/simple/client/metering"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring/prometheus"
"kubesphere.io/kubesphere/pkg/simple/client/multicluster"
"kubesphere.io/kubesphere/pkg/simple/client/network"
......@@ -101,6 +102,7 @@ type Config struct {
AlertingOptions *alerting.Options `json:"alerting,omitempty" yaml:"alerting,omitempty" mapstructure:"alerting"`
NotificationOptions *notification.Options `json:"notification,omitempty" yaml:"notification,omitempty" mapstructure:"notification"`
KubeEdgeOptions *kubeedge.Options `json:"kubeedge,omitempty" yaml:"kubeedge,omitempty" mapstructure:"kubeedge"`
MeteringOptions *metering.Options `json:"metering,omitempty" yaml:"metering,omitempty" mapstructure:"metering"`
}
// newConfig creates a default non-empty Config
......@@ -125,6 +127,7 @@ func New() *Config {
EventsOptions: events.NewEventsOptions(),
AuditingOptions: auditing.NewAuditingOptions(),
KubeEdgeOptions: kubeedge.NewKubeEdgeOptions(),
MeteringOptions: metering.NewMeteringOptions(),
}
}
......@@ -287,4 +290,8 @@ func (conf *Config) stripEmptyOptions() {
if conf.KubeEdgeOptions != nil && conf.KubeEdgeOptions.Endpoint == "" {
conf.KubeEdgeOptions = nil
}
if conf.MeteringOptions != nil && !conf.MeteringOptions.Enable {
conf.MeteringOptions = nil
}
}
......@@ -39,6 +39,7 @@ import (
"kubesphere.io/kubesphere/pkg/simple/client/kubeedge"
"kubesphere.io/kubesphere/pkg/simple/client/ldap"
"kubesphere.io/kubesphere/pkg/simple/client/logging"
"kubesphere.io/kubesphere/pkg/simple/client/metering"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring/prometheus"
"kubesphere.io/kubesphere/pkg/simple/client/multicluster"
"kubesphere.io/kubesphere/pkg/simple/client/network"
......@@ -170,6 +171,9 @@ func newTestConfig() (*Config, error) {
KubeEdgeOptions: &kubeedge.Options{
Endpoint: "http://edge-watcher.kubeedge.svc/api/",
},
MeteringOptions: &metering.Options{
Enable: false,
},
}
return conf, nil
}
......@@ -185,6 +189,13 @@ func saveTestConfig(t *testing.T, conf *Config) {
}
}
func testMeteringConfig(t *testing.T, conf *Config) {
conf.ToMap()
if conf.MeteringOptions != nil {
t.Fatalf("setting metering options failed")
}
}
func cleanTestConfig(t *testing.T) {
file := fmt.Sprintf("%s.yaml", defaultConfigurationName)
if _, err := os.Stat(file); os.IsNotExist(err) {
......@@ -214,4 +225,7 @@ func TestGet(t *testing.T) {
if diff := cmp.Diff(conf, conf2); diff != "" {
t.Fatal(diff)
}
testMeteringConfig(t, conf)
}
......@@ -22,6 +22,7 @@ import (
"github.com/emicklei/go-restful"
"k8s.io/client-go/kubernetes"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"kubesphere.io/kubesphere/pkg/informers"
monitorhle "kubesphere.io/kubesphere/pkg/kapis/monitoring/v1alpha3"
resourcev1alpha3 "kubesphere.io/kubesphere/pkg/models/resources/v1alpha3/resource"
......@@ -40,6 +41,6 @@ type meterHandler interface {
HandlePVCMetersQuery(req *restful.Request, resp *restful.Response)
}
func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, resourceGetter *resourcev1alpha3.ResourceGetter) meterHandler {
return monitorhle.NewHandler(k, m, nil, f, resourceGetter)
func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) meterHandler {
return monitorhle.NewHandler(k, m, nil, f, ksClient, resourceGetter)
}
......@@ -20,6 +20,8 @@ package v1alpha1
import (
"net/http"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"github.com/emicklei/go-restful"
restfulspec "github.com/emicklei/go-restful-openapi"
"k8s.io/apimachinery/pkg/runtime/schema"
......@@ -42,10 +44,10 @@ const (
var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha1"}
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteringClient monitoring.Interface, factory informers.InformerFactory, cache cache.Cache) error {
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, meteringClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface, cache cache.Cache) error {
ws := runtime.NewWebService(GroupVersion)
h := newHandler(k8sClient, meteringClient, factory, resourcev1alpha3.NewResourceGetter(factory, cache))
h := newHandler(k8sClient, meteringClient, factory, ksClient, resourcev1alpha3.NewResourceGetter(factory, cache))
ws.Route(ws.GET("/cluster").
To(h.HandleClusterMetersQuery).
......
......@@ -23,6 +23,9 @@ import (
"regexp"
"strings"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"kubesphere.io/kubesphere/pkg/models/openpitrix"
"github.com/emicklei/go-restful"
"k8s.io/client-go/kubernetes"
......@@ -34,12 +37,17 @@ import (
)
type handler struct {
k kubernetes.Interface
mo model.MonitoringOperator
k kubernetes.Interface
mo model.MonitoringOperator
opRelease openpitrix.ReleaseInterface
}
func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, resourceGetter *resourcev1alpha3.ResourceGetter) *handler {
return &handler{k, model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, resourceGetter)}
func NewHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, ksClient versioned.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) *handler {
return &handler{
k: k,
mo: model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, resourceGetter),
opRelease: nil,
}
}
func (h handler) handleKubeSphereMetricsQuery(req *restful.Request, resp *restful.Response) {
......
......@@ -25,13 +25,14 @@ import (
"strings"
"time"
"kubesphere.io/kubesphere/pkg/api"
"github.com/jszwec/csvutil"
"github.com/emicklei/go-restful"
"github.com/pkg/errors"
corev1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"kubesphere.io/kubesphere/pkg/api"
model "kubesphere.io/kubesphere/pkg/models/monitoring"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
......@@ -217,9 +218,14 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
return q, errors.New(fmt.Sprintf(ErrParameterNotfound, "namespace"))
}
application := []string{}
if len(r.applications) != 0 {
application = strings.Split(r.applications, "|")
}
q.option = monitoring.ApplicationsOption{
NamespaceName: r.namespaceName,
Applications: strings.Split(r.applications, "|"),
Applications: application,
StorageClassName: r.storageClassName, // metering pvc
}
q.namedMetrics = model.ApplicationMetrics
......@@ -247,10 +253,17 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
case monitoring.LevelService:
q.identifier = model.IdentifierService
svcs := []string{}
if len(r.services) != 0 {
svcs = strings.Split(r.services, "|")
}
q.option = monitoring.ServicesOption{
NamespaceName: r.namespaceName,
Services: strings.Split(r.services, "|"),
Services: svcs,
}
q.namedMetrics = model.ServiceMetrics
case monitoring.LevelContainer:
......@@ -379,9 +392,8 @@ func (h handler) makeQueryOptions(r reqParams, lvl monitoring.Level) (q queryOpt
return q, nil
}
func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
resp.Header().Set(restful.HEADER_ContentType, "text/plain")
resp.Header().Set("Content-Disposition", "attachment")
func exportMetrics(metrics model.Metrics) (*bytes.Buffer, error) {
var resBytes []byte
for i, _ := range metrics.Results {
ret := metrics.Results[i]
......@@ -390,14 +402,72 @@ func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
}
}
resBytes, err := csvutil.Marshal(metrics.Results)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
for _, metric := range metrics.Results {
metricName := metric.MetricName
var csvpoints []monitoring.CSVPoint
for _, metricVal := range metric.MetricValues {
var targetList []string
for k, v := range metricVal.Metadata {
targetList = append(targetList, fmt.Sprintf("%s=%s", k, v))
}
selector := strings.Join(targetList, "|")
var startTime, endTime string
if len(metricVal.ExportedSeries) > 0 {
startTime = metricVal.ExportedSeries[0].Timestamp()
endTime = metricVal.ExportedSeries[len(metricVal.ExportedSeries)-1].Timestamp()
}
statsTab := "\nmetric_name,selector,start_time,end_time,min,max,avg,sum,fee, currency_unit\n" +
fmt.Sprintf("%s,%s,%s,%s,%.2f,%.2f,%.2f,%.2f,%.2f,%s\n\n",
metricName,
selector,
startTime,
endTime,
metricVal.MinValue,
metricVal.MaxValue,
metricVal.AvgValue,
metricVal.SumValue,
metricVal.Fee,
metricVal.CurrencyUnit)
csvpoints = nil
resourceUnit := metricVal.ResourceUnit
for _, p := range metricVal.ExportedSeries {
csvpoints = append(csvpoints, p.TransformToCSVPoint(metricName, selector, resourceUnit))
}
dataTab, err := csvutil.Marshal(csvpoints)
if err != nil {
return nil, err
}
resBytes = append(resBytes, statsTab...)
resBytes = append(resBytes, dataTab...)
}
}
if len(resBytes) == 0 {
resBytes = []byte("no data")
}
output := new(bytes.Buffer)
_, err = output.Write(resBytes)
_, err := output.Write(resBytes)
if err != nil {
return nil, err
}
return output, nil
}
func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
resp.Header().Set(restful.HEADER_ContentType, "text/plain")
resp.Header().Set("Content-Disposition", "attachment")
output, err := exportMetrics(metrics)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
......@@ -408,5 +478,6 @@ func ExportMetrics(resp *restful.Response, metrics model.Metrics) {
api.HandleBadRequest(resp, nil, err)
return
}
return
}
......@@ -22,10 +22,14 @@ import (
"time"
"github.com/google/go-cmp/cmp"
fakesnapshot "github.com/kubernetes-csi/external-snapshotter/client/v3/clientset/versioned/fake"
fakeistio "istio.io/client-go/pkg/clientset/versioned/fake"
corev1 "k8s.io/api/core/v1"
fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
fakeks "kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
"kubesphere.io/kubesphere/pkg/informers"
model "kubesphere.io/kubesphere/pkg/models/monitoring"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
......@@ -217,13 +221,78 @@ func TestParseRequestParams(t *testing.T) {
},
expectedErr: false,
},
{
params: reqParams{
time: "1585830000",
operation: OperationQuery,
},
lvl: monitoring.LevelApplication,
expectedErr: true,
},
{
params: reqParams{
start: "1585880000",
end: "1585830000",
operation: OperationQuery,
namespaceName: "default",
applications: "app1|app2",
},
lvl: monitoring.LevelApplication,
expectedErr: true,
},
{
params: reqParams{
start: "1585880000",
end: "1585830000",
operation: OperationQuery,
namespaceName: "default",
},
lvl: monitoring.LevelApplication,
expectedErr: true,
},
{
params: reqParams{
target: "meter_service_cpu_usage",
time: "1585880000",
operation: OperationQuery,
namespaceName: "default",
},
lvl: monitoring.LevelService,
expectedErr: true,
},
{
params: reqParams{
target: "meter_service_cpu_usage",
time: "1585880000",
operation: OperationQuery,
namespaceName: "default",
services: "svc1|svc2",
},
lvl: monitoring.LevelService,
expectedErr: true,
},
{
params: reqParams{
namespaceName: "default",
},
lvl: monitoring.LevelOpenpitrix,
expectedErr: true,
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
client := fake.NewSimpleClientset(&tt.namespace)
fakeInformerFactory := informers.NewInformerFactories(client, nil, nil, nil, nil, nil)
handler := NewHandler(client, nil, nil, fakeInformerFactory, nil)
ksClient := fakeks.NewSimpleClientset()
istioClient := fakeistio.NewSimpleClientset()
snapshotClient := fakesnapshot.NewSimpleClientset()
apiextensionsClient := fakeapiextensions.NewSimpleClientset()
fakeInformerFactory := informers.NewInformerFactories(client, ksClient, istioClient, snapshotClient, apiextensionsClient, nil)
fakeInformerFactory.KubeSphereSharedInformerFactory()
handler := NewHandler(client, nil, nil, fakeInformerFactory, ksClient, nil)
result, err := handler.makeQueryOptions(tt.params, tt.lvl)
if err != nil {
if !tt.expectedErr {
......@@ -242,3 +311,71 @@ func TestParseRequestParams(t *testing.T) {
})
}
}
func TestExportMetrics(t *testing.T) {
fakeMetadata := map[string]string{
"k1": "v1",
"k2": "v2",
"k3": "v3",
}
fakeExportedSeries := []monitoring.ExportPoint{
{1616641733, 2},
{1616641800, 4},
}
tests := []struct {
metrics model.Metrics
expectedErr bool
}{
{
metrics: model.Metrics{
Results: []monitoring.Metric{
{
MetricName: "test",
MetricData: monitoring.MetricData{
MetricType: "",
MetricValues: []monitoring.MetricValue{
{
Metadata: fakeMetadata,
ExportedSeries: fakeExportedSeries,
},
},
},
},
},
},
expectedErr: false,
},
{
metrics: model.Metrics{
Results: []monitoring.Metric{
{
MetricName: "test",
MetricData: monitoring.MetricData{
MetricType: "",
MetricValues: []monitoring.MetricValue{
{
Metadata: fakeMetadata,
ExportedSeries: nil,
},
},
},
},
},
},
expectedErr: true,
},
{},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
_, err := exportMetrics(tt.metrics)
if err != nil && !tt.expectedErr {
t.Fatal("Failed to export metering metrics", err)
}
})
}
}
......@@ -32,6 +32,10 @@ func getMetricPosMap(metrics []monitoring.Metric) map[string]int {
return metricMap
}
func (h handler) getAppWorkloads(ns string, apps []string) map[string][]string {
return h.mo.GetAppWorkloads(ns, apps)
}
func (h handler) handleApplicationMetersQuery(meters []string, resp *restful.Response, q queryOptions) {
var metricMap = make(map[string]int)
var res model.Metrics
......@@ -43,13 +47,13 @@ func (h handler) handleApplicationMetersQuery(meters []string, resp *restful.Res
klog.Error("invalid application option")
return
}
componentsMap := h.mo.GetAppComponentsMap(aso.NamespaceName, aso.Applications)
appWorkloads := h.getAppWorkloads(aso.NamespaceName, aso.Applications)
for k, _ := range componentsMap {
for k, _ := range appWorkloads {
opt := monitoring.ApplicationOption{
NamespaceName: aso.NamespaceName,
Application: k,
ApplicationComponents: componentsMap[k],
ApplicationComponents: appWorkloads[k],
StorageClassName: aso.StorageClassName,
}
......
......@@ -20,6 +20,8 @@ package v1alpha3
import (
"net/http"
"kubesphere.io/kubesphere/pkg/client/clientset/versioned"
"github.com/emicklei/go-restful"
restfulspec "github.com/emicklei/go-restful-openapi"
"k8s.io/apimachinery/pkg/runtime/schema"
......@@ -39,10 +41,10 @@ const (
var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha3"}
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory) error {
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, ksClient versioned.Interface) error {
ws := runtime.NewWebService(GroupVersion)
h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, nil)
h := NewHandler(k8sClient, monitoringClient, metricsClient, factory, ksClient, nil)
ws.Route(ws.GET("/kubesphere").
To(h.handleKubeSphereMetricsQuery).
......
......@@ -55,8 +55,7 @@ type tenantHandler struct {
func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface,
evtsClient events.Client, loggingClient logging.Client, auditingclient auditing.Client,
am am.AccessManagementInterface, authorizer authorizer.Authorizer,
monitoringclient monitoringclient.Interface,
resourceGetter *resourcev1alpha3.ResourceGetter) *tenantHandler {
monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) *tenantHandler {
return &tenantHandler{
tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourceGetter),
......
......@@ -64,25 +64,26 @@ func (h *tenantHandler) QueryMeteringsHierarchy(req *restful.Request, resp *rest
func (h *tenantHandler) HandlePriceInfoQuery(req *restful.Request, resp *restful.Response) {
var priceInfoResponse metering.PriceInfo
priceInfoResponse.Init()
var priceResponse metering.PriceResponse
priceResponse.Init()
meterConfig, err := monitoring.LoadYaml()
if err != nil {
klog.Warning(err)
resp.WriteAsJson(priceInfoResponse)
resp.WriteAsJson(priceResponse)
return
}
priceInfo := meterConfig.GetPriceInfo()
priceInfoResponse.Currency = priceInfo.CurrencyUnit
priceInfoResponse.CpuPerCorePerHour = priceInfo.CpuPerCorePerHour
priceInfoResponse.MemPerGigabytesPerHour = priceInfo.MemPerGigabytesPerHour
priceInfoResponse.IngressNetworkTrafficPerGiagabytesPerHour = priceInfo.IngressNetworkTrafficPerGiagabytesPerHour
priceInfoResponse.EgressNetworkTrafficPerGiagabytesPerHour = priceInfo.EgressNetworkTrafficPerGigabytesPerHour
priceInfoResponse.PvcPerGigabytesPerHour = priceInfo.PvcPerGigabytesPerHour
resp.WriteAsJson(priceInfoResponse)
priceResponse.RetentionDay = meterConfig.RetentionDay
priceResponse.Currency = priceInfo.CurrencyUnit
priceResponse.CpuPerCorePerHour = priceInfo.CpuPerCorePerHour
priceResponse.MemPerGigabytesPerHour = priceInfo.MemPerGigabytesPerHour
priceResponse.IngressNetworkTrafficPerMegabytesPerHour = priceInfo.IngressNetworkTrafficPerMegabytesPerHour
priceResponse.EgressNetworkTrafficPerMegabytesPerHour = priceInfo.EgressNetworkTrafficPerMegabytesPerHour
priceResponse.PvcPerGigabytesPerHour = priceInfo.PvcPerGigabytesPerHour
resp.WriteAsJson(priceResponse)
return
}
......@@ -334,6 +334,7 @@ func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8s
Param(ws.PathParameter("namespace", "Namespace name.").DataType("string").Required(false)).
Param(ws.QueryParameter("metrics_filter", "The metric name filter consists of a regexp pattern. It specifies which metric data to return. For example, the following filter matches both workspace CPU usage and memory usage: `meter_pod_cpu_usage|meter_pod_memory_usage_wo_cache`.").DataType("string").Required(false)).
Param(ws.QueryParameter("time", "A timestamp in Unix time format. Retrieve metric data at a single point in time. Defaults to now. Time and the combination of start, end, step are mutually exclusive.").DataType("string").Required(false)).
Param(ws.QueryParameter("cluster", "Cluster name").DataType("string").Required(false)).
Doc("get current metering hierarchies info in last one hour").
Writes(metering.ResourceStatistic{}).
Returns(http.StatusOK, api.StatusOK, metering.ResourceStatistic{}))
......
package metering
type PriceInfo struct {
Currency string `json:"currency" description:"currency"`
CpuPerCorePerHour float64 `json:"cpu_per_core_per_hour,omitempty" description:"cpu price"`
MemPerGigabytesPerHour float64 `json:"mem_per_gigabytes_per_hour,omitempty" description:"mem price"`
IngressNetworkTrafficPerGiagabytesPerHour float64 `json:"ingress_network_traffic_per_giagabytes_per_hour,omitempty" description:"ingress price"`
EgressNetworkTrafficPerGiagabytesPerHour float64 `json:"egress_network_traffic_per_gigabytes_per_hour,omitempty" description:"egress price"`
PvcPerGigabytesPerHour float64 `json:"pvc_per_gigabytes_per_hour,omitempty" description:"pvc price"`
Currency string `json:"currency" description:"currency"`
CpuPerCorePerHour float64 `json:"cpu_per_core_per_hour,omitempty" description:"cpu price"`
MemPerGigabytesPerHour float64 `json:"mem_per_gigabytes_per_hour,omitempty" description:"mem price"`
IngressNetworkTrafficPerMegabytesPerHour float64 `json:"ingress_network_traffic_per_megabytes_per_hour,omitempty" description:"ingress price"`
EgressNetworkTrafficPerMegabytesPerHour float64 `json:"egress_network_traffic_per_megabytes_per_hour,omitempty" description:"egress price"`
PvcPerGigabytesPerHour float64 `json:"pvc_per_gigabytes_per_hour,omitempty" description:"pvc price"`
}
type PriceResponse struct {
RetentionDay string `json:"retention_day"`
PriceInfo `json:",inline"`
}
// currently init method fill illegal value to hint that metering config file was not mounted yet
......@@ -14,8 +19,8 @@ func (p *PriceInfo) Init() {
p.Currency = ""
p.CpuPerCorePerHour = -1
p.MemPerGigabytesPerHour = -1
p.IngressNetworkTrafficPerGiagabytesPerHour = -1
p.EgressNetworkTrafficPerGiagabytesPerHour = -1
p.IngressNetworkTrafficPerMegabytesPerHour = -1
p.EgressNetworkTrafficPerMegabytesPerHour = -1
p.PvcPerGigabytesPerHour = -1
}
......@@ -47,46 +52,105 @@ func (ps *PodsStats) Set(podName, meterName string, value float64) {
}
}
type OpenPitrixStatistic struct {
AppStatistic
}
type AppStatistic struct {
CPUUsage float64 `json:"cpu_usage" description:"cpu_usage"`
MemoryUsageWoCache float64 `json:"memory_usage_wo_cache" description:"memory_usage_wo_cache"`
NetBytesTransmitted float64 `json:"net_bytes_transmitted" description:"net_bytes_transmitted"`
NetBytesReceived float64 `json:"net_bytes_received" description:"net_bytes_received"`
PVCBytesTotal float64 `json:"pvc_bytes_total" description:"pvc_bytes_total"`
Services map[string]*ServiceStatistic `json:"services" description:"services"`
CPUUsage float64 `json:"cpu_usage" description:"cpu_usage"`
MemoryUsageWoCache float64 `json:"memory_usage_wo_cache" description:"memory_usage_wo_cache"`
NetBytesTransmitted float64 `json:"net_bytes_transmitted" description:"net_bytes_transmitted"`
NetBytesReceived float64 `json:"net_bytes_received" description:"net_bytes_received"`
PVCBytesTotal float64 `json:"pvc_bytes_total" description:"pvc_bytes_total"`
Deploys map[string]*DeploymentStatistic `json:"deployments" description:"deployment statistic"`
Statefulsets map[string]*StatefulsetStatistic `json:"statefulsets" description:"statefulset statistic"`
Daemonsets map[string]*DaemonsetStatistic `json:"daemonsets" description:"daemonsets statistics"`
}
func (as *AppStatistic) GetServiceStats(name string) *ServiceStatistic {
if as.Services == nil {
as.Services = make(map[string]*ServiceStatistic)
func (as *AppStatistic) GetDeployStats(name string) *DeploymentStatistic {
if as.Deploys == nil {
as.Deploys = make(map[string]*DeploymentStatistic)
}
if as.Services[name] == nil {
as.Services[name] = &ServiceStatistic{}
if as.Deploys[name] == nil {
as.Deploys[name] = &DeploymentStatistic{}
}
return as.Services[name]
return as.Deploys[name]
}
func (as *AppStatistic) GetDaemonStats(name string) *DaemonsetStatistic {
if as.Daemonsets == nil {
as.Daemonsets = make(map[string]*DaemonsetStatistic)
}
if as.Daemonsets[name] == nil {
as.Daemonsets[name] = &DaemonsetStatistic{}
}
return as.Daemonsets[name]
}
func (as *AppStatistic) GetStatefulsetStats(name string) *StatefulsetStatistic {
if as.Statefulsets == nil {
as.Statefulsets = make(map[string]*StatefulsetStatistic)
}
if as.Statefulsets[name] == nil {
as.Statefulsets[name] = &StatefulsetStatistic{}
}
return as.Statefulsets[name]
}
func (as *AppStatistic) Aggregate() {
if as.Services == nil {
if as.Deploys == nil && as.Statefulsets == nil && as.Daemonsets == nil {
return
}
// remove duplicate pods which were selected by different svc
podsMap := make(map[string]struct{})
for _, svcObj := range as.Services {
for podName, podObj := range svcObj.Pods {
if _, ok := podsMap[podName]; ok {
continue
} else {
podsMap[podName] = struct{}{}
}
as.CPUUsage += podObj.CPUUsage
as.MemoryUsageWoCache += podObj.MemoryUsageWoCache
as.NetBytesTransmitted += podObj.NetBytesTransmitted
as.NetBytesReceived += podObj.NetBytesReceived
as.PVCBytesTotal += podObj.PVCBytesTotal
// aggregate deployment stats
for _, deployObj := range as.Deploys {
for _, podObj := range deployObj.Pods {
deployObj.CPUUsage += podObj.CPUUsage
deployObj.MemoryUsageWoCache += podObj.MemoryUsageWoCache
deployObj.NetBytesTransmitted += podObj.NetBytesTransmitted
deployObj.NetBytesReceived += podObj.NetBytesReceived
deployObj.PVCBytesTotal += podObj.PVCBytesTotal
}
as.CPUUsage += deployObj.CPUUsage
as.MemoryUsageWoCache += deployObj.MemoryUsageWoCache
as.NetBytesTransmitted += deployObj.NetBytesTransmitted
as.NetBytesReceived += deployObj.NetBytesReceived
as.PVCBytesTotal += deployObj.PVCBytesTotal
}
// aggregate statfulset stats
for _, statfulObj := range as.Statefulsets {
for _, podObj := range statfulObj.Pods {
statfulObj.CPUUsage += podObj.CPUUsage
statfulObj.MemoryUsageWoCache += podObj.MemoryUsageWoCache
statfulObj.NetBytesTransmitted += podObj.NetBytesTransmitted
statfulObj.NetBytesReceived += podObj.NetBytesReceived
statfulObj.PVCBytesTotal += podObj.PVCBytesTotal
}
as.CPUUsage += statfulObj.CPUUsage
as.MemoryUsageWoCache += statfulObj.MemoryUsageWoCache
as.NetBytesTransmitted += statfulObj.NetBytesTransmitted
as.NetBytesReceived += statfulObj.NetBytesReceived
as.PVCBytesTotal += statfulObj.PVCBytesTotal
}
// aggregate daemonset stats
for _, daemonsetObj := range as.Daemonsets {
for _, podObj := range daemonsetObj.Pods {
daemonsetObj.CPUUsage += podObj.CPUUsage
daemonsetObj.MemoryUsageWoCache += podObj.MemoryUsageWoCache
daemonsetObj.NetBytesTransmitted += podObj.NetBytesTransmitted
daemonsetObj.NetBytesReceived += podObj.NetBytesReceived
daemonsetObj.PVCBytesTotal += podObj.PVCBytesTotal
}
as.CPUUsage += daemonsetObj.CPUUsage
as.MemoryUsageWoCache += daemonsetObj.MemoryUsageWoCache
as.NetBytesTransmitted += daemonsetObj.NetBytesTransmitted
as.NetBytesReceived += daemonsetObj.NetBytesReceived
as.PVCBytesTotal += daemonsetObj.PVCBytesTotal
}
return
}
type ServiceStatistic struct {
......@@ -251,13 +315,28 @@ func (ds *DaemonsetStatistic) Aggregate() {
}
type ResourceStatistic struct {
Apps map[string]*AppStatistic `json:"apps" description:"app statistic"`
Services map[string]*ServiceStatistic `json:"services" description:"service statistic"`
// openpitrix statistic
OpenPitrixs map[string]*OpenPitrixStatistic `json:"openpitrixs" description:"openpitrix statistic"`
// app crd statistic
Apps map[string]*AppStatistic `json:"apps" description:"app statistic"`
// k8s workload only which exclude app and op
Deploys map[string]*DeploymentStatistic `json:"deployments" description:"deployment statistic"`
Statefulsets map[string]*StatefulsetStatistic `json:"statefulsets" description:"statefulset statistic"`
Daemonsets map[string]*DaemonsetStatistic `json:"daemonsets" description:"daemonsets statistics"`
}
func (rs *ResourceStatistic) GetOpenPitrixStats(name string) *OpenPitrixStatistic {
if rs.OpenPitrixs == nil {
rs.OpenPitrixs = make(map[string]*OpenPitrixStatistic)
}
if rs.OpenPitrixs[name] == nil {
rs.OpenPitrixs[name] = &OpenPitrixStatistic{}
}
return rs.OpenPitrixs[name]
}
func (rs *ResourceStatistic) GetAppStats(name string) *AppStatistic {
if rs.Apps == nil {
rs.Apps = make(map[string]*AppStatistic)
......@@ -268,16 +347,6 @@ func (rs *ResourceStatistic) GetAppStats(name string) *AppStatistic {
return rs.Apps[name]
}
func (rs *ResourceStatistic) GetServiceStats(name string) *ServiceStatistic {
if rs.Services == nil {
rs.Services = make(map[string]*ServiceStatistic)
}
if rs.Services[name] == nil {
rs.Services[name] = &ServiceStatistic{}
}
return rs.Services[name]
}
func (rs *ResourceStatistic) GetDeployStats(name string) *DeploymentStatistic {
if rs.Deploys == nil {
rs.Deploys = make(map[string]*DeploymentStatistic)
......
......@@ -59,7 +59,7 @@ type MonitoringOperator interface {
// meter
GetNamedMetersOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) (Metrics, error)
GetNamedMeters(metrics []string, time time.Time, opt monitoring.QueryOption) (Metrics, error)
GetAppComponentsMap(ns string, apps []string) map[string][]string
GetAppWorkloads(ns string, apps []string) map[string][]string
GetSerivePodsMap(ns string, services []string) map[string][]string
}
......@@ -432,10 +432,10 @@ func (mo monitoringOperator) GetNamedMetersOverTime(meters []string, start, end
}
// query time range: (start, end], so here we need to exclude start itself.
if start.Add(step).After(end) {
if start.Add(time.Hour).After(end) {
start = end
} else {
start = start.Add(step)
start = start.Add(time.Hour)
}
var opts []monitoring.QueryOption
......@@ -444,10 +444,10 @@ func (mo monitoringOperator) GetNamedMetersOverTime(meters []string, start, end
opts = append(opts, monitoring.MeterOption{
Start: start,
End: end,
Step: step,
Step: time.Hour,
})
ress := mo.prometheus.GetNamedMetersOverTime(meters, start, end, step, opts)
ress := mo.prometheus.GetNamedMetersOverTime(meters, start, end, time.Hour, opts)
sMap := generateScalingFactorMap(step)
for i, _ := range ress {
......@@ -471,7 +471,7 @@ func (mo monitoringOperator) GetNamedMeters(meters []string, time time.Time, opt
return metersPerHour, nil
}
func (mo monitoringOperator) GetAppComponentsMap(ns string, apps []string) map[string][]string {
func (mo monitoringOperator) GetAppWorkloads(ns string, apps []string) map[string][]string {
componentsMap := make(map[string][]string)
applicationList := []*appv1beta1.Application{}
......@@ -584,7 +584,7 @@ func (mo monitoringOperator) GetSerivePodsMap(ns string, services []string) map[
svcSelector := svcObj.Spec.Selector
if len(svcSelector) == 0 {
return svcPodsMap
continue
}
svcLabels := labels.Set{}
......
......@@ -71,12 +71,12 @@ var MeterResourceMap = map[string]int{
}
type PriceInfo struct {
CpuPerCorePerHour float64 `json:"cpuPerCorePerHour" yaml:"cpuPerCorePerHour"`
MemPerGigabytesPerHour float64 `json:"memPerGigabytesPerHour" yaml:"memPerGigabytesPerHour"`
IngressNetworkTrafficPerGiagabytesPerHour float64 `json:"ingressNetworkTrafficPerGiagabytesPerHour" yaml:"ingressNetworkTrafficPerGiagabytesPerHour"`
EgressNetworkTrafficPerGigabytesPerHour float64 `json:"egressNetworkTrafficPerGigabytesPerHour" yaml:"egressNetworkTrafficPerGigabytesPerHour"`
PvcPerGigabytesPerHour float64 `json:"pvcPerGigabytesPerHour" yaml:"pvcPerGigabytesPerHour"`
CurrencyUnit string `json:"currencyUnit" yaml:"currencyUnit"`
CpuPerCorePerHour float64 `json:"cpuPerCorePerHour" yaml:"cpuPerCorePerHour"`
MemPerGigabytesPerHour float64 `json:"memPerGigabytesPerHour" yaml:"memPerGigabytesPerHour"`
IngressNetworkTrafficPerMegabytesPerHour float64 `json:"ingressNetworkTrafficPerMegabytesPerHour" yaml:"ingressNetworkTrafficPerGiagabytesPerHour"`
EgressNetworkTrafficPerMegabytesPerHour float64 `json:"egressNetworkTrafficPerMegabytesPerHour" yaml:"egressNetworkTrafficPerGigabytesPerHour"`
PvcPerGigabytesPerHour float64 `json:"pvcPerGigabytesPerHour" yaml:"pvcPerGigabytesPerHour"`
CurrencyUnit string `json:"currencyUnit" yaml:"currencyUnit"`
}
type Billing struct {
......@@ -84,7 +84,8 @@ type Billing struct {
}
type MeterConfig struct {
Billing Billing `json:"billing" yaml:"billing"`
RetentionDay string `json:"retentionDay" yaml:"retentionDay"`
Billing Billing `json:"billing" yaml:"billing"`
}
func (mc MeterConfig) GetPriceInfo() PriceInfo {
......@@ -197,11 +198,11 @@ func getFeeWithMeterName(meterName string, sum float64) float64 {
case METER_RESOURCE_TYPE_NET_INGRESS:
// unit: Megabyte, precision: 1
sum = math.Round(sum / 1048576)
return priceInfo.IngressNetworkTrafficPerGiagabytesPerHour * sum
return priceInfo.IngressNetworkTrafficPerMegabytesPerHour * sum
case METER_RESOURCE_TYPE_NET_EGRESS:
// unit: Megabyte, precision:
// unit: Megabyte, precision: 1
sum = math.Round(sum / 1048576)
return priceInfo.EgressNetworkTrafficPerGigabytesPerHour * sum
return priceInfo.EgressNetworkTrafficPerMegabytesPerHour * sum
case METER_RESOURCE_TYPE_PVC:
// unit: Gigabyte, precision: 0.1
sum = math.Round(sum/1073741824*10) / 10
......@@ -217,37 +218,56 @@ func updateMetricStatData(metric monitoring.Metric, scalingMap map[string]float6
metricData := metric.MetricData
for index, metricValue := range metricData.MetricValues {
var points []monitoring.Point
// calulate min, max, avg value first, then squash points with factor
if metricData.MetricType == monitoring.MetricTypeMatrix {
points = metricValue.Series
metricData.MetricValues[index].MinValue = getMinPointValue(metricValue.Series)
metricData.MetricValues[index].MaxValue = getMaxPointValue(metricValue.Series)
metricData.MetricValues[index].AvgValue = getAvgPointValue(metricValue.Series)
} else {
points = append(points, *metricValue.Sample)
metricData.MetricValues[index].MinValue = (*metricValue.Sample)[1]
metricData.MetricValues[index].MaxValue = (*metricValue.Sample)[1]
metricData.MetricValues[index].AvgValue = (*metricValue.Sample)[1]
}
// squash points if step is more than one hour and calculate sum and fee
var factor float64 = 1
if scalingMap != nil {
factor = scalingMap[metricName]
}
metricData.MetricValues[index].Series = squashPoints(metricData.MetricValues[index].Series, int(factor))
if len(points) == 1 {
sample := points[0]
sum := sample[1] * factor
metricData.MetricValues[index].MinValue = sample[1]
metricData.MetricValues[index].MaxValue = sample[1]
metricData.MetricValues[index].AvgValue = sample[1]
if metricData.MetricType == monitoring.MetricTypeMatrix {
sum := getSumPointValue(metricData.MetricValues[index].Series)
metricData.MetricValues[index].SumValue = sum
metricData.MetricValues[index].Fee = getFeeWithMeterName(metricName, sum)
} else {
sum := getSumPointValue(points) * factor
metricData.MetricValues[index].MinValue = getMinPointValue(points)
metricData.MetricValues[index].MaxValue = getMaxPointValue(points)
metricData.MetricValues[index].AvgValue = getAvgPointValue(points)
sum := (*metricValue.Sample)[1]
metricData.MetricValues[index].SumValue = sum
metricData.MetricValues[index].Fee = getFeeWithMeterName(metricName, sum)
}
metricData.MetricValues[index].CurrencyUnit = getCurrencyUnit()
metricData.MetricValues[index].ResourceUnit = getResourceUnit(metricName)
}
return metricData
}
func squashPoints(input []monitoring.Point, factor int) (output []monitoring.Point) {
if factor <= 0 {
klog.Errorln("factor should be positive")
return nil
}
for i := 0; i < len(input); i++ {
if i%factor == 0 {
output = append([]monitoring.Point{input[len(input)-1-i]}, output...)
} else {
output[0] = output[0].Add(input[len(input)-1-i])
}
}
return output
}
......@@ -598,7 +598,7 @@ func (t *tenantOperator) processApplicationMetersQuery(meters []string, q QueryO
klog.Error(err.Error())
return
}
componentsMap := t.mo.GetAppComponentsMap(aso.NamespaceName, aso.Applications)
componentsMap := t.mo.GetAppWorkloads(aso.NamespaceName, aso.Applications)
for k, _ := range componentsMap {
opt := monitoring.ApplicationOption{
......@@ -698,90 +698,24 @@ func (t *tenantOperator) transformMetricData(metrics monitoringmodel.Metrics) me
return podsStats
}
func (t *tenantOperator) classifyPodStats(user user.Info, ns string, podsStats metering.PodsStats) (resourceStats metering.ResourceStatistic, err error) {
if err = t.updateServicesStats(user, ns, podsStats, &resourceStats); err != nil {
func (t *tenantOperator) classifyPodStats(user user.Info, cluster, ns string, podsStats metering.PodsStats) (resourceStats metering.ResourceStatistic, err error) {
// classify pod stats into following 3 levels under spedified namespace and user info
// 1. project -> workload(deploy, sts, ds) -> pod
// 2. project -> app -> workload(deploy, sts, ds) -> pod
// 3. project -> op -> workload(deploy, sts, ds) -> pod
if err = t.updateDeploysStats(user, cluster, ns, podsStats, &resourceStats); err != nil {
return
}
if err = t.updateDeploysStats(user, ns, podsStats, &resourceStats); err != nil {
if err = t.updateDaemonsetsStats(user, cluster, ns, podsStats, &resourceStats); err != nil {
return
}
if err = t.updateDaemonsetsStats(user, ns, podsStats, &resourceStats); err != nil {
return
}
if err = t.updateStatefulsetsStats(user, ns, podsStats, &resourceStats); err != nil {
if err = t.updateStatefulsetsStats(user, cluster, ns, podsStats, &resourceStats); err != nil {
return
}
return
}
func (t *tenantOperator) updateServicesStats(user user.Info, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error {
svcList, err := t.listServices(user, ns)
if err != nil {
return err
}
for _, svc := range svcList.Items {
if svc.Annotations[constants.ApplicationReleaseName] != "" &&
svc.Annotations[constants.ApplicationReleaseNS] != "" &&
t.isOpNamespace(ns) {
// for op svc
// currently we do NOT include op svc
continue
} else {
appName, nameOK := svc.Labels[constants.ApplicationName]
appVersion, versionOK := svc.Labels[constants.ApplicationVersion]
svcPodsMap := t.mo.GetSerivePodsMap(ns, []string{svc.Name})
pods := svcPodsMap[svc.Name]
if nameOK && versionOK {
// for app crd svc
for _, pod := range pods {
podStat := podsStats[pod]
if podStat == nil {
klog.Warningf("%v not found", pod)
continue
}
appFullName := appName + ":" + appVersion
if err := resourceStats.GetAppStats(appFullName).GetServiceStats(svc.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
}
} else {
// for k8s svc
for _, pod := range pods {
if err := resourceStats.GetServiceStats(svc.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
}
}
}
}
// aggregate svc data
for _, app := range resourceStats.Apps {
for _, svc := range app.Services {
svc.Aggregate()
}
app.Aggregate()
}
for _, svc := range resourceStats.Services {
svc.Aggregate()
}
return nil
}
func (t *tenantOperator) listServices(user user.Info, ns string) (*corev1.ServiceList, error) {
svcScope := request.NamespaceScope
......@@ -817,7 +751,11 @@ func (t *tenantOperator) listServices(user user.Info, ns string) (*corev1.Servic
return svcs, nil
}
func (t *tenantOperator) updateDeploysStats(user user.Info, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error {
// updateDeploysStats will update deployment field in resource stats struct with pod stats data and deployments will be classified into 3 classes:
// 1. openpitrix deployments
// 2. app deployments
// 3. k8s deploymnets
func (t *tenantOperator) updateDeploysStats(user user.Info, cluster, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error {
deployList, err := t.listDeploys(user, ns)
if err != nil {
return err
......@@ -825,44 +763,63 @@ func (t *tenantOperator) updateDeploysStats(user user.Info, ns string, podsStats
for _, deploy := range deployList.Items {
if deploy.Annotations[constants.ApplicationReleaseName] != "" &&
deploy.Annotations[constants.ApplicationReleaseNS] != "" &&
t.isOpNamespace(ns) {
// for op deploy
// currently we do NOT include op deploy
pods, err := t.listPods(user, ns, deploy.Spec.Selector)
if err != nil {
klog.Error(err)
return err
}
if ok, _ := t.isOpenPitrixComponent(cluster, ns, "deployment", deploy.Name); ok {
// TODO: for op deployment
continue
} else {
_, appNameOK := deploy.Labels[constants.ApplicationName]
_, appVersionOK := deploy.Labels[constants.ApplicationVersion]
} else if ok, appName := t.isAppComponent(ns, "deployment", deploy.Name); ok {
// for app deployment
for _, pod := range pods {
podsStat := podsStats[pod]
if podsStat == nil {
klog.Warningf("%v not found", pod)
continue
}
pods, err := t.listPods(user, ns, deploy.Spec.Selector)
if err != nil {
klog.Error(err)
return err
if err := resourceStats.GetAppStats(appName).GetDeployStats(deploy.Name).SetPodStats(pod, podsStat); err != nil {
klog.Error(err)
return err
}
}
if appNameOK && appVersionOK {
// for app crd svc
continue
} else {
// for k8s svc
for _, pod := range pods {
if err := resourceStats.GetDeployStats(deploy.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
} else {
// for k8s deployment only
for _, pod := range pods {
if err := resourceStats.GetDeployStats(deploy.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
}
}
}
// TODO: op aggregate for deployment components
for _, op := range resourceStats.OpenPitrixs {
op.Aggregate()
}
// app aggregate for deployment components
for _, app := range resourceStats.Apps {
app.Aggregate()
}
// k8s aggregate for deployment components
for _, deploy := range resourceStats.Deploys {
deploy.Aggregate()
}
return nil
}
func (t *tenantOperator) updateDaemonsetsStats(user user.Info, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error {
// updateDaemonsetsStats will update daemonsets field in resource stats struct with pod stats data and daemonsets will be classified into 3 classes:
// 1. openpitrix daemonsets
// 2. app daemonsets
// 3. k8s daemonsets
func (t *tenantOperator) updateDaemonsetsStats(user user.Info, cluster, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error {
daemonsetList, err := t.listDaemonsets(user, ns)
if err != nil {
return err
......@@ -870,59 +827,81 @@ func (t *tenantOperator) updateDaemonsetsStats(user user.Info, ns string, podsSt
for _, daemonset := range daemonsetList.Items {
if daemonset.Annotations["meta.helm.sh/release-name"] != "" &&
daemonset.Annotations["meta.helm.sh/release-namespace"] != "" &&
t.isOpNamespace(ns) {
// for op deploy
// currently we do NOT include op deploy
continue
} else {
appName := daemonset.Labels[constants.ApplicationName]
appVersion := daemonset.Labels[constants.ApplicationVersion]
pods, err := t.listPods(user, ns, daemonset.Spec.Selector)
if err != nil {
klog.Error(err)
return err
}
pods, err := t.listPods(user, ns, daemonset.Spec.Selector)
if err != nil {
klog.Error(err)
return err
if ok, _ := t.isOpenPitrixComponent(cluster, ns, "daemonset", daemonset.Name); ok {
// TODO: for op daemonset
continue
} else if ok, appName := t.isAppComponent(ns, "daemonset", daemonset.Name); ok {
// for app daemonset
for _, pod := range pods {
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// app field(create if not existed) -> statefulsets field(create if not existed) -> pod
if err := resourceStats.GetAppStats(appName).GetDaemonStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
}
if appName != "" && appVersion != "" {
// for app crd svc
continue
} else {
// for k8s svc
for _, pod := range pods {
if err := resourceStats.GetDaemonsetStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
} else {
// for k8s daemonset
for _, pod := range pods {
if err := resourceStats.GetDaemonsetStats(daemonset.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
}
}
}
// here pod stats and level struct are ready
// TODO: op aggregate for daemonset components
for _, op := range resourceStats.OpenPitrixs {
op.Aggregate()
}
// app aggregate for daemonset components
for _, app := range resourceStats.Apps {
app.Aggregate()
}
// k8s aggregate for daemonset components
for _, daemonset := range resourceStats.Daemonsets {
daemonset.Aggregate()
}
return nil
}
func (t *tenantOperator) isOpNamespace(ns string) bool {
// TODO: include op metering part
func (t *tenantOperator) isOpenPitrixComponent(cluster, ns, kind, componentName string) (bool, string) {
return false, ""
}
nsObj, err := t.k8sclient.CoreV1().Namespaces().Get(context.Background(), ns, metav1.GetOptions{})
if err != nil {
return false
}
func (t *tenantOperator) isAppComponent(ns, kind, componentName string) (bool, string) {
ws := nsObj.Labels[constants.WorkspaceLabelKey]
appWorkloads := t.mo.GetAppWorkloads(ns, nil)
if len(ws) != 0 && ws != "system-workspace" {
return true
for appName, cList := range appWorkloads {
for _, component := range cList {
if component == fmt.Sprintf("%s:%s", strings.Title(kind), componentName) {
return true, appName
}
}
}
return false
return false, ""
}
func (t *tenantOperator) updateStatefulsetsStats(user user.Info, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error {
// updateStatefulsetsStats will update statefulsets field in resource stats struct with pod stats data and statefulsets will be classified into 3 classes:
// 1. openpitrix statefulsets
// 2. app statefulsets
// 3. k8s statefulsets
func (t *tenantOperator) updateStatefulsetsStats(user user.Info, cluster, ns string, podsStats metering.PodsStats, resourceStats *metering.ResourceStatistic) error {
statefulsetsList, err := t.listStatefulsets(user, ns)
if err != nil {
return err
......@@ -930,37 +909,51 @@ func (t *tenantOperator) updateStatefulsetsStats(user user.Info, ns string, pods
for _, statefulset := range statefulsetsList.Items {
if statefulset.Annotations[constants.ApplicationReleaseName] != "" &&
statefulset.Annotations[constants.ApplicationReleaseNS] != "" &&
t.isOpNamespace(ns) {
// for op deploy
// currently we do NOT include op deploy
continue
} else {
appName := statefulset.Labels[constants.ApplicationName]
appVersion := statefulset.Labels[constants.ApplicationVersion]
// query pod list under the statefulset within the namespace
pods, err := t.listPods(user, ns, statefulset.Spec.Selector)
if err != nil {
klog.Error(err)
return err
}
pods, err := t.listPods(user, ns, statefulset.Spec.Selector)
if err != nil {
klog.Error(err)
return err
if ok, _ := t.isOpenPitrixComponent(cluster, ns, "statefulset", statefulset.Name); ok {
// TODO: for op statefulset
continue
} else if ok, appName := t.isAppComponent(ns, "daemonset", statefulset.Name); ok {
// for app statefulset
for _, pod := range pods {
// aggregate order is from bottom(pods) to top(app), we should create outer field if not exists
// and then set pod stats data, the direction is as follows:
// app field(create if not existed) -> statefulsets field(create if not existed) -> pod
if err := resourceStats.GetAppStats(appName).GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
}
if appName != "" && appVersion != "" {
// for app crd svc
continue
} else {
// for k8s svc
for _, pod := range pods {
if err := resourceStats.GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
} else {
// for k8s statefulset
for _, pod := range pods {
// same as above, the direction is similar:
// k8s field(create if not existed) -> statefulsets field(create if not existed) -> pod
if err := resourceStats.GetStatefulsetStats(statefulset.Name).SetPodStats(pod, podsStats[pod]); err != nil {
klog.Error(err)
return err
}
}
}
}
// TODO: op aggregate for statefulset components
for _, op := range resourceStats.OpenPitrixs {
op.Aggregate()
}
// app aggregate for statefulset components
for _, app := range resourceStats.Apps {
app.Aggregate()
}
// k8s aggregate for statefulset components
for _, statefulset := range resourceStats.Statefulsets {
statefulset.Aggregate()
}
......@@ -1048,6 +1041,16 @@ func (t *tenantOperator) listDeploys(user user.Info, ns string) (*appv1.Deployme
return deploys, nil
}
func (t *tenantOperator) getAppNameFromLabels(labels map[string]string) string {
appName := labels[constants.ApplicationName]
appVersion := labels[constants.ApplicationVersion]
if appName == "" || appVersion == "" {
return ""
}
return fmt.Sprintf("%s:%s", appName, appVersion)
}
func (t *tenantOperator) listDaemonsets(user user.Info, ns string) (*appv1.DaemonSetList, error) {
dsScope := request.NamespaceScope
......
......@@ -24,6 +24,8 @@ import (
"strings"
"time"
"kubesphere.io/kubesphere/pkg/models/openpitrix"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
......@@ -104,9 +106,15 @@ type tenantOperator struct {
lo logging.LoggingOperator
auditing auditing.Interface
mo monitoring.MonitoringOperator
opRelease openpitrix.ReleaseInterface
}
func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface, evtsClient eventsclient.Client, loggingClient loggingclient.Client, auditingclient auditingclient.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer, monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter) Interface {
var openpitrixRelease openpitrix.ReleaseInterface
if ksclient != nil {
openpitrixRelease = openpitrix.NewOpenpitrixOperator(informers, ksclient, nil)
}
return &tenantOperator{
am: am,
authorizer: authorizer,
......@@ -117,6 +125,7 @@ func New(informers informers.InformerFactory, k8sclient kubernetes.Interface, ks
lo: logging.NewLoggingOperator(loggingClient),
auditing: auditing.NewEventsOperator(auditingclient),
mo: monitoring.NewMonitoringOperator(monitoringclient, nil, k8sclient, informers, resourceGetter),
opRelease: openpitrixRelease,
}
}
......@@ -989,7 +998,7 @@ func (t *tenantOperator) MeteringHierarchy(user user.Info, queryParam *meteringv
podsStats := t.transformMetricData(res)
// classify pods stats
resourceStats, err := t.classifyPodStats(user, queryParam.NamespaceName, podsStats)
resourceStats, err := t.classifyPodStats(user, "", queryParam.NamespaceName, podsStats)
if err != nil {
klog.Error(err)
return metering.ResourceStatistic{}, err
......
......@@ -7,8 +7,8 @@ import (
"net/http"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/api"
)
import "github.com/prometheus/client_golang/api"
const (
apiPrefix = "/api/v1"
......
package metering
type Options struct {
Enable bool `json:"enable" yaml:"enable"`
}
func NewMeteringOptions() *Options {
return &Options{}
}
......@@ -30,6 +30,7 @@ const (
LevelWorkspace
LevelNamespace
LevelApplication
LevelOpenpitrix
LevelWorkload
LevelService
LevelPod
......@@ -150,6 +151,7 @@ func (aso ApplicationsOption) Apply(o *QueryOptions) {
return
}
// ApplicationsOption & OpenpitrixsOption share the same ApplicationOption struct
type ApplicationOption struct {
NamespaceName string
Application string
......
......@@ -20,11 +20,9 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/jszwec/csvutil"
)
const (
......@@ -46,41 +44,11 @@ type Metric struct {
type MetricValues []MetricValue
func (m MetricValues) MarshalCSV() ([]byte, error) {
var ret []string
for _, v := range m {
tmp, err := v.MarshalCSV()
if err != nil {
return nil, err
}
ret = append(ret, string(tmp))
}
return []byte(strings.Join(ret, "||")), nil
}
type MetricData struct {
MetricType string `json:"resultType,omitempty" description:"result type, one of matrix, vector" csv:"metric_type"`
MetricValues `json:"result,omitempty" description:"metric data including labels, time series and values" csv:"metric_values"`
}
func (m MetricData) MarshalCSV() ([]byte, error) {
var ret []byte
for _, v := range m.MetricValues {
tmp, err := csvutil.Marshal(&v)
if err != nil {
return nil, err
}
ret = append(ret, tmp...)
}
return ret, nil
}
// The first element is the timestamp, the second is the metric value.
// eg, [1585658599.195, 0.528]
type Point [2]float64
......@@ -104,41 +72,6 @@ type MetricValue struct {
CurrencyUnit string `json:"currency_unit"`
}
func (mv MetricValue) MarshalCSV() ([]byte, error) {
// metric value format:
// target,stats value(include fees),exported_value,exported_values
// for example:
// {workspace:demo-ws},,2021-02-23 01:00:00 AM 0|2021-02-23 02:00:00 AM 0|...
var metricValueCSVTemplate = "{%s},unit:%s|min:%.3f|max:%.3f|avg:%.3f|sum:%.3f|fee:%.2f %s,%s,%s"
var targetList []string
for k, v := range mv.Metadata {
targetList = append(targetList, fmt.Sprintf("%s=%s", k, v))
}
exportedSampleStr := ""
if mv.ExportSample != nil {
exportedSampleStr = mv.ExportSample.Format()
}
exportedSeriesStrList := []string{}
for _, v := range mv.ExportedSeries {
exportedSeriesStrList = append(exportedSeriesStrList, v.Format())
}
return []byte(fmt.Sprintf(metricValueCSVTemplate,
strings.Join(targetList, "|"),
mv.ResourceUnit,
mv.MinValue,
mv.MaxValue,
mv.AvgValue,
mv.SumValue,
mv.Fee,
mv.CurrencyUnit,
exportedSampleStr,
exportedSeriesStrList)), nil
}
func (mv *MetricValue) TransferToExportedMetricValue() {
if mv.Sample != nil {
......@@ -167,6 +100,10 @@ func (p Point) transferToExported() ExportPoint {
return ExportPoint{p[0], p[1]}
}
func (p Point) Add(other Point) Point {
return Point{p[0], p[1] + other[1]}
}
// MarshalJSON implements json.Marshaler. It will be called when writing JSON to HTTP response
// Inspired by prometheus/client_golang
func (p Point) MarshalJSON() ([]byte, error) {
......@@ -214,6 +151,14 @@ func (p *Point) UnmarshalJSON(b []byte) error {
return nil
}
type CSVPoint struct {
MetricName string `csv:"metric_name"`
Selector string `csv:"selector"`
Time string `csv:"time"`
Value string `csv:"value"`
ResourceUnit string `csv:"unit"`
}
type ExportPoint [2]float64
func (p ExportPoint) Timestamp() string {
......@@ -227,3 +172,13 @@ func (p ExportPoint) Value() float64 {
func (p ExportPoint) Format() string {
return p.Timestamp() + " " + strconv.FormatFloat(p.Value(), 'f', -1, 64)
}
func (p ExportPoint) TransformToCSVPoint(metricName string, selector string, resourceUnit string) CSVPoint {
return CSVPoint{
MetricName: metricName,
Selector: selector,
Time: p.Timestamp(),
Value: strconv.FormatFloat(p.Value(), 'f', -1, 64),
ResourceUnit: resourceUnit,
}
}
......@@ -125,7 +125,7 @@ func generateSwaggerJson() []byte {
urlruntime.Must(devopsv1alpha2.AddToContainer(container, informerFactory.KubeSphereSharedInformerFactory(), &fakedevops.Devops{}, nil, clientsets.KubeSphere(), fakes3.NewFakeS3(), "", nil))
urlruntime.Must(devopsv1alpha3.AddToContainer(container, &fakedevops.Devops{}, clientsets.Kubernetes(), clientsets.KubeSphere(), informerFactory.KubeSphereSharedInformerFactory(), informerFactory.KubernetesSharedInformerFactory()))
urlruntime.Must(iamv1alpha2.AddToContainer(container, nil, nil, group.New(informerFactory, clientsets.KubeSphere(), clientsets.Kubernetes()), nil))
urlruntime.Must(monitoringv1alpha3.AddToContainer(container, clientsets.Kubernetes(), nil, nil, informerFactory))
urlruntime.Must(monitoringv1alpha3.AddToContainer(container, clientsets.Kubernetes(), nil, nil, informerFactory, nil))
urlruntime.Must(openpitrixv1.AddToContainer(container, informerFactory, fake.NewSimpleClientset(), nil))
urlruntime.Must(openpitrixv2.AddToContainer(container, informerFactory, fake.NewSimpleClientset(), nil))
urlruntime.Must(operationsv1alpha2.AddToContainer(container, clientsets.Kubernetes()))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册