This connector provides a Sink that writes partitioned files to any filesystem supported by [Hadoop FileSystem](http://hadoop.apache.org). To use this connector, add the following dependency to your project:
@@ -17,11 +17,12 @@ This connector provides a Sink that writes partitioned files to any filesystem s
Note that the streaming connectors are currently not part of the binary distribution. See [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html) for information about how to package the program with the libraries for cluster execution.
The bucketing behaviour as well as the writing can be configured but we will get to that later. This is how you can create a bucketing sink which by default, sinks to rolling files that are split by time:
The only required parameter is the base path where the buckets will be stored. The sink can be further configured by specifying a custom bucketer, writer and batch size.
唯一需要的参数是存储桶的基本路径。可以通过指定自定义存储区,写入器和批处理大小来进一步配置接收器。
By default the bucketing sink will split by the current system time when elements arrive and will use the datetime pattern `"yyyy-MM-dd--HH"` to name the buckets. This pattern is passed to `DateTimeFormatter` with the current system time and JVM’s default timezone to form a bucket path. Users can also specify a timezone for the bucketer to format bucket path. A new bucket will be created whenever a new date is encountered. For example, if you have a pattern that contains minutes as the finest granularity you will get a new bucket every minute. Each bucket is itself a directory that contains several part files: each parallel instance of the sink will create its own part file and when part files get too big the sink will also create a new part file next to the others. When a bucket becomes inactive, the open part file will be flushed and closed. A bucket is regarded as inactive when it hasn’t been written to recently. By default, the sink checks for inactive buckets every minute, and closes any buckets which haven’t been written to for over a minute. This behaviour can be configured with `setInactiveBucketCheckInterval()` and `setInactiveBucketThreshold()` on a `BucketingSink`.
You can also specify a custom bucketer by using `setBucketer()` on a `BucketingSink`. If desired, the bucketer can use a property of the element or tuple to determine the bucket directory.
The default writer is `StringWriter`. This will call `toString()` on the incoming elements and write them to part files, separated by newline. To specify a custom writer use `setWriter()` on a `BucketingSink`. If you want to write Hadoop SequenceFiles you can use the provided `SequenceFileWriter` which can also be configured to use compression.
There are two configuration options that specify when a part file should be closed and a new one started:
有两个配置选项,指定何时应关闭零件文件以及何时启动一个新的零件文件:
* By setting a batch size (The default part file size is 384 MB)
* By setting a batch roll over time interval (The default roll over interval is `Long.MAX_VALUE`)
* By setting a batch size (The default part file size is 384 MB) 通过设置批处理大小(默认零件文件大小为384 MB)
* By setting a batch roll over time interval (The default roll over interval is `Long.MAX_VALUE`) 通过设置批次过渡时间间隔(默认过渡时间间隔为Long.MAX_VALUE)
A new part file is started when either of these two conditions is satisfied.
当满足这两个条件之一时,将启动一个新的零件文件。
Example:
...
...
@@ -90,7 +96,7 @@ input.addSink(sink)
This will create a sink that writes to bucket files that follow this schema:
这将创建一个接收器,该接收器写入遵循此架构的存储桶文件:
```
...
...
@@ -100,6 +106,7 @@ This will create a sink that writes to bucket files that follow this schema:
Where `date-time` is the string that we get from the date/time format, `parallel-task` is the index of the parallel sink instance and `count` is the running number of part files that were created because of the batch size or batch roll over interval.
For in-depth information, please refer to the JavaDoc for [BucketingSink](http://flink.apache.org/docs/latest/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html).