This page explains the use of Flink’s API for asynchronous I/O with external data stores. For users not familiar with asynchronous or event-driven programming, an article about Futures and event-driven programming may be useful preparation.
Note: Details about the design and implementation of the asynchronous I/O utility can be found in the proposal and design document [FLIP-12: Asynchronous I/O Design and Implementation](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673).
When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that communication delay with the external system does not dominate the streaming application’s total work.
Naively accessing data in the external database, for example in a `MapFunction`, typically means **synchronous** interaction: A request is sent to the database and the `MapFunction` waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time.
Asynchronous interaction with the database means that a single parallel function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlayed with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cased to much higher streaming throughput.
_Note:_ Improving throughput by just scaling the `MapFunction` to a very high parallelism is in some cases possible as well, but usually comes at a very high resource cost: Having many more parallel MapFunction instances means more tasks, threads, Flink-internal network connections, network connections to the database, buffers, and general internal bookkeeping overhead.
As illustrated in the section above, implementing proper asynchronous I/O to a database (or key/value store) requires a client to that database that supports asynchronous requests. Many popular databases offer such a client.
In the absence of such a client, one can try and turn a synchronous client into a limited concurrent client by creating multiple clients and handling the synchronous calls with a thread pool. However, this approach is usually less efficient than a proper asynchronous client.
Flink’s Async I/O API allows users to use asynchronous request clients with data streams. The API handles the integration with data streams, well as handling order, event time, fault tolerance, etc.
Flink的异步I / O API允许用户将异步请求客户端与数据流一起使用。API处理与数据流的集成,以及处理顺序,事件时间,容错等。
Assuming one has an asynchronous client for the target database, three parts are needed to implement a stream transformation with asynchronous I/O against the database:
假设其中一个具有目标数据库的异步客户端,则需要三个部分来对数据库执行具有异步I / O的流转换:
* An implementation of `AsyncFunction` that dispatches the requests
* A _callback_ that takes the result of the operation and hands it to the `ResultFuture`
* Applying the async I/O operation on a DataStream as a transformation
* An implementation of `AsyncFunction` that dispatches the requests 的实现AsyncFunction调度请求
* A _callback_ that takes the result of the operation and hands it to the `ResultFuture` 一个回调,是以操作并把它的结果ResultFuture
* Applying the async I/O operation on a DataStream as a transformation 在数据流上应用异步I / O操作作为转换
The following code example illustrates the basic pattern:
The following two parameters control the asynchronous operations:
以下两个参数控制异步操作:
***Timeout**: The timeout defines how long an asynchronous request may take before it is considered failed. This parameter guards against dead/failed requests.
* 超时:超时定义异步请求在被视为失败之前可能需要花费多长时间。此参数防止无效/失败的请求
***Capacity**: This parameter defines how many asynchronous requests may be in progress at the same time. Even though the async I/O approach leads typically to much better throughput, the operator can still be the bottleneck in the streaming application. Limiting the number of concurrent requests ensures that the operator will not accumulate an ever-growing backlog of pending requests, but that it will trigger backpressure once the capacity is exhausted.
When an async I/O request times out, by default an exception is thrown and job is restarted. If you want to handle timeouts, you can override the `AsyncFunction#timeout` method.
The concurrent requests issued by the `AsyncFunction` frequently complete in some undefined order, based on which request finished first. To control in which order the resulting records are emitted, Flink offers two modes:
***Unordered**: Result records are emitted as soon as the asynchronous request finishes. The order of the records in the stream is different after the async I/O operator than before. This mode has the lowest latency and lowest overhead, when used with _processing time_ as the basic time characteristic. Use `AsyncDataStream.unorderedWait(...)` for this mode.
***Ordered**: In that case, the stream order is preserved. Result records are emitted in the same order as the asynchronous requests are triggered (the order of the operators input records). To achieve that, the operator buffers a result record until all its preceding records are emitted (or timed out). This usually introduces some amount of extra latency and some overhead in checkpointing, because records or results are maintained in the checkpointed state for a longer time, compared to the unordered mode. Use `AsyncDataStream.orderedWait(...)` for this mode.
When the streaming application works with [event time](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html), watermarks will be handled correctly by the asynchronous I/O operator. That means concretely the following for the two order modes:
***Unordered**: Watermarks do not overtake records and vice versa, meaning watermarks establish an _order boundary_. Records are emitted unordered only between watermarks. A record occurring after a certain watermark will be emitted only after that watermark was emitted. The watermark in turn will be emitted only after all result records from inputs before that watermark were emitted.
That means that in the presence of watermarks, the _unordered_ mode introduces some of the same latency and management overhead as the _ordered_ mode does. The amount of that overhead depends on the watermark frequency.
***Ordered**: Order of watermarks an records is preserved, just like order between records is preserved. There is no significant change in overhead, compared to working with _processing time_.
* 有序:保留记录的水印顺序,就像保留记录之间的顺序一样。与处理时间相比,开销没有明显变化。
Please recall that _Ingestion Time_ is a special case of _event time_ with automatically generated watermarks that are based on the sources processing time.
请回想一下,“ 提取时间”是事件时间的特例,它具有基于源处理时间自动生成的水印。
### Fault Tolerance Guarantees
### Fault Tolerance Guarantees 容错保证
The asynchronous I/O operator offers full exactly-once fault tolerance guarantees. It stores the records for in-flight asynchronous requests in checkpoints and restores/re-triggers the requests when recovering from a failure.
For implementations with _Futures_ that have an _Executor_ (or _ExecutionContext_ in Scala) for callbacks, we suggests to use a `DirectExecutor`, because the callback typically does minimal work, and a `DirectExecutor` avoids an additional thread-to-thread handover overhead. The callback typically only hands the result to the `ResultFuture`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction with the checkpoint bookkeeping happens in a dedicated thread-pool anyways.
A `DirectExecutor` can be obtained via `org.apache.flink.runtime.concurrent.Executors.directExecutor()` or `com.google.common.util.concurrent.MoreExecutors.directExecutor()`.
### Caveat
### Caveat 警告
**The AsyncFunction is not called Multi-Threaded**
**The AsyncFunction is not called Multi-Threaded AsyncFunction不称为多线程**
A common confusion that we want to explicitly point out here is that the `AsyncFunction` is not called in a multi-threaded fashion. There exists only one instance of the `AsyncFunction` and it is called sequentially for each record in the respective partition of the stream. Unless the `asyncInvoke(...)` method returns fast and relies on a callback (by the client), it will not result in proper asynchronous I/O.