# 分布式数据
## 依赖
为了使用分布式数据(`Distributed Data`),你需要将以下依赖添加到你的项目中:
```xml
com.typesafe.akka
akka-distributed-data_2.12
2.5.22
dependencies {
compile group: 'com.typesafe.akka', name: 'akka-distributed-data_2.12', version: '2.5.22'
}
libraryDependencies += "com.typesafe.akka" %% "akka-distributed-data" % "2.5.22"
```
## 示例项目
你可以下载「[Distributed Data](https://developer.lightbend.com/start/?group=akka&project=akka-samples-distributed-data-java)」示例项目来看看分布式数据是如何在实践中应用的。
## 简介
当需要在 Akka 集群中的节点之间共享数据时,Akka 分布式数据非常有用。通过提供类似 API 的键值存储的 Actor 访问数据。键是具有数据值类型信息的唯一标识符。这些值是无冲突的复制数据类型(`Conflict Free Replicated Data Types (CRDTs)`)。
所有数据条目都通过直接复制和基于`gossip`的协议传播到集群中的所有节点或具有特定角色的节点。你可以对读写的一致性级别进行细粒度控制。
自然`CRDTs`可以在不协调的情况下从任何节点执行更新。来自不同节点的并发更新将由单调合并函数(`monotonic merge function`)自动解决,所有数据类型都必须提供该函数。状态变化总是收敛的。为计数器、集合、映射和寄存器提供了几种有用的数据类型,你还可以实现自己的自定义数据类型。
它最终是一致的,旨在提供高读写可用性(分区容限),低延迟。请注意,在最终一致的系统中,读取可能会返回过期的值。
## 使用 Replicator
`akka.cluster.ddata.Replicator` Actor 提供了与数据交互的 API。`Replicator` Actor 必须在集群中的每个节点上启动,或者在标记有特定角色的节点组上启动。它与运行在其他节点上的具有相同路径(而不是地址)的其他`Replicator`实例通信。为了方便起见,它可以与`akka.cluster.ddata.DistributedData`扩展一起使用,但也可以使用`Replicator.props`作为普通 Actor 启动。如果它是作为一个普通的 Actor 启动的,那么它必须在所有节点上以相同的名称、相同的路径启动。
状态为「[WeaklyUp](https://doc.akka.io/docs/akka/current/cluster-usage.html#weakly-up)」的集群成员将参与分布式数据。这意味着数据将通过后台`gossip`协议复制到`WeaklyUp`节点。请注意,如果一致性模式是从所有节点或大多数节点读/写,则它不会参与任何操作。`WeaklyUp`节点不算作集群的一部分。因此,就一致操作而言,3 个节点 + 5 个`WeaklyUp`的节点本质上是 3 个节点。
下面是一个 Actor 的示例,它将`tick`消息调度到自己,并为每个`tick`添加或删除`ORSet`(`observed-remove set`)中的元素。它还订阅了这一变化。
```java
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.cluster.Cluster;
import akka.cluster.ddata.DistributedData;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.ORSetKey;
import akka.cluster.ddata.Replicator;
import akka.cluster.ddata.Replicator.Changed;
import akka.cluster.ddata.Replicator.Subscribe;
import akka.cluster.ddata.Replicator.Update;
import akka.cluster.ddata.Replicator.UpdateResponse;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class DataBot extends AbstractActor {
private static final String TICK = "tick";
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
private final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
private final Cluster node = Cluster.get(getContext().getSystem());
private final Cancellable tickTask =
getContext()
.getSystem()
.scheduler()
.schedule(
Duration.ofSeconds(5),
Duration.ofSeconds(5),
getSelf(),
TICK,
getContext().getDispatcher(),
getSelf());
private final Key> dataKey = ORSetKey.create("key");
@SuppressWarnings("unchecked")
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, a -> a.equals(TICK), a -> receiveTick())
.match(
Changed.class,
c -> c.key().equals(dataKey),
c -> receiveChanged((Changed>) c))
.match(UpdateResponse.class, r -> receiveUpdateResponse())
.build();
}
private void receiveTick() {
String s = String.valueOf((char) ThreadLocalRandom.current().nextInt(97, 123));
if (ThreadLocalRandom.current().nextBoolean()) {
// add
log.info("Adding: {}", s);
Update> update =
new Update<>(dataKey, ORSet.create(), Replicator.writeLocal(), curr -> curr.add(node, s));
replicator.tell(update, getSelf());
} else {
// remove
log.info("Removing: {}", s);
Update> update =
new Update<>(
dataKey, ORSet.create(), Replicator.writeLocal(), curr -> curr.remove(node, s));
replicator.tell(update, getSelf());
}
}
private void receiveChanged(Changed> c) {
ORSet data = c.dataValue();
log.info("Current elements: {}", data.getElements());
}
private void receiveUpdateResponse() {
// ignore
}
@Override
public void preStart() {
Subscribe> subscribe = new Subscribe<>(dataKey, getSelf());
replicator.tell(subscribe, ActorRef.noSender());
}
@Override
public void postStop() {
tickTask.cancel();
}
}
```
### 更新
若要修改和复制数据值,请向本地`Replicator`发送一条`Replicator.Update`消息。
`Update`的`key`的当前数据值作为参数传递给`Update`的`modify`函数。函数应该返回数据的新值,然后根据给定的一致性级别复制该值。
`modify`函数由`Replicator` Actor 调用,因此必须是一个纯函数,只使用封闭范围中的数据参数和稳定字段。例如,它必须不访问封闭 Actor 的发送方(`getSender()`)引用。
由于`modify`函数通常不可序列化,因此只能从与`Replicator`运行在同一本地`ActorSystem`中的 Actor 发送`Update`。
你提供的写入一致性级别具有以下含义:
- `writeLocal`,该值将立即只被写入本地副本,然后通过`gossip`进行传播。
- `WriteTo(n)`,该值将立即写入至少`n`个副本,包括本地副本
- `WriteMajority`,该值将立即写入大多数副本,即至少`N/2 + 1`个副本,其中`N`是群集(或群集角色组)中的节点数
- `WriteAll`,该值将立即写入群集中的所有节点(或群集中角色组中的所有节点)。
当你指定在`x`个节点中写入`n`个节点时,更新将首先复制到`n`个节点。如果在超时的`1/5`之后没有足够的`Acks`,更新将复制到其他`n`个节点。如果剩余节点少于`n`个,则使用所有剩余节点。可访问节点比不可访问节点更受欢迎。
请注意,`WriteMajority`有一个`minCap`参数,该参数对于指定小集群以实现更好的安全性非常有用。
```java
class DemonstrateUpdate extends AbstractActor {
final SelfUniqueAddress node =
DistributedData.get(getContext().getSystem()).selfUniqueAddress();
final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
final Key counter1Key = PNCounterKey.create("counter1");
final Key> set1Key = GSetKey.create("set1");
final Key> set2Key = ORSetKey.create("set2");
final Key activeFlagKey = FlagKey.create("active");
@Override
public Receive createReceive() {
ReceiveBuilder b = receiveBuilder();
b.matchEquals(
"demonstrate update",
msg -> {
replicator.tell(
new Replicator.Update(
counter1Key,
PNCounter.create(),
Replicator.writeLocal(),
curr -> curr.increment(node, 1)),
getSelf());
final WriteConsistency writeTo3 = new WriteTo(3, Duration.ofSeconds(1));
replicator.tell(
new Replicator.Update>(
set1Key, GSet.create(), writeTo3, curr -> curr.add("hello")),
getSelf());
final WriteConsistency writeMajority = new WriteMajority(Duration.ofSeconds(5));
replicator.tell(
new Replicator.Update>(
set2Key, ORSet.create(), writeMajority, curr -> curr.add(node, "hello")),
getSelf());
final WriteConsistency writeAll = new WriteAll(Duration.ofSeconds(5));
replicator.tell(
new Replicator.Update(
activeFlagKey, Flag.create(), writeAll, curr -> curr.switchOn()),
getSelf());
});
return b.build();
}
}
```
作为`Update`的答复,如果在提供的超时内根据提供的一致性级别成功复制了值,则会向`Update`的发送方发送`Replicator.UpdateSuccess`。否则将返回`Replicator.UpdateFailure`子类。请注意,`Replicator.UpdateTimeout`回复并不意味着更新完全失败或已回滚。它可能仍然被复制到一些节点上,并最终通过`gossip`协议复制到所有节点上。
```java
b.match(
UpdateSuccess.class,
a -> a.key().equals(counter1Key),
a -> {
// ok
});
```
```java
b.match(
UpdateSuccess.class,
a -> a.key().equals(set1Key),
a -> {
// ok
})
.match(
UpdateTimeout.class,
a -> a.key().equals(set1Key),
a -> {
// write to 3 nodes failed within 1.second
});
```
你总会看到自己的写入。例如,如果发送两条`Update`消息更改同一个`key`的值,则第二条消息的`modify`函数将看到第一条`Update`消息执行的更改。
在`Update`消息中,你可以传递一个可选的请求上下文,`Replicator`不关心该上下文,但它包含在回复消息中。这是一种传递上下文信息(例如原始发送者)的方便方法,无需使用`ask`或维护本地相关数据结构。
```java
class DemonstrateUpdateWithRequestContext extends AbstractActor {
final Cluster node = Cluster.get(getContext().getSystem());
final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
final WriteConsistency writeTwo = new WriteTo(2, Duration.ofSeconds(3));
final Key counter1Key = PNCounterKey.create("counter1");
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
String.class,
a -> a.equals("increment"),
a -> {
// incoming command to increase the counter
Optional