如何利用 Pravega 的状态同步器解决分布式一致性问题

Pravega 是一个开源的分布式流存储平台。其中,StateSynchronizer 组件以 stream 为基础,对外提供一致性状态共享服务。StateSynchronizer 允许一组进程同时读写同一共享状态而不必担心一致性问题。本文以实现一个简单的共享字典应用为示例,演示 StateSynchronizer 相关 API 的使用。

API 示例

示例实现 1:SharedConfig(共享配置)

在深入 StateSynchronizer 的使用细节之前,先让我们看一个使用 StateSynchronizer 的示例。本章节示例全部来自 Pravega 官方文档 [1]。该示例的所有源代码都可以从 Pravega-Samples 的 GitHub 仓库 [2] 下载获取。这个示例程序使用 StateSynchronizer 实现了 Java 的 Map (字典)数据结构,我们不妨将其称作“SharedMap(共享字典)”。我们以这个 SharedMap 为基础,实现了 SharedConfig。SharedConfig 允许一组进程一致性地读写一个共享的,由一组键 / 值对属性所构成的配置对象。除了示例代码之外,我们还提供了一个命令行小程序 SharedConfigCLI,让读者可以方便地体验一下这个 SharedConfig 应用。该示例的整体架构如图 1 所示。

命令行小程序 SharedConfigCLI 所支持的所有命令如下:

  • GET_ALL: 打印 SharedConfig 中的所有属性。

  • GET {key}: 打印给定键的属性值。

  • PUT {key}, {value}: 用给定的键 / 值对更新 SharedConfig。如果存在旧值,则同时将其打印。

  • PUT_IF_ABCENT {key}, {value}: 用给定的键 / 值对更新 SharedConfig,当且仅当该属性不存在。

  • REMOVE {key} [, {currentValue}]: 将给定属性从 SharedConfig 中移除。如果给出了{currentValue}值,则仅当该属性当前值匹配时才进行移除操作。

  • REPLACE {key}, {newValue}, [, {currentValue}]: 更新属性值。如果给出了{ currentValue }值,则仅当该属性当前值匹配时才进行更新操作。

  • CLEAR: 从 SharedConfig 中移除所有的键。

  • REFRESH: 强制从共享状态进行同步。

  • HELP: 打印可用命令列表。

  • QUIT: 退出程序。

在安装好上述名为“Pravega-Samples”的项目之后,请使用相同的 scope 名字和 stream 名字启动两个 SharedConfigCLI 的实例,这让我们可以模拟两个不同的进程通过协调它们本地的 SharedConfig 副本来操作同一共享状态。读者可以按如下步骤进行操作,以便初步体会一下这个 SharedConfig 是如何被不同进程协调的。

步骤 进程 1 进程 2 讨论
1 GET_ALL GET_ALL 两个进程都看见一个空的 SharedConfig。
2 PUT p1, v1 进程 1 添加名为 p1 的属性
3 GET p1 GET p1 进程 1 看见属性 p1 的值为 v1
进程 2 此时并不能看见属性 p1。为什么?因为进程 2 还没有从共享状态刷新它的本地副本。
4 REFRESH 将进程 2 的本地副本与共享状态同步。
5 GET p1 进程 2 现在可以看见进程 1 在步骤 2 中所做的改动了。
6 REPLACE p1, newVal, v1 进程 2 尝试更改 p1 的值,但使用的是一个条件更改,这意味着只有当 p1 的旧值为 v1 时该更改才能生效(此时确实如此)。
7 GET p1 p1 的值现在变成 newVal 了。
8 REPLACE p1, anotherVal, v1 进程 1 尝试使用和进程 2 在步骤 6 中所用相同的条件更改。这次的更改会失败,因为 p1 此时的值已不再是 v1 了。
9 GET p1 步骤 8 中的失败更改会导致进程 1 的共享状态本地副本被更新。p1 此时的值已是 newVal 了。

表 1 两个进程同时操作同一个 SharedConfig [1]

