From 667fa49101905bb0d49f2ddf35616c76019fc423 Mon Sep 17 00:00:00 2001 From: wizardforcel <562826179@qq.com> Date: Sat, 23 Feb 2019 23:10:49 +0800 Subject: [PATCH] 2019-02-23 23:10:49 --- docs/1.7-SNAPSHOT/26.md | 124 +++++----- docs/1.7-SNAPSHOT/43.md | 134 +++++----- docs/1.7-SNAPSHOT/46.md | 537 +++++++++++++++++++++++++--------------- docs/1.7-SNAPSHOT/50.md | 21 +- docs/1.7-SNAPSHOT/51.md | 125 ++++++---- docs/1.7-SNAPSHOT/60.md | 100 -------- docs/1.7-SNAPSHOT/61.md | 50 ---- docs/1.7-SNAPSHOT/73.md | 242 ++++-------------- 8 files changed, 600 insertions(+), 733 deletions(-) diff --git a/docs/1.7-SNAPSHOT/26.md b/docs/1.7-SNAPSHOT/26.md index 094c9a2..c76ce88 100644 --- a/docs/1.7-SNAPSHOT/26.md +++ b/docs/1.7-SNAPSHOT/26.md @@ -20,7 +20,7 @@ --- -转型:**映射** DataStream→DataStream +转换:**映射** DataStream→DataStream 描述:采用一个数据元并生成一个数据元。一个map函数,它将输入流的值加倍: @@ -39,7 +39,7 @@ dataStream.map(new MapFunction<Integer, Integer>() { --- -转型:**FlatMap** DataStream→DataStream +转换:**FlatMap** DataStream→DataStream 描述:采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数: @@ -60,7 +60,7 @@ dataStream.flatMap(new FlatMapFunction<String, String>() { --- -转型:**Filter** DataStream→DataStream +转换:**Filter** DataStream→DataStream 描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器: @@ -78,7 +78,7 @@ dataStream.filter(new FilterFunction<Integer>() { --- -转型:**KeyBy** DataStream→KeyedStream +转换:**KeyBy** DataStream→KeyedStream 描述:逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,_keyBy()_是使用散列分区实现的。[指定键](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)有不同的方法。此转换返回_KeyedStream_,其中包括使用[被Keys化状态](https://flink.sojb.cn/dev/stream/state/state.html#keyed-state)所需的_KeyedStream_。 @@ -97,7 +97,7 @@ dataStream.keyBy(0) // Key by the first element of a Tuple --- -转型:**Reduce** KeyedStream→DataStream +转换:**Reduce** KeyedStream→DataStream 描述:被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。 @@ -118,7 +118,7 @@ keyedStream.reduce(new ReduceFunction<Integer>() { --- -转型:**折叠** KeyedStream→DataStream +转换:**折叠** KeyedStream→DataStream 描述:具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值。 @@ -139,7 +139,7 @@ DataStream<String> result = --- -转型:**聚合** KeyedStream→DataStream +转换:**聚合** KeyedStream→DataStream 描述:在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。 @@ -161,7 +161,7 @@ keyedStream.maxBy("key"); --- -转型:**Window** KeyedStream→WindowedStream +转换:**Window** KeyedStream→WindowedStream 描述:可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。有关[窗口](windows.html)的完整说明,请参见windows。 @@ -174,7 +174,7 @@ dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Las --- -转型:**WindowAll** DataStream→AllWindowedStream +转换:**WindowAll** DataStream→AllWindowedStream 描述:Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。有关[窗口](windows.html)的完整说明,请参见windows。**警告:**在许多情况下,这**是非并行**转换。所有记录将收集在windowAll 算子的一个任务中。 @@ -187,7 +187,7 @@ dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 se --- -转型:**Window Apply** WindowedStream→DataStream AllWindowedStream→DataStream +转换:**Window Apply** WindowedStream→DataStream AllWindowedStream→DataStream 描述:将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。**注意:**如果您正在使用windowAll转换,则需要使用AllWindowFunction。 @@ -224,7 +224,7 @@ allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, --- -转型:**Window Reduce** WindowedStream→DataStream +转换:**Window Reduce** WindowedStream→DataStream 描述:将函数缩减函数应用于窗口并返回缩小的值。 @@ -241,7 +241,7 @@ windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() --- -转型:**Window Fold** WindowedStream→DataStream +转换:**Window Fold** WindowedStream→DataStream 描述:将函数折叠函数应用于窗口并返回折叠值。示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”: @@ -258,7 +258,7 @@ windowedStream.fold("start", new FoldFunction<Integer, String>() { --- -转型:**Windows上的聚合** WindowedStream→DataStream +转换:**Windows上的聚合** WindowedStream→DataStream 描述:聚合窗口的内容。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。 @@ -280,7 +280,7 @@ windowedStream.maxBy("key"); --- -转型:**Union** DataStream *→DataStream +转换:**Union** DataStream *→DataStream 描述:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。 @@ -293,7 +293,7 @@ dataStream.union(otherStream1, otherStream2, ...); --- -转型:**Window Join** DataStream,DataStream→DataStream +转换:**Window Join** DataStream,DataStream→DataStream 描述:在给定Keys和公共窗口上连接两个数据流。 @@ -309,7 +309,7 @@ dataStream.join(otherStream) --- -转型:**Interval Join** KeyedStream,KeyedStream→DataStream +转换:**Interval Join** KeyedStream,KeyedStream→DataStream 描述:在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound @@ -328,7 +328,7 @@ keyedStream.intervalJoin(otherKeyedStream) --- -转型:**Window CoGroup** DataStream,DataStream→DataStream +转换:**Window CoGroup** DataStream,DataStream→DataStream 描述:在给定Keys和公共窗口上对两个数据流进行Cogroup。 @@ -344,7 +344,7 @@ dataStream.coGroup(otherStream) --- -转型:**连接** DataStream,DataStream→ConnectedStreams +转换:**连接** DataStream,DataStream→ConnectedStreams 描述:“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态。 @@ -360,7 +360,7 @@ ConnectedStreams<Integer, String> connectedStreams = someStream.connect(ot --- -转型:**CoMap,CoFlatMap** ConnectedStreams→DataStream +转换:**CoMap,CoFlatMap** ConnectedStreams→DataStream 描述:类似于连接数据流上的map和flatMap @@ -397,7 +397,7 @@ connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() --- -转型:**拆分** DataStream→SplitStream +转换:**拆分** DataStream→SplitStream 描述:根据某些标准将流拆分为两个或更多个流。 @@ -422,7 +422,7 @@ SplitStream<Integer> split = someDataStream.split(new OutputSelector<In --- -转型:**选择** SplitStream→DataStream +转换:**选择** SplitStream→DataStream 描述:从拆分流中选择一个或多个流。 @@ -438,7 +438,7 @@ DataStream<Integer> all = split.select("even","odd"); --- -转型:**迭代** DataStream→IterativeStream→DataStream +转换:**迭代** DataStream→IterativeStream→DataStream 描述:通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅[迭代](#iterations)。 @@ -465,7 +465,7 @@ DataStream<Long> output = iterationBody.filter(new FilterFunction<Long& --- -转型:**提取时间戳** DataStream→DataStream +转换:**提取时间戳** DataStream→DataStream 描述:从记录中提取时间戳,以便使用使用事件时间语义的窗口。查看[活动时间](https://flink.sojb.cn/dev/event_time.html)。 @@ -480,7 +480,7 @@ stream.assignTimestamps (new TimeStampExtractor() {...}); --- -转型:**Map** DataStream → DataStream +转换:**Map** DataStream → DataStream 描述:Takes one element and produces one element. A map function that doubles the values of the input stream: @@ -493,7 +493,7 @@ dataStream.map { x => x * 2 } --- -转型:**FlatMap** DataStream → DataStream +转换:**FlatMap** DataStream → DataStream 描述:Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words: @@ -506,7 +506,7 @@ dataStream.flatMap { str => str.split(" ") } --- -转型:**Filter** DataStream → DataStream +转换:**Filter** DataStream → DataStream 描述:Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: @@ -519,7 +519,7 @@ dataStream.filter { _ != 0 } --- -转型:**KeyBy** DataStream → KeyedStream +转换:**KeyBy** DataStream → KeyedStream 描述:Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See [keys](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys) on how to specify keys. This transformation returns a KeyedStream. @@ -532,7 +532,7 @@ dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key --- -转型:**Reduce** KeyedStream → DataStream +转换:**Reduce** KeyedStream → DataStream 描述:A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. @@ -546,7 +546,7 @@ keyedStream.reduce { _ + _ } --- -转型:**Fold** KeyedStream → DataStream +转换:**Fold** KeyedStream → DataStream 描述:A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. @@ -562,7 +562,7 @@ val result: DataStream[String] = --- -转型:**Aggregations** KeyedStream → DataStream +转换:**Aggregations** KeyedStream → DataStream 描述:Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). @@ -584,7 +584,7 @@ keyedStream.maxBy("key") --- -转型:**Window** KeyedStream → WindowedStream +转换:**Window** KeyedStream → WindowedStream 描述:Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See [windows](windows.html) for a description of windows. @@ -597,7 +597,7 @@ dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last --- -转型:**WindowAll** DataStream → AllWindowedStream +转换:**WindowAll** DataStream → AllWindowedStream 描述:Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See [windows](windows.html) for a complete description of windows.**WARNING:** This is in many cases a **non-parallel** transformation. All records will be gathered in one task for the windowAll operator. @@ -610,7 +610,7 @@ dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 sec --- -转型:**Window Apply** WindowedStream → DataStream AllWindowedStream → DataStream | Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.**Note:** If you are using a windowAll transformation, you need to use an AllWindowFunction instead. +转换:**Window Apply** WindowedStream → DataStream AllWindowedStream → DataStream | Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.**Note:** If you are using a windowAll transformation, you need to use an AllWindowFunction instead. ``` @@ -623,7 +623,7 @@ windowedStream.apply { WindowFunction } --- -转型:**Window Reduce** WindowedStream → DataStream +转换:**Window Reduce** WindowedStream → DataStream 描述:Applies a functional reduce function to the window and returns the reduced value. @@ -636,7 +636,7 @@ windowedStream.reduce { _ + _ } --- -转型:**Window Fold** WindowedStream → DataStream +转换:**Window Fold** WindowedStream → DataStream 描述:Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5": @@ -650,7 +650,7 @@ val result: DataStream[String] = --- -转型:**Aggregations on windows** WindowedStream → DataStream +转换:**Aggregations on windows** WindowedStream → DataStream 描述:Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). @@ -672,7 +672,7 @@ windowedStream.maxBy("key") --- -转型:**Union** DataStream* → DataStream +转换:**Union** DataStream* → DataStream 描述:Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream. @@ -685,7 +685,7 @@ dataStream.union(otherStream1, otherStream2, ...) --- -转型:**Window Join** DataStream,DataStream → DataStream +转换:**Window Join** DataStream,DataStream → DataStream 描述:Join two data streams on a given key and a common window. @@ -701,7 +701,7 @@ dataStream.join(otherStream) --- -转型:**Window CoGroup** DataStream,DataStream → DataStream +转换:**Window CoGroup** DataStream,DataStream → DataStream 描述:Cogroups two data streams on a given key and a common window. @@ -717,7 +717,7 @@ dataStream.coGroup(otherStream) --- -转型:**Connect** DataStream,DataStream → ConnectedStreams +转换:**Connect** DataStream,DataStream → ConnectedStreams 描述:"Connects" two data streams retaining their types, allowing for shared state between the two streams. @@ -733,7 +733,7 @@ val connectedStreams = someStream.connect(otherStream) --- -转型:**CoMap, CoFlatMap** ConnectedStreams → DataStream +转换:**CoMap, CoFlatMap** ConnectedStreams → DataStream 描述:Similar to map and flatMap on a connected data stream @@ -753,7 +753,7 @@ connectedStreams.flatMap( --- -转型:**Split** DataStream → SplitStream +转换:**Split** DataStream → SplitStream 描述:Split the stream into two or more streams according to some criterion. @@ -772,7 +772,7 @@ val split = someDataStream.split( --- -转型:**Select** SplitStream → DataStream +转换:**Select** SplitStream → DataStream 描述:Select one or more streams from a split stream. @@ -787,7 +787,7 @@ val all = split.select("even","odd") --- -转型:**Iterate** DataStream → IterativeStream → DataStream +转换:**Iterate** DataStream → IterativeStream → DataStream 描述:Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See [iterations](#iterations) for a complete description. @@ -805,7 +805,7 @@ initialStream.iterate { --- -转型:**Extract Timestamps** DataStream → DataStream +转换:**Extract Timestamps** DataStream → DataStream 描述:Extracts timestamps from records in order to work with windows that use event time semantics. See [Event Time](https://flink.sojb.cn/dev/event_time.html). @@ -837,7 +837,7 @@ is not supported by the API out-of-the-box. To use this feature, you should use --- -转型:**Project** DataStream→DataStream +转换:**Project** DataStream→DataStream 描述:从元组中选择字段的子集 @@ -863,7 +863,7 @@ Flink还通过以下函数对转换后的精确流分区进行低级控制(如 --- -转型:**自定义分区** DataStream→DataStream +转换:**自定义分区** DataStream→DataStream 描述:使用用户定义的分区程序为每个数据元选择目标任务。 @@ -877,7 +877,7 @@ dataStream.partitionCustom(partitioner, 0); --- -转型:**随机分区** DataStream→DataStream +转换:**随机分区** DataStream→DataStream 描述:根据均匀分布随机分配数据元。 @@ -890,7 +890,7 @@ dataStream.shuffle(); --- -转型:**Rebalance (循环分区)** DataStream→DataStream +转换:**Rebalance (循环分区)** DataStream→DataStream 描述:分区数据元循环,每个分区创建相等的负载。在存在数据偏斜时用于性能优化。 @@ -903,7 +903,7 @@ dataStream.rebalance(); --- -转型:**重新调整** DataStream→DataStream +转换:**重新调整** DataStream→DataStream 描述:分区数据元,循环,到下游 算子操作的子集。如果您希望拥有管道,例如,从源的每个并行实例扇出到多个映射器的子集以分配负载但又不希望发生rebalance()会产生完全Rebalance ,那么这非常有用。这将仅需要本地数据传输而不是通过网络传输数据,具体取决于其他配置值,例如TaskManagers的插槽数。上游 算子操作发送数据元的下游 算子操作的子集取决于上游和下游 算子操作的并行度。例如,如果上游 算子操作具有并行性2并且下游 算子操作具有并行性6,则一个上游 算子操作将分配元件到三个下游 算子操作,而另一个上游 算子操作将分配到其他三个下游 算子操作。另一方面,如果下游 算子操作具有并行性2而上游 算子操作具有并行性6,则三个上游 算子操作将分配到一个下游 算子操作,而其他三个上游 算子操作将分配到另一个下游 算子操作。在不同并行度不是彼此的倍数的情况下,一个或多个下游 算子操作将具有来自上游 算子操作的不同数量的输入。请参阅此图以获取上例中连接模式的可视化:![数据流中的检查点障碍](../img/rescale.svg) @@ -916,7 +916,7 @@ dataStream.rescale(); --- -转型:**广播** DataStream→DataStream +转换:**广播** DataStream→DataStream 描述:向每个分区广播数据元。 @@ -931,7 +931,7 @@ dataStream.broadcast(); --- -转型:**Custom partitioning** DataStream → DataStream +转换:**Custom partitioning** DataStream → DataStream 描述:Uses a user-defined Partitioner to select the target task for each element. @@ -945,7 +945,7 @@ dataStream.partitionCustom(partitioner, 0) --- -转型:**Random partitioning** DataStream → DataStream +转换:**Random partitioning** DataStream → DataStream 描述:Partitions elements randomly according to a uniform distribution. @@ -958,7 +958,7 @@ dataStream.shuffle() --- -转型:**Rebalancing (Round-robin partitioning)** DataStream → DataStream +转换:**Rebalancing (Round-robin partitioning)** DataStream → DataStream 描述:Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew. @@ -971,7 +971,7 @@ dataStream.rebalance() --- -转型:**Rescaling** DataStream → DataStream +转换:**Rescaling** DataStream → DataStream 描述:Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations would distribute to one downstream operation while the other two upstream operations would distribute to the other downstream operations.In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.</p> Please see this figure for a visualization of the connection pattern in the above example: </p>![Checkpoint barriers in data streams](../img/rescale.svg) @@ -984,7 +984,7 @@ dataStream.rescale() --- -转型:**Broadcasting** DataStream → DataStream +转换:**Broadcasting** DataStream → DataStream 描述:Broadcasts elements to every partition. @@ -1013,7 +1013,7 @@ dataStream.broadcast() --- -转型:开始新的链条 +转换:开始新的链条 描述:从这个 算子开始,开始一个新的链。两个映射器将被链接,并且过滤器将不会链接到第一个映射器。 @@ -1026,7 +1026,7 @@ someStream.filter(...).map(...).startNewChain().map(...); --- -转型:禁用链接 +转换:禁用链接 描述:不要链接Map 算子 @@ -1039,7 +1039,7 @@ someStream.map(...).disableChaining(); --- -转型:设置插槽共享组 +转换:设置插槽共享组 描述:设置 算子操作的插槽共享组。Flink将把具有相同插槽共享组的 算子操作放入同一个插槽,同时保持其他插槽中没有插槽共享组的 算子操作。这可用于隔离插槽。如果所有输入 算子操作都在同一个插槽共享组中,则插槽共享组将继承输入 算子操作。默认插槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)将 算子操作显式放入此组中。 @@ -1054,7 +1054,7 @@ someStream.filter(...).slotSharingGroup("name"); --- -转型:Start new chain +转换:Start new chain 描述:Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper. @@ -1067,7 +1067,7 @@ someStream.filter(...).map(...).startNewChain().map(...) --- -转型:Disable chaining +转换:Disable chaining 描述:Do not chain the map operator @@ -1080,7 +1080,7 @@ someStream.map(...).disableChaining() --- -转型:Set slot sharing group +转换:Set slot sharing group 描述:Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). diff --git a/docs/1.7-SNAPSHOT/43.md b/docs/1.7-SNAPSHOT/43.md index 76e24ca..a4d0c66 100644 --- a/docs/1.7-SNAPSHOT/43.md +++ b/docs/1.7-SNAPSHOT/43.md @@ -167,12 +167,12 @@ Python API在Windows / Linux / OSX系统上进行了测试。 本节简要概述了可用的转换。该[转换文档](dataset_transformations.html)与示例全部转换的完整描述。 -| 转换 | 描述 | -| --- | --- | -| **Map** -PythonDataStream→PythonDataStream | 采用一个数据元并生成一个数据元。 +--- + +转换:**Map** PythonDataStream→PythonDataStream + +描述:采用一个数据元并生成一个数据元。 -<figure class="highlight"> ``` class Doubler(MapFunction): @@ -182,13 +182,14 @@ class Doubler(MapFunction): data_stream.map(Doubler()) ``` -</figure> - | -| **FlatMap** -PythonDataStream→PythonDataStream | 采用一个数据元并生成零个,一个或多个数据元。 -<figure class="highlight"> +--- + +转换:**FlatMap** PythonDataStream→PythonDataStream + +描述:采用一个数据元并生成零个,一个或多个数据元。 + ``` class Tokenizer(FlatMapFunction): @@ -198,13 +199,14 @@ class Tokenizer(FlatMapFunction): data_stream.flat_map(Tokenizer()) ``` -</figure> - | -| **Filter** -PythonDataStream→PythonDataStream | 计算每个数据元的布尔函数,并保存函数返回true的数据元。 -<figure class="highlight"> +--- + +转换:**Filter** PythonDataStream→PythonDataStream + +描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。 + ``` class GreaterThen1000(FilterFunction): @@ -214,13 +216,14 @@ class GreaterThen1000(FilterFunction): data_stream.filter(GreaterThen1000()) ``` -</figure> - | -| **KeyBy** -PythonDataStream→PythonKeyedStream | 逻辑上将流分区为不相交的分区,每个分区包含相同Keys的数据元。在内部,这是通过散列分区实现的。见[键](/dev/api_concepts#specifying-keys)如何指定键。此转换返回PythonKeyedDataStream。 -<figure class="highlight"> +--- + +转换:**KeyBy** PythonDataStream→PythonKeyedStream + +描述:逻辑上将流分区为不相交的分区,每个分区包含相同Keys的数据元。在内部,这是通过散列分区实现的。见[键](/dev/api_concepts#specifying-keys)如何指定键。此转换返回PythonKeyedDataStream。 + ``` class Selector(KeySelector): @@ -230,13 +233,14 @@ class Selector(KeySelector): data_stream.key_by(Selector()) // Key by field "someKey" ``` -</figure> - | -| **Reduce** -PythonKeyedStream→PythonDataStream | 被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。 -<figure class="highlight"> +--- + +转换:**Reduce** PythonKeyedStream→PythonDataStream + +描述:被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。 + ``` class Sum(ReduceFunction): @@ -248,13 +252,14 @@ class Sum(ReduceFunction): data.reduce(Sum()) ``` -</figure> - | -| **Window** -PythonKeyedStream→PythonWindowedStream | 可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。有关[窗口](windows.html)的完整说明,请参见windows。 -<figure class="highlight"> +--- + +转换:**Window** PythonKeyedStream→PythonWindowedStream + +描述:可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。有关[窗口](windows.html)的完整说明,请参见windows。 + ``` keyed_stream.count_window(10, 5) # Last 10 elements, sliding (jumping) by 5 elements @@ -264,13 +269,14 @@ keyed_stream.time_window(milliseconds(30)) # Last 30 milliseconds of data keted_stream.time_window(milliseconds(100), milliseconds(20)) # Last 100 milliseconds of data, sliding (jumping) by 20 milliseconds ``` -</figure> - | -| **Window Apply** -PythonWindowedStream→PythonDataStream | 将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。 -<figure class="highlight"> +--- + +转换:**Window Apply** PythonWindowedStream→PythonDataStream + +描述:将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。 + ``` class WindowSum(WindowFunction): @@ -283,13 +289,14 @@ class WindowSum(WindowFunction): windowed_stream.apply(WindowSum()) ``` -</figure> - | -| **Window Reduce** -PythonWindowedStream→PythonDataStream | 将函数缩减函数应用于窗口并返回缩小的值。 -<figure class="highlight"> +--- + +转换:**Window Reduce** PythonWindowedStream→PythonDataStream + +描述:将函数缩减函数应用于窗口并返回缩小的值。 + ``` class Sum(ReduceFunction): @@ -301,25 +308,27 @@ class Sum(ReduceFunction): windowed_stream.reduce(Sum()) ``` -</figure> - | -| **Union** -PythonDataStream *→PythonDataStream | 两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。 -<figure class="highlight"> +--- + +转换:**Union** PythonDataStream *→PythonDataStream + +描述:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。 + ``` data_stream.union(other_stream1, other_stream2, ...); ``` -</figure> - | -| **Split** -PythonDataStream→PythonSplitStream | 根据某些标准将流拆分为两个或更多个流。 -<figure class="highlight"> +--- + +转换:**Split** PythonDataStream→PythonSplitStream + +描述:根据某些标准将流拆分为两个或更多个流。 + ``` class StreamSelector(OutputSelector): @@ -329,13 +338,14 @@ class StreamSelector(OutputSelector): splited_stream = data_stream.split(StreamSelector()) ``` -</figure> - | -| **Select** -SplitStream→DataStream | 从拆分流中选择一个或多个流。 -<figure class="highlight"> +--- + +转换:**Select** SplitStream→DataStream + +描述:从拆分流中选择一个或多个流。 + ``` even_data_stream = splited_stream.select("even") @@ -343,13 +353,14 @@ odd_data_stream = splited_stream.select("odd") all_data_stream = splited_stream.select("even", "odd") ``` -</figure> - | -| **Iterate** -PythonDataStream→PythonIterativeStream→PythonDataStream | 通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅[迭代](#iterations)。 -<figure class="highlight"> +--- + +转换:**Iterate** PythonDataStream→PythonIterativeStream→PythonDataStream + +描述:通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅[迭代](#iterations)。 + ``` class MinusOne(MapFunction): @@ -371,9 +382,8 @@ iteration.close_with(feedback) output = iteration_body.filter(LessEquelToZero()) ``` -</figure> - | + ## 将函数传递给Flink diff --git a/docs/1.7-SNAPSHOT/46.md b/docs/1.7-SNAPSHOT/46.md index 9654791..1ad3e80 100644 --- a/docs/1.7-SNAPSHOT/46.md +++ b/docs/1.7-SNAPSHOT/46.md @@ -84,11 +84,14 @@ object WordCount { * [**Java**](#tab_java_1) * [**Scala**](#tab_scala_1) -| 转换 | 描述 | -| --- | --- | -| **Map** | 采用一个数据元并生成一个数据元。 -<figure class="highlight"> + +--- + +转换:**Map** + +描述:采用一个数据元并生成一个数据元。 + ``` data.map(new MapFunction<String, Integer>() { @@ -96,12 +99,14 @@ data.map(new MapFunction<String, Integer>() { }); ``` -</figure> - | -| **FlatMap** | 采用一个数据元并生成零个,一个或多个数据元。 -<figure class="highlight"> +--- + +转换:**FlatMap** + +描述:采用一个数据元并生成零个,一个或多个数据元。 + ``` data.flatMap(new FlatMapFunction<String, String>() { @@ -113,12 +118,14 @@ data.flatMap(new FlatMapFunction<String, String>() { }); ``` -</figure> - | -| **MapPartition** | 在单个函数调用中转换并行分区。该函数将分区作为`Iterable`流来获取,并且可以生成任意数量的结果值。每个分区中的数据元数量取决于并行度和先前的 算子操作。 -<figure class="highlight"> +--- + +转换:**MapPartition** + +描述:在单个函数调用中转换并行分区。该函数将分区作为`Iterable`流来获取,并且可以生成任意数量的结果值。每个分区中的数据元数量取决于并行度和先前的 算子操作。 + ``` data.mapPartition(new MapPartitionFunction<String, Long>() { @@ -132,13 +139,15 @@ data.mapPartition(new MapPartitionFunction<String, Long>() { }); ``` -</figure> - | -| **Filter** | 计算每个数据元的布尔函数,并保存函数返回true的数据元。 + +--- + +转换:**Filter** + +描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。 **重要信息:**系统假定该函数不会修改应用谓词的数据元。违反此假设可能会导致错误的结果。 -<figure class="highlight"> ``` data.filter(new FilterFunction<Integer>() { @@ -146,12 +155,14 @@ data.filter(new FilterFunction<Integer>() { }); ``` -</figure> - | -| **Reduce** | 通过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。Reduce可以应用于完整数据集或分组数据集。 -<figure class="highlight"> +--- + +转换:**Reduce** + +描述:通过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。Reduce可以应用于完整数据集或分组数据集。 + ``` data.reduce(new ReduceFunction<Integer> { @@ -159,12 +170,14 @@ data.reduce(new ReduceFunction<Integer> { }); ``` -</figure> 如果将reduce应用于分组数据集,则可以通过提供`CombineHint`to 来指定运行时执行reduce的组合阶段的方式 `setCombineHint`。在大多数情况下,基于散列的策略应该更快,特别是如果不同键的数量与输入数据元的数量相比较小(例如1/10)。 | -| **ReduceGroup** | 将一组数据元组合成一个或多个数据元。ReduceGroup可以应用于完整数据集或分组数据集。 +--- + +转换:**ReduceGroup** + +描述:将一组数据元组合成一个或多个数据元。ReduceGroup可以应用于完整数据集或分组数据集。 -<figure class="highlight"> ``` data.reduceGroup(new GroupReduceFunction<Integer, Integer> { @@ -178,46 +191,50 @@ data.reduceGroup(new GroupReduceFunction<Integer, Integer> { }); ``` -</figure> - | -| **Aggregate** | 将一组值聚合为单个值。聚合函数可以被认为是内置的reduce函数。聚合可以应用于完整数据集或分组数据集。 -<figure class="highlight"> +--- + +转换:**Aggregate** + +描述:将一组值聚合为单个值。聚合函数可以被认为是内置的reduce函数。聚合可以应用于完整数据集或分组数据集。 + ``` Dataset<Tuple3<Integer, String, Double>> input = // [...] DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2); ``` -</figure> 您还可以使用简写语法进行最小,最大和总和聚合。 -<figure class="highlight"> ``` Dataset<Tuple3<Integer, String, Double>> input = // [...] DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2); ``` -</figure> - | -| **Distinct** | 返回数据集的不同数据元。它相对于数据元的所有字段或字段子集从输入DataSet中删除重复条目。 -<figure class="highlight"> +--- + +转换:**Distinct** + +描述:返回数据集的不同数据元。它相对于数据元的所有字段或字段子集从输入DataSet中删除重复条目。 + ``` data.distinct(); ``` -</figure> 使用reduce函数实现Distinct。您可以通过提供`CombineHint`to 来指定运行时执行reduce的组合阶段的方式 `setCombineHint`。在大多数情况下,基于散列的策略应该更快,特别是如果不同键的数量与输入数据元的数量相比较小(例如1/10)。 | -| **Join** | 通过创建在其键上相等的所有数据元对来连接两个数据集。可选地使用JoinFunction将数据元对转换为单个数据元,或使用FlatJoinFunction将数据元对转换为任意多个(包括无)数据元。请参阅[键部分](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)以了解如何定义连接键。 +--- + +转换:**Join** + +描述:通过创建在其键上相等的所有数据元对来连接两个数据集。可选地使用JoinFunction将数据元对转换为单个数据元,或使用FlatJoinFunction将数据元对转换为任意多个(包括无)数据元。请参阅[键部分](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)以了解如何定义连接键。 -<figure class="highlight"> ``` result = input1.join(input2) @@ -225,12 +242,10 @@ result = input1.join(input2) .equalTo(1); // key of the second input (tuple field 1) ``` -</figure> 您可以通过_Join Hints_指定运行时执行连接的方式。提示描述了通过分区或广播进行连接,以及它是使用基于排序还是基于散列的算法。有关可能的提示和示例的列表,请参阅“ [转换指南”](dataset_transformations.html#join-algorithm-hints)。 如果未指定提示,系统将尝试估算输入大小,并根据这些估计选择最佳策略。 -<figure class="highlight"> ``` // This executes a join by broadcasting the first data set @@ -239,12 +254,14 @@ result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST) .where(0).equalTo(1); ``` -</figure> 请注意,连接转换仅适用于等连接。其他连接类型需要使用OuterJoin或CoGroup表示。 | -| **OuterJoin** | 在两个数据集上执行左,右或全外连接。外连接类似于常规(内部)连接,并创建在其键上相等的所有数据元对。此外,如果在另一侧没有找到匹配的Keys,则保存“外部”侧(左侧,右侧或两者都满)的记录。匹配数据元对(或一个数据元和`null`另一个输入的值)被赋予JoinFunction以将数据元对转换为单个数据元,或者转换为FlatJoinFunction以将数据元对转换为任意多个(包括无)数据元。请参阅[键部分](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)以了解如何定义连接键。 +--- + +转换:**OuterJoin** + +描述:在两个数据集上执行左,右或全外连接。外连接类似于常规(内部)连接,并创建在其键上相等的所有数据元对。此外,如果在另一侧没有找到匹配的Keys,则保存“外部”侧(左侧,右侧或两者都满)的记录。匹配数据元对(或一个数据元和`null`另一个输入的值)被赋予JoinFunction以将数据元对转换为单个数据元,或者转换为FlatJoinFunction以将数据元对转换为任意多个(包括无)数据元。请参阅[键部分](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)以了解如何定义连接键。 -<figure class="highlight"> ``` input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins @@ -260,12 +277,14 @@ input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or ful }); ``` -</figure> - | -| **CoGroup** | reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。请参阅[keys部分](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)以了解如何定义coGroup键。 -<figure class="highlight"> +--- + +转换:**CoGroup** + +描述:reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。请参阅[keys部分](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)以了解如何定义coGroup键。 + ``` data1.coGroup(data2) @@ -278,12 +297,14 @@ data1.coGroup(data2) }); ``` -</figure> - | -| **Cross** | 构建两个输入的笛卡尔积(交叉乘积),创建所有数据元对。可选择使用CrossFunction将数据元对转换为单个数据元 -<figure class="highlight"> +--- + +转换:**Cross** + +描述:构建两个输入的笛卡尔积(交叉乘积),创建所有数据元对。可选择使用CrossFunction将数据元对转换为单个数据元 + ``` DataSet<Integer> data1 = // [...] @@ -291,12 +312,14 @@ DataSet<String> data2 = // [...] DataSet<Tuple2<Integer, String>> result = data1.cross(data2); ``` -</figure> 注:交叉是一个潜在的**非常**计算密集型 算子操作它甚至可以挑战大的计算集群!建议使用_crossWithTiny()_和_crossWithHuge()_来提示系统的DataSet大小。 | -| **Union** | 生成两个数据集的并集。 +--- + +转换:**Union** + +描述:生成两个数据集的并集。 -<figure class="highlight"> ``` DataSet<String> data1 = // [...] @@ -304,12 +327,14 @@ DataSet<String> data2 = // [...] DataSet<String> result = data1.union(data2); ``` -</figure> - | -| **Rebalance** | 均匀地Rebalance 数据集的并行分区以消除数据偏差。只有类似Map的转换可能会遵循Rebalance 转换。 -<figure class="highlight"> +--- + +转换:**Rebalance** + +描述:均匀地Rebalance 数据集的并行分区以消除数据偏差。只有类似Map的转换可能会遵循Rebalance 转换。 + ``` DataSet<String> in = // [...] @@ -317,12 +342,14 @@ DataSet<String> result = in.rebalance() .map(new Mapper()); ``` -</figure> - | -| **Hash-Partition** | 散列分区给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。 -<figure class="highlight"> +--- + +转换:**Hash-Partition** + +描述:散列分区给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。 + ``` DataSet<Tuple2<String,Integer>> in = // [...] @@ -330,12 +357,14 @@ DataSet<Integer> result = in.partitionByHash(0) .mapPartition(new PartitionMapper()); ``` -</figure> - | -| **Range-Partition** | Range-Partition给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。 -<figure class="highlight"> +--- + +转换:**Range-Partition** + +描述:Range-Partition给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。 + ``` DataSet<Tuple2<String,Integer>> in = // [...] @@ -343,25 +372,29 @@ DataSet<Integer> result = in.partitionByRange(0) .mapPartition(new PartitionMapper()); ``` -</figure> - | -| **Custom Partitioning** | 手动指定数据分区。 + +--- + +转换:**Custom Partitioning** + +描述:手动指定数据分区。 _注意_:此方法仅适用于单个字段键。 -<figure class="highlight"> ``` DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key) ``` -</figure> - | -| **Sort Partition** | 本地按指定顺序对指定字段上的数据集的所有分区进行排序。可以将字段指定为元组位置或字段表达式。通过链接sortPartition()调用来完成对多个字段的排序。 -<figure class="highlight"> +--- + +转换:**Sort Partition** + +描述:本地按指定顺序对指定字段上的数据集的所有分区进行排序。可以将字段指定为元组位置或字段表达式。通过链接sortPartition()调用来完成对多个字段的排序。 + ``` DataSet<Tuple2<String,Integer>> in = // [...] @@ -369,12 +402,14 @@ DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING) .mapPartition(new PartitionMapper()); ``` -</figure> - | -| **First-n** | 返回数据集的前n个(任意)数据元。First-n可以应用于常规数据集,分组数据集或分组排序数据集。分组键可以指定为键选择器函数或字段位置键。 -<figure class="highlight"> +--- + +转换:**First-n** + +描述:返回数据集的前n个(任意)数据元。First-n可以应用于常规数据集,分组数据集或分组排序数据集。分组键可以指定为键选择器函数或字段位置键。 + ``` DataSet<Tuple2<String,Integer>> in = // [...] @@ -389,31 +424,35 @@ DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0) .first(3); ``` -</figure> - | + * * * 以下转换可用于元组的数据集: -| 转换 | 描述 | -| --- | --- | -| **project** | 从元组中选择字段的子集 -<figure class="highlight"> + +--- + +转换:**project** + +描述:从元组中选择字段的子集 + ``` DataSet<Tuple3<Integer, Double, String>> in = // [...] DataSet<Tuple2<String, Integer>> out = in.project(2,0); ``` -</figure> - | -| **MinBy / MaxBy** | 从一组元组中选择一个元组,其元组的一个或多个字段的值最小(最大)。用于比较的字段必须是有效的关键字段,即可比较。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy(MaxBy)可以应用于完整数据集或分组数据集。 -<figure class="highlight"> +--- + +转换:**MinBy / MaxBy** + +描述:从一组元组中选择一个元组,其元组的一个或多个字段的值最小(最大)。用于比较的字段必须是有效的关键字段,即可比较。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy(MaxBy)可以应用于完整数据集或分组数据集。 + ``` DataSet<Tuple3<Integer, Double, String>> in = // [...] @@ -424,126 +463,140 @@ DataSet<Tuple3<Integer, Double, String>> out2 = in.groupBy(2) .minBy(1); ``` -</figure> - | -| 转换 | 描述 | -| --- | --- | -| **Map** | 采用一个元素并生成一个元素。 -<figure class="highlight"> + + +--- + +转换:**Map** + +描述:采用一个元素并生成一个元素。 + ``` data.map { x => x.toInt } ``` -</figure> - | -| **FlatMap** | 采用一个元素并生成零个,一个或多个元素。 -<figure class="highlight"> +--- + +转换:**FlatMap** + +描述:采用一个元素并生成零个,一个或多个元素。 + ``` data.flatMap { str => str.split(" ") } ``` -</figure> - | -| **MapPartition** | 在单个函数调用中转换并行分区。该函数将分区作为“迭代器”,并可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和先前的 算子操作。 -<figure class="highlight"> +--- + +转换:**MapPartition** + +描述:在单个函数调用中转换并行分区。该函数将分区作为“迭代器”,并可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和先前的 算子操作。 + ``` data.mapPartition { in => in map { (_, 1) } } ``` -</figure> - | -| **Filter** | 计算每个元素的布尔函数,并保存函数返回true的元素。 + +--- + +转换:**Filter** + +描述:计算每个元素的布尔函数,并保存函数返回true的元素。 **重要信息:**系统假定该函数不会修改应用谓词的元素。违反此假设可能会导致错误的结果。 -<figure class="highlight"> ``` data.filter { _ > 1000 } ``` -</figure> - | -| **Reduce** | 通过将两个元素重复组合成一个元素,将一组元素组合成一个元素。Reduce可以应用于完整数据集或分组数据集。 -<figure class="highlight"> +--- + +转换:**Reduce** + +描述:通过将两个元素重复组合成一个元素,将一组元素组合成一个元素。Reduce可以应用于完整数据集或分组数据集。 + ``` data.reduce { _ + _ } ``` -</figure> - | -| **ReduceGroup** | 将一组元素组合成一个或多个元素。ReduceGroup可以应用于完整数据集或分组数据集。 -<figure class="highlight"> +--- + +转换:**ReduceGroup** + +描述:将一组元素组合成一个或多个元素。ReduceGroup可以应用于完整数据集或分组数据集。 + ``` data.reduceGroup { elements => elements.sum } ``` -</figure> - | -| **Aggregate** | 将一组值聚合为单个值。聚合函数可以被认为是内置的reduce函数。聚合可以应用于完整数据集或分组数据集。 -<figure class="highlight"> +--- + +转换:**Aggregate** + +描述:将一组值聚合为单个值。聚合函数可以被认为是内置的reduce函数。聚合可以应用于完整数据集或分组数据集。 + ``` val input: DataSet[(Int, String, Double)] = // [...] val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2) ``` -</figure> 您还可以使用简写语法进行最小,最大和总和聚合。 -<figure class="highlight"> ``` val input: DataSet[(Int, String, Double)] = // [...] val output: DataSet[(Int, String, Double)] = input.sum(0).min(2) ``` -</figure> - | -| **Distinct** | 返回数据集的不同元素。它相对于元素的所有字段或字段子集从输入DataSet中删除重复条目。 -<figure class="highlight"> +--- + +转换:**Distinct** + +描述:返回数据集的不同元素。它相对于元素的所有字段或字段子集从输入DataSet中删除重复条目。 + ``` data.distinct() ``` -</figure> - | -| **Join** | 通过创建在其键上相等的所有元素对来连接两个数据集。可选地使用JoinFunction将元素对转换为单个元素,或使用FlatJoinFunction将元素对转换为任意多个(包括无)元素。请参阅[键部分](//ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#specifying-keys)以了解如何定义连接键。 -<figure class="highlight"> +--- + +转换:**Join** + +描述:通过创建在其键上相等的所有元素对来连接两个数据集。可选地使用JoinFunction将元素对转换为单个元素,或使用FlatJoinFunction将元素对转换为任意多个(包括无)元素。请参阅[键部分](//ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#specifying-keys)以了解如何定义连接键。 + ``` // In this case tuple fields are used as keys. "0" is the join field on the first tuple // "1" is the join field on the second tuple. val result = input1.join(input2).where(0).equalTo(1) ``` -</figure> 您可以通过_Join Hints_指定运行时执行连接的方式。提示描述了通过分区或广播进行连接,以及它是使用基于排序还是基于散列的算法。有关可能的提示和示例的列表,请参阅“ [转换指南”](dataset_transformations.html#join-algorithm-hints)。 如果未指定提示,系统将尝试估算输入大小,并根据这些估计选择最佳策略。 -<figure class="highlight"> ``` // This executes a join by broadcasting the first data set @@ -551,12 +604,14 @@ val input: DataSet[(Int, String, Double)] = // [...] val output: DataSet[(Int, S .where(0).equalTo(1) ``` -</figure> 请注意,连接转换仅适用于等连接。其他连接类型需要使用OuterJoin或CoGroup表示。 | -| **OuterJoin** | 在两个数据集上执行左,右或全外连接。外连接类似于常规(内部)连接,并创建在其键上相等的所有元素对。此外,如果在另一侧没有找到匹配的密钥,则保存“外部”侧(左侧,右侧或两者都满)的记录。匹配元素对(或一个元素和另一个输入的`null`值)被赋予JoinFunction以将元素对转换为单个元素,或者给予FlatJoinFunction以将元素对转换为任意多个(包括无)元素。请参阅[键部分](//ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#specifying-keys)以了解如何定义连接键。 +--- + +转换:**OuterJoin** + +描述:在两个数据集上执行左,右或全外连接。外连接类似于常规(内部)连接,并创建在其键上相等的所有元素对。此外,如果在另一侧没有找到匹配的密钥,则保存“外部”侧(左侧,右侧或两者都满)的记录。匹配元素对(或一个元素和另一个输入的`null`值)被赋予JoinFunction以将元素对转换为单个元素,或者给予FlatJoinFunction以将元素对转换为任意多个(包括无)元素。请参阅[键部分](//ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#specifying-keys)以了解如何定义连接键。 -<figure class="highlight"> ``` val joined = left.leftOuterJoin(right).where(0).equalTo(1) { @@ -566,102 +621,120 @@ val joined = left.leftOuterJoin(right).where(0).equalTo(1) { } ``` -</figure> - | -| **CoGroup** | reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。请参阅[keys部分](//ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#specifying-keys)以了解如何定义coGroup键。 -<figure class="highlight"> +--- + +转换:**CoGroup** + +描述:reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。请参阅[keys部分](//ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#specifying-keys)以了解如何定义coGroup键。 + ``` data1.coGroup(data2).where(0).equalTo(1) ``` -</figure> - | -| **Cross** | 构建两个输入的笛卡尔积(交叉乘积),创建所有元素对。可选择使用CrossFunction将元素对转换为单个元素 -<figure class="highlight"> +--- + +转换:**Cross** + +描述:构建两个输入的笛卡尔积(交叉乘积),创建所有元素对。可选择使用CrossFunction将元素对转换为单个元素 + ``` val data1: DataSet[Int] = // [...] val data2: DataSet[String] = // [...] val result: DataSet[(Int, String)] = data1.cross(data2) ``` -</figure> 注:交叉是一个潜在的**非常**计算密集型 算子操作它甚至可以挑战大的计算集群!建议使用_crossWithTiny()_和_crossWithHuge()_来提示系统的DataSet大小。 | -| **Union** | 生成两个数据集的并集。 +--- + +转换:**Union** + +描述:生成两个数据集的并集。 -<figure class="highlight"> ``` data.union(data2) ``` -</figure> - | -| **Rebalance** | 均匀地Rebalance 数据集的并行分区以消除数据偏差。只有类似Map的转换可能会遵循Rebalance 转换。 -<figure class="highlight"> +--- + +转换:**Rebalance** + +描述:均匀地Rebalance 数据集的并行分区以消除数据偏差。只有类似Map的转换可能会遵循Rebalance 转换。 + ``` val data1: DataSet[Int] = // [...] val result: DataSet[(Int, String)] = data1.rebalance().map(...) ``` -</figure> - | -| **Hash-Partition** | 散列分区给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。 -<figure class="highlight"> +--- + +转换:**Hash-Partition** + +描述:散列分区给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。 + ``` val in: DataSet[(Int, String)] = // [...] val result = in.partitionByHash(0).mapPartition { ... } ``` -</figure> - | -| **Range-Partition** | Range-Partition给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。 -<figure class="highlight"> +--- + +转换:**Range-Partition** + +描述:Range-Partition给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。 + ``` val in: DataSet[(Int, String)] = // [...] val result = in.partitionByRange(0).mapPartition { ... } ``` -</figure> - | -| **Custom Partitioning** | 手动指定数据分区。 + +--- + +转换:**Custom Partitioning** + +描述:手动指定数据分区。 _注意_:此方法仅适用于单个字段键。 -<figure class="highlight"> ``` val in: DataSet[(Int, String)] = // [...] val result = in .partitionCustom(partitioner: Partitioner[K], key) ``` -</figure> - | -| **Sort Partition** | 本地按指定顺序对指定字段上的数据集的所有分区进行排序。可以将字段指定为元组位置或字段表达式。通过链接sortPartition()调用来完成对多个字段的排序。 -<figure class="highlight"> +--- + +转换:**Sort Partition** + +描述:本地按指定顺序对指定字段上的数据集的所有分区进行排序。可以将字段指定为元组位置或字段表达式。通过链接sortPartition()调用来完成对多个字段的排序。 + ``` val in: DataSet[(Int, String)] = // [...] val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... } ``` -</figure> - | -| **First-n** | 返回数据集的前n个(任意)元素。First-n可以应用于常规数据集,分组数据集或分组排序数据集。可以将分组键指定为键选择器函数,元组位置或案例类字段。 -<figure class="highlight"> +--- + +转换:**First-n** + +描述:返回数据集的前n个(任意)元素。First-n可以应用于常规数据集,分组数据集或分组排序数据集。可以将分组键指定为键选择器函数,元组位置或案例类字段。 + ``` val in: DataSet[(Int, String)] = // [...] @@ -670,19 +743,21 @@ val in: DataSet[(Int, String)] = // [...] // grouped-sorted data set val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3) ``` -</figure> - | + * * * 以下转换可用于元组的数据集: -| 转换 | 描述 | -| --- | --- | -| **MinBy / MaxBy** | 从一组元组中选择一个元组,其元组的一个或多个字段的值最小(最大)。用于比较的字段必须是有效的关键字段,即可比较。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy(MaxBy)可以应用于完整数据集或分组数据集。 -<figure class="highlight"> + +--- + +转换:**MinBy / MaxBy** + +描述:从一组元组中选择一个元组,其元组的一个或多个字段的值最小(最大)。用于比较的字段必须是有效的关键字段,即可比较。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy(MaxBy)可以应用于完整数据集或分组数据集。 + ``` val in: DataSet[(Int, Double, String)] = // [...] @@ -693,9 +768,8 @@ val out2: DataSet[(Int, Double, String)] = in.groupBy(2) .minBy(1) ``` -</figure> - | + 通过匿名模式匹配从元组,案例类和集合中提取,如下所示: @@ -960,12 +1034,33 @@ Flink目前支持输入文件的透明解压缩,如果它们标有适当的文 下表列出了当前支持的压缩方法。 -| 压缩方法 | 文件扩展名 | 可并行 | -| --- | --- | --- | -| **DEFLATE** | `.deflate` | no / not | -| **GZip** | `.gz`, `.gzip` | no / not | -| **Bzip2** | `.bz2` | no / not | -| **XZ** | `.xz` | no / not | +--- + +转换:压缩方法 + +描述:文件扩展名 | 可并行 | + +描述:--- | --- | +--- + +转换:**DEFLATE** + +描述:`.deflate` | no / not | +--- + +转换:**GZip** + +描述:`.gz`, `.gzip` | no / not | +--- + +转换:**Bzip2** + +描述:`.bz2` | no / not | +--- + +转换:**XZ** + +描述:`.xz` | no / not | ## 数据接收 @@ -1313,13 +1408,37 @@ Flink的DataSet API具有两种模式,这些模式在Flink的运行时创建 默认情况下,Flink在禁用对象重用模式下运行。此模式可确保函数始终在函数调用中接收新的输入对象。禁用对象重用模式可提供更好的保证,并且使用起来更安全。但是,它带来了一定的处理开销,可能会导致更高的Java垃圾回收活动。下表说明了用户函数如何在禁用对象重用模式下访问输入和输出对象。 -| Operation | 保证和限制 | -| --- | --- | -| **读取输入对象** | 在方法调用中,保证输入对象的值不会改变。这包括由Iterable提供的对象。例如,收集由List或Map中的Iterable提供的输入对象是安全的。请注意,在保存方法调用后,可以修改对象。在函数调用中记住对象是**不安全的**。 | -| **修改输入对象** | 您可以修改输入对象。 | -| **发射输入对象** | 您可以发出输入对象。输入对象的值在发出后可能已更改。在输出对象后,读取它是**不安全的**。 | -| **读取输出对象** | 提供给收集器或作为方法结果返回的对象可能已更改其值。读取输出对象是**不安全的**。 | -| **修改输出对象** | 您可以在发射对象后对其进行修改并再次发射。 | +--- + +转换:Operation + +描述:保证和限制 | + +--- + +转换:**读取输入对象** + +描述:在方法调用中,保证输入对象的值不会改变。这包括由Iterable提供的对象。例如,收集由List或Map中的Iterable提供的输入对象是安全的。请注意,在保存方法调用后,可以修改对象。在函数调用中记住对象是**不安全的**。 | +--- + +转换:**修改输入对象** + +描述:您可以修改输入对象。 | +--- + +转换:**发射输入对象** + +描述:您可以发出输入对象。输入对象的值在发出后可能已更改。在输出对象后,读取它是**不安全的**。 | +--- + +转换:**读取输出对象** + +描述:提供给收集器或作为方法结果返回的对象可能已更改其值。读取输出对象是**不安全的**。 | +--- + +转换:**修改输出对象** + +描述:您可以在发射对象后对其进行修改并再次发射。 | **禁用对象重用(默认)模式的编码指南:** @@ -1330,14 +1449,42 @@ Flink的DataSet API具有两种模式,这些模式在Flink的运行时创建 在对象重用启用模式下,Flink的运行时最小化对象实例化的数量。这可以提高性能并可以ReduceJava垃圾收集压力。通过调用激活对象重用启用模式`ExecutionConfig.enableObjectReuse()`。下表说明了用户函数如何在对象重用启用模式下访问输入和输出对象。 -| Operation | 保证和限制 | -| --- | --- | -| **读取作为常规方法参数接收的输入对象** | 在常规方法参数中接收的输入对象不会在函数调用中修改。在离开方法调用后,可以修改对象。在函数调用中记住对象是**不安全的**。 | -| **读取从Iterable参数接收的输入对象** | 从Iterable接收的输入对象仅在调用next()方法之前有效。Iterable或Iterator可以多次为同一个对象实例提供服务。记住从Iterable接收的输入对象是**不安全**的,例如,将它们放在List或Map中。 | -| **修改输入对象** | 除了MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction和InputFormat.next(重用)的输入对象外,您**不能**修改输入对象。 | -| **发射输入对象** | 除了MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction和InputFormat.next(重用)的输入对象外, 您**不能**发出输入对象。 | -| **读取输出对象** | 提供给收集器或作为方法结果返回的对象可能已更改其值。读取输出对象是**不安全的**。 | -| **修改输出对象** | 您可以修改输出对象并再次发出。 | +--- + +转换:Operation + +描述:保证和限制 | + +--- + +转换:**读取作为常规方法参数接收的输入对象** + +描述:在常规方法参数中接收的输入对象不会在函数调用中修改。在离开方法调用后,可以修改对象。在函数调用中记住对象是**不安全的**。 | +--- + +转换:**读取从Iterable参数接收的输入对象** + +描述:从Iterable接收的输入对象仅在调用next()方法之前有效。Iterable或Iterator可以多次为同一个对象实例提供服务。记住从Iterable接收的输入对象是**不安全**的,例如,将它们放在List或Map中。 | +--- + +转换:**修改输入对象** + +描述:除了MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction和InputFormat.next(重用)的输入对象外,您**不能**修改输入对象。 | +--- + +转换:**发射输入对象** + +描述:除了MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction和InputFormat.next(重用)的输入对象外, 您**不能**发出输入对象。 | +--- + +转换:**读取输出对象** + +描述:提供给收集器或作为方法结果返回的对象可能已更改其值。读取输出对象是**不安全的**。 | +--- + +转换:**修改输出对象** + +描述:您可以修改输出对象并再次发出。 | **启用对象重用的编码指南:** diff --git a/docs/1.7-SNAPSHOT/50.md b/docs/1.7-SNAPSHOT/50.md index ef7e988..49a5b7a 100644 --- a/docs/1.7-SNAPSHOT/50.md +++ b/docs/1.7-SNAPSHOT/50.md @@ -17,24 +17,11 @@ Flink程序通过定义**步进函数**并将其嵌入到特殊的迭代 算子 | --- | --- | --- | | **迭代输入** | **部分解决方案** | **工作集**和**解决方案集** | | **步函数** | 任意数据流 | -| **状态更新** | 下**一部分解决方案** | - -* 下一个工作集 -* **对解决方案集的更改** - - | +| **状态更新** | 下**一部分解决方案** | 下一个工作集 | +| | | **对解决方案集的更改** | | **迭代结果** | 最后部分解决方案 | 上次迭代后的解决方案设置状态 | -| **终止** | - -* **最大迭代次数**(默认) -* 自定义聚合器收敛 - - | - -* **最大迭代次数或空工作集**(默认) -* 自定义聚合器收敛 - - | +| **终止** | **最大迭代次数**(默认) | **最大迭代次数或空工作集**(默认) | +| | 自定义聚合器收敛 | 自定义聚合器收敛 | ## 迭代 算子 diff --git a/docs/1.7-SNAPSHOT/51.md b/docs/1.7-SNAPSHOT/51.md index 917c9b0..dea9c0a 100644 --- a/docs/1.7-SNAPSHOT/51.md +++ b/docs/1.7-SNAPSHOT/51.md @@ -123,69 +123,81 @@ Python API在安装了Python 2.7或3.4的Linux / Windows系统上进行了测试 本节简要概述了可用的转换。该[转换文档](dataset_transformations.html)与示例全部转换的完整描述。 - -| 转型 | 描述 | -| --- | --- | -| **Map** | 采用一个数据元并生成一个数据元。 -<figure class="highlight"> + +--- + +转换:**Map** + +描述:采用一个数据元并生成一个数据元。 + ``` data.map(lambda x: x * 2) ``` -</figure> - | -| **FlatMap** | 采用一个数据元并生成零个,一个或多个数据元。 -<figure class="highlight"> +--- + +转换:**FlatMap** + +描述:采用一个数据元并生成零个,一个或多个数据元。 + ``` data.flat_map( lambda x,c: [(1,word) for word in line.lower().split() for line in x]) ``` -</figure> - | -| **MapPartition** | 在单个函数调用中转换并行分区。该函数将分区作为“迭代器”,并可以生成任意数量的结果值。每个分区中的数据元数量取决于并行度和先前的 算子操作。 -<figure class="highlight"> +--- + +转换:**MapPartition** + +描述:在单个函数调用中转换并行分区。该函数将分区作为“迭代器”,并可以生成任意数量的结果值。每个分区中的数据元数量取决于并行度和先前的 算子操作。 + ``` data.map_partition(lambda x,c: [value * 2 for value in x]) ``` -</figure> - | -| **Filter** | 计算每个数据元的布尔函数,并保存函数返回true的数据元。 -<figure class="highlight"> +--- + +转换:**Filter** + +描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。 + ``` data.filter(lambda x: x > 1000) ``` -</figure> - | -| **Reduce** | 通过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。Reduce可以应用于完整数据集或分组数据集。 -<figure class="highlight"> +--- + +转换:**Reduce** + +描述:通过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。Reduce可以应用于完整数据集或分组数据集。 + ``` data.reduce(lambda x,y : x + y) ``` -</figure> - | -| **ReduceGroup** | 将一组数据元组合成一个或多个数据元。ReduceGroup可以应用于完整数据集或分组数据集。 -<figure class="highlight"> +--- + +转换:**ReduceGroup** + +描述:将一组数据元组合成一个或多个数据元。ReduceGroup可以应用于完整数据集或分组数据集。 + ``` class Adder(GroupReduceFunction): @@ -197,12 +209,14 @@ class Adder(GroupReduceFunction): data.reduce_group(Adder()) ``` -</figure> - | -| **骨料** | 在数据集或数据集的每个组中的所有元组的一个字段上执行内置 算子操作(sum,min,max)。聚合可以应用于完整数据集或分组数据集。 -<figure class="highlight"> +--- + +转换:**骨料** + +描述:在数据集或数据集的每个组中的所有元组的一个字段上执行内置 算子操作(sum,min,max)。聚合可以应用于完整数据集或分组数据集。 + ``` # This code finds the sum of all of the values in the first field and the maximum of all of the values in the second field @@ -218,12 +232,14 @@ data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1) data.sum(0).and_agg(Aggregation.Max, 1) ``` -</figure> - | -| **Join** | 通过创建在其键上相等的所有数据元对来连接两个数据集。(可选)使用JoinFunction将数据元对转换为单个数据元。见[键](#specifying-keys)如何定义连接Keys。 -<figure class="highlight"> +--- + +转换:**Join** + +描述:通过创建在其键上相等的所有数据元对来连接两个数据集。(可选)使用JoinFunction将数据元对转换为单个数据元。见[键](#specifying-keys)如何定义连接Keys。 + ``` # In this case tuple fields are used as keys. @@ -241,53 +257,60 @@ data.sum(0).and_agg(Aggregation.Max, 1) result = input1.join(input2).where(0).equal_to(1) ``` -</figure> - | -| **CoGroup** | reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。见[键](#specifying-keys)如何定义CoGroup键。 -<figure class="highlight"> +--- + +转换:**CoGroup** + +描述:reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。见[键](#specifying-keys)如何定义CoGroup键。 + ``` data1.co_group(data2).where(0).equal_to(1) ``` -</figure> - | -| **交叉** | 构建两个输入的笛卡尔积(交叉乘积),创建所有数据元对。可选择使用CrossFunction将数据元对转换为单个数据元。 -<figure class="highlight"> +--- + +转换:**交叉** + +描述:构建两个输入的笛卡尔积(交叉乘积),创建所有数据元对。可选择使用CrossFunction将数据元对转换为单个数据元。 + ``` result = data1.cross(data2) ``` -</figure> - | -| **Union** | 生成两个数据集的并集。 -<figure class="highlight"> +--- + +转换:**Union** + +描述:生成两个数据集的并集。 + ``` data.union(data2) ``` -</figure> - | -| **ZipWithIndex** | 为每个数据元分配连续索引。有关详细信息,请参阅[Zip数据元指南](zip_elements_guide.html#zip-with-a-dense-index)。 -<figure class="highlight"> +--- + +转换:**ZipWithIndex** + +描述:为每个数据元分配连续索引。有关详细信息,请参阅[Zip数据元指南](zip_elements_guide.html#zip-with-a-dense-index)。 + ``` data.zip_with_index() ``` -</figure> - | + ## 指定Keys diff --git a/docs/1.7-SNAPSHOT/60.md b/docs/1.7-SNAPSHOT/60.md index 2ff88b0..76d2765 100644 --- a/docs/1.7-SNAPSHOT/60.md +++ b/docs/1.7-SNAPSHOT/60.md @@ -127,73 +127,61 @@ Table API支持以下 算子操作。请注意,并非所有 算子操作都可 | **Scan** 批量 流 | 与SQL查询中的FROM子句类似。执行已注册表的扫描。 -<figure class="highlight"> ``` Table orders = tableEnv.scan("Orders"); ``` -</figure> | | **Select** 批量 流 | 与SQL SELECT语句类似。执行选择 算子操作。 -<figure class="highlight"> ``` Table orders = tableEnv.scan("Orders"); Table result = orders.select("a, c as d"); ``` -</figure> 您可以使用star(`*`)作为通配符,选择表中的所有列。 -<figure class="highlight"> ``` Table result = orders.select("*"); ``` -</figure> | | **As** 批量 流 | 重命名字段。 -<figure class="highlight"> ``` Table orders = tableEnv.scan("Orders"); Table result = orders.as("x, y, z, t"); ``` -</figure> | | **Where / Filter** Batch Streaming | 与SQL WHERE子句类似。过滤掉未通过过滤谓词的行。 -<figure class="highlight"> ``` Table orders = tableEnv.scan("Orders"); Table result = orders.where("b === 'red'"); ``` -</figure> 要么 -<figure class="highlight"> ``` Table orders = tableEnv.scan("Orders"); Table result = orders.filter("a % 2 === 0"); ``` -</figure> | @@ -202,73 +190,61 @@ Table result = orders.filter("a % 2 === 0"); | **Scan** Batch Streaming | Similar to the FROM clause in a SQL query. Performs a scan of a registered table. -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders") ``` -</figure> | | **Select** Batch Streaming | Similar to a SQL SELECT statement. Performs a select operation. -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders") val result = orders.select('a, 'c as 'd) ``` -</figure> You can use star (`*`) to act as a wild card, selecting all of the columns in the table. -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders") val result = orders.select('*) ``` -</figure> | | **As** Batch Streaming | Renames fields. -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders").as('x, 'y, 'z, 't) ``` -</figure> | | **Where / Filter** Batch Streaming | Similar to a SQL WHERE clause. Filters out rows that do not pass the filter predicate. -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders") val result = orders.filter('a % 2 === 0) ``` -</figure> or -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders") val result = orders.where('b === "red") ``` -</figure> | @@ -283,20 +259,17 @@ val result = orders.where('b === "red") 批处理 流 结果更新 | 与SQL GROUP BY子句类似。使用以下运行的聚合 算子对分组键上的行进行分组,以按组聚合行。 -<figure class="highlight"> ``` Table orders = tableEnv.scan("Orders"); Table result = orders.groupBy("a").select("a, b.sum as d"); ``` -</figure> **注意:**对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅[Streaming Concepts](streaming.html)。 | | **GroupBy窗口聚合** 批量 流 | 组和聚合组[窗口](#group-windows)上的表以及可能的一个或多个分组键。 -<figure class="highlight"> ``` Table orders = tableEnv.scan("Orders"); @@ -306,13 +279,11 @@ Table result = orders .select("a, w.start, w.end, w.rowtime, b.sum as d"); // access window properties and aggregate ``` -</figure> | | **Over Windows聚合** 流 | 与SQL OVER子句类似。基于前一行和后一行的窗口(范围)计算每行的窗口聚合。有关更多详细信息,请参阅[over windows部分](#over-windows)。 -<figure class="highlight"> ``` Table orders = tableEnv.scan("Orders"); @@ -327,21 +298,18 @@ Table result = orders .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate ``` -</figure> **注意:**必须在同一窗口中定义所有聚合,即相同的分区,排序和范围。目前,仅支持具有PRREDING(UNBOUNDED和有界)到CURRENT ROW范围的窗口。尚不支持使用FOLLOWING的范围。必须在单个[时间属性](streaming.html#time-attributes)上指定ORDER BY 。 | | **Distinct** 批量 流 结果更新 | 与SQL DISTINCT子句类似。返回具有不同值组合的记录。 -<figure class="highlight"> ``` Table orders = tableEnv.scan("Orders"); Table result = orders.distinct(); ``` -</figure> **注意:**对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅[Streaming Concepts](streaming.html)。 | @@ -351,20 +319,17 @@ Table result = orders.distinct(); Batch Streaming Result Updating | Similar to a SQL GROUP BY clause. Groups the rows on the grouping keys with a following running aggregation operator to aggregate rows group-wise. -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders") val result = orders.groupBy('a).select('a, 'b.sum as 'd) ``` -</figure> **Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Streaming Concepts](streaming.html) for details. | | **GroupBy Window Aggregation** Batch Streaming | Groups and aggregates a table on a [group window](#group-windows) and possibly one or more grouping keys. -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders") @@ -374,13 +339,11 @@ val result: Table = orders .select('a, w.start, 'w.end, 'w.rowtime, 'b.sum as 'd) // access window properties and aggregate ``` -</figure> | | **Over Window Aggregation** Streaming | Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the [over windows section](#over-windows) for more details. -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders") @@ -395,20 +358,17 @@ val result: Table = orders .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w) // sliding aggregate ``` -</figure> **Note:** All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single [time attribute](streaming.html#time-attributes). | | **Distinct** Batch | Similar to a SQL DISTINCT clause. Returns records with distinct value combinations. -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders") val result = orders.distinct() ``` -</figure> **Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Streaming Concepts](streaming.html) for details. | @@ -422,7 +382,6 @@ val result = orders.distinct() | **Inner Join** 批量 流 | 与SQL JOIN子句类似。关联两张桌子。两个表必须具有不同的字段名称,并且必须通过连接 算子或使用where或filter 算子定义至少一个相等连接谓词。 -<figure class="highlight"> ``` Table left = tableEnv.fromDataSet(ds1, "a, b, c"); @@ -430,13 +389,11 @@ Table right = tableEnv.fromDataSet(ds2, "d, e, f"); Table result = left.join(right).where("a = d").select("a, b, e"); ``` -</figure> **注意:**对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅[Streaming Concepts](streaming.html)。 | | **Outer Join** 批处理 流 结果更新 | 与SQL LEFT / RIGHT / FULL OUTER JOIN子句类似。关联两张桌子。两个表必须具有不同的字段名称,并且必须至少定义一个等于连接谓词。 -<figure class="highlight"> ``` Table left = tableEnv.fromDataSet(ds1, "a, b, c"); @@ -447,7 +404,6 @@ Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e"); Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e"); ``` -</figure> **注意:**对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅[Streaming Concepts](streaming.html)。 | | **Time-windowed Join** @@ -456,7 +412,6 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e"); * `ltime === rtime` * `ltime >= rtime && ltime < rtime + 10.minutes` -<figure class="highlight"> ``` Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime"); @@ -467,13 +422,11 @@ Table result = left.join(right) .select("a, b, e, ltime"); ``` -</figure> | | **TableFunction Inner Join** 批量 流 | 使用表函数的结果连接表。左(外)表的每一行与表函数的相应调用产生的所有行连接。如果其表函数调用返回空结果,则删除左(外)表的一行。 -<figure class="highlight"> ``` // register function @@ -487,13 +440,11 @@ Table result = orders .select("a, b, s, t, v"); ``` -</figure> | | **TableFunction Left Outer Join** Batch Streaming | 使用表函数的结果连接表。左(外)表的每一行与表函数的相应调用产生的所有行连接。如果表函数调用返回空结果,则保存相应的外部行,并使用空值填充结果。**注意:**目前,左外连接的表函数的谓词只能是空或文字`true`。 -<figure class="highlight"> ``` // register function @@ -507,7 +458,6 @@ Table result = orders .select("a, b, s, t, v"); ``` -</figure> | @@ -516,7 +466,6 @@ Table result = orders | **Inner Join** Batch Streaming | Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator. -<figure class="highlight"> ``` val left = ds1.toTable(tableEnv, 'a, 'b, 'c) @@ -524,13 +473,11 @@ val right = ds2.toTable(tableEnv, 'd, 'e, 'f) val result = left.join(right).where('a === 'd).select('a, 'b, 'e) ``` -</figure> **Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Streaming Concepts](streaming.html) for details. | | **Outer Join** Batch Streaming Result Updating | Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined. -<figure class="highlight"> ``` val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c) @@ -541,7 +488,6 @@ val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e) val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e) ``` -</figure> **Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Streaming Concepts](streaming.html) for details. | | **Time-windowed Join** @@ -550,7 +496,6 @@ Batch Streaming | **Note:** Time-windowed joins are a subset of regular joins th * `'ltime === 'rtime` * `'ltime >= 'rtime && 'ltime < 'rtime + 10.minutes` -<figure class="highlight"> ``` val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime) @@ -561,13 +506,11 @@ val result = left.join(right) .select('a, 'b, 'e, 'ltime) ``` -</figure> | | **TableFunction Inner Join** Batch Streaming | Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. A row of the left (outer) table is dropped, if its table function call returns an empty result. -<figure class="highlight"> ``` // instantiate function val split: TableFunction[_] = new MySplitUDTF() @@ -577,13 +520,11 @@ Batch Streaming | Joins a table with a the results of a table function. Each row .select('a, 'b, 's, 't, 'v) ``` -</figure> | | **TableFunction Left Outer Join** Batch Streaming | Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.**Note:** Currently, the predicate of a table function left outer join can only be empty or literal `true`. -<figure class="highlight"> ``` // instantiate function val split: TableFunction[_] = new MySplitUDTF() @@ -593,7 +534,6 @@ Batch Streaming | Joins a table with a the results of a table function. Each row .select('a, 'b, 's, 't, 'v) ``` -</figure> | @@ -607,7 +547,6 @@ Batch Streaming | Joins a table with a the results of a table function. Each row | **Union** 批次 | 与SQL UNION子句类似。联合两个表删除了重复记录。两个表必须具有相同的字段类型。 -<figure class="highlight"> ``` Table left = tableEnv.fromDataSet(ds1, "a, b, c"); @@ -615,13 +554,11 @@ Table right = tableEnv.fromDataSet(ds2, "a, b, c"); Table result = left.union(right); ``` -</figure> | | **UnionAll** Batch Streaming | 与SQL UNION ALL子句类似。工会两张桌子。两个表必须具有相同的字段类型。 -<figure class="highlight"> ``` Table left = tableEnv.fromDataSet(ds1, "a, b, c"); @@ -629,13 +566,11 @@ Table right = tableEnv.fromDataSet(ds2, "a, b, c"); Table result = left.unionAll(right); ``` -</figure> | | **Intersect** 批次 | 类似于SQL INTERSECT子句。Intersect返回两个表中存在的记录。如果一个或两个表不止一次出现记录,则只返回一次,即结果表没有重复记录。两个表必须具有相同的字段类型。 -<figure class="highlight"> ``` Table left = tableEnv.fromDataSet(ds1, "a, b, c"); @@ -643,13 +578,11 @@ Table right = tableEnv.fromDataSet(ds2, "d, e, f"); Table result = left.intersect(right); ``` -</figure> | | **IntersectAll** Batch | 类似于SQL INTERSECT ALL子句。IntersectAll返回两个表中存在的记录。如果两个表中的记录多次出现,则返回的值与两个表中的记录一样多,即生成的表可能具有重复记录。两个表必须具有相同的字段类型。 -<figure class="highlight"> ``` Table left = tableEnv.fromDataSet(ds1, "a, b, c"); @@ -657,13 +590,11 @@ Table right = tableEnv.fromDataSet(ds2, "d, e, f"); Table result = left.intersectAll(right); ``` -</figure> | | **Minus** 批 | 与SQL EXCEPT子句类似。减号返回左表中右表中不存在的记录。左表中的重复记录只返回一次,即删除重复项。两个表必须具有相同的字段类型。 -<figure class="highlight"> ``` Table left = tableEnv.fromDataSet(ds1, "a, b, c"); @@ -671,13 +602,11 @@ Table right = tableEnv.fromDataSet(ds2, "a, b, c"); Table result = left.minus(right); ``` -</figure> | | **MinusAll** Batch | 类似于SQL EXCEPT ALL子句。MinusAll返回右表中不存在的记录。在左表中出现n次并在右表中出现m次的记录返回(n-m)次,即,删除右表中存在的重复次数。两个表必须具有相同的字段类型。 -<figure class="highlight"> ``` Table left = tableEnv.fromDataSet(ds1, "a, b, c"); @@ -685,13 +614,11 @@ Table right = tableEnv.fromDataSet(ds2, "a, b, c"); Table result = left.minusAll(right); ``` -</figure> | | **In** 批量 流中 | 与SQL IN子句类似。如果表达式存在于给定的表子查询中,则返回true。子查询表必须包含一列。此列必须与表达式具有相同的数据类型。 -<figure class="highlight"> ``` Table left = ds1.toTable(tableEnv, "a, b, c"); @@ -705,7 +632,6 @@ tableEnv.registerTable("RightTable", right); Table result = left.select("a, b, c").where("a.in(RightTable)"); ``` -</figure> **注意:**对于流式查询, 算子操作将在连接和组 算子操作中重写。计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅[Streaming Concepts](streaming.html)。 | @@ -714,7 +640,6 @@ Table result = left.select("a, b, c").where("a.in(RightTable)"); | **Union** Batch | Similar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical field types. -<figure class="highlight"> ``` val left = ds1.toTable(tableEnv, 'a, 'b, 'c) @@ -722,13 +647,11 @@ val right = ds2.toTable(tableEnv, 'a, 'b, 'c) val result = left.union(right) ``` -</figure> | | **UnionAll** Batch Streaming | Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical field types. -<figure class="highlight"> ``` val left = ds1.toTable(tableEnv, 'a, 'b, 'c) @@ -736,13 +659,11 @@ val right = ds2.toTable(tableEnv, 'a, 'b, 'c) val result = left.unionAll(right) ``` -</figure> | | **Intersect** Batch | Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types. -<figure class="highlight"> ``` val left = ds1.toTable(tableEnv, 'a, 'b, 'c) @@ -750,13 +671,11 @@ val right = ds2.toTable(tableEnv, 'e, 'f, 'g) val result = left.intersect(right) ``` -</figure> | | **IntersectAll** Batch | Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types. -<figure class="highlight"> ``` val left = ds1.toTable(tableEnv, 'a, 'b, 'c) @@ -764,13 +683,11 @@ val right = ds2.toTable(tableEnv, 'e, 'f, 'g) val result = left.intersectAll(right) ``` -</figure> | | **Minus** Batch | Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types. -<figure class="highlight"> ``` val left = ds1.toTable(tableEnv, 'a, 'b, 'c) @@ -778,13 +695,11 @@ val right = ds2.toTable(tableEnv, 'a, 'b, 'c) val result = left.minus(right) ``` -</figure> | | **MinusAll** Batch | Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types. -<figure class="highlight"> ``` val left = ds1.toTable(tableEnv, 'a, 'b, 'c) @@ -792,13 +707,11 @@ val right = ds2.toTable(tableEnv, 'a, 'b, 'c) val result = left.minusAll(right) ``` -</figure> | | **In** Batch Streaming | Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression. -<figure class="highlight"> ``` val left = ds1.toTable(tableEnv, 'a, 'b, 'c) @@ -806,7 +719,6 @@ val right = ds2.toTable(tableEnv, 'a) val result = left.select('a, 'b, 'c).where('a.in(right)) ``` -</figure> **Note:** For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Streaming Concepts](streaming.html) for details. | @@ -820,20 +732,17 @@ val result = left.select('a, 'b, 'c).where('a.in(right)) | **Order By** 批次 | 与SQL ORDER BY子句类似。返回跨所有并行分区全局排序的记录。 -<figure class="highlight"> ``` Table in = tableEnv.fromDataSet(ds, "a, b, c"); Table result = in.orderBy("a.asc"); ``` -</figure> | | **Offset & Fetch** 批次 | 类似于SQL OFFSET和FETCH子句。偏移和提取限制从排序结果返回的记录数。Offset和Fetch在技术上是Order By 算子的一部分,因此必须以它为前缀。 -<figure class="highlight"> ``` Table in = tableEnv.fromDataSet(ds, "a, b, c"); @@ -848,7 +757,6 @@ Table result2 = in.orderBy("a.asc").offset(3); Table result3 = in.orderBy("a.asc").offset(10).fetch(5); ``` -</figure> | @@ -857,20 +765,17 @@ Table result3 = in.orderBy("a.asc").offset(10).fetch(5); | **Order By** Batch | Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions. -<figure class="highlight"> ``` val in = ds.toTable(tableEnv, 'a, 'b, 'c) val result = in.orderBy('a.asc) ``` -</figure> | | **Offset & Fetch** Batch | Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it. -<figure class="highlight"> ``` val in = ds.toTable(tableEnv, 'a, 'b, 'c) @@ -882,7 +787,6 @@ val in = ds.toTable(tableEnv, 'a, 'b, 'c) // skips the first 10 records and returns the next 5 records from the sorted result val result3: Table = in.orderBy('a.asc).offset(10).fetch(5) ``` -</figure> | @@ -896,14 +800,12 @@ val in = ds.toTable(tableEnv, 'a, 'b, 'c) | **Insert** 批量 流处理 | 类似于SQL查询中的INSERT INTO子句。执行插入已注册的输出表。输出表必须在TableEnvironment中[注册](common.html#register-a-tablesink)(请参阅[注册TableSink](common.html#register-a-tablesink))。此外,已注册表的模式必须与查询的模式匹配。 -<figure class="highlight"> ``` Table orders = tableEnv.scan("Orders"); orders.insertInto("OutOrders"); ``` -</figure> | @@ -912,14 +814,12 @@ orders.insertInto("OutOrders"); | **Insert Into** Batch Streaming | Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.Output tables must be registered in the TableEnvironment (see [Register a TableSink](common.html#register-a-tablesink)). Moreover, the schema of the registered table must match the schema of the query. -<figure class="highlight"> ``` val orders: Table = tableEnv.scan("Orders") orders.insertInto("OutOrders") ``` -</figure> | diff --git a/docs/1.7-SNAPSHOT/61.md b/docs/1.7-SNAPSHOT/61.md index 208f9c6..89736ce 100644 --- a/docs/1.7-SNAPSHOT/61.md +++ b/docs/1.7-SNAPSHOT/61.md @@ -190,7 +190,6 @@ Flink SQL对类似于Java标识符(表,属性,函数名)使用词法策 | **Scan/Select/As** 批量 流 | -<figure class="highlight"> ``` SELECT * FROM Orders @@ -198,13 +197,11 @@ SELECT * FROM Orders SELECT a, c AS d FROM Orders ``` -</figure> | | **Where / Filter** Batch Streaming | -<figure class="highlight"> ``` SELECT * FROM Orders WHERE b = 'red' @@ -212,19 +209,16 @@ SELECT * FROM Orders WHERE b = 'red' SELECT * FROM Orders WHERE a % 2 = 0 ``` -</figure> | | **User-defined Scalar Functions (Scalar UDF)** 批量 流 | UDF必须在TableEnvironment中注册。有关如何指定和注册标量UDF的详细信息,请参阅[UDF文档](udfs.html)。 -<figure class="highlight"> ``` SELECT PRETTY_PRINT(user) FROM Orders ``` -</figure> | @@ -236,7 +230,6 @@ SELECT PRETTY_PRINT(user) FROM Orders 批处理 流 结果更新 | **注意:**流表上的GroupBy会生成更新结果。有关详细信息,请参阅[Streaming Concepts](streaming.html)页面。 -<figure class="highlight"> ``` SELECT a, SUM(b) as d @@ -244,13 +237,11 @@ FROM Orders GROUP BY a ``` -</figure> | | **GroupBy窗口聚合** 批量 流 | 使用组窗口计算每个组的单个结果行。有关详细信息,请参阅[GroupWindows](#group-windows)部分。 -<figure class="highlight"> ``` SELECT user, SUM(amount) @@ -258,13 +249,11 @@ FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user ``` -</figure> | | **Over Window聚合** 流 | **注意:**必须在同一窗口中定义所有聚合,即相同的分区,排序和范围。目前,仅支持具有PRREDING(UNBOUNDED和有界)到CURRENT ROW范围的窗口。尚不支持使用FOLLOWING的范围。必须在单个[时间属性](streaming.html#time-attributes)上指定ORDER BY[](streaming.html#time-attributes) -<figure class="highlight"> ``` SELECT COUNT(amount) OVER ( @@ -281,26 +270,22 @@ WINDOW w AS ( ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) ``` -</figure> | | **Distinct** 批量 流 结果更新 | -<figure class="highlight"> ``` SELECT DISTINCT users FROM Orders ``` -</figure> **注意:**对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅[Streaming Concepts](streaming.html)。 | | **分组集,汇总,多维数据集** 批量 | -<figure class="highlight"> ``` SELECT SUM(amount) @@ -308,13 +293,11 @@ FROM Orders GROUP BY GROUPING SETS ((user), (product)) ``` -</figure> | | **Having** 批量 流 | -<figure class="highlight"> ``` SELECT SUM(amount) @@ -323,13 +306,11 @@ GROUP BY users HAVING SUM(amount) > 50 ``` -</figure> | | **用户定义的聚合函数(UDAGG)** 批量 流 | UDAGG必须在TableEnvironment中注册。有关如何指定和注册UDAGG的详细信息,请参阅[UDF文档](udfs.html)。 -<figure class="highlight"> ``` SELECT MyAggregate(amount) @@ -337,7 +318,6 @@ FROM Orders GROUP BY users ``` -</figure> | @@ -348,20 +328,17 @@ GROUP BY users | **内部Equi-join** 批量 流 | 目前,仅支持等连接,即具有至少一个带有等式谓词的连接条件的连接。不支持任意交叉或theta连接。**注意:**连接顺序未优化。表按照FROM子句中指定的顺序连接。确保以不产生交叉连接(笛卡尔积)的顺序指定表,这些表不受支持并且会导致查询失败。 -<figure class="highlight"> ``` SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id ``` -</figure> **注意:**对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅[Streaming Concepts](streaming.html)。 | | **外部Equi-join** 批量 流 结果更新 | 目前,仅支持等连接,即具有至少一个带有等式谓词的连接条件的连接。不支持任意交叉或theta连接。**注意:**连接顺序未优化。表按照FROM子句中指定的顺序连接。确保以不产生交叉连接(笛卡尔积)的顺序指定表,这些表不受支持并且会导致查询失败。 -<figure class="highlight"> ``` SELECT * @@ -374,7 +351,6 @@ SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id ``` -</figure> **注意:**对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅[Streaming Concepts](streaming.html)。 | | **Time-windowed Join** @@ -384,7 +360,6 @@ FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id * `ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE` * `ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND` -<figure class="highlight"> ``` SELECT * @@ -393,44 +368,37 @@ WHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime ``` -</figure> 如果订单在收到订单后四小时发货,上面的示例将关联所有订单及其相应的货件。 | | **将数组扩展为关系** Batch Streaming | 尚未支持UnANDing WITH ORDINALITY。 -<figure class="highlight"> ``` SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) ``` -</figure> | | **关联用户定义的表函数(UDTF)** 批量 流 | UDTF必须在TableEnvironment中注册。有关如何指定和注册UDTF的详细信息,请参阅[UDF文档](udfs.html)。内部联接 -<figure class="highlight"> ``` SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag ``` -</figure> 左外连接 -<figure class="highlight"> ``` SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE ``` -</figure> **注意:**目前,`TRUE`对于横向表,只支持左外连接的谓词作为谓词。 | @@ -441,7 +409,6 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE | **Union** 批次 | -<figure class="highlight"> ``` SELECT * @@ -452,13 +419,11 @@ FROM ( ) ``` -</figure> | | **UnionAll** Batch Streaming | -<figure class="highlight"> ``` SELECT * @@ -469,13 +434,11 @@ FROM ( ) ``` -</figure> | | **Intersect/ Except**批量 | -<figure class="highlight"> ``` SELECT * @@ -486,9 +449,7 @@ FROM ( ) ``` -</figure> -<figure class="highlight"> ``` SELECT * @@ -499,13 +460,11 @@ FROM ( ) ``` -</figure> | | **IN** 批量 流中 | 如果表达式存在于给定的表子查询中,则返回true。子查询表必须包含一列。此列必须与表达式具有相同的数据类型。 -<figure class="highlight"> ``` SELECT user, amount @@ -515,13 +474,11 @@ WHERE product IN ( ) ``` -</figure> **注意:**对于流式查询, 算子操作将在连接和组 算子操作中重写。计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅[Streaming Concepts](streaming.html)。 | | **Exists** 批量 流 | 如果子查询至少返回一行,则返回true。仅在可以在连接和组 算子操作中重写 算子操作时才支持。 -<figure class="highlight"> ``` SELECT user, amount @@ -531,7 +488,6 @@ WHERE product EXISTS ( ) ``` -</figure> **注意:**对于流式查询, 算子操作将在连接和组 算子操作中重写。计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅[Streaming Concepts](streaming.html)。 | @@ -542,7 +498,6 @@ WHERE product EXISTS ( | **Order By** 批量 流 | **注意:**流式查询的结果必须主要按升序[时间属性](streaming.html#time-attributes)排序。支持其他排序属性。 -<figure class="highlight"> ``` SELECT * @@ -550,13 +505,11 @@ FROM Orders ORDER BY orderTime ``` -</figure> | | **Limit** 批次 | -<figure class="highlight"> ``` SELECT * @@ -564,7 +517,6 @@ FROM Orders LIMIT 3 ``` -</figure> | @@ -575,7 +527,6 @@ LIMIT 3 | **Insert** 批量 流处理 | 输出表必须在TableEnvironment中[注册](common.html#register-a-tablesink)(请参阅[注册TableSink](common.html#register-a-tablesink))。此外,已注册表的模式必须与查询的模式匹配。 -<figure class="highlight"> ``` INSERT INTO OutputTable @@ -583,7 +534,6 @@ SELECT users, tag FROM Orders ``` -</figure> | diff --git a/docs/1.7-SNAPSHOT/73.md b/docs/1.7-SNAPSHOT/73.md index 3ec1157..97e281a 100644 --- a/docs/1.7-SNAPSHOT/73.md +++ b/docs/1.7-SNAPSHOT/73.md @@ -376,11 +376,13 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition * [**Java**](#tab_java_7) * [**Scala**](#tab_scala_7) -| 模式 算子操作 | 描述 | -| --- | --- | -| **其中(条件)** | 定义当前模式的条件。要匹配模式,事件必须满足条件。多个连续的where()子句导致其条件为AND: -<figure class="highlight"> +--- + +模式算子操作:`where(condition)` + +描述:定义当前模式的条件。要匹配模式,事件必须满足条件。多个连续的where()子句导致其条件为AND: + ``` pattern.where(new IterativeCondition<Event>() { @@ -391,12 +393,14 @@ pattern.where(new IterativeCondition<Event>() { }); ``` -</figure> - | -| **或(条件)** | 添加与现有条件进行OR运算的新条件。只有在至少通过其中一个条件时,事件才能匹配该模式: -<figure class="highlight"> +--- + +模式算子操作:`or(condition)` + +描述:添加与现有条件进行OR运算的新条件。只有在至少通过其中一个条件时,事件才能匹配该模式: + ``` pattern.where(new IterativeCondition<Event>() { @@ -412,12 +416,14 @@ pattern.where(new IterativeCondition<Event>() { }); ``` -</figure> - | -| **直到(条件)** | 指定循环模式的停止条件。意味着如果匹配给定条件的事件发生,则不再接受该模式中的事件。仅适用于 `oneOrMore()`**注意:**它允许在基于事件的条件下清除相应模式的状态。 -<figure class="highlight"> +--- + +模式算子操作:`until(condition)` + +描述:指定循环模式的停止条件。意味着如果匹配给定条件的事件发生,则不再接受该模式中的事件。仅适用于 `oneOrMore()`**注意:**它允许在基于事件的条件下清除相应模式的状态。 + ``` pattern.oneOrMore().until(new IterativeCondition<Event>() { @@ -428,201 +434,97 @@ pattern.oneOrMore().until(new IterativeCondition<Event>() { }); ``` -</figure> - | -| **亚型(子类)** | 定义当前模式的子类型条件。如果事件属于此子类型,则事件只能匹配该模式: -<figure class="highlight"> +--- -``` -pattern.subtype(SubEvent.class); -``` +模式算子操作:`subtype(subClass)` -</figure> +描述:定义当前模式的子类型条件。如果事件属于此子类型,则事件只能匹配该模式: - | -| **一个或多个()** | 指定此模式至少发生一次匹配事件。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅[连续](#consecutive_java)。**注意:**建议使用`until()`或`within()`启用状态清除 -<figure class="highlight"> - -``` -pattern.oneOrMore(); ``` - -</figure> - - | -| **timesOrMore(#times)** | 指定此模式至少需要**#times**出现匹配事件。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅[连续](#consecutive_java)。 - -<figure class="highlight"> - -``` -pattern.timesOrMore(2); -``` - -</figure> - - | -| **次(#ofTimes)** | 指定此模式需要匹配事件的确切出现次数。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅[连续](#consecutive_java)。 - -<figure class="highlight"> - -``` -pattern.times(2); +pattern.subtype(SubEvent.class); ``` -</figure> - | -| **次(#fromTimes,#toTimes)** | 指定此模式期望在匹配事件的**#fromTimes** 和**#toTimes**之间出现。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅[连续](#consecutive_java)。 -<figure class="highlight"> +--- -``` -pattern.times(2, 4); -``` - -</figure> +模式算子操作:`oneOrMore()` - | -| **Optional()** | 指定此模式是可选的,即根本不会发生。这适用于所有上述量词。 +描述:指定此模式至少发生一次匹配事件。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅[连续](#consecutive_java)。**注意:**建议使用`until()`或`within()`启用状态清除 -<figure class="highlight"> ``` -pattern.oneOrMore().optional(); -``` - -</figure> - - | -| **贪婪()** | 指定此模式是贪婪的,即它将尽可能多地重复。这仅适用于量词,目前不支持组模式。 - -<figure class="highlight"> - -``` -pattern.oneOrMore().greedy(); +pattern.oneOrMore(); ``` -</figure> - - | - -| Pattern Operation | Description | -| --- | --- | -| **where(condition)** | Defines a condition for the current pattern. To match the pattern, an event must satisfy the condition. Multiple consecutive where() clauses lead to their conditions being ANDed: -<figure class="highlight"> -``` -pattern.where(event => ... /* some condition */) -``` +--- -</figure> +模式算子操作:`timesOrMore(#times)` - | -| **or(condition)** | Adds a new condition which is ORed with an existing one. An event can match the pattern only if it passes at least one of the conditions: +描述:指定此模式至少需要**#times**出现匹配事件。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅[连续](#consecutive_java)。 -<figure class="highlight"> ``` -pattern.where(event => ... /* some condition */) - .or(event => ... /* alternative condition */) +pattern.timesOrMore(2); ``` -</figure> - | -| **until(condition)** | Specifies a stop condition for looping pattern. Meaning if event matching the given condition occurs, no more events will be accepted into the pattern.Applicable only in conjunction with `oneOrMore()`**NOTE:** It allows for cleaning state for corresponding pattern on event-based condition. -<figure class="highlight"> +--- -``` -pattern.oneOrMore().until(event => ... /* some condition */) -``` +模式算子操作:`次(#ofTimes)` -</figure> - - | -| **subtype(subClass)** | Defines a subtype condition for the current pattern. An event can only match the pattern if it is of this subtype: +描述:指定此模式需要匹配事件的确切出现次数。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅[连续](#consecutive_java)。 -<figure class="highlight"> ``` -pattern.subtype(classOf[SubEvent]) +pattern.times(2); ``` -</figure> - - | -| **oneOrMore()** | Specifies that this pattern expects at least one occurrence of a matching event.By default a relaxed internal contiguity (between subsequent events) is used. For more info on internal contiguity see [consecutive](#consecutive_scala).**NOTE:** It is advised to use either `until()` or `within()` to enable state clearing -<figure class="highlight"> -``` -pattern.oneOrMore() -``` +--- -</figure> +模式算子操作:`times(#fromTimes,#toTimes)` - | -| **timesOrMore(#times)** | Specifies that this pattern expects at least **#times** occurrences of a matching event.By default a relaxed internal contiguity (between subsequent events) is used. For more info on internal contiguity see [consecutive](#consecutive_scala). +描述:指定此模式期望在匹配事件的**#fromTimes** 和**#toTimes**之间出现。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅[连续](#consecutive_java)。 -<figure class="highlight"> ``` -pattern.timesOrMore(2) +pattern.times(2, 4); ``` -</figure> - - | -| **times(#ofTimes)** | Specifies that this pattern expects an exact number of occurrences of a matching event.By default a relaxed internal contiguity (between subsequent events) is used. For more info on internal contiguity see [consecutive](#consecutive_scala). -<figure class="highlight"> -``` -pattern.times(2) -``` +--- -</figure> +模式算子操作:`Optional()` - | -| **times(#fromTimes, #toTimes)** | Specifies that this pattern expects occurrences between **#fromTimes** and **#toTimes** of a matching event.By default a relaxed internal contiguity (between subsequent events) is used. For more info on internal contiguity see [consecutive](#consecutive_java). +描述:指定此模式是可选的,即根本不会发生。这适用于所有上述量词。 -<figure class="highlight"> ``` -pattern.times(2, 4) +pattern.oneOrMore().optional(); ``` -</figure> - | -| **optional()** | Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all aforementioned quantifiers. -<figure class="highlight"> +--- -``` -pattern.oneOrMore().optional() -``` +模式算子操作:`greedy()` -</figure> +描述:指定此模式是贪婪的,即它将尽可能多地重复。这仅适用于量词,目前不支持组模式。 - | -| **greedy()** | Specifies that this pattern is greedy, i.e. it will repeat as many as possible. This is only applicable to quantifiers and it does not support group pattern currently. - -<figure class="highlight"> ``` -pattern.oneOrMore().greedy() +pattern.oneOrMore().greedy(); ``` -</figure> - - | - ### 结合模式 现在你已经看到了单个模式的样子,现在是时候看看如何将它们组合成一个完整的模式序列。 @@ -761,7 +663,6 @@ next.within(Time.seconds(10)) | --- | --- | | **连续的()** | 与匹配事件一起使用`oneOrMore()`并`times()`强制执行严格的连续性,即任何不匹配的数据元都会中断匹配(如`next()`)。如果不应用,则使用放松的连续性(如`followedBy()`)。例如: -<figure class="highlight"> ``` Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @@ -784,12 +685,10 @@ Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { }); ``` -</figure> 将为输入序列生成以下匹配项:CD A1 A2 A3 D A4 B.连续申请:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B}没有连续申请:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B} | | **allowCombinations()** | 与匹配事件一起使用`oneOrMore()`并且`times()`在匹配事件之间施加非确定性松弛连续性(如`followedByAny()`)。如果不应用,则使用放松的连续性(如`followedBy()`)。例如: -<figure class="highlight"> ``` Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @@ -812,7 +711,6 @@ Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { }); ``` -</figure> 将为输入序列生成以下匹配项:CD A1 A2 A3 D A4 B.启用组合:{C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}未启用组合:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B} | @@ -820,7 +718,6 @@ Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { | --- | --- | | **consecutive()** | Works in conjunction with `oneOrMore()` and `times()` and imposes strict contiguity between the matching events, i.e. any non-matching element breaks the match (as in `next()`).If not applied a relaxed contiguity (as in `followedBy()`) is used.E.g. a pattern like: -<figure class="highlight"> ``` Pattern.begin("start").where(_.getName().equals("c")) @@ -829,12 +726,10 @@ Pattern.begin("start").where(_.getName().equals("c")) .followedBy("end1").where(_.getName().equals("b")) ``` -</figure> Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 Bwith consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B} | | **allowCombinations()** | Works in conjunction with `oneOrMore()` and `times()` and imposes non-deterministic relaxed contiguity between the matching events (as in `followedByAny()`).If not applied a relaxed contiguity (as in `followedBy()`) is used.E.g. a pattern like: -<figure class="highlight"> ``` Pattern.begin("start").where(_.getName().equals("c")) @@ -843,7 +738,6 @@ Pattern.begin("start").where(_.getName().equals("c")) .followedBy("end1").where(_.getName().equals("b")) ``` -</figure> Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 Bwith combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}without combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B} | @@ -908,18 +802,15 @@ val start: Pattern[Event, _] = Pattern.begin( | --- | --- | | **开始(#NAME)** | 定义一个起始模式: -<figure class="highlight"> ``` Pattern<Event, ?> start = Pattern.<Event>begin("start"); ``` -</figure> | | **开始(#pattern_sequence)** | 定义一个起始模式: -<figure class="highlight"> ``` Pattern<Event, ?> start = Pattern.<Event>begin( @@ -927,23 +818,19 @@ Pattern<Event, ?> start = Pattern.<Event>begin( ); ``` -</figure> | | **下一个(#NAME)** | 添加新模式。匹配事件必须直接接替先前的匹配事件(严格连续性): -<figure class="highlight"> ``` Pattern<Event, ?> next = start.next("middle"); ``` -</figure> | | **下一个(#pattern_sequence)** | 添加新模式。一系列匹配事件必须直接接替先前的匹配事件(严格连续性): -<figure class="highlight"> ``` Pattern<Event, ?> next = start.next( @@ -951,23 +838,19 @@ Pattern<Event, ?> next = start.next( ); ``` -</figure> | | **followedBy(#NAME)** | 添加新模式。匹配事件和先前匹配事件(轻松连续)之间可能发生其他事件: -<figure class="highlight"> ``` Pattern<Event, ?> followedBy = start.followedBy("middle"); ``` -</figure> | | **followedBy(#pattern_sequence)** | 添加新模式。在一系列匹配事件和先前匹配事件(轻松连续)之间可能发生其他事件: -<figure class="highlight"> ``` Pattern<Event, ?> followedBy = start.followedBy( @@ -975,23 +858,19 @@ Pattern<Event, ?> followedBy = start.followedBy( ); ``` -</figure> | | **followedByAny(#NAME)** | 添加新模式。匹配事件和先前匹配事件之间可能发生其他事件,并且将针对每个备选匹配事件(非确定性放松连续性)呈现替代匹配: -<figure class="highlight"> ``` Pattern<Event, ?> followedByAny = start.followedByAny("middle"); ``` -</figure> | | **followedByAny(#pattern_sequence)** | 添加新模式。在一系列匹配事件和先前匹配事件之间可能发生其他事件,并且将针对匹配事件的每个替代序列(非确定性松弛邻接)呈现替代匹配: -<figure class="highlight"> ``` Pattern<Event, ?> followedByAny = start.followedByAny( @@ -999,40 +878,33 @@ Pattern<Event, ?> followedByAny = start.followedByAny( ); ``` -</figure> | | **notNext()** | 添加新的负面模式。匹配(否定)事件必须直接成功执行先前的匹配事件(严格连续性)才能丢弃部分匹配: -<figure class="highlight"> ``` Pattern<Event, ?> notNext = start.notNext("not"); ``` -</figure> | | **notFollowedBy()** | 添加新的负面模式。即使在匹配(否定)事件和先前匹配事件(松弛连续性)之间发生其他事件,也将丢弃部分匹配事件序列: -<figure class="highlight"> ``` Pattern<Event, ?> notFollowedBy = start.notFollowedBy("not"); ``` -</figure> | | **内(时间)** | 定义事件序列与模式匹配的最大时间间隔。如果未完成的事件序列超过此时间,则将其丢弃: -<figure class="highlight"> ``` pattern.within(Time.seconds(10)); ``` -</figure> | @@ -1040,18 +912,15 @@ pattern.within(Time.seconds(10)); | --- | --- | | **begin(#name)** | Defines a starting pattern: -<figure class="highlight"> ``` val start = Pattern.begin[Event]("start") ``` -</figure> | | **begin(#pattern_sequence)** | Defines a starting pattern: -<figure class="highlight"> ``` val start = Pattern.begin( @@ -1059,23 +928,19 @@ val start = Pattern.begin( ) ``` -</figure> | | **next(#name)** | Appends a new pattern. A matching event has to directly succeed the previous matching event (strict contiguity): -<figure class="highlight"> ``` val next = start.next("middle") ``` -</figure> | | **next(#pattern_sequence)** | Appends a new pattern. A sequence of matching events have to directly succeed the previous matching event (strict contiguity): -<figure class="highlight"> ``` val next = start.next( @@ -1083,23 +948,19 @@ val next = start.next( ) ``` -</figure> | | **followedBy(#name)** | Appends a new pattern. Other events can occur between a matching event and the previous matching event (relaxed contiguity) : -<figure class="highlight"> ``` val followedBy = start.followedBy("middle") ``` -</figure> | | **followedBy(#pattern_sequence)** | Appends a new pattern. Other events can occur between a sequence of matching events and the previous matching event (relaxed contiguity) : -<figure class="highlight"> ``` val followedBy = start.followedBy( @@ -1107,23 +968,19 @@ val followedBy = start.followedBy( ) ``` -</figure> | | **followedByAny(#name)** | Appends a new pattern. Other events can occur between a matching event and the previous matching event, and alternative matches will be presented for every alternative matching event (non-deterministic relaxed contiguity): -<figure class="highlight"> ``` val followedByAny = start.followedByAny("middle") ``` -</figure> | | **followedByAny(#pattern_sequence)** | Appends a new pattern. Other events can occur between a sequence of matching events and the previous matching event, and alternative matches will be presented for every alternative sequence of matching events (non-deterministic relaxed contiguity): -<figure class="highlight"> ``` val followedByAny = start.followedByAny( @@ -1131,40 +988,33 @@ val followedByAny = start.followedByAny( ) ``` -</figure> | | **notNext()** | Appends a new negative pattern. A matching (negative) event has to directly succeed the previous matching event (strict contiguity) for the partial match to be discarded: -<figure class="highlight"> ``` val notNext = start.notNext("not") ``` -</figure> | | **notFollowedBy()** | Appends a new negative pattern. A partial matching event sequence will be discarded even if other events occur between the matching (negative) event and the previous matching event (relaxed contiguity): -<figure class="highlight"> ``` val notFollowedBy = start.notFollowedBy("not") ``` -</figure> | | **within(time)** | Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded: -<figure class="highlight"> ``` pattern.within(Time.seconds(10)) ``` -</figure> | -- GitLab