# Elasticsearch Connector This connector provides sinks that can request document actions to an [Elasticsearch](https://elastic.co/) Index. To use this connector, add one of the following dependencies to your project, depending on the version of the Elasticsearch installation: | Maven Dependency | Supported since | Elasticsearch version | | --- | --- | --- | | flink-connector-elasticsearch_2.11 | 1.0.0 | 1.x | | flink-connector-elasticsearch2_2.11 | 1.0.0 | 2.x | | flink-connector-elasticsearch5_2.11 | 1.3.0 | 5.x | | flink-connector-elasticsearch6_2.11 | 1.6.0 | 6 and later versions | Note that the streaming connectors are currently not part of the binary distribution. See [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html) for information about how to package the program with the libraries for cluster execution. ## Installing Elasticsearch Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when creating an `ElasticsearchSink` for requesting document actions against your cluster. ## Elasticsearch Sink The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an Elasticsearch cluster. The example below shows how to configure and create a sink:
``` import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; DataStream input = ...; Map config = new HashMap<>(); config.put("cluster.name", "my-cluster-name"); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); List transportAddresses = new ArrayList(); transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300)); transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300)); input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction() { public IndexRequest createIndexRequest(String element) { Map json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } })); ```
``` import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; DataStream input = ...; Map config = new HashMap<>(); config.put("cluster.name", "my-cluster-name"); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); List transportAddresses = new ArrayList<>(); transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)); input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction() { public IndexRequest createIndexRequest(String element) { Map json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } })); ```
``` import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; DataStream input = ...; List httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); httpHosts.add(new HttpHost("10.2.3.1", 9200, "http")); // use a ElasticsearchSink.Builder to create an ElasticsearchSink ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction() { public IndexRequest createIndexRequest(String element) { Map json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } } ); // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1); // provide a RestClientFactory for custom configuration on the internally created REST client esSinkBuilder.setRestClientFactory( restClientBuilder -> { restClientBuilder.setDefaultHeaders(...) restClientBuilder.setMaxRetryTimeoutMillis(...) restClientBuilder.setPathPrefix(...) restClientBuilder.setHttpClientConfigCallback(...) } ); // finally, build and add the sink to the job's pipeline input.addSink(esSinkBuilder.build()); ```
``` import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import org.elasticsearch.common.transport.InetSocketTransportAddress import org.elasticsearch.common.transport.TransportAddress import java.net.InetAddress import java.util.ArrayList import java.util.HashMap import java.util.List import java.util.Map val input: DataStream[String] = ... val config = new java.util.HashMap[String, String] config.put("cluster.name", "my-cluster-name") // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1") val transportAddresses = new java.util.ArrayList[TransportAddress] transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300)) transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300)) input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(element: String): IndexRequest = { val json = new java.util.HashMap[String, String] json.put("data", element) return Requests.indexRequest() .index("my-index") .type("my-type") .source(json) } })) ```
``` import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import java.net.InetAddress import java.net.InetSocketAddress import java.util.ArrayList import java.util.HashMap import java.util.List import java.util.Map val input: DataStream[String] = ... val config = new java.util.HashMap[String, String] config.put("cluster.name", "my-cluster-name") // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1") val transportAddresses = new java.util.ArrayList[InetSocketAddress] transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)) transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300)) input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(element: String): IndexRequest = { val json = new java.util.HashMap[String, String] json.put("data", element) return Requests.indexRequest() .index("my-index") .type("my-type") .source(json) } })) ```
``` import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import java.util.ArrayList import java.util.List val input: DataStream[String] = ... val httpHosts = new java.util.ArrayList[HttpHost] httpHosts.add(new HttpHost("127.0.0.1", 9300, "http")) httpHosts.add(new HttpHost("10.2.3.1", 9300, "http")) val esSinkBuilder = new ElasticsearchSink.Builer[String]( httpHosts, new ElasticsearchSinkFunction[String] { def createIndexRequest(element: String): IndexRequest = { val json = new java.util.HashMap[String, String] json.put("data", element) return Requests.indexRequest() .index("my-index") .type("my-type") .source(json) } } ) // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1) // provide a RestClientFactory for custom configuration on the internally created REST client esSinkBuilder.setRestClientFactory( restClientBuilder -> { restClientBuilder.setDefaultHeaders(...) restClientBuilder.setMaxRetryTimeoutMillis(...) restClientBuilder.setPathPrefix(...) restClientBuilder.setHttpClientConfigCallback(...) } ) // finally, build and add the sink to the job's pipeline input.addSink(esSinkBuilder.build) ```
For Elasticsearch versions that still uses the now deprecated `TransportClient` to communicate with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a `Map` of `String`s is used to configure the `ElasticsearchSink`. This config map will be directly forwarded when creating the internally used `TransportClient`. The configuration keys are documented in the Elasticsearch documentation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html). Especially important is the `cluster.name` parameter that must correspond to the name of your cluster. For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used for cluster communication. By default, the connector uses the default configurations for the REST client. To have custom configuration for the REST client, users can provide a `RestClientFactory` implementation when setting up the `ElasticsearchClient.Builder` that builds the sink. Also note that the example only demonstrates performing a single index request for each incoming element. Generally, the `ElasticsearchSinkFunction` can be used to perform multiple requests of different types (ex., `DeleteRequest`, `UpdateRequest`, etc.). Internally, each parallel instance of the Flink Elasticsearch Sink uses a `BulkProcessor` to send action requests to the cluster. This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor` executes bulk requests one at a time, i.e. there will be no two concurrent flushes of the buffered actions in progress. ### Elasticsearch Sinks and Fault Tolerance With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees at-least-once delivery of action requests to Elasticsearch clusters. It does so by waiting for all pending action requests in the `BulkProcessor` at the time of checkpoints. This effectively assures that all requests before the checkpoint was triggered have been successfully acknowledged by Elasticsearch, before proceeding to process more records sent to the sink. More details on checkpoints and fault tolerance are in the [fault tolerance docs](//ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html). To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
``` final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // checkpoint every 5000 msecs ```
``` val env = StreamExecutionEnvironment.getExecutionEnvironment() env.enableCheckpointing(5000) // checkpoint every 5000 msecs ```
**NOTE**: Users can disable flushing if they wish to do so, by calling **disableFlushOnCheckpoint()** on the created **ElasticsearchSink**. Be aware that this essentially means the sink will not provide any strong delivery guarantees anymore, even with checkpoint for the topology enabled. ### Communication using Embedded Node (only for Elasticsearch 1.x) For Elasticsearch versions 1.x, communication using an embedded node is also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) for information about the differences between communicating with Elasticsearch with an embedded node and a `TransportClient`. Below is an example of how to create an `ElasticsearchSink` use an embedded node instead of a `TransportClient`:
``` DataStream input = ...; Map config = new HashMap<>; // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "my-cluster-name"); input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction() { public IndexRequest createIndexRequest(String element) { Map json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } })); ```
``` val input: DataStream[String] = ... val config = new java.util.HashMap[String, String] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "my-cluster-name") input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] { def createIndexRequest(element: String): IndexRequest = { val json = new java.util.HashMap[String, String] json.put("data", element) return Requests.indexRequest() .index("my-index") .type("my-type") .source(json) } })) ```
The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes. ### Handling Failing Elasticsearch Requests Elasticsearch action requests may fail due to a variety of reasons, including temporarily saturated node queue capacity or malformed documents to be indexed. The Flink Elasticsearch Sink allows the user to specify how request failures are handled, by simply implementing an `ActionRequestFailureHandler` and providing it to the constructor. Below is an example:
``` DataStream input = ...; input.addSink(new ElasticsearchSink<>( config, transportAddresses, new ElasticsearchSinkFunction() {...}, new ActionRequestFailureHandler() { @Override void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throw Throwable { if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { // full queue; re-add document for indexing indexer.add(action); } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) { // malformed document; simply drop request without failing sink } else { // for all other failures, fail the sink // here the failure is simply rethrown, but users can also choose to throw custom exceptions throw failure; } } })); ```
``` val input: DataStream[String] = ... input.addSink(new ElasticsearchSink( config, transportAddresses, new ElasticsearchSinkFunction[String] {...}, new ActionRequestFailureHandler { @throws(classOf[Throwable]) override def onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) { if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { // full queue; re-add document for indexing indexer.add(action) } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) { // malformed document; simply drop request without failing sink } else { // for all other failures, fail the sink // here the failure is simply rethrown, but users can also choose to throw custom exceptions throw failure } } })) ```
The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests with malformed documents, without failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` is not provided to the constructor, the sink will fail for any kind of error. Note that `onFailure` is called for failures that still occur only after the `BulkProcessor` internally finishes all backoff retry attempts. By default, the `BulkProcessor` retries to a maximum of 8 attempts with an exponential backoff. For more information on the behaviour of the internal `BulkProcessor` and how to configure it, please see the following section. By default, if a failure handler is not provided, the sink uses a `NoOpFailureHandler` that simply fails for all kinds of exceptions. The connector also provides a `RetryRejectedExecutionFailureHandler` implementation that always re-add requests that have failed due to queue capacity saturation. **IMPORTANT**: Re-adding requests back to the internal **BulkProcessor** on failures will lead to longer checkpoints, as the sink will also need to wait for the re-added requests to be flushed when checkpointing. For example, when using **RetryRejectedExecutionFailureHandler**, checkpoints will need to wait until Elasticsearch node queues have enough capacity for all the pending requests. This also means that if re-added requests never succeed, the checkpoint will never finish. **Failure handling for Elasticsearch 1.x**: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type could not be retrieved through the older version Java client APIs (thus, the types will be general **Exception**s and only differ in the failure message). In this case, it is recommended to match on the provided REST status code. ### Configuring the Internal Bulk Processor The internal `BulkProcessor` can be further configured for its behaviour on how buffered action requests are flushed, by setting the following values in the provided `Map<String, String>`: * **bulk.flush.max.actions**: Maximum amount of actions to buffer before flushing. * **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer before flushing. * **bulk.flush.interval.ms**: Interval at which to flush regardless of the amount or size of buffered actions. For versions 2.x and above, configuring how temporary request errors are retried is also supported: * **bulk.flush.backoff.enable**: Whether or not to perform retries with backoff delay for a flush if one or more of its actions failed due to a temporary `EsRejectedExecutionException`. * **bulk.flush.backoff.type**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL` * **bulk.flush.backoff.delay**: The amount of delay for backoff. For constant backoff, this is simply the delay between each retry. For exponential backoff, this is the initial base delay. * **bulk.flush.backoff.retries**: The amount of backoff retries to attempt. More information about Elasticsearch can be found [here](https://elastic.co). ## Packaging the Elasticsearch Connector into an Uber-Jar For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies (see [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html) for further information). Alternatively, you can put the connector’s jar file into Flink’s `lib/` folder to make it available system-wide, i.e. for all job being run.