未验证 提交 287b8b9b 编写于 作者: A Ax1an 提交者: GitHub

Add syncBulkActions config to avoid the large amount of metrics data written...

Add syncBulkActions config to avoid the large amount of metrics data written ES in a single sync bulk request. (#5699)
上级 3fba656a
......@@ -91,6 +91,7 @@ Release Notes.
* Fix end time bug in the query process.
* Fix `Exporter INCREMENT mode` is not working.
* Fix an error when executing startup.bat when the log directory exists
* Add syncBulkActions configuration to set up the batch size of the metrics persistent.
#### UI
* Add browser dashboard.
......
......@@ -87,6 +87,7 @@
* Fix end time bug in the query process.
* Fix `Exporter INCREMENT mode` is not working.
* Fix an error when executing startup.bat when the log directory exists
* Add syncBulkActions configuration to set up the batch size of the metrics persistent.
#### UI
* Add browser dashboard.
......
......@@ -63,7 +63,8 @@ storage:
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0.
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
......
......@@ -85,7 +85,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | superDatasetDayStep | Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 |
| - | - | superDatasetIndexShardsFactor | Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 |
| - | - | superDatasetIndexReplicasNumber | Represent the replicas number in the super size dataset record index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 |
| - | - | bulkActions| Bulk size of the batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 1000|
| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 1000|
| - | - | syncBulkActions| Sync bulk size of the metrics data batch execution. | SW_STORAGE_ES_SYNC_BULK_ACTIONS| 50000|
| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10|
| - | - | concurrentRequests| The number of concurrent requests allowed to be executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 |
| - | - | resultWindowMaxSize | The max size of dataset when OAP loading cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000|
......@@ -108,7 +109,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | superDatasetDayStep | Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 |
| - | - | superDatasetIndexShardsFactor | Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 |
| - | - | superDatasetIndexReplicasNumber | Represent the replicas number in the super size dataset record index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 |
| - | - | bulkActions| Bulk size of the batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 1000|
| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 1000|
| - | - | syncBulkActions| Sync bulk size of the metrics data batch execution. | SW_STORAGE_ES_SYNC_BULK_ACTIONS| 50000|
| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10|
| - | - | concurrentRequests| The number of concurrent requests allowed to be executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 |
| - | - | resultWindowMaxSize | The max size of dataset when OAP loading cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000|
......
......@@ -118,7 +118,8 @@ storage:
superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0.
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
......@@ -142,7 +143,8 @@ storage:
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
......
......@@ -52,7 +52,8 @@ storage:
indexShardsNumber: ${ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
bulkSize: ${ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
......
......@@ -52,6 +52,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
private int superDatasetIndexShardsFactor = 5;
private int indexRefreshInterval = 2;
private int bulkActions = 2000;
private int syncBulkActions = 50000;
private int flushInterval = 10;
private int concurrentRequests = 2;
/**
......
......@@ -158,7 +158,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
);
this.registerServiceImplementation(
IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getSyncBulkActions(), config
.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(
......
......@@ -19,6 +19,8 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
......@@ -37,12 +39,14 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
private BulkProcessor bulkProcessor;
private final int bulkActions;
private final int syncBulkActions;
private final int flushInterval;
private final int concurrentRequests;
public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int flushInterval, int concurrentRequests) {
public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int syncBulkActions, int flushInterval, int concurrentRequests) {
super(client);
this.bulkActions = bulkActions;
this.syncBulkActions = syncBulkActions;
this.flushInterval = flushInterval;
this.concurrentRequests = concurrentRequests;
}
......@@ -59,16 +63,20 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
@Override
public void synchronous(List<PrepareRequest> prepareRequests) {
if (CollectionUtils.isNotEmpty(prepareRequests)) {
BulkRequest request = new BulkRequest();
List<List<PrepareRequest>> partitions = Lists.partition(prepareRequests, syncBulkActions);
for (List<PrepareRequest> partition : partitions) {
BulkRequest request = new BulkRequest();
for (PrepareRequest prepareRequest : prepareRequests) {
if (prepareRequest instanceof InsertRequest) {
request.add((IndexRequest) prepareRequest);
} else {
request.add((UpdateRequest) prepareRequest);
for (PrepareRequest prepareRequest : partition) {
if (prepareRequest instanceof InsertRequest) {
request.add((IndexRequest) prepareRequest);
} else {
request.add((UpdateRequest) prepareRequest);
}
}
getClient().synchronousBulk(request);
}
getClient().synchronousBulk(request);
}
}
}
......@@ -156,7 +156,7 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
);
this.registerServiceImplementation(
IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(),
IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getSyncBulkActions(),
config.getFlushInterval(), config.getConcurrentRequests()
));
this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
......
......@@ -29,7 +29,8 @@ storage:
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
......@@ -49,7 +50,8 @@ storage:
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册