22.md 9.2 KB
Newer Older
W
init  
wizardforcel 已提交
1 2


噼里啪啦嘣 已提交
3
# Checkpointing 检验指示
W
init  
wizardforcel 已提交
4

噼里啪啦嘣 已提交
5
弗林克中的每个函数和运算符都可以是*状态*(有关详细信息,请参阅[与状态一起工作](state.html)。状态函数存储数据跨单个元素/事件的处理,使状态成为任何类型的更精细操作的关键构建块。
W
init  
wizardforcel 已提交
6

噼里啪啦嘣 已提交
7
为了使状态容错,Flink需要到**检查点**状态。检查点允许Flink恢复流中的状态和位置,以赋予应用程序与无故障执行相同的语义。
W
init  
wizardforcel 已提交
8

噼里啪啦嘣 已提交
9
[关于流式容错的文档](//ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html)详细介绍了flink的流容错机制背后的技术。
W
init  
wizardforcel 已提交
10

噼里啪啦嘣 已提交
11
## Prerequisites 先决条件
W
init  
wizardforcel 已提交
12

噼里啪啦嘣 已提交
13
Flink的检查点机制与流和状态的持久存储交互。一般而言,它要求:
W
init  
wizardforcel 已提交
14

噼里啪啦嘣 已提交
15 16
*   a _persistent_ (或 _durable_ )数据源,可以重放特定时间量的记录。这样的源的示例是持久消息队列(例如,ApacheKafka、RabbitMQ、AmazonKinesis、Googlepubsub)或文件系统(例如,HDFS、S3、GFS、NFS、CEH等)。)。
*   用于状态的持久存储,通常是分布式文件系统(例如,HDFS、S3、GFS、NFS、CEH等)。)
W
init  
wizardforcel 已提交
17

噼里啪啦嘣 已提交
18
## Enabling and Configuring Checkpointing 启用和配置检查点
W
init  
wizardforcel 已提交
19

噼里啪啦嘣 已提交
20
默认情况下,禁用检查点。要启用检查点,请在`StreamExecutionEnvironment`上调用`enableCheckpointing(n)`,其中_n_是以毫秒为单位的检查点间隔。
W
init  
wizardforcel 已提交
21

噼里啪啦嘣 已提交
22
检查点操作的其他参数包括:
W
init  
wizardforcel 已提交
23

噼里啪啦嘣 已提交
24
*   _exactly-once vs. at-least-once_:您可以选择将模式传递到 `enableCheckpointing(n)`方法以在两个保证级别之间进行选择。对于大多数应用而言,精确-一次是优选的。至少-一次对于某些超低延迟(持续几毫秒)应用而言可能是相关的。
W
init  
wizardforcel 已提交
25

噼里啪啦嘣 已提交
26
*   _checkpoint timeout_: 进程检查点终止的时间,如果该检查点到那时还没有完成的话.
W
init  
wizardforcel 已提交
27

噼里啪啦嘣 已提交
28
*   _minimum time between checkpoints_: 为了确保流应用程序在检查点之间取得一定的进展,可以定义在检查点之间传递所需的时间。例如,如果将此值设置为 _5000_,则下一个检查点将在上一个检查点完成后5秒钟内启动,而不管检查点持续时间和检查点间隔如何。请注意,这意味着检查点间隔永远不会小于此参数。
W
init  
wizardforcel 已提交
29

噼里啪啦嘣 已提交
30
    通过定义“检查点之间的时间”通常比定义检查点间隔更容易配置应用程序,因为“检查点之间的时间”不容易受到检查点有时比平均时间长的影响(例如,如果目标存储系统暂时慢的话)。
W
init  
wizardforcel 已提交
31

噼里啪啦嘣 已提交
32
    注意,这个值还意味着并发检查点的数目是 _one_。
W
init  
wizardforcel 已提交
33

噼里啪啦嘣 已提交
34
*   _number of concurrent checkpoints_: 默认情况下,当另一个检查点仍在进行中时,系统不会触发另一个检查点。这可以确保拓扑不会花费太多时间在检查点上,并且不会在处理流方面取得进展。它可以允许多个重叠检查点,这对于具有一定处理延迟(例如,函数调用需要一定时间响应的外部服务)但仍然希望执行非常频繁的检查点(100毫秒)来重新处理故障时很少处理的管道来说是很有趣的。
W
init  
wizardforcel 已提交
35

噼里啪啦嘣 已提交
36
    当定义检查点之间的最小时间时,无法使用此选项。
W
init  
wizardforcel 已提交
37