读者可以重复类似的操作序列来探索 PUT_IF_ABCENT 和其它修改共享状态操作的语义。该示例背后蕴含的基本思想是:所有对 SharedConfig 的更改操作只有当它们作用于最新的状态值时才能成功。此处,我们使用乐观的并发控制在 SharedConfig 对象的不同消费者之间达到了高效的一致性。读者还可以创建多个 SharedConfig 状态对象同时运行,每个 SharedConfig 对象个体都使用基于不同 stream 的独立的 StateSynchronizer。

示例实现 2:SharedMap(共享字典)的基本结构

在上一个示例中,我们其实已经使用到了 SharedMap。StateSynchronizer 可以用于实现几乎所有数据结构的共享版本。或许你的应用只需要共享一个简单的整数计数器,那么我们可以用 StateSynchronizer 实现一个共享计数器。又或许你需要共享的数据是一个包含当前集群里所有服务器的一个集合,那么我们可以用 StateSynchronizer 实现一个共享集合。还有许许多多类似这样的可能性。现在,让我们详细讨论一下如何基于 StateSynchronizer 实现 SharedMap 以便进行对象共享。

创建 StateSynchronizer

StateSynchronizer 也是 Pravega 客户端的类型之一,类似 EventStreamReader 或者 EventStreamWriterStateSynchronizer 通过 ClientFactory 接口创建。每个 StateSynchronizer 在其 scope 内必须具有唯一的名字。 SynchronizerConfig 可用于定制 StateSynchronizer 的行为(尽管在当前版本中,StateSynchronizer 尚不支持自定义行为)。 StateSynchronizer 利用 Java 的泛型机制,允许开发者指定基于特定类型的 StateSynchronizer

共享 / 同步状态 StateT

在设计一个使用 StateSynchronizer 的应用时,开发者必须确定需要被同步(共享)的状态类型。我们需要共享一个 Map ?一个 Set ?还是一个普通的 POJO 类?也就是说,我们需要共享什么样的数据结构。这将定义出 StateSynchronizer 的核心类型,即 StateSynchronizer 接口上的泛型类型 StateTStateT 对象可以是任何 Java 对象,只要它实现了 Pravega 的 Revisioned 接口。 Revisioned 是很简单的一个接口,它允许 Pravega 能够比较任意两个不同的 StateT 对象。

在我们的例子中,SharedMap 才是 StateSynchronizer 的真正使用者。它定义了一个简单的 Map 接口,提供 get(key)set(key, value) 等操作,就像一个常见的 Map 对象那样。它同时也按 StateSynchronizer 的要求实现了 Revisioned 接口,并且使用一个简单的 ConcurrentHashMap 作为内部 Map 的实现。所以,在我们的这个例子中, StateT 就是 SharedStateMap

状态变更操作 Update 与 InitialUpdate

在一个 StateSynchronizer 应用中,除了类型 StateT 之外,还需要定义另外两个非常重要的类型: Update 类型和 InitialUpdate 类型。 Update 代表了持久化在 stream 中的“增量”或者说是更新对象。 InitialUpdate 则是一个特殊的起始更新对象,用于启动 StateSynchronizer。 UpdateInitialUpdate 都是基于泛型类型 StateT 定义的。

StateSynchronizer 使用单 segment 的 stream 来存储对共享对象的更新操作。以 UpdateInitialUpdate 形式存在的更新对象,根据当前本地的共享状态副本是否相对于 stream 处于最新状态,被写入 stream 之中。如果检测到该更新操作将会作用于在共享状态的一个早期版本,则更新操作不会被应用。StateSynchronizer 自身在本地内存中维护了一份共享状态的副本,并且保存了该副本相关的版本元信息。通过 getState() 方法可以获取该本地状态副本。本地的状态副本有可能已经过期,应用程序可以通过调用 fetchUpdates() 方法来刷新状态副本,该操作会从 stream 获取所有在该版本之后产生的更新操作。绝大多数来自应用程序的更新操作都通过 updateState() 方法进行。 updateState() 方法接受一个函数对象作为参数。该函数对象会被调用并传入最新的共享状态,而函数本身则负责确定有哪些更新将作用于该状态。

在我们的例子中, InitialUpdate 实现如下:

复制代码

