未验证 提交 de975c7e 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Remove synchronous persistence mechanism in API level and ElasticSearch implementation (#7293)

* Performance: remove the synchronous persistence mechanism from batch ElasticSearch DAO. Because the current enhanced persistent session mechanism, don't require the data queryable immediately after the insert and update anymore.
* Performance: share `flushInterval` setting for both metrics and record data, due to `synchronous persistence mechanism` removed. Record flush interval used to be hardcoded as 10s.
* Remove `syncBulkActions` in ElasticSearch storage option.
* Increase the default bulkActions(env, SW_STORAGE_ES_BULK_ACTIONS) to 5000(from 1000).
* Increase the flush interval of ElasticSearch indices to 15s(from 10s)

Add these 2 references. According to these, **(same _index, _type and _id) in same bulk will be in order**
1. https://github.com/elastic/elasticsearch/issues/50199
2. https://discuss.elastic.co/t/order-of--bulk-request-operations/98124

Notice, the order of different bulks is not guaranteed by the ElasticSearch cluster. We are going to have the risk of dirty write. But consider we set over 20s period between flush, and index flush period is 10, we should be safe.

Recommend 5000 bulk size and 15s flush interval only. The persistent period has been set to 25s.
上级 982001c6
......@@ -72,18 +72,24 @@ Release Notes.
* Support connectTimeout and socketTimeout settings for ElasticSearch6 and ElasticSearch7 storages.
* Re-implement storage session mechanism, cached metrics are removed only according to their last access timestamp,
rather than first time. This makes sure hot data never gets removed unexpectedly.
* Support session expired threshold configurable.
* Support session expired threshold configurable.
* Fix InfluxDB storage-plugin Metrics#multiGet issue.
* Replace zuul proxy with spring cloud gateway 2.x. in webapp module.
* Upgrade etcd cluster coordinator and dynamic configuration to v3.x.
* Configuration: Allow to configure server maximum request header size.
* Configuration: Allow configuring server maximum request header size.
* Add thread state metric and class loaded info metric to JVMMetric.
* Performance: compile LAL DSL statically and run with type checked.
* Add pagination to event query protocol.
* Performance: optimize Envoy error logs persistence performance.
* Performance: remove the synchronous persistence mechanism from batch ElasticSearch DAO. Because the current enhanced
persistent session mechanism, don't require the data queryable immediately after the insert and update anymore.
* Performance: share `flushInterval` setting for both metrics and record data, due
to `synchronous persistence mechanism` removed. Record flush interval used to be hardcoded as 10s.
* Remove `syncBulkActions` in ElasticSearch storage option.
* Increase the default bulkActions(env, SW_STORAGE_ES_BULK_ACTIONS) to 5000(from 1000).
* Increase the flush interval of ElasticSearch indices to 15s(from 10s)
#### UI
* Fix the date component for log conditions.
* Fix selector keys for duplicate options.
* Add Python celery plugin.
......@@ -93,6 +99,7 @@ Release Notes.
* Fix chart types for setting metrics configure.
#### Documentation
* Add FAQ about `Elasticsearch exception type=version_conflict_engine_exception since 8.7.0`
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/90?closed=1)
......
......@@ -11,6 +11,7 @@ These are known and frequently asked questions about SkyWalking. We welcome you
* [Compiling issues on Mac's M1 chip](How-to-build-with-mac-m1.md)
## Runtime
* [Elasticsearch exception `type=version_conflict_engine_exception` since 8.7.0](es-version-conflict.md)
* [Version 8.x+ upgrade](v8-version-upgrade.md)
* [Why do metrics indexes with Hour and Day precisions stop updating after upgrade to 7.x?](Hour-Day-Metrics-Stopping.md)
* [Version 6.x upgrade](v6-version-upgrade.md)
......
# Elasticsearch exception `type=version_conflict_engine_exception` since 8.7.0
Since 8.7.0, we did the following optimization to reduce Elasticsearch load.
```markdown
Performance: remove the synchronous persistence mechanism from batch ElasticSearch DAO. Because the current enhanced
persistent session mechanism, don't require the data queryable immediately after the insert and update anymore.
```
Due to this, we flush the metrics into Elasticsearch without using `WriteRequest.RefreshPolicy.WAIT_UNTIL`. This reduces
the load of persistent works in OAP server and load of Elasticsearch CPU dramatically.
Meanwhile, there is little chance you could see following **warn**s in your logs.
```
{
"timeMillis": 1626247722647,
"thread": "I/O dispatcher 4",
"level": "WARN",
"loggerName": "org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient",
"message": "Bulk [70] executed with failures:[failure in bulk execution:\n[18875]: index [sw8_service_relation_client_side-20210714], type [_doc], id [20210714_b3BlcmF0aW9uLXJ1bGUtc2VydmVyQDExNDgx.1-bWFya2V0LXJlZmVycmFsLXNlcnZlckAxMDI1MQ==.1], message [[sw8_service_relation_client_side-20210714/D7qzncbeRq6qh2QF5MogTw][[sw8_service_relation_client_side-20210714][0]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[20210714_b3BlcmF0aW9uLXJ1bGUtc2VydmVyQDExNDgx.1-bWFya2V0LXJlZmVycmFsLXNlcnZlckAxMDI1MQ==.1]: version conflict, required seqNo [14012594], primary term [1]. current document has seqNo [14207928] and primary term [1]]]]]",
"endOfBatch": false,
"loggerFqcn": "org.apache.logging.slf4j.Log4jLogger",
"threadId": 44,
"threadPriority": 5,
"timestamp": "2021-07-14 15:28:42.647"
}
```
This would not affect the system much, just a possibility of inaccurate of metrics. If this wouldn't show up in high
frequency, you could ignore this directly.
In case you could see many logs like this. Then it is a signal, that the flush period of your ElasticSearch template can't
catch up your setting. Or you set the `persistentPeriod` less than the flush period.
......@@ -20,6 +20,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | gRPCSslCertChainPath| The file path of gRPC SSL cert chain| SW_CORE_GRPC_SSL_CERT_CHAIN_PATH| - |
| - | - | gRPCSslTrustedCAPath| The file path of gRPC trusted CA| SW_CORE_GRPC_SSL_TRUSTED_CA_PATH| - |
| - | - | downsampling| The activated level of down sampling aggregation | | Hour,Day|
| - | - | persistentPeriod| The execution period of the persistent timer. Unit is second. | | 25 |
| - | - | enableDataKeeperExecutor|Controller of TTL scheduler. Once disabled, TTL wouldn't work.|SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR|true|
| - | - | dataKeeperExecutePeriod|The execution period of TTL scheduler, unit is minute. Execution doesn't mean deleting data. The storage provider could override this, such as ElasticSearch storage.|SW_CORE_DATA_KEEPER_EXECUTE_PERIOD|5|
| - | - | recordDataTTL|The lifecycle of record data. Record data includes traces, top n sampled records, and logs. Unit is day. Minimal value is 2.|SW_CORE_RECORD_DATA_TTL|3|
......@@ -97,8 +98,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | superDatasetDayStep | Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 |
| - | - | superDatasetIndexShardsFactor | Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 |
| - | - | superDatasetIndexReplicasNumber | Represent the replicas number in the super size dataset record index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 |
| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 1000|
| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10|
| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 5000|
| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 15|
| - | - | concurrentRequests| The number of concurrent requests allowed to be executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 |
| - | - | resultWindowMaxSize | The max size of dataset when OAP loading cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000|
| - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_ES_QUERY_MAX_SIZE | 5000 |
......@@ -122,8 +123,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | superDatasetDayStep | Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 |
| - | - | superDatasetIndexShardsFactor | Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 |
| - | - | superDatasetIndexReplicasNumber | Represent the replicas number in the super size dataset record index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 |
| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 1000|
| - | - | syncBulkActions| Sync bulk size of the metrics data batch execution. | SW_STORAGE_ES_SYNC_BULK_ACTIONS| 50000|
| - | - | bulkActions| Async bulk size of data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 5000|
| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10|
| - | - | concurrentRequests| The number of concurrent requests allowed to be executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 |
| - | - | resultWindowMaxSize | The max size of dataset when OAP loading cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000|
......
......@@ -141,8 +141,8 @@ storage:
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0.
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
......@@ -169,8 +169,8 @@ storage:
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
......@@ -250,9 +250,8 @@ storage:
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
......
......@@ -64,7 +64,7 @@ public class CoreModuleConfig extends ModuleConfig {
* The period of doing data persistence. Unit is second.
*/
@Setter
private long persistentPeriod = 3;
private long persistentPeriod = 25;
private boolean enableDataKeeperExecutor = true;
......
......@@ -49,7 +49,7 @@ public class RecordPersistentWorker extends AbstractWorker<Record> {
public void in(Record record) {
try {
InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
batchDAO.asynchronous(insertRequest);
batchDAO.insert(insertRequest);
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
......
......@@ -23,25 +23,23 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
/**
* IBatchDAO provides two modes of data persistence supported by most databases, including synchronous and
* asynchronous.
* IBatchDAO provides two modes of data persistence supported by most databases, including pure insert and
* batch hybrid insert/update.
*/
public interface IBatchDAO extends DAO {
/**
* Push data into the database in async mode. This method is driven by streaming process. This method doesn't
* request the data queryable immediately after the method finished.
*
* All data are in the additional mode, no modification.
*
* @param insertRequest data to insert.
*/
void asynchronous(InsertRequest insertRequest);
void insert(InsertRequest insertRequest);
/**
* Make all given PrepareRequest efficient in the sync mode. All requests could be confirmed by the database. All
* changes are required queryable after method returns.
* Push data collection into the database in async mode. This method is driven by streaming process. This method
* doesn't request the data queryable immediately after the method finished.
*
* @param prepareRequests data to insert or update. No delete happens in streaming mode.
*/
void synchronous(List<PrepareRequest> prepareRequests);
void flush(List<PrepareRequest> prepareRequests);
}
......@@ -161,7 +161,7 @@ public enum PersistenceTimer {
HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
try {
if (CollectionUtils.isNotEmpty(partition)) {
batchDAO.synchronous(partition);
batchDAO.flush(partition);
}
} catch (Throwable e) {
log.error(e.getMessage(), e);
......
......@@ -57,12 +57,12 @@ public class PersistenceTimerTest {
moduleConfig.setPersistentPeriod(Integer.MAX_VALUE);
IBatchDAO iBatchDAO = new IBatchDAO() {
@Override
public void asynchronous(InsertRequest insertRequest) {
public void insert(InsertRequest insertRequest) {
}
@Override
public void synchronous(List<PrepareRequest> prepareRequests) {
public void flush(final List<PrepareRequest> prepareRequests) {
synchronized (result) {
result.addAll(prepareRequests);
}
......
......@@ -547,6 +547,10 @@ public class ElasticSearchClient implements Client, HealthCheckable {
return response.getStatusLine().getStatusCode();
}
/**
* @since 8.7.0 SkyWalking don't use sync bulk anymore. This method is just kept for unexpected case in the future.
*/
@Deprecated
public void synchronousBulk(BulkRequest request) {
request.timeout(TimeValue.timeValueMinutes(2));
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
......
......@@ -42,7 +42,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
*/
private int socketTimeout = 30000;
/**
* Since 6.4.0, the index of metrics and traces data in minute/hour/month precision are organized in days. ES
* @since 6.4.0, the index of metrics and traces data in minute/hour/month precision are organized in days. ES
* storage creates new indexes in every day.
*
* @since 7.0.0 dayStep represents how many days a single one index represents. Default is 1, meaning no difference
......@@ -64,8 +64,16 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
private int superDatasetIndexReplicasNumber = 0;
private int superDatasetIndexShardsFactor = 5;
private int indexRefreshInterval = 2;
private int bulkActions = 2000;
private int flushInterval = 10;
/**
* @since 8.7.0 This setting affects all traces/logs/metrics/metadata flush policy.
*/
private int bulkActions = 5000;
/**
* Period of flush, no matter `bulkActions` reached or not. Unit is second.
*
* @since 8.7.0 increase to 15s from 10s
*/
private int flushInterval = 15;
private int concurrentRequests = 2;
/**
* @since 7.0.0 This could be managed inside {@link #secretsManagementFile}
......
......@@ -19,22 +19,18 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchProcessEsDAO.class);
private BulkProcessor bulkProcessor;
private final int bulkActions;
private final int flushInterval;
......@@ -51,7 +47,7 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
}
@Override
public void asynchronous(InsertRequest insertRequest) {
public void insert(InsertRequest insertRequest) {
if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
}
......@@ -60,17 +56,19 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
}
@Override
public void synchronous(List<PrepareRequest> prepareRequests) {
public void flush(List<PrepareRequest> prepareRequests) {
if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
}
if (CollectionUtils.isNotEmpty(prepareRequests)) {
BulkRequest request = new BulkRequest();
for (PrepareRequest prepareRequest : prepareRequests) {
if (prepareRequest instanceof InsertRequest) {
request.add((IndexRequest) prepareRequest);
this.bulkProcessor.add((IndexRequest) prepareRequest);
} else {
request.add((UpdateRequest) prepareRequest);
this.bulkProcessor.add((UpdateRequest) prepareRequest);
}
}
getClient().synchronousBulk(request);
}
}
}
......@@ -162,9 +162,7 @@ public class StorageEsInstaller extends ModelInstaller {
setting.put("index.number_of_shards", model.isSuperDataset()
? config.getIndexShardsNumber() * config.getSuperDatasetIndexShardsFactor()
: config.getIndexShardsNumber());
setting.put("index.refresh_interval", model.isRecord()
? TimeValue.timeValueSeconds(10).toString()
: TimeValue.timeValueSeconds(config.getFlushInterval()).toString());
setting.put("index.refresh_interval", TimeValue.timeValueSeconds(config.getFlushInterval()).toString());
setting.put("analysis", getAnalyzerSetting(model.getColumns()));
if (!StringUtil.isEmpty(config.getAdvanced())) {
Map<String, Object> advancedSettings = gson.fromJson(config.getAdvanced(), Map.class);
......
......@@ -392,6 +392,10 @@ public class ElasticSearch7Client extends ElasticSearchClient {
return HttpStatus.SC_OK;
}
/**
* @since 8.7.0 SkyWalking don't use sync bulk anymore. This method is just kept for unexpected case in the future.
*/
@Deprecated
@Override
public void synchronousBulk(BulkRequest request) {
request.timeout(TimeValue.timeValueMinutes(2));
......
......@@ -36,12 +36,12 @@ public class BatchDAO implements IBatchDAO {
}
@Override
public void asynchronous(InsertRequest insertRequest) {
public void insert(InsertRequest insertRequest) {
client.write(((InfluxInsertRequest) insertRequest).getPoint());
}
@Override
public void synchronous(List<PrepareRequest> prepareRequests) {
public void flush(List<PrepareRequest> prepareRequests) {
if (CollectionUtils.isEmpty(prepareRequests)) {
return;
}
......
......@@ -56,7 +56,7 @@ public class H2BatchDAO implements IBatchDAO {
}
@Override
public void synchronous(List<PrepareRequest> prepareRequests) {
public void flush(List<PrepareRequest> prepareRequests) {
if (CollectionUtils.isEmpty(prepareRequests)) {
return;
}
......@@ -81,7 +81,7 @@ public class H2BatchDAO implements IBatchDAO {
}
@Override
public void asynchronous(InsertRequest insertRequest) {
public void insert(InsertRequest insertRequest) {
this.dataCarrier.produce(insertRequest);
}
......@@ -100,7 +100,7 @@ public class H2BatchDAO implements IBatchDAO {
@Override
public void consume(List<PrepareRequest> prepareRequests) {
h2BatchDAO.synchronous(prepareRequests);
h2BatchDAO.flush(prepareRequests);
}
@Override
......
......@@ -158,10 +158,8 @@ public class MetricsQuery extends AbstractQuery<MetricsQuery> {
"envoy_worker_threads_max"
};
public static String METER_INSTANCE_PERSISTENCE_EXECUTE_PERCENTILE = "meter_oap_instance_persistence_execute_percentile";
public static String[] ALL_SO11Y_LABELED_METRICS = {
METER_INSTANCE_PERSISTENCE_EXECUTE_PERCENTILE
// Nothing to check for now.
};
private String id;
private String metricsName;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册