未验证 提交 e5630899 编写于 作者: H Hper 提交者: GitHub

[Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush. (#10287)

上级 b991cfd1
......@@ -81,6 +81,7 @@
* Fix TCP service instances are lack of instance properties like `pod` and `namespace`, which causes Pod log not to work for TCP workloads.
* Add Python HBase happybase module component ID(94).
* Fix gRPC alarm cannot update settings from dynamic configuration source.
* Add `batchOfBytes` configuration to limit the size of bulk flush.
* Add Python Websocket module component ID(7018).
* [Optional] Optimize single trace query performance by customizing routing in ElasticSearch. SkyWalking trace segments and Zipkin spans are using trace ID for routing. This is OFF by default, controlled by `storage/elasticsearch/enableCustomRouting`.
* Enhance OAP HTTP server to support HTTPS
......
......@@ -106,6 +106,7 @@ The Configuration Vocabulary lists all available configurations provided by `app
| - | - | superDatasetIndexReplicasNumber | Represents the replicas number in the super size dataset record index. | SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER | 0 |
| - | - | indexTemplateOrder | The order of index template. | SW_STORAGE_ES_INDEX_TEMPLATE_ORDER | 0 |
| - | - | bulkActions | Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS | 5000 |
| - | - | batchOfBytes | A threshold to control the max body size of ElasticSearch Bulk flush. | SW_STORAGE_ES_BATCH_OF_BYTES | 10485760 (10m) |
| - | - | flushInterval | Period of flush (in seconds). Does not matter whether `bulkActions` is reached or not. | SW_STORAGE_ES_FLUSH_INTERVAL | 5 |
| - | - | concurrentRequests | The number of concurrent requests allowed to be executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS | 2 |
| - | - | resultWindowMaxSize | The maximum size of dataset when the OAP loads cache, such as network aliases. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000 |
......
......@@ -365,9 +365,11 @@ public class ElasticSearchClient implements Client, HealthCheckable {
public BulkProcessor createBulkProcessor(int bulkActions,
int flushInterval,
int concurrentRequests) {
int concurrentRequests,
int batchOfBytes) {
return BulkProcessor.builder()
.bulkActions(bulkActions)
.batchOfBytes(batchOfBytes)
.flushInterval(Duration.ofSeconds(flushInterval))
.concurrentRequests(concurrentRequests)
.build(es);
......
......@@ -27,6 +27,7 @@ import java.util.Optional;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
......@@ -205,7 +206,7 @@ public class ITElasticSearch {
@Test
public void bulk() {
BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 10, 2);
BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 10, 2, 5 * 1024 * 1024);
Map<String, String> source = new HashMap<>();
source.put("column1", "value1");
......@@ -221,6 +222,23 @@ public class ITElasticSearch {
bulkProcessor.flush();
}
@Test
public void bulkPer_1KB() {
BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 10, 2, 1024);
Map<String, String> source = new HashMap<>();
source.put("column1", RandomStringUtils.randomAlphanumeric(1024));
source.put("column2", "value2");
for (int i = 0; i < 100; i++) {
IndexRequestWrapper indexRequest = new IndexRequestWrapper(
"bulk_insert_test6", "type", String.valueOf(i), source);
bulkProcessor.add(indexRequest.getRequest());
}
bulkProcessor.flush();
}
@Test
public void timeSeriesOperate() {
final String indexName = "test_time_series_operate";
......
......@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
......@@ -36,7 +37,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.ElasticSearch;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
import org.apache.skywalking.library.elasticsearch.requests.factory.Codec;
import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
import static java.util.Objects.requireNonNull;
......@@ -50,23 +53,26 @@ public final class BulkProcessor {
private final Semaphore semaphore;
private final long flushInternalInMillis;
private volatile long lastFlushTS = 0;
private final int batchOfBytes;
public static BulkProcessorBuilder builder() {
return new BulkProcessorBuilder();
}
BulkProcessor(
final AtomicReference<ElasticSearch> es, final int bulkActions,
final Duration flushInterval, final int concurrentRequests) {
BulkProcessor(final AtomicReference<ElasticSearch> es,
final int bulkActions,
final Duration flushInterval,
final int concurrentRequests,
final int batchOfBytes) {
requireNonNull(flushInterval, "flushInterval");
this.es = requireNonNull(es, "es");
this.bulkActions = bulkActions;
this.batchOfBytes = batchOfBytes;
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
this.requests = new ArrayBlockingQueue<>(bulkActions + 1);
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
1, r -> {
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, r -> {
final Thread thread = new Thread(r);
thread.setName("ElasticSearch BulkProcessor");
return thread;
......@@ -129,52 +135,73 @@ public final class BulkProcessor {
final List<Holder> batch = new ArrayList<>(requests.size());
requests.drainTo(batch);
final CompletableFuture<Void> flush = doFlush(batch);
flush.whenComplete((ignored1, ignored2) -> semaphore.release());
flush.join();
final List<CompletableFuture<Void>> futures = doFlush(batch);
final CompletableFuture<Void> future = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()]));
future.whenComplete((v, t) -> semaphore.release());
future.join();
lastFlushTS = System.currentTimeMillis();
}
private CompletableFuture<Void> doFlush(final List<Holder> batch) {
private List<CompletableFuture<Void>> doFlush(final List<Holder> batch) {
log.debug("Executing bulk with {} requests", batch.size());
if (batch.isEmpty()) {
return CompletableFuture.completedFuture(null);
return Collections.emptyList();
}
final CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
try {
final RequestFactory rf = v.requestFactory();
final List<byte[]> bs = new ArrayList<>();
for (final Holder holder : batch) {
bs.add(v.codec().encode(holder.request));
bs.add("\n".getBytes());
try {
int bufferOfBytes = 0;
Codec codec = es.get().version().get().codec();
final List<byte[]> bs = new ArrayList<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
List<ByteBuf> byteBufList = new ArrayList<>();
for (final Holder holder : batch) {
byte[] bytes = codec.encode(holder.request);
bs.add(bytes);
bs.add("\n".getBytes());
bufferOfBytes += bytes.length + 1;
if (bufferOfBytes >= batchOfBytes) {
final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][]));
byteBufList.add(content);
bs.clear();
bufferOfBytes = 0;
}
}
if (CollectionUtils.isNotEmpty(bs)) {
final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][]));
return es.get().client().execute(rf.bulk().bulk(content))
.aggregate().thenAccept(response -> {
final HttpStatus status = response.status();
if (status != HttpStatus.OK) {
throw new RuntimeException(response.contentUtf8());
}
});
} catch (Exception e) {
return Exceptions.throwUnsafely(e);
byteBufList.add(content);
}
});
future.whenComplete((ignored, exception) -> {
if (exception != null) {
batch.stream().map(it -> it.future)
.forEach(it -> it.completeExceptionally(exception));
log.error("Failed to execute requests in bulk", exception);
} else {
log.debug("Succeeded to execute {} requests in bulk", batch.size());
batch.stream().map(it -> it.future).forEach(it -> it.complete(null));
for (final ByteBuf content : byteBufList) {
CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
try {
final RequestFactory rf = v.requestFactory();
return es.get().client().execute(rf.bulk().bulk(content)).aggregate().thenAccept(response -> {
final HttpStatus status = response.status();
if (status != HttpStatus.OK) {
throw new RuntimeException(response.contentUtf8());
}
});
} catch (Exception e) {
return Exceptions.throwUnsafely(e);
}
});
future.whenComplete((ignored, exception) -> {
if (exception != null) {
batch.stream().map(it -> it.future)
.forEach(it -> it.completeExceptionally((Throwable) exception));
log.error("Failed to execute requests in bulk", exception);
} else {
log.debug("Succeeded to execute {} requests in bulk", batch.size());
batch.stream().map(it -> it.future).forEach(it -> it.complete(null));
}
});
futures.add(future);
}
});
return future;
return futures;
} catch (Exception e) {
log.error("Failed to execute requests in bulk", e);
return Collections.emptyList();
}
}
@RequiredArgsConstructor
......@@ -182,4 +209,5 @@ public final class BulkProcessor {
private final CompletableFuture<Void> future;
private final Object request;
}
}
......@@ -32,6 +32,7 @@ public final class BulkProcessorBuilder {
private int bulkActions = -1;
private Duration flushInterval;
private int concurrentRequests = 2;
private int batchOfBytes;
public BulkProcessorBuilder bulkActions(int bulkActions) {
checkArgument(bulkActions > 0, "bulkActions must be positive");
......@@ -39,6 +40,12 @@ public final class BulkProcessorBuilder {
return this;
}
public BulkProcessorBuilder batchOfBytes(int batchOfBytes) {
checkArgument(batchOfBytes > 0, "batchOfBytes must be positive");
this.batchOfBytes = batchOfBytes;
return this;
}
public BulkProcessorBuilder flushInterval(Duration flushInterval) {
this.flushInterval = requireNonNull(flushInterval, "flushInterval");
return this;
......@@ -52,6 +59,6 @@ public final class BulkProcessorBuilder {
public BulkProcessor build(AtomicReference<ElasticSearch> es) {
return new BulkProcessor(
es, bulkActions, flushInterval, concurrentRequests);
es, bulkActions, flushInterval, concurrentRequests, batchOfBytes);
}
}
......@@ -53,6 +53,7 @@ storage:
indexReplicasNumber: ${ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
batchOfBytes: ${SW_STORAGE_ES_BATCH_OF_BYTES:10485760} # A threshold to control the max body size of ElasticSearch Bulk flush.
syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
bulkSize: ${ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
......
......@@ -156,6 +156,7 @@ storage:
superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0.
indexTemplateOrder: ${SW_STORAGE_ES_INDEX_TEMPLATE_ORDER:0} # the order of index template
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
batchOfBytes: ${SW_STORAGE_ES_BATCH_OF_BYTES:10485760} # A threshold to control the max body size of ElasticSearch Bulk flush.
# flush the bulk every 5 seconds whatever the number of requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:5}
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
......
......@@ -88,6 +88,8 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
* @since 8.7.0 This setting affects all traces/logs/metrics/metadata flush policy.
*/
private int bulkActions = 5000;
private int batchOfBytes = 1024 * 1024 * 10;
/**
* Period of flush, no matter `bulkActions` reached or not.
* Unit is second.
......
......@@ -185,7 +185,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(
IBatchDAO.class,
new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
.getFlushInterval(), config.getConcurrentRequests())
.getFlushInterval(), config.getConcurrentRequests(), config.getBatchOfBytes())
);
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(
......
......@@ -36,15 +36,18 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
private final int bulkActions;
private final int flushInterval;
private final int concurrentRequests;
private final int batchOfBytes;
public BatchProcessEsDAO(ElasticSearchClient client,
int bulkActions,
int flushInterval,
int concurrentRequests) {
int concurrentRequests,
int batchOfBytes) {
super(client);
this.bulkActions = bulkActions;
this.flushInterval = flushInterval;
this.concurrentRequests = concurrentRequests;
this.batchOfBytes = batchOfBytes;
}
@Override
......@@ -53,7 +56,7 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
synchronized (this) {
if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(
bulkActions, flushInterval, concurrentRequests);
bulkActions, flushInterval, concurrentRequests, batchOfBytes);
}
}
}
......@@ -67,7 +70,7 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
synchronized (this) {
if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(
bulkActions, flushInterval, concurrentRequests);
bulkActions, flushInterval, concurrentRequests, batchOfBytes);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册