Flink is able to process streaming data based on different notions of _time_.
Flink能够根据_time_的不同概念处理流式数据。
* _Processing time_ refers to the system time of the machine (also known as “wall-clock time”) that is executing the respective operation.
* _Event time_ refers to the processing of streaming data based on timestamps which are attached to each row. The timestamps can encode when an event happened.
* _Ingestion time_ is the time that events enter Flink; internally, it is treated similarly to event time.
For more information about time handling in Flink, see the introduction about [Event Time and Watermarks](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html).
有关Flink中时间处理的更多信息,请参阅有关 [Event Time and Watermarks](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html)的介绍。
This pages explains how time attributes can be defined for time-based operations in Flink’s Table API & SQL.
本页介绍了如何在Flink的Table API和SQL中为基于时间的操作定义时间属性。
## Introduction to Time Attributes
## 时间属性简介
Time-based operations such as windows in both the [Table API](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#group-windows) and [SQL](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html#group-windows) require information about the notion of time and its origin. Therefore, tables can offer _logical time attributes_ for indicating time and accessing corresponding timestamps in table programs.
基于时间的操作,例如 [Table API](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#group-windows) 和 [SQL](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html#group-windows) 中的窗口,需要有关时间概念及其来源的信息。因此,表可以提供 _logical time attributes_ 用于指示时间和访问表程序中的相应时间戳。
Time attributes can be part of every table schema. They are defined when creating a table from a `DataStream` or are pre-defined when using a `TableSource`. Once a time attribute has been defined at the beginning, it can be referenced as a field and can be used in time-based operations.
As long as a time attribute is not modified and is simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink’s time and watermarking system and thus can not be used for time-based operations anymore.
Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time but does not provide determinism. It neither requires timestamp extraction nor watermark generation.
There are two ways to define a processing time attribute.
有两种方法可以定义处理时间属性。
### During DataStream-to-Table Conversion
### 在 DataStream 到 Table 转换期间
The processing time attribute is defined with the `.proctime` property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it can only be defined at the end of the schema definition.
@@ -83,9 +83,9 @@ val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'u
### Using a TableSource
### 使用 TableSource
The processing time attribute is defined by a `TableSource` that implements the `DefinedProctimeAttribute` interface. The logical time attribute is appended to the physical schema defined by the return type of the `TableSource`.
Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of the table program when reading records from persistent storage.
Additionally, event time allows for unified syntax for table programs in both batch and streaming environments. A time attribute in a streaming environment can be a regular field of a record in a batch environment.
In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html)).
The event time attribute is defined with the `.rowtime` property during schema definition. [Timestamps and watermarks](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html) must have been assigned in the `DataStream` that is converted.
在模式定义期间使用 `.rowtime` 属性定义事件时间属性。[Timestamps and watermarks](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html) 必须在已转换的 `DataStream` 中分配。
There are two ways of defining the time attribute when converting a `DataStream` into a `Table`. Depending on whether the specified `.rowtime` field name exists in the schema of the `DataStream` or not, the timestamp field is either
// extract timestamp and assign watermarks based on knowledge of the stream val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
// declare an additional logical field as an event time attribute val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)
// Option 2:
// Option 2:
// extract timestamp from first field, and assign watermarks based on knowledge of the stream val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)
// Usage:
// Usage:
val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
```
### Using a TableSource
### 使用 TableSource
The event time attribute is defined by a `TableSource` that implements the `DefinedRowtimeAttributes` interface. The `getRowtimeAttributeDescriptors()` method returns a list of `RowtimeAttributeDescriptor` for describing the final name of a time attribute, a timestamp extractor to derive the values of the attribute, and the watermark strategy associated with the attribute.
Please make sure that the `DataStream` returned by the `getDataStream()` method is aligned with the defined time attribute. The timestamps of the `DataStream` (the ones which are assigned by a `TimestampAssigner`) are only considered if a `StreamRecordTimestamp` timestamp extractor is defined. Watermarks of a `DataStream` are only preserved if a `PreserveWatermarks` watermark strategy is defined. Otherwise, only the values of the `TableSource`’s rowtime attribute are relevant.
Joins are a common and well-understood operation in batch data processing to connect the rows of two relations. However, the semantics of joins on [dynamic tables](dynamic_tables.html) are much less obvious or even confusing.
Regular joins are the most generic type of join in which any new records or changes to either side of the join input are visible and are affecting the whole join result. For example, if there is a new record on the left side, it will be joined with all of the previous and future records on the right side.
@@ -22,13 +22,13 @@ ON Orders.productId = Product.id
These semantics allow for any kind of updating (insert, update, delete) input tables.
这些语义允许任何类型的更新(插入,更新,删除)输入表。
However, this operation has an important implication: it requires to keep both sides of the join input in Flink’s state forever. Thus, the resource usage will grow indefinitely as well, if one or both input tables are continuously growing.
A time-windowed join is defined by a join predicate, that checks if the [time attributes](time_attributes.html) of the input records are within certain time constraints, i.e., a time window.
Compared to a regular join operation, this kind of join only supports append-only tables with time attributes. Since time attributes are quasi-monontic increasing, Flink can remove old values from its state without affecting the correctness of the result.
A join with a temporal table joins an append-only table (left input/probe side) with a temporal table (right input/build side), i.e., a table that changes over time and tracks its changes. Please check the corresponding page for more information about [temporal tables](temporal_tables.html).
`Orders`is an append-only table that represents payments for the given `amount` and the given `currency`. For example at `10:15` there was an order for an amount of `2 Euro`.
`RatesHistory`represents an ever changing append-only table of currency exchange rates with respect to `Yen` (which has a rate of `1`). For example, the exchange rate for the period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. From `10:45` to `11:15` it was `116`.
@@ -136,21 +136,21 @@ WHERE r.currency = o.currency
Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record. In order to support updates (overwrites) of previous values on the build side table, the table must define a primary key.
In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.rowtime`. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. If the query were using a processing-time notion, a newly appended order would always be joined with the most recent version of `Rates` when executing the operation.
In contrast to [regular joins](#regular-joins), this means that if there is a new record on the build side, it will not affect the previous results of the join. This again allows Flink to limit the number of elements that must be kept in the state.
Compared to [time-windowed joins](#time-windowed-joins), temporal table joins do not define a time window within which bounds the records will be joined. Records from the probe side are always joined with the build side’s version at the time specified by the time attribute. Thus, records on the build side might be arbitrarily old. As time passes, the previous and no longer needed versions of the record (for the given primary key) will be removed from the state.
与[time-windowed join](#time-windowed-joins)相比,时态表连接不定义时间窗口(时间窗口内的数据将会被join)。探针端的记录始终与 time 属性指定的构建端版本连接。因此,构建方面的记录可能是任意旧的。随着时间的推移,将从状态中删除先前和不再需要的记录版本(对于给定的主键)。
Such behaviour makes a temporal table join a good candidate to express stream enrichment in relational terms.
这种行为使得时态表加入了一个很好的候选者来表达关系术语中的流富集。
### Usage
### 用法
After [defining temporal table function](temporal_tables.html#defining-temporal-table-function), we can start using it. Temporal table functions can be used in the same way as normal table functions would be used.
The following code snippet solves our motivating problem of converting currencies from the `Orders` table:
以下代码片段解决了我们从 `订单` 表转换货币的问题:
...
...
@@ -186,23 +186,22 @@ val result = orders
**Note**: State retention defined in a [query configuration](query_configuration.html) is not yet implemented for temporal joins. This means that the required state to compute the query result might grow infinitely depending on the number of distinct primary keys for the history table.
With a processing-time time attribute, it is impossible to pass _past_ time attributes as an argument to the temporal table function. By definition, it is always the current timestamp. Thus, invocations of a processing-time temporal table function will always return the latest known versions of the underlying table and any updates in the underlying history table will also immediately overwrite the current values.
Only the latest versions (with respect to the defined primary key) of the build side records are kept in the state. Updates of the build side will have no effect on previously emitted join results.
One can think about a processing-time temporal join as a simple `HashMap<K, V>` that stores all of the records from the build side. When a new record from the build side has the same key as some previous record, the old value is just simply overwritten. Every record from the probe side is always evaluated against the most recent/current state of the `HashMap`.
With an event-time time attribute (i.e., a rowtime attribute), it is possible to pass _past_ time attributes to the temporal table function. This allows for joining the two tables at a common point in time.
Compared to processing-time temporal joins, the temporal table does not only keep the latest version (with respect to the defined primary key) of the build side records in the state but stores all versions (identified by time) since the last watermark.
For example, an incoming row with an event-time timestamp of `12:30:00` that is appended to the probe side table is joined with the version of the build side table at time `12:30:00` according to the [concept of temporal tables](temporal_tables.html). Thus, the incoming row is only joined with rows that have a timestamp lower or equal to `12:30:00` with applied updates according to the primary key until this point in time.
By definition of event time, [watermarks](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html) allow the join operation to move forward in time and discard versions of the build table that are no longer necessary because no incoming row with lower or equal timestamp is expected.
Temporal Tables represent a concept of a (parameterized) view on a changing history table that returns the content of a table at a specific point in time.
时态表表示改变的历史记录表上的(参数化)视图的概念,该表返回特定时间点的表的内容。
Flink can keep track of the changes applied to an underlying append-only table and allows for accessing the table’s content at a certain point in time within a query.
Flink 可以跟踪应用于追加表的更改,在查询中的特定时间点,允许访问表的内容。
## Motivation
## 动机
Let’s assume that we have the following table `RatesHistory`.
假设我们有下表 `RatesHistory`。
...
...
@@ -27,9 +27,9 @@ rowtime currency rate
`RatesHistory`represents an ever growing append-only table of currency exchange rates with respect to `Yen` (which has a rate of `1`). For example, the exchange rate for the period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. From `10:45` to `11:15` it was `116`.
Given that we would like to output all current rates at the time `10:58`, we would need the following SQL query to compute a result table:
鉴于我们希望在 `10:58` 时输出所有当前汇率,我们需要以下 SQL 查询来计算结果表:
...
...
@@ -45,9 +45,9 @@ WHERE r.rowtime = (
The correlated subquery determines the maximum time for the corresponding currency that is lower or equal than the desired time. The outer query lists the rates that have a maximum timestamp.
相关子查询确定相应货币的最大时间小于或等于所需时间。外部查询列出具有最大时间戳的汇率。
The following table shows the result of such a computation. In our example, the update to `Euro` at `10:45` is taken into account, however, the update to `Euro` at `11:15` and the new entry of `Pounds` are not considered in the table’s version at time `10:58`.
The concept of _Temporal Tables_ aims to simplify such queries, speed up their execution, and reduce Flink’s state usage. A _Temporal Table_ is a parameterized view on an append-only table that interprets the rows of the append-only table as the changelog of a table and provides the version of that table at a specific point in time. Interpreting the append-only table as a changelog requires the specification of a primary key attribute and a timestamp attribute. The primary key determines which rows are overwritten and the timestamp determines the time during which a row is valid.
In order to access the data in a temporal table, one must pass a [time attribute](time_attributes.html) that determines the version of the table that will be returned. Flink uses the SQL syntax of [table functions](../udfs.html#table-functions) to provide a way to express it.
Once defined, a _Temporal Table Function_ takes a single time argument `timeAttribute` and returns a set of rows. This set contains the latest versions of the rows for all of the existing primary keys with respect to the given time attribute.
Assuming that we defined a temporal table function `Rates(timeAttribute)` based on `RatesHistory` table, we could query such a function in the following way:
**Note**: Currently, Flink doesn’t support directly querying the temporal table functions with a constant time attribute parameter. At the moment, temporal table functions can only be used in joins. The example above was used to provide an intuition about what the function `Rates(timeAttribute)` returns.
Line `(1)` creates a `rates`[temporal table function](#temporal-table-functions), which allows us to use the function `rates` in the [Table API](../tableApi.html#joins).
Line `(2)` registers this function under the name `Rates` in our table environment, which allows us to use the `Rates` function in [SQL](../sql.html#joins).
Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input. In many cases, continuous queries on streaming input are capable of computing accurate results that are identical to offline computed results. However, this is not possible in general case because continuous queries have to restrict the size of the state they are maintaining in order to avoid to run out of storage and to be able to process unbounded streaming data over a long period of time. As a result, a continuous query might only be able to provide approximated results depending on the characteristics of the input data and the query itself.
Flink’s Table API and SQL interface provide parameters to tune the accuracy and resource consumption of continuous queries. The parameters are specified via a `QueryConfig` object. The `QueryConfig` can be obtained from the `TableEnvironment` and is passed back when a `Table` is translated, i.e., when it is [transformed into a DataStream](../common.html#convert-a-table-into-a-datastream-or-dataset) or [emitted via a TableSink](../common.html#emit-a-table).
@@ -56,7 +56,7 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
"outputTable", // table name
Array[String](...), // field names
Array[TypeInformation[_]](...), // field types
sink) // table sink
sink) // table sink
// emit result Table via a TableSink result.insertInto("outputTable", qConfig)
// convert result Table into a DataStream[Row] val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
...
...
@@ -64,13 +64,13 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
In the following we describe the parameters of the `QueryConfig` and how they affect the accuracy and resource consumption of a query.
在下文中,我们将描述 `QueryConfig` 的参数以及它们如何影响查询的准确性和资源消耗。
## Idle State Retention Time
## 空闲状态保留时间
Many queries aggregate or join records on one or more key attributes. When such a query is executed on a stream, the continuous query needs to collect records or maintain partial results per key. If the key domain of the input stream is evolving, i.e., the active key values are changing over time, the continuous query accumulates more and more state as more and more distinct keys are observed. However, often keys become inactive after some time and their corresponding state becomes stale and useless.
For example the following query computes the number of clicks per session.
例如,以下查询计算每个会话(session)的单击次数。
...
...
@@ -80,18 +80,18 @@ SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
The `sessionId` attribute is used as a grouping key and the continuous query maintains a count for each `sessionId` it observes. The `sessionId` attribute is evolving over time and `sessionId` values are only active until the session ends, i.e., for a limited period of time. However, the continuous query cannot know about this property of `sessionId` and expects that every `sessionId` value can occur at any point of time. It maintains a count for each observed `sessionId` value. Consequently, the total state size of the query is continuously growing as more and more `sessionId` values are observed.
The _Idle State Retention Time_ parameters define for how long the state of a key is retained without being updated before it is removed. For the previous example query, the count of a `sessionId` would be removed as soon as it has not been updated for the configured period of time.
_空闲状态保留时间参数(Idle State Retention Time)_ 定义了在删除 key 之前保留 key 状态多长时间而不进行更新。对于前面的示例查询,只要在配置的时间段内没有更新 `sessionId`,就会删除它的计数。
By removing the state of a key, the continuous query completely forgets that it has seen this key before. If a record with a key, whose state has been removed before, is processed, the record will be treated as if it was the first record with the respective key. For the example above this means that the count of a `sessionId` would start again at `0`.
There are two parameters to configure the idle state retention time:
配置空闲状态保留时间有两个参数:
*The _minimum idle state retention time_ defines how long the state of an inactive key is at least kept before it is removed.
*The _maximum idle state retention time_ defines how long the state of an inactive key is at most kept before it is removed.
*_minimum idle state retention time_ 定义了非活动key的状态在被删除之前至少保持多长时间。
*_maximum idle state retention time_ 义了非活动key的状态在被删除之前最多保留多长时间。
The parameters are specified as follows:
参数规定如下:
...
...
@@ -114,5 +114,4 @@ val qConfig: StreamQueryConfig = ???
Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes.