未验证 提交 6e48dca9 编写于 作者: D Daming 提交者: GitHub

Improvement Influxdb query performance (#6066)

上级 8aeabfd5
......@@ -39,6 +39,7 @@ Release Notes.
* Fix searchService method error in storage-influxdb-plugin.
* Add JavaScript component ID.
* Fix CVE of UninstrumentedGateways in Dynamic Configuration activation.
* Improve query performance in storage-influxdb-plugin.
#### UI
* Fix un-removed tags in trace query.
......
......@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
......@@ -91,11 +92,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer());
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
.provider()
.getService(MetricsCreator.class);
aggregationCounter = metricsCreator.createCounter(
"metrics_aggregation", "The number of rows in aggregation",
new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(model.getName(), "2", model.getDownsampling().getName())
"metrics_aggregation", "The number of rows in aggregation",
new MetricsTag.Keys("metricName", "level", "dimensionality"),
new MetricsTag.Values(model.getName(), "2", model.getDownsampling().getName())
);
}
......@@ -209,18 +211,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
context.clear();
}
List<String> notInCacheIds = new ArrayList<>();
for (Metrics metric : metrics) {
if (!context.containsKey(metric)) {
notInCacheIds.add(metric.id());
}
}
if (notInCacheIds.size() > 0) {
List<Metrics> metricsList = metricsDAO.multiGet(model, notInCacheIds);
for (Metrics metric : metricsList) {
context.put(metric, metric);
}
List<Metrics> noInCacheMetrics = metrics.stream()
.filter(m -> !context.containsKey(m))
.collect(Collectors.toList());
if (!noInCacheMetrics.isEmpty()) {
metricsDAO.multiGet(model, noInCacheMetrics).forEach(m -> context.put(m, m));
}
}
......
......@@ -32,12 +32,12 @@ public interface IMetricsDAO extends DAO {
/**
* Read data from the storage by given IDs.
*
* @param model target entity of this query.
* @param ids ID list.
* @param model target entity of this query.
* @param metrics metrics list.
* @return the data of all given IDs. Only include existing data. Don't require to keep the same order of ids list.
* @throws IOException when error occurs in data query.
*/
List<Metrics> multiGet(Model model, List<String> ids) throws IOException;
List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException;
/**
* Transfer the given metrics to an executable insert statement.
......
......@@ -41,8 +41,9 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
}
@Override
public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
SearchResponse response = getClient().ids(model.getName(), ids.toArray(new String[0]));
public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
String[] ids = metrics.stream().map(Metrics::id).toArray(String[]::new);
SearchResponse response = getClient().ids(model.getName(), ids);
List<Metrics> result = new ArrayList<>(response.getHits().getHits().length);
for (int i = 0; i < response.getHits().getHits().length; i++) {
......
......@@ -18,6 +18,9 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.dao;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -25,10 +28,6 @@ import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSear
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MetricsEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class MetricsEs7DAO extends MetricsEsDAO {
MetricsEs7DAO(final ElasticSearchClient client, final StorageBuilder<Metrics> storageBuilder) {
......@@ -36,8 +35,9 @@ public class MetricsEs7DAO extends MetricsEsDAO {
}
@Override
public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
SearchResponse response = getClient().ids(model.getName(), ids.toArray(new String[0]));
public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
String[] ids = metrics.stream().map(Metrics::id).toArray(String[]::new);
SearchResponse response = getClient().ids(model.getName(), ids);
List<Metrics> result = new ArrayList<>(response.getHits().getHits().length);
for (int i = 0; i < response.getHits().getHits().length; i++) {
......
......@@ -35,7 +35,7 @@ public interface InfluxConstants {
String ID_COLUMN = "_id";
String NAME = "_name";
String NAME = "_name_";
String ENTITY_ID = "_entity_id";
......
......@@ -54,17 +54,25 @@ public class TableMetaInfo {
storageAndColumnMap.put(columnName.getStorageName(), columnName.getName());
});
storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME);
if (model.getName().endsWith("_traffic")) {
// instance_traffic name, service_id
// endpoint_traffic name, service_id
storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME);
if (InstanceTraffic.INDEX_NAME.equals(model.getName())
|| EndpointTraffic.INDEX_NAME.equals(model.getName())) {
storageAndTagMap.put(EndpointTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
} else {
// service_traffic name, node_type, group
storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE);
storageAndTagMap.put(ServiceTraffic.GROUP, InfluxConstants.TagName.SERVICE_GROUP);
switch (model.getName()) {
// instance_traffic name, service_id
case InstanceTraffic.INDEX_NAME: {
storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME);
storageAndTagMap.put(InstanceTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
break;
}
// endpoint_traffic service_id
case EndpointTraffic.INDEX_NAME: {
storageAndTagMap.put(EndpointTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
break;
}
// service_traffic name, group
case ServiceTraffic.INDEX_NAME: {
storageAndTagMap.put(ServiceTraffic.GROUP, InfluxConstants.TagName.SERVICE_GROUP);
storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE);
}
}
} else {
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
......@@ -26,8 +25,13 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
......@@ -36,13 +40,15 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObje
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.TagName;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.influxdb.querybuilder.clauses.Clause;
import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ALL_FIELDS;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ID_COLUMN;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j
......@@ -57,11 +63,46 @@ public class MetricsDAO implements IMetricsDAO {
}
@Override
public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
final WhereQueryImpl<SelectQueryImpl> query = select()
.raw(ALL_FIELDS)
.from(client.getDatabase(), model.getName())
.where(contains("id", Joiner.on("|").join(ids)));
public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
final TableMetaInfo metaInfo = TableMetaInfo.get(model.getName());
final String queryStr;
if (model.getName().endsWith("_traffic")) {
final Function<Metrics, Clause> clauseFunction;
switch (model.getName()) {
case EndpointTraffic.INDEX_NAME: {
clauseFunction = m -> eq(TagName.SERVICE_ID, ((EndpointTraffic) m).getServiceId());
break;
}
case ServiceTraffic.INDEX_NAME: {
clauseFunction = m -> eq(TagName.NAME, ((ServiceTraffic) m).getName());
break;
}
case InstanceTraffic.INDEX_NAME: {
clauseFunction = m -> eq(TagName.SERVICE_ID, ((InstanceTraffic) m).getServiceId());
break;
}
default:
throw new IOException("Unknown metadata type, " + model.getName());
}
queryStr = metrics.stream().map(m -> select().raw(ALL_FIELDS)
.from(client.getDatabase(), model.getName())
.where(clauseFunction.apply(m))
.and(eq(ID_COLUMN, m.id()))
.buildQueryString()
).collect(Collectors.joining(";"));
} else {
queryStr = metrics.stream().map(m -> select().raw(ALL_FIELDS)
.from(client.getDatabase(), model.getName())
.where(eq(
TagName.TIME_BUCKET,
String.valueOf(m.getTimeBucket())
))
.and(eq(ID_COLUMN, m.id()))
.buildQueryString()
).collect(Collectors.joining(";"));
}
final Query query = new Query(queryStr);
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result: {}", query.getCommand(), series);
......@@ -71,10 +112,8 @@ public class MetricsDAO implements IMetricsDAO {
return Collections.emptyList();
}
final List<Metrics> metrics = Lists.newArrayList();
final List<Metrics> newMetrics = Lists.newArrayList();
final List<String> columns = series.getColumns();
final TableMetaInfo metaInfo = TableMetaInfo.get(model.getName());
final Map<String, String> storageAndColumnMap = metaInfo.getStorageAndColumnMap();
series.getValues().forEach(values -> {
......@@ -88,11 +127,10 @@ public class MetricsDAO implements IMetricsDAO {
data.put(storageAndColumnMap.get(columns.get(i)), value);
}
metrics.add(storageBuilder.map2Data(data));
newMetrics.add(storageBuilder.map2Data(data));
});
return metrics;
return newMetrics;
}
@Override
......
......@@ -115,7 +115,7 @@ public class MetadataQuery implements IMetadataQueryDAO {
.from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
.where(eq(TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
if (!Strings.isNullOrEmpty(keyword)) {
where.and(contains(ServiceTraffic.NAME, keyword));
where.and(contains(NAME, keyword));
}
return buildServices(where);
}
......@@ -125,7 +125,7 @@ public class MetadataQuery implements IMetadataQueryDAO {
final WhereQueryImpl<SelectQueryImpl> whereQuery = select(ID_COLUMN, NAME, ServiceTraffic.GROUP)
.from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
.where(eq(TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
whereQuery.and(eq(InfluxConstants.NAME, serviceCode));
whereQuery.and(eq(NAME, serviceCode));
return buildServices(whereQuery).get(0);
}
......@@ -139,7 +139,7 @@ public class MetadataQuery implements IMetadataQueryDAO {
.from(client.getDatabase(), EndpointTraffic.INDEX_NAME)
.where(eq(TagName.SERVICE_ID, String.valueOf(serviceId)));
if (!Strings.isNullOrEmpty(keyword)) {
where.and(contains(EndpointTraffic.NAME, keyword.replaceAll("/", "\\\\/")));
where.and(contains(NAME, keyword.replaceAll("/", "\\\\/")));
}
where.limit(limit);
......@@ -172,8 +172,8 @@ public class MetadataQuery implements IMetadataQueryDAO {
.from(InstanceTraffic.INDEX_NAME)
.where()
.and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket))
.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId))
.groupBy(TagName.NAME, TagName.SERVICE_ID);
.and(eq(TagName.SERVICE_ID, serviceId))
.groupBy(TagName.NAME);
SelectQueryImpl query = select().column(ID_COLUMN)
.column(NAME)
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
......@@ -40,8 +41,9 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO {
}
@Override
public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
List<StorageData> storageDataList = getByIDs(h2Client, model.getName(), ids.toArray(new String[0]), storageBuilder);
public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
String[] ids = metrics.stream().map(Metrics::id).collect(Collectors.toList()).toArray(new String[] {});
List<StorageData> storageDataList = getByIDs(h2Client, model.getName(), ids, storageBuilder);
List<Metrics> result = new ArrayList<>(storageDataList.size());
for (StorageData storageData : storageDataList) {
result.add((Metrics) storageData);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册