/**
* Create a Map. This is used by StateSynchronizer to initialize shared state.
*/
privatestaticclassCreateStateimplementsInitialUpdate<SharedStateMap>,Serializable{
privatestaticfinallongserialVersionUID =1L;
privatefinalConcurrentHashMap impl;

publicCreateState(ConcurrentHashMap impl){
this.impl = impl;
}

@Override
publicSharedStateMapcreate(String scopedStreamName, Revision revision){
returnnewSharedStateMap(scopedStreamName, impl, revision);
}
}

源代码 1 用于产生起始状态的特殊更新操作 InitialUpdate [1]

如源码所示, CreateState 对象用于初始化 stream 中的共享对象,它将会创建一个新的,空 SharedStateMap 对象。读者可以把其他例子中的 InitialUpdate 想象成把计数器置 1,或者把一个集合初始化成仅包含数个固定元素。将诸如“initialize”和“update”的函数方法用类的形式来实现或许看起来有一些奇怪,但当你仔细思考其中的缘由时,你就会发现这其实非常合理。所有这些更新操作,例如 initialize 和 update,都需要被保存到 stream 中,因此它们必须被实现为可序列化的对象。我们必须允许一个客户端在任何时间点启动,计算出当前共享状态,并且随着更新操作(可能来自其它客户端)不断被写入到 stream 中,还能够维持当前最新状态。如果我们仅仅在 stream 中保存“最新的状态值”,那么就无法使用并发控制来提供并发读写的一致性了。

StateUpdate 抽象

Update 类型看起来就更加怪异了。对于 Map 数据结构来说,并非只有一种更新操作,而是有各种各样的更新操作:更新一个键 / 值对,更新一组键 / 值对,删除一个键 / 值对,删除所有键 / 值对等等。每一类这样的更新操作都必须由一个对应的类实现。为此,我们定义了一个名为 StateUpdate 的抽象类,而所有其它更新操作的实现类都将继承自这个抽象类:

复制代码

/**
* A base class for all updates to the shared state. This allows for several different types of updates.
*/
privatestaticabstractclassStateUpdateimplementsUpdate<SharedStateMap>,Serializable{
privatestaticfinallongserialVersionUID =1L;

@Override
publicSharedStateMapapplyTo(SharedStateMap oldState, Revision newRevision){
ConcurrentHashMap newState =newConcurrentHashMap(oldState.impl);
process(newState);
returnnewSharedStateMap(oldState.getScopedStreamName(), newState, newRevision);
}

publicabstractvoidprocess(ConcurrentHashMap updatableList);
}

源代码 2 所有更新操作的抽象基类 [1]

通过定义一个抽象类 StateUpdate ,我们可以基于该抽象类来定义其它的 Update 实现。抽象类实现了“ applyTo ”方法。StateSynchronizer 调用该方法将更新应用于当前的状态对象,并返回对应的更新后的状态对象。实际的更新工作是在旧状态的底层 Map 实现对象的一个拷贝上完成的: process() 方法作用于实现对象上,并返回 SharedStateMap 的一个新版本,而这个新版本则使用 process() 方法处理后的实现对象作为内部状态。抽象类所定义的 process() 方法正是更新操作真正进行的位置。该方法由具体的更新操作实现类负责实现,例如 SharedMap 上的 Put 更新操作和 PutAll 更新操作。

基于 StateUpdate 抽象的 Put 操作

以下是 SharedMap 上的 Put(key, value) 操作的实现:

复制代码

/**
* Add a key/value pair to the State.
*/
privatestaticclassPutextendsStateUpdate{
privatestaticfinallongserialVersionUID =1L;
privatefinalK key;
privatefinalV value;

publicPut(K key, V value){
this.key = key;
this.value = value;
}

@Override
publicvoidprocess(ConcurrentHashMap impl){
impl.put(key, value);
}
}

源代码 3 Put 更新操作的实现 [1]

此处, process() 方法用于将一个键 / 值对加入字典数据结构,如果对应的键已经存在,则更改对应的值。SharedMap 上的所有操作都是通过创建 StateUpdate 子类实例的方式进行的。

示例实现 3:在 SharedMap 上执行操作

SharedMap 的例子展示了 StateSynchronizer 上的典型操作。SharedMap 对外提供的 API 非常类似 Java 的 Map 接口。通过使用 StateUpdate 的各个子类进行状态变更操作,SharedMap 以操作 StateSynchronizer 的形式实现了 Map 上的各种操作。