噼里啪啦嘣 已提交
38
*   _externalized checkpoints_: 您可以在外部配置定期检查点。外部化检查点将其元数据写入永久存储,并在作业失败时自动清除。这样,如果您的作业失败,您将有一个检查点以恢复。[外部化检查点的部署说明](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/checkpoints.html#externalized-checkpoints)中有更多详细信息。
W
init  
wizardforcel 已提交
39

噼里啪啦嘣 已提交
40
*   _fail/continue task on checkpoint errors_: 这将确定如果任务的检查点过程执行中发生错误,任务是否会失败。这是默认行为。或者,当此操作被禁用时,任务将简单地将检查点拒绝给检查点协调器并继续运行。
W
init  
wizardforcel 已提交
41

W
wizardforcel 已提交
42

W
init  
wizardforcel 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
```


W
wizardforcel 已提交
69 70


W
init  
wizardforcel 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88

```
val env = StreamExecutionEnvironment.getExecutionEnvironment()

// start a checkpoint every 1000 ms env.enableCheckpointing(1000)

// advanced options: 
// set mode to exactly-once (this is the default) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig.setCheckpointTimeout(60000)

// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined. env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// allow only one checkpoint to be in progress at the same time env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
```

W
wizardforcel 已提交
89

W
init  
wizardforcel 已提交
90

噼里啪啦嘣 已提交
91
### Related Config Options 相关配置选项
W
init  
wizardforcel 已提交
92

噼里啪啦嘣 已提交
93
可以通过`conf/flink-conf.yaml`设置更多参数和/或默认值(请参阅[配置](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html)作为完整指南):
W
init  
wizardforcel 已提交
94 95 96 97 98 99 100

| Key | Default | Description |
| --- | --- | --- |
| 

##### state.backend

噼里啪啦嘣 已提交
101
 | (none) | 用于存储和检查点状态的状态后端。|
W
init  
wizardforcel 已提交
102 103 104 105
| 

##### state.backend.async

噼里啪啦嘣 已提交
106 107 108
 | true | 选项是否应在可能的情况下使用异步快照方法并进行配置。有些状态后端可能不支持异步快照,或者只支持异步快照,而忽略此选项。| 
 |
  
W
init  
wizardforcel 已提交
109 110
##### state.backend.fs.memory-threshold

噼里啪啦嘣 已提交
111
 | 1024 | 状态数据文件的最小大小。所有小于该状态块的状态块都内联存储在根检查点元数据文件中。 |
W
init  
wizardforcel 已提交
112 113 114 115
| 

##### state.backend.incremental

噼里啪啦嘣 已提交
116
 | false | 选项状态后端是否应创建增量检查点(如果可能)。对于增量检查点,仅存储与上一个检查点不同的值,而不是完整的检查点状态。某些状态后端可能不支持增量检查点并忽略此选项。|
W
init  
wizardforcel 已提交
117 118 119 120
| 

##### state.backend.local-recovery

噼里啪啦嘣 已提交
121
 | false | 选项状态后端是否应创建增量检查点(如果可能)。对于增量检查点,仅存储与上一个检查点不同的值,而不是完整的检查点状态。某些状态后端可能不支持增量检查点并忽略此选项。|
W
init  
wizardforcel 已提交
122 123 124 125
| 

##### state.checkpoints.dir

噼里啪啦嘣 已提交
126
 | (none) | 用于在FLink支持的文件系统中存储检查点数据文件和元数据的默认目录。必须从所有参与进程/节点访问存储路径(即,所有任务管理器和作业管理器)。|
W
init  
wizardforcel 已提交
127 128 129 130
| 

##### state.checkpoints.num-retained

噼里啪啦嘣 已提交
131
 | 1 | 要保留的已完成检查点的最大数量。 |
W
init  
wizardforcel 已提交
132 133 134 135
| 

##### state.savepoints.dir

噼里啪啦嘣 已提交
136
 | (none) | 保存点的默认目录。用于将保存点写入文件系统的状态后端(MemoryStateBackend、FsStateBackend、RocksDBStateBackend)。 |
W
init  
wizardforcel 已提交
137 138 139 140
| 

##### taskmanager.state.local.root-dirs

噼里啪啦嘣 已提交
141
 | (none) | 配置参数定义了用于存储用于本地恢复的基于文件的状态的根目录。本地恢复当前仅覆盖键控状态后端。当前,MemoryStateBackend不支持本地恢复并忽略此选项|
W
init  
wizardforcel 已提交
142

噼里啪啦嘣 已提交
143
## Selecting a State Backend 选择状态后端
W
init  
wizardforcel 已提交
144

噼里啪啦嘣 已提交
145
flink的[checkpoint机制](//ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html)存储定时器和有状态运算符的所有状态的一致快照,包括连接器、Windows和任何[用户定义的状态](state.html)。其中存储检查点(例如,JobManager内存、文件系统、数据库)取决于所配置的**状态后端**
W
init  
wizardforcel 已提交
146

噼里啪啦嘣 已提交
147
默认情况下,状态保存在TaskManager中的内存中,检查点存储在JobManager中的内存中。为了适当地保持大状态,Flink支持在其他状态后端存储和检查点状态的各种方法。可以通过`StreamExecutionEnvironment.setStateBackend(…)配置状态后端的选择。``
W
init  
wizardforcel 已提交
148

噼里啪啦嘣 已提交
149
有关作业范围和集群范围内配置的可用状态后端和选项的详细信息,请参见[State backends](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html)]。
W
init  
wizardforcel 已提交
150

噼里啪啦嘣 已提交
151
## State Checkpoints in Iterative Jobs 迭代作业中的状态检查点
W
init  
wizardforcel 已提交
152

噼里啪啦嘣 已提交
153
Flink目前只为没有迭代的作业提供处理保证。在迭代作业上启用检查点会导致异常。为了在迭代程序上强制检查点,用户在启用检查点时需要设置一个特殊标志:‘env.enableCheckpoint(Interval,force=true)’。
W
init  
wizardforcel 已提交
154

噼里啪啦嘣 已提交
155
请注意,在循环边缘的飞行记录(以及与它们相关的状态更改)将在失败期间丢失。
W
init  
wizardforcel 已提交
156

噼里啪啦嘣 已提交
157
## Restart Strategies 重启策略
W
init  
wizardforcel 已提交
158

噼里啪啦嘣 已提交
159
Flink支持不同的重新启动策略,这些策略控制在发生故障时如何重新启动作业。有关更多信息,请参见[重新启动Strategies](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/restart_strategies.html).
W
init  
wizardforcel 已提交
160