提交 285ad0a3 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

1. Fixed the OverViewLayerQuery#getTopNSlowService method bug which is not...

1. Fixed the OverViewLayerQuery#getTopNSlowService method bug which is not order by average response time. (#913)

2. Fixed the OverViewLayerQuery#getTopNApplicationThroughput method bug which is not order by TPS.
3. Fixed the ApplicationQuery#getSlowService method bug which is not order by average response time.
4. Fixed the ApplicationQuery#getServerThroughput method bug which is not order by TPS.

#907
上级 e05d9e81
......@@ -18,10 +18,8 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
......@@ -35,13 +33,10 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
/**
* @author peng-yongsheng
......@@ -52,8 +47,6 @@ public class ApplicationMetricEsUIDAO extends EsDAO implements IApplicationMetri
super(client);
}
private static final String AVG_TPS = "avg_tps";
@Override
public List<ApplicationTPS> getTopNApplicationThroughput(Step step, long startTimeBucket, long endTimeBucket,
int betweenSecond, int topN, MetricSource metricSource) {
......@@ -70,36 +63,37 @@ public class ApplicationMetricEsUIDAO extends EsDAO implements IApplicationMetri
searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(0);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ApplicationMetricTable.COLUMN_APPLICATION_ID).field(ApplicationMetricTable.COLUMN_APPLICATION_ID).size(topN);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ApplicationMetricTable.COLUMN_APPLICATION_ID).field(ApplicationMetricTable.COLUMN_APPLICATION_ID).size(2000);
aggregationBuilder.subAggregation(AggregationBuilders.sum(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS).field(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
Map<String, String> bucketsPathsMap = new HashMap<>();
bucketsPathsMap.put(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS, ApplicationMetricTable.COLUMN_TRANSACTION_CALLS);
bucketsPathsMap.put(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS);
String idOrCode = "(params." + ApplicationMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")"
+ " / "
+ "(" + betweenSecond + ")";
Script script = new Script(idOrCode);
aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_TPS, bucketsPathsMap, script));
searchRequestBuilder.addAggregation(aggregationBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<ApplicationTPS> applicationTPSs = new LinkedList<>();
Terms applicationIdTerms = searchResponse.getAggregations().get(ApplicationMetricTable.COLUMN_APPLICATION_ID);
applicationIdTerms.getBuckets().forEach(applicationIdTerm -> {
int applicationId = applicationIdTerm.getKeyAsNumber().intValue();
Sum callSum = applicationIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS);
long calls = (long)callSum.getValue();
int callsPerSec = (int)(betweenSecond == 0 ? 0 : calls / betweenSecond);
ApplicationTPS applicationTPS = new ApplicationTPS();
InternalSimpleValue simpleValue = applicationIdTerm.getAggregations().get(AVG_TPS);
applicationTPS.setApplicationId(applicationId);
applicationTPS.setCallsPerSec((int)simpleValue.getValue());
applicationTPS.setCallsPerSec(callsPerSec);
applicationTPSs.add(applicationTPS);
});
return applicationTPSs;
applicationTPSs.sort((first, second) -> first.getCallsPerSec() > second.getCallsPerSec() ? -1 : 1);
if (applicationTPSs.size() <= topN) {
return applicationTPSs;
} else {
List<ApplicationTPS> newCollection = new LinkedList<>();
for (int i = 0; i < topN; i++) {
newCollection.add(applicationTPSs.get(i));
}
return newCollection;
}
}
@Override
......
......@@ -18,15 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetricTable;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
import org.apache.skywalking.apm.collector.storage.ui.server.AppServerInfo;
......@@ -40,26 +39,22 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
/**
* @author peng-yongsheng
*/
public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO {
private static final String AVG_TPS = "avg_tps";
public InstanceMetricEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public List<AppServerInfo> getServerThroughput(int applicationId, Step step, long startTimeBucket, long endTimeBucket,
int secondBetween, int topN, MetricSource metricSource) {
@Override public List<AppServerInfo> getServerThroughput(int applicationId, Step step, long startTimeBucket,
long endTimeBucket, int secondBetween, int topN, MetricSource metricSource) {
String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
......@@ -76,36 +71,36 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(0);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(InstanceMetricTable.COLUMN_INSTANCE_ID).field(InstanceMetricTable.COLUMN_INSTANCE_ID).size(topN);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(InstanceMetricTable.COLUMN_INSTANCE_ID).field(InstanceMetricTable.COLUMN_INSTANCE_ID).size(2000);
aggregationBuilder.subAggregation(AggregationBuilders.sum(InstanceMetricTable.COLUMN_TRANSACTION_CALLS).field(InstanceMetricTable.COLUMN_TRANSACTION_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
Map<String, String> bucketsPathsMap = new HashMap<>();
bucketsPathsMap.put(InstanceMetricTable.COLUMN_TRANSACTION_CALLS, InstanceMetricTable.COLUMN_TRANSACTION_CALLS);
bucketsPathsMap.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS);
String idOrCode = "(params." + InstanceMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")"
+ " / "
+ "( " + secondBetween + " )";
Script script = new Script(idOrCode);
aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_TPS, bucketsPathsMap, script));
searchRequestBuilder.addAggregation(aggregationBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<AppServerInfo> appServerInfos = new LinkedList<>();
Terms serviceIdTerms = searchResponse.getAggregations().get(InstanceMetricTable.COLUMN_INSTANCE_ID);
serviceIdTerms.getBuckets().forEach(serviceIdTerm -> {
int instanceId = serviceIdTerm.getKeyAsNumber().intValue();
Terms instanceIdTerms = searchResponse.getAggregations().get(InstanceMetricTable.COLUMN_INSTANCE_ID);
instanceIdTerms.getBuckets().forEach(instanceIdTerm -> {
int instanceId = instanceIdTerm.getKeyAsNumber().intValue();
Sum callSum = instanceIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS);
long calls = (long)callSum.getValue();
int callsPerSec = (int)(secondBetween == 0 ? 0 : calls / secondBetween);
AppServerInfo appServerInfo = new AppServerInfo();
InternalSimpleValue simpleValue = serviceIdTerm.getAggregations().get(AVG_TPS);
appServerInfo.setId(instanceId);
appServerInfo.setCallsPerSec((int)simpleValue.getValue());
appServerInfo.setCallsPerSec(callsPerSec);
appServerInfos.add(appServerInfo);
});
return appServerInfos;
appServerInfos.sort((first, second) -> first.getCallsPerSec() > second.getCallsPerSec() ? -1 : 1);
if (appServerInfos.size() <= topN) {
return appServerInfos;
} else {
List<AppServerInfo> newCollection = new LinkedList<>();
for (int i = 0; i < topN; i++) {
newCollection.add(appServerInfos.get(i));
}
return newCollection;
}
}
@Override public List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
......
......@@ -19,10 +19,10 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
......@@ -43,21 +43,19 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
/**
* @author peng-yongsheng
*/
public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
private static final String AVG_DURATION = "avg_duration";
public ServiceMetricEsUIDAO(ElasticSearchClient client) {
super(client);
}
......@@ -177,8 +175,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
@Override
public List<ServiceMetric> getSlowService(int applicationId, Step step, long startTimeBucket, long endTimeBucket,
Integer topN,
MetricSource metricSource) {
Integer topN, MetricSource metricSource) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
......@@ -193,43 +190,30 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
boolQuery.must().add(QueryBuilders.termQuery(ServiceMetricTable.COLUMN_SOURCE_VALUE, metricSource.getValue()));
searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(0);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ServiceMetricTable.COLUMN_SERVICE_ID).field(ServiceMetricTable.COLUMN_SERVICE_ID).size(topN);
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_CALLS).field(ServiceMetricTable.COLUMN_TRANSACTION_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM).field(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM).field(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM));
Map<String, String> bucketsPathsMap = new HashMap<>();
bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_CALLS, ServiceMetricTable.COLUMN_TRANSACTION_CALLS);
bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS);
bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM);
bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM);
String idOrCode = "(params." + ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM + " - params." + ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM + ")"
+ " / "
+ "(params." + ServiceMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")";
Script script = new Script(idOrCode);
aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_DURATION, bucketsPathsMap, script));
searchRequestBuilder.addAggregation(aggregationBuilder);
searchRequestBuilder.setSize(topN * 60);
searchRequestBuilder.addSort(SortBuilders.fieldSort(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION).order(SortOrder.DESC));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<ServiceMetric> serviceMetrics = new LinkedList<>();
Terms serviceIdTerms = searchResponse.getAggregations().get(ServiceMetricTable.COLUMN_SERVICE_ID);
serviceIdTerms.getBuckets().forEach(serviceIdTerm -> {
int serviceId = serviceIdTerm.getKeyAsNumber().intValue();
SearchHit[] searchHits = searchResponse.getHits().getHits();
ServiceMetric serviceMetric = new ServiceMetric();
InternalSimpleValue simpleValue = serviceIdTerm.getAggregations().get(AVG_DURATION);
Sum calls = serviceIdTerm.getAggregations().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS);
Set<Integer> serviceIds = new HashSet<>();
List<ServiceMetric> serviceMetrics = new LinkedList<>();
for (SearchHit searchHit : searchHits) {
int serviceId = ((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue();
if (!serviceIds.contains(serviceId)) {
ServiceMetric serviceMetric = new ServiceMetric();
serviceMetric.setId(serviceId);
serviceMetric.setCalls(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
serviceMetric.setAvgResponseTime(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue());
serviceMetrics.add(serviceMetric);
serviceIds.add(serviceId);
}
if (topN == serviceIds.size()) {
break;
}
}
serviceMetric.setCalls((long)calls.getValue());
serviceMetric.setId(serviceId);
serviceMetric.setAvgResponseTime((int)simpleValue.getValue());
serviceMetrics.add(serviceMetric);
});
return serviceMetrics;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册