From 6e48dca99a5910e311169c2e91354ec55ddc4300 Mon Sep 17 00:00:00 2001 From: Daming Date: Wed, 30 Dec 2020 19:54:33 +0800 Subject: [PATCH] Improvement Influxdb query performance (#6066) --- CHANGES.md | 1 + .../worker/MetricsPersistentWorker.java | 27 +++----- .../oap/server/core/storage/IMetricsDAO.java | 6 +- .../elasticsearch/base/MetricsEsDAO.java | 5 +- .../elasticsearch7/dao/MetricsEs7DAO.java | 12 ++-- .../plugin/influxdb/InfluxConstants.java | 2 +- .../plugin/influxdb/TableMetaInfo.java | 28 +++++--- .../plugin/influxdb/base/MetricsDAO.java | 68 +++++++++++++++---- .../plugin/influxdb/query/MetadataQuery.java | 10 +-- .../plugin/jdbc/h2/dao/H2MetricsDAO.java | 6 +- 10 files changed, 105 insertions(+), 60 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 8f58b7456d..0668f16ed7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -39,6 +39,7 @@ Release Notes. * Fix searchService method error in storage-influxdb-plugin. * Add JavaScript component ID. * Fix CVE of UninstrumentedGateways in Dynamic Configuration activation. +* Improve query performance in storage-influxdb-plugin. #### UI * Fix un-removed tags in trace query. diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index edab874fe0..5e483c4012 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool; @@ -91,11 +92,12 @@ public class MetricsPersistentWorker extends PersistenceWorker { this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer()); MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME) - .provider() - .getService(MetricsCreator.class); + .provider() + .getService(MetricsCreator.class); aggregationCounter = metricsCreator.createCounter( - "metrics_aggregation", "The number of rows in aggregation", - new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(model.getName(), "2", model.getDownsampling().getName()) + "metrics_aggregation", "The number of rows in aggregation", + new MetricsTag.Keys("metricName", "level", "dimensionality"), + new MetricsTag.Values(model.getName(), "2", model.getDownsampling().getName()) ); } @@ -209,18 +211,11 @@ public class MetricsPersistentWorker extends PersistenceWorker { context.clear(); } - List notInCacheIds = new ArrayList<>(); - for (Metrics metric : metrics) { - if (!context.containsKey(metric)) { - notInCacheIds.add(metric.id()); - } - } - - if (notInCacheIds.size() > 0) { - List metricsList = metricsDAO.multiGet(model, notInCacheIds); - for (Metrics metric : metricsList) { - context.put(metric, metric); - } + List noInCacheMetrics = metrics.stream() + .filter(m -> !context.containsKey(m)) + .collect(Collectors.toList()); + if (!noInCacheMetrics.isEmpty()) { + metricsDAO.multiGet(model, noInCacheMetrics).forEach(m -> context.put(m, m)); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java index c5add94ef2..c1ef378776 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java @@ -32,12 +32,12 @@ public interface IMetricsDAO extends DAO { /** * Read data from the storage by given IDs. * - * @param model target entity of this query. - * @param ids ID list. + * @param model target entity of this query. + * @param metrics metrics list. * @return the data of all given IDs. Only include existing data. Don't require to keep the same order of ids list. * @throws IOException when error occurs in data query. */ - List multiGet(Model model, List ids) throws IOException; + List multiGet(Model model, List metrics) throws IOException; /** * Transfer the given metrics to an executable insert statement. diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java index d78c960577..44d92b6305 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java @@ -41,8 +41,9 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO { } @Override - public List multiGet(Model model, List ids) throws IOException { - SearchResponse response = getClient().ids(model.getName(), ids.toArray(new String[0])); + public List multiGet(Model model, List metrics) throws IOException { + String[] ids = metrics.stream().map(Metrics::id).toArray(String[]::new); + SearchResponse response = getClient().ids(model.getName(), ids); List result = new ArrayList<>(response.getHits().getHits().length); for (int i = 0; i < response.getHits().getHits().length; i++) { diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/dao/MetricsEs7DAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/dao/MetricsEs7DAO.java index bec72db7ef..1be9e5f6a8 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/dao/MetricsEs7DAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/dao/MetricsEs7DAO.java @@ -18,6 +18,9 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.dao; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.model.Model; @@ -25,10 +28,6 @@ import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSear import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MetricsEsDAO; import org.elasticsearch.action.search.SearchResponse; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - public class MetricsEs7DAO extends MetricsEsDAO { MetricsEs7DAO(final ElasticSearchClient client, final StorageBuilder storageBuilder) { @@ -36,8 +35,9 @@ public class MetricsEs7DAO extends MetricsEsDAO { } @Override - public List multiGet(Model model, List ids) throws IOException { - SearchResponse response = getClient().ids(model.getName(), ids.toArray(new String[0])); + public List multiGet(Model model, List metrics) throws IOException { + String[] ids = metrics.stream().map(Metrics::id).toArray(String[]::new); + SearchResponse response = getClient().ids(model.getName(), ids); List result = new ArrayList<>(response.getHits().getHits().length); for (int i = 0; i < response.getHits().getHits().length; i++) { diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java index 3743a24b58..9c7d15a0c9 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java @@ -35,7 +35,7 @@ public interface InfluxConstants { String ID_COLUMN = "_id"; - String NAME = "_name"; + String NAME = "_name_"; String ENTITY_ID = "_entity_id"; diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java index 698ac97052..22bf7b9db0 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java @@ -54,17 +54,25 @@ public class TableMetaInfo { storageAndColumnMap.put(columnName.getStorageName(), columnName.getName()); }); + storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME); if (model.getName().endsWith("_traffic")) { - // instance_traffic name, service_id - // endpoint_traffic name, service_id - storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME); - if (InstanceTraffic.INDEX_NAME.equals(model.getName()) - || EndpointTraffic.INDEX_NAME.equals(model.getName())) { - storageAndTagMap.put(EndpointTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID); - } else { - // service_traffic name, node_type, group - storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE); - storageAndTagMap.put(ServiceTraffic.GROUP, InfluxConstants.TagName.SERVICE_GROUP); + switch (model.getName()) { + // instance_traffic name, service_id + case InstanceTraffic.INDEX_NAME: { + storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME); + storageAndTagMap.put(InstanceTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID); + break; + } + // endpoint_traffic service_id + case EndpointTraffic.INDEX_NAME: { + storageAndTagMap.put(EndpointTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID); + break; + } + // service_traffic name, group + case ServiceTraffic.INDEX_NAME: { + storageAndTagMap.put(ServiceTraffic.GROUP, InfluxConstants.TagName.SERVICE_GROUP); + storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE); + } } } else { diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java index 13121e56f7..27b3e1981f 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; -import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.io.IOException; @@ -26,8 +25,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; @@ -36,13 +40,15 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObje import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; +import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.TagName; import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo; +import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; -import org.influxdb.querybuilder.SelectQueryImpl; -import org.influxdb.querybuilder.WhereQueryImpl; +import org.influxdb.querybuilder.clauses.Clause; import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ALL_FIELDS; -import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains; +import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ID_COLUMN; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq; import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select; @Slf4j @@ -57,11 +63,46 @@ public class MetricsDAO implements IMetricsDAO { } @Override - public List multiGet(Model model, List ids) throws IOException { - final WhereQueryImpl query = select() - .raw(ALL_FIELDS) - .from(client.getDatabase(), model.getName()) - .where(contains("id", Joiner.on("|").join(ids))); + public List multiGet(Model model, List metrics) throws IOException { + final TableMetaInfo metaInfo = TableMetaInfo.get(model.getName()); + final String queryStr; + if (model.getName().endsWith("_traffic")) { + final Function clauseFunction; + switch (model.getName()) { + case EndpointTraffic.INDEX_NAME: { + clauseFunction = m -> eq(TagName.SERVICE_ID, ((EndpointTraffic) m).getServiceId()); + break; + } + case ServiceTraffic.INDEX_NAME: { + clauseFunction = m -> eq(TagName.NAME, ((ServiceTraffic) m).getName()); + break; + } + case InstanceTraffic.INDEX_NAME: { + clauseFunction = m -> eq(TagName.SERVICE_ID, ((InstanceTraffic) m).getServiceId()); + break; + } + default: + throw new IOException("Unknown metadata type, " + model.getName()); + } + queryStr = metrics.stream().map(m -> select().raw(ALL_FIELDS) + .from(client.getDatabase(), model.getName()) + .where(clauseFunction.apply(m)) + .and(eq(ID_COLUMN, m.id())) + .buildQueryString() + ).collect(Collectors.joining(";")); + } else { + queryStr = metrics.stream().map(m -> select().raw(ALL_FIELDS) + .from(client.getDatabase(), model.getName()) + .where(eq( + TagName.TIME_BUCKET, + String.valueOf(m.getTimeBucket()) + )) + .and(eq(ID_COLUMN, m.id())) + .buildQueryString() + ).collect(Collectors.joining(";")); + } + + final Query query = new Query(queryStr); QueryResult.Series series = client.queryForSingleSeries(query); if (log.isDebugEnabled()) { log.debug("SQL: {} result: {}", query.getCommand(), series); @@ -71,10 +112,8 @@ public class MetricsDAO implements IMetricsDAO { return Collections.emptyList(); } - final List metrics = Lists.newArrayList(); + final List newMetrics = Lists.newArrayList(); final List columns = series.getColumns(); - - final TableMetaInfo metaInfo = TableMetaInfo.get(model.getName()); final Map storageAndColumnMap = metaInfo.getStorageAndColumnMap(); series.getValues().forEach(values -> { @@ -88,11 +127,10 @@ public class MetricsDAO implements IMetricsDAO { data.put(storageAndColumnMap.get(columns.get(i)), value); } - metrics.add(storageBuilder.map2Data(data)); - + newMetrics.add(storageBuilder.map2Data(data)); }); - return metrics; + return newMetrics; } @Override diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java index 6499646af5..d54620b2a0 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java @@ -115,7 +115,7 @@ public class MetadataQuery implements IMetadataQueryDAO { .from(client.getDatabase(), ServiceTraffic.INDEX_NAME) .where(eq(TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value()))); if (!Strings.isNullOrEmpty(keyword)) { - where.and(contains(ServiceTraffic.NAME, keyword)); + where.and(contains(NAME, keyword)); } return buildServices(where); } @@ -125,7 +125,7 @@ public class MetadataQuery implements IMetadataQueryDAO { final WhereQueryImpl whereQuery = select(ID_COLUMN, NAME, ServiceTraffic.GROUP) .from(client.getDatabase(), ServiceTraffic.INDEX_NAME) .where(eq(TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value()))); - whereQuery.and(eq(InfluxConstants.NAME, serviceCode)); + whereQuery.and(eq(NAME, serviceCode)); return buildServices(whereQuery).get(0); } @@ -139,7 +139,7 @@ public class MetadataQuery implements IMetadataQueryDAO { .from(client.getDatabase(), EndpointTraffic.INDEX_NAME) .where(eq(TagName.SERVICE_ID, String.valueOf(serviceId))); if (!Strings.isNullOrEmpty(keyword)) { - where.and(contains(EndpointTraffic.NAME, keyword.replaceAll("/", "\\\\/"))); + where.and(contains(NAME, keyword.replaceAll("/", "\\\\/"))); } where.limit(limit); @@ -172,8 +172,8 @@ public class MetadataQuery implements IMetadataQueryDAO { .from(InstanceTraffic.INDEX_NAME) .where() .and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket)) - .and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId)) - .groupBy(TagName.NAME, TagName.SERVICE_ID); + .and(eq(TagName.SERVICE_ID, serviceId)) + .groupBy(TagName.NAME); SelectQueryImpl query = select().column(ID_COLUMN) .column(NAME) diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java index 1435fc0bd6..74a896a143 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; @@ -40,8 +41,9 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO { } @Override - public List multiGet(Model model, List ids) throws IOException { - List storageDataList = getByIDs(h2Client, model.getName(), ids.toArray(new String[0]), storageBuilder); + public List multiGet(Model model, List metrics) throws IOException { + String[] ids = metrics.stream().map(Metrics::id).collect(Collectors.toList()).toArray(new String[] {}); + List storageDataList = getByIDs(h2Client, model.getName(), ids, storageBuilder); List result = new ArrayList<>(storageDataList.size()); for (StorageData storageData : storageDataList) { result.add((Metrics) storageData); -- GitLab