创建与初始化

以下源码将展示如何创建 SharedMap:

复制代码

/**
* Creates the shared state using a synchronizer based on the given stream name.
*
*@paramclientFactory - the Pravega ClientFactory to use to create the StateSynchronizer.
*@paramstreamManager - the Pravega StreamManager to use to create the Scope and the Stream used by the StateSynchronizer
*@paramscope - the Scope to use to create the Stream used by the StateSynchronizer.
*@paramname - the name of the Stream to be used by the StateSynchronizer.
*/
publicSharedMap(ClientFactory clientFactory, StreamManager streamManager, String scope, String name){
streamManager.createScope(scope);

StreamConfiguration streamConfig = StreamConfiguration.builder().scope(scope).streamName(name)
.scalingPolicy(ScalingPolicy.fixed(1))
.build();

streamManager.createStream(scope, name, streamConfig);

this.stateSynchronizer = clientFactory.createStateSynchronizer(name,
newJavaSerializer<StateUpdate>(),
newJavaSerializer<CreateState>(),
SynchronizerConfig.builder().build());

stateSynchronizer.initialize(newCreateState(newConcurrentHashMap()));
}

源代码 4 SharedMap 的创建 [1]

一个 SharedMap 实例是以定义 scope 和 stream 的形式创建出来的(在大多数情况下,对应的 scope 和 stream 可能已经存在,那么第 10-16 行通常为一个空操作)。 StateSynchronizer 对象本身是在第 18-21 行中由 ClientFactory 创建的,非常类似 reader 和 writer 的创建。注意,可以为 Update 对象和 InitialUpdate 对象分别制定各自的序列化器。目前, SynchronizerConfig 还仅仅只是一个空实现,因为 StateSynchronizer 在当前版本中尚无可用的配置项。

StateSynchronizer 提供一个以 InitialUpdate 对象作为参数的 initialize() 方法。该方法在 SharedMap 的构造函数中被调用,以保证共享状态被正确地初始化。注意,在大多数情况下, SharedMap 很可能是基于一个已经包含 SharedMap 共享状态的 stream 被创建。在这种情况下,调用 initialize() 方法也没有任何问题,因为 initialize() 方法不会修改 stream 中的共享状态。

读操作

所有的读操作,即不会改变共享状态的操作,例如: get(key)containsValue(value) 等,都是基于 StateSynchronizer 的本地状态副本的。所有的这些操作都使用 getState() 方法获取本地共享状态副本,然后在该副本上的进行读取操作。 StateSynchronizer 的本地状态副本可能过期。在这种情况下,SharedMap 的客户端可能需要使用 refresh() 方法来强制 StateSynchronizer 从 stream 上的最新共享状态刷新本地状态,而该刷新操作是通过 StateSynchronizer 对象的 fetchUpdates() 方法完成的。

注意,用存在过期状态的可能性来换取更快的响应,这是设计决策层面的一种权衡。当然,我们也可以很容易地将读操作实现为在读取本地共享状态副本之前总是先做一次刷新操作。如果开发者可以预见到共享状态上会存在频繁更新,那么这也不失为一种有效的策略。在我们的这个示例中,我们假设 SharedMap 对象会被频繁读取,但并不会被频繁更新,因此选择了直接读取本地状态副本。

写(更新)操作

正如我们先前讨论的那样,所有的写操作都是由 StateUpdate 类的各个具体子类实现的: clear() 操作使用 StateUpdate 的子类 Clear 来进行键 / 值对的删除,而 put() 操作则使用 Put 子类,如此种种。现在,让我们深入 put() 操作的实现来更加详细地讨论 StateSynchronizer 的编程。以下是 put() 操作的源码:

复制代码

/**
* Associates the specified value with the specified key in this map.
*
*@paramkey - the key at which the value should be found.
*@paramvalue - the value to be entered into the map.
*@return- the previous value (if it existed) for the given key or null if the key did not exist before this operation.
*/
publicVput(K key, V value){
finalAtomicReference oldValue =newAtomicReference(null);
stateSynchronizer.updateState((state, updates) -> {
oldValue.set(state.get(key));
updates.add(newPut(key,value));
});
returnoldValue.get();
}

