This connector provides sinks that can request document actions to an [Elasticsearch](https://elastic.co/) Index. To use this connector, add one of the following dependencies to your project, depending on the version of the Elasticsearch installation:
该连接器提供了可向Elasticsearch Index 请求文档操作的接收器。要使用此连接器,请根据您的Elasticsearch安装版本将以下依赖项之一添加到您的项目中:
| Maven Dependency | Supported since | Elasticsearch version |
| --- | --- | --- |
...
...
@@ -12,17 +13,20 @@ This connector provides sinks that can request document actions to an [Elasticse
| flink-connector-elasticsearch6_2.11 | 1.6.0 | 6 and later versions |
Note that the streaming connectors are currently not part of the binary distribution. See [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html) for information about how to package the program with the libraries for cluster execution.
Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when creating an `ElasticsearchSink` for requesting document actions against your cluster.
The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an Elasticsearch cluster.
The example below shows how to configure and create a sink:
下面的示例显示如何配置和创建接收器:
```
...
...
@@ -331,21 +335,27 @@ val esSinkBuilder = new ElasticsearchSink.Builer[String](
For Elasticsearch versions that still uses the now deprecated `TransportClient` to communicate with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a `Map` of `String`s is used to configure the `ElasticsearchSink`. This config map will be directly forwarded when creating the internally used `TransportClient`. The configuration keys are documented in the Elasticsearch documentation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html). Especially important is the `cluster.name` parameter that must correspond to the name of your cluster.
For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used for cluster communication. By default, the connector uses the default configurations for the REST client. To have custom configuration for the REST client, users can provide a `RestClientFactory` implementation when setting up the `ElasticsearchClient.Builder` that builds the sink.
Also note that the example only demonstrates performing a single index request for each incoming element. Generally, the `ElasticsearchSinkFunction` can be used to perform multiple requests of different types (ex., `DeleteRequest`, `UpdateRequest`, etc.).
Internally, each parallel instance of the Flink Elasticsearch Sink uses a `BulkProcessor` to send action requests to the cluster. This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor` executes bulk requests one at a time, i.e. there will be no two concurrent flushes of the buffered actions in progress.
### Elasticsearch Sinks and Fault Tolerance 接收器和容错
With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees at-least-once delivery of action requests to Elasticsearch clusters. It does so by waiting for all pending action requests in the `BulkProcessor` at the time of checkpoints. This effectively assures that all requests before the checkpoint was triggered have been successfully acknowledged by Elasticsearch, before proceeding to process more records sent to the sink.
More details on checkpoints and fault tolerance are in the [fault tolerance docs](//ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html).
有关检查点和容错的更多详细信息,请参见容错文档。
To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
要使用容错的Elasticsearch Sink,需要在执行环境中启用拓扑检查点:
```
...
...
@@ -365,13 +375,15 @@ env.enableCheckpointing(5000) // checkpoint every 5000 msecs
**NOTE**: Users can disable flushing if they wish to do so, by calling **disableFlushOnCheckpoint()** on the created **ElasticsearchSink**. Be aware that this essentially means the sink will not provide any strong delivery guarantees anymore, even with checkpoint for the topology enabled.
### Communication using Embedded Node (only for Elasticsearch 1.x)
### Communication using Embedded Node (only for Elasticsearch 1.x) 使用嵌入式节点进行通信
For Elasticsearch versions 1.x, communication using an embedded node is also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) for information about the differences between communicating with Elasticsearch with an embedded node and a `TransportClient`.
Elasticsearch action requests may fail due to a variety of reasons, including temporarily saturated node queue capacity or malformed documents to be indexed. The Flink Elasticsearch Sink allows the user to specify how request failures are handled, by simply implementing an `ActionRequestFailureHandler` and providing it to the constructor.
The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests with malformed documents, without failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` is not provided to the constructor, the sink will fail for any kind of error.
Note that `onFailure` is called for failures that still occur only after the `BulkProcessor` internally finishes all backoff retry attempts. By default, the `BulkProcessor` retries to a maximum of 8 attempts with an exponential backoff. For more information on the behaviour of the internal `BulkProcessor` and how to configure it, please see the following section.
By default, if a failure handler is not provided, the sink uses a `NoOpFailureHandler` that simply fails for all kinds of exceptions. The connector also provides a `RetryRejectedExecutionFailureHandler` implementation that always re-add requests that have failed due to queue capacity saturation.
**IMPORTANT**: Re-adding requests back to the internal **BulkProcessor** on failures will lead to longer checkpoints, as the sink will also need to wait for the re-added requests to be flushed when checkpointing. For example, when using **RetryRejectedExecutionFailureHandler**, checkpoints will need to wait until Elasticsearch node queues have enough capacity for all the pending requests. This also means that if re-added requests never succeed, the checkpoint will never finish.
**Failure handling for Elasticsearch 1.x**: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type could not be retrieved through the older version Java client APIs (thus, the types will be general **Exception**s and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
### Configuring the Internal Bulk Processor 配置内部批量处理器
The internal `BulkProcessor` can be further configured for its behaviour on how buffered action requests are flushed, by setting the following values in the provided `Map<String, String>`:
***bulk.flush.max.actions**: Maximum amount of actions to buffer before flushing.
***bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer before flushing.
***bulk.flush.interval.ms**: Interval at which to flush regardless of the amount or size of buffered actions.
***bulk.flush.max.actions**: Maximum amount of actions to buffer before flushing. 刷新前要缓冲的最大操作数。
***bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer before flushing. 刷新前要缓冲的最大数据大小(以兆字节为单位)。
***bulk.flush.interval.ms**: Interval at which to flush regardless of the amount or size of buffered actions. 刷新间隔,无论缓冲操作的数量或大小如何。
For versions 2.x and above, configuring how temporary request errors are retried is also supported:
对于2.x和更高版本,还支持配置重试临时请求错误的方式:
***bulk.flush.backoff.enable**: Whether or not to perform retries with backoff delay for a flush if one or more of its actions failed due to a temporary `EsRejectedExecutionException`.
***bulk.flush.backoff.type**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`
* 退避延迟的类型,可以是CONSTANT或EXPONENTIAL
***bulk.flush.backoff.delay**: The amount of delay for backoff. For constant backoff, this is simply the delay between each retry. For exponential backoff, this is the initial base delay.
* 延迟的延迟量。对于恒定的退避,这只是每次重试之间的延迟。对于指数补偿,这是初始基准延迟。
***bulk.flush.backoff.retries**: The amount of backoff retries to attempt.
* 尝试尝试的退避重试次数。
More information about Elasticsearch can be found [here](https://elastic.co).
可以在此处找到有关Elasticsearch的更多信息。
## Packaging the Elasticsearch Connector into an Uber-Jar
For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies (see [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html) for further information).