[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks.
A third type of supported _operator state_ is the _Broadcast State_. Broadcast state was introduced to support use cases where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest of operator states in that:
To show the provided APIs, we will start with an example before presenting their full functionality. As our running example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs of objects of the same color that follow a certain pattern, _e.g._ a rectangle followed by a triangle. We assume that the set of interesting patterns evolves over time.
In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other stream will contain the `Rules`.
Starting from the stream of `Items`, we just need to _key it_ by `Color`, as we want pairs of the same color. This will make sure that elements of the same color end up on the same physical machine.
从 `Items`的流开始,我们只需要 _key it_ by `Color`,因为我们想要相同颜色的对。这将确保相同颜色的元素在同一台物理机器上结束。
Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules will be stored.
Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to:
最后,为了评估`Item`流中对输入元素的`Rules` ,我们需要:
1. connect the two streams, and
2. specify our match detecting logic.
Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. The exact type of the function depends on the type of the non-broadcasted stream:
### BroadcastProcessFunction and KeyedBroadcastProcessFunction
### BroadcastProcessFunction and KeyedBroadcastProcessFunction 广播过程功能与关键--广播过程功能
As in the case of a `CoProcessFunction`, these functions have two process methods to implement; the `processBroadcastElement()` which is responsible for processing incoming elements in the broadcasted stream and the `processElement()` which is used for the non-broadcasted one. The full signatures of the methods are presented below:
@@ -113,34 +113,34 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
The first thing to notice is that both functions require the implementation of the `processBroadcastElement()` method for processing elements in the broadcast side and the `processElement()` for elements in the non-broadcasted side.
The difference lies in the type of access each one gives to the broadcast state. The broadcasted side has **read-write access** to it, while the non-broadcast side has **read-only access** (thus the names). The reason for this is that in Flink there is no cross-task communication. So, to guarantee that the contents in the Broadcast State are the same across all parallel instances of our operator, we give read-write access only to the broadcast side, which sees the same elements across all tasks, and we require the computation on each incoming element on that side to be identical across all tasks. Ignoring this rule would break the consistency guarantees of the state, leading to inconsistent and often difficult to debug results.
Finally, due to the fact that the `KeyedBroadcastProcessFunction` is operating on a keyed stream, it exposes some functionality which is not available to the `BroadcastProcessFunction`. That is:
1.the `ReadOnlyContext` in the `processElement()` method gives access to Flink’s underlying timer service, which allows to register event and/or processing time timers. When a timer fires, the `onTimer()` (shown above) is invoked with an `OnTimerContext` which exposes the same functionality as the `ReadOnlyContext` plus
*the ability to ask if the timer that fired was an event or processing time one and
*to query the key associated with the timer.
2.the `Context` in the `processBroadcastElement()` method contains the method `applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)`. This allows to register a `KeyedStateFunction` to be **applied to all states of all keys** associated with the provided `stateDescriptor`.
2.`processBroadcastElement()` 方法中的`Context`包含`applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)`的方法。这允许向与所提供的`stateDescriptor`相关联的所有密钥**的所有状态注册 `KeyedStateFunction` to be。
**Attention:** Registering timers is only possible at `processElement()` of the `KeyedBroadcastProcessFunction` and only there. It is not possible in the `processBroadcastElement()` method, as there is no key associated to the broadcasted elements.
@@ -211,15 +211,15 @@ new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
## Important Considerations
## Important Considerations 重要考虑
After describing the offered APIs, this section focuses on the important things to keep in mind when using broadcast state. These are:
在描述了提供的API之后,本节将重点介绍在使用广播状态时要记住的重要事项。它们是:
***There is no cross-task communication:** As stated earlier, this is the reason why only the broadcast side of a `(Keyed)-BroadcastProcessFunction` can modify the contents of the broadcast state. In addition, the user has to make sure that all tasks modify the contents of the broadcast state in the same way for each incoming element. Otherwise, different tasks might have different contents, leading to inconsistent results.
***Order of events in Broadcast State may differ across tasks:** Although broadcasting the elements of a stream guarantees that all elements will (eventually) go to all downstream tasks, elements may arrive in a different order to each task. So the state updates for each incoming element _MUST NOT depend on the ordering_ of the incoming events.
***All tasks checkpoint their broadcast state:** Although all tasks have the same elements in their broadcast state when a checkpoint takes place (checkpoint barriers do not overpass elements), all tasks checkpoint their broadcast state, and not just one of them. This is a design decision to avoid having all tasks read from the same file during a restore (thus avoiding hotspots), although it comes at the expense of increasing the size of the checkpointed state by a factor of p (= parallelism). Flink guarantees that upon restoring/rescaling there will be **no duplicates** and **no missing data**. In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each task reads its own state, and the remaining tasks (`p_new`-`p_old`) read checkpoints of previous tasks in a round-robin manner.
***No RocksDB state backend:** Broadcast state is kept in-memory at runtime and memory provisioning should be done accordingly. This holds for all operator states.