源代码 5 SharedMap 上 put 接口方法的实现 [1]

注意,传入给 StateSynchronizerupdateState() 方法的函数对象很可能被调用多次。将该函数作用于旧状态所得的结果只有当该状态是 stream 上的最新状态时才会被写出。如果由于竞争而导致乐观并发控制检查失败,那么操作会反复重试(从而导致传入的函数对象被调用多次)。在大多数情况下,只需要很少的几次重试就可以完成对应操作了。在某些情况下,开发者也可以选择在调用 updateState() 方法之前先手动调用一次 fetchUpdates() ,从 stream 上同步最新的共享状态到 StateSynchronizer 。这其实是一种优化手段,在更新操作的预期频繁程度和更新操作的高效性之间做出权衡。如果你能预见到会有大量更新,那么就在调用 updateState() 方法之前先调用一次 fetchUpdates() 。在我们的示例中,我们假设不会有太多更新,因此选择让 StateSynchronizer 自己处理状态刷新。

删除操作

我们选择在实现删除操作的同时,利用 StateSynchronizer 的 compact 特性。我们的策略是,每执行 5 次 remove() 操作后,以及每次 clear() 操作后,都自动进行一次 compact() 调用。我们甚至还可以选择在每执行 5 次 update() 操作后也进行一次 compct() 调用,但目前我们想仅把 compact() 操作运用在删除操作上。

读者可以把 compact() 操作想象成 StateSynchronizer 的某种形式的“垃圾清除”机制。在一定数量的更新操作被应用到共享状态之后,一种更有效的形式是再写入一个新的起始状态。这个新的起始状态可以看作是所有写入 stream 的更新操作的累积表示。通过这种方法,所有早于该 compact() 操作的数据现在都可以被忽略,并最终从 stream 中清除。

图 2 每个 compact() 操作都会创建一个新的起始状态 图片来自 [1]

如图 2 所示, compact() 操作的结果就是将一个新的起始状态 Initial2 写入 stream。现在,所有比 Change3 更早的数据,包括 Change3 自身,都已经无用了,可以从 stream 中被作为垃圾清除掉。

Pravega 系列文章计划

Pravega 根据 Apache 2.0 许可证开源, 0.4 版本 已于近日发布。我们欢迎对流式存储感兴趣的大咖们加入 Pravega 社区,与 Pravega 共同成长。本篇文章为 Pravega 系列第六篇,系列文章标题如下(标题根据需求可能会有更新):

  1. 实时流处理 (Streaming) 统一批处理 (Batch) 的最后一块拼图:Pravega

  2. 开源 Pravega 架构解析:如何通过分层解决流存储的三大挑战?

  3. Pravega 应用实战:为什么云原生特性对流存储至关重要

  4. “ToB” 产品必备特性: Pravega 的动态弹性伸缩

  5. 取代 ZooKeeper!高并发下的分布式一致性开源组件 StateSynchronizer

  6. 分布式一致性解决方案 – 状态同步器 (StateSynchronizer) API 示例

  7. Pravega 的仅一次语义及事务支持

  8. 与 Apache Flink 集成使用

作者简介

  • 蔡超前:华东理工大学计算机应用专业博士研究生,现就职于 Dell EMC,6 年搜索和分布式系统开发以及架构设计经验,现从事流存储和流搜索相关的设计与研发工作。

  • 滕昱:现就职于 Dell EMC 非结构化数据存储部门 (Unstructured Data Storage)团队并担任软件开发总监。2007 年加入 Dell EMC 以后一直专注于分布式存储领域。参加并领导了中国研发团队参与两代 Dell EMC 对象存储产品的研发工作并取得商业上成功。从 2017 年开始,兼任 Streaming 存储和实时计算系统的设计开发与领导工作。

参考文献

[1] “Working with Pravega: State Synchronizer,” Dell EMC, [Online]. Available: https://github.com/pravega/pravega/blob/master/documentation/src/docs/state-synchronizer.md .

[2] “StateSynchronizer Related Samples in Pravega-Samples GitHub Repository,” [Online]. Available: https://github.com/pravega/pravega-samples/tree/master/pravega-client-examples/src/main/java/io/pravega/example/statesynchronizer .

更多内容,请关注 AI 前线