Spark SQL 分布式事务处理能力的探索与实践

云湖湖导读:

在当今大数据时代,随着存储数据量的增长,对数据库插入与读取性能的要求越来越严苛。为了提升访问数据库的性能,已经有越来越多成熟的方案来并发访问数据库,Spark JDBC Datasource就是其中之一。而另一方面,并发访问数据库也会带来各种各样的问题,比如数据的保序、数据倾斜、并发一致性等问题。

本文将会从具体案例入手,通过以下3点来详细描述如何利用Spark来处理分布式事务,解决并发一致性问题。

1、概述事务

2、案例分析

3、解决方案

更多优质内容请关注微信公众号“智能数据湖”

作者:William

责编:云湖湖

1、概述

1.1

数据库事务

事务 可以简单理解为对数据库的一批操作,这批操作要么全部成功,要么全部失败回滚。 事务有4大特性ACID

原子性(Atomicity)

事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。

一致性(Consistency)

事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。

隔离性(Isolation)

多个事务并发执行时,一个事务的执行不应影响其他事务的执行。

持久性(Durability)

已被提交的事务对数据库的修改应该永久保存在数据库中。

以MySQL为例,一个简单的事务操作流程

1.2

分布式数据库事务

分布式事务是数据库事务的子类,在分布式系统与微服务架构盛行的今天,各个不同模块、服务之间进行数据库业务的操作,也是需要保证事务性质的。

现在,已经有很多分布式事务的解决方案,比如常见的2PC、3PC、MySQL的XA事务、SAGA等等。然而,各种方案都有各自的优缺点,面对分布式系统中的所有需求问题,至今是无法找到一个十全十美的解决方案。著名的CAP理论提到,在一个分布式系统中,最多只能满足C、A、P中的两个需求:

C:Consistency 一致性

同一数据的多个副本保持一致。

A:Availability 可用性

系统能够提供正常服务并能在需求的时间内返回结果。

P:Partition tolerance 分区容忍性

由于各种不可预知的问题,系统中必然不能一直使数据的各个副本保持同步,那么系统能够在可用的同时容忍数据不同步的极限称为分区容忍性。

由于至今没有有效的解法来否决CAP理论,因此在实际分布式的解决方案中,又引入了BASE理论来辅助解决分布式中的难题,该理论将会结合下面的实际案例分析来进行介绍。

1.3

Spark JDBC事务处理

Spark JDBC Datasource是Spark SQL用于访问关系型数据库的模块,其中自然运用到了事务逻辑。附录有详细源码分析。

2、案例分析

2.1

某企业需要定期将存放在HBase中的订单信息数据增量同步到PostGre数据库中,以便后续做BI报表展示。流程示意图如下:

该企业要求每隔1小时将HBase中新产生的数据导入到PostGre中,在下一个导入周期开始前完成,每个周期的数据量在10~100GB级别左右。其中每张表都有时间戳(Timestamp)字段来表示数据插入的时间。另外,该企业还要求导入过程中有失败重试机制,保证所有订单数据最终导入成功,并且不能在PostGre中产生重复的订单数据。

考虑到10~100GB级别数据需要在一个周期内导入完成,性能还是有一定的需求。选型中确定了使用SparkSQL的分布式并发处理能力来进行数据的导入。

2.2

Spark JDBC中的并发问题

Spark能够并发启动多个task来同时进行数据的导入,解决了性能问题。但是并发也会带来其他的问题。

Spark是具有事务处理的能力,但是每个task都是一个独立的事务。task执行成功之后,数据会永久写入SQL数据库。如果其他的task执行失败,不会影响到已经执行成功的task进行回滚,这时SQL数据库将会产生脏数据。对于用户来讲,实际上只是执行了一条“insert”语句,但是Spark内部启动了并发task,将用户的SQL语句拆分成了多个SQL语句进行处理,而用户是不会感知的。从用户的角度考虑,如果需要事务的能力,要么这一条“insert”语句执行成功,数据正确写入数据库,要么执行失败,SQL语句回滚,没有数据写入数据库中。因此,Spark的并发处理能力无法满足用户对于事务的需求。

图 用户的一条insert语句被拆分成多个insert语句并发执行,当task4写入失败后,其他task写入的数据变成脏数据

3、解决方案

3.1

单并发事务处理

思路

1.2中提到了CAP理论,当C、A、P三者选其二的时候,如果选择CA,那么需要放弃掉数据副本,也就是说系统中只有一份数据,那么就减少了因为同步而带来的一致性问题(实际上考虑到分布式系统的可靠性,数据是需要多副本存放的,一般的分布式系统都会选择P,并在CA中二选一)。

根据这个思想,当并发存在问题的时候,采用单并发从根源上直接抹去分布式事务能力,是可以解决问题的。在Spark中可以设置参数spark.default.parallelism和spark.sql.shuffle.partitions为1来控制task的个数。这样利用Spark JDBC的事务能力,当单task执行失败的时候进行回滚,数据库就不会产生脏数据了。

图 单task成功写入即完成任务

图 单task失败事务回滚,数据库不会产生脏数据

优点

完全解决了Spark因并发导致的分布式事务功能缺失问题。

缺点

无法使用Spark的并行计算能力,性能直线下降,无法满足客户需求。

3.2

二阶段式:协调者与执行者实现

1.2中介绍过,业界已经有很多分布式事务的解决方案,在这里我们介绍一下比较简单的2PC如何在Spark中实现,其他的方案也都可以根据其思想结合Spark现有的执行模式来实现。

思路

先来介绍一下2PC的实现,在2PC的系统中需要有2个角色——协调者和参与者,协调者负责任务调度,参与者负责执行任务。2PC活动分为2个阶段——准备阶段和提交阶段。

准备阶段,协调者会发送确认信息询问参与者是否已准备好,具备事务提交能力;当协调者接收到所有参与者回复之后,进入提交阶段,如果所有参与者都准备好事务提交,则提交事务,如果有一个参与者返回无法进行事务提交,则所有参与者进行回滚。

图 2PC事务成功提交流程

图 2PC事务失败回滚流程

Spark中的Driver与Executor运行模式,正好填充了协调者与参与者的角色。Driver可以作为协调者,Executor中的每一个task可以作为参与者,这里还需要在Driver与task之间增加一个通信通道。准备阶段,Driver启动监听端口等待接收task反馈,各个task完成insert数据处理之后发送完成准备信息给Driver,并等待Driver下一步指令。提交阶段,Driver接受到所有task的准备信息之后,发送提交或者回滚指令。

图 Spark 2PC改造后实现分布式事务处理流程

在这里还需要打通2个难点。第一个问题是准备阶段,Driver在等待task反馈的时候,当task异常失败后有可能会无法反馈。这就需要Driver的监听与task状态检测机制关联,当遇到task异常退出的时候,Driver需要下发回滚指令,不能无止尽等待,防止Spark作业卡死。第二个问题是task是在队列等待下发的,当executor的资源不足以启动所有task的时候,会有部分task存在队列中,这时候driver等待所有task的反馈,启动中的task在等待driver下发新的指令,队列中未启动的task等待启动中的task完成任务释放资源,三方互相牵制,造成死循环。

图 当集群资源无法满足同时启动所有task的时候,造成死循环

这时需要通过Spark配置项来控制task并发数,确保executor有足够的资源能够一次性将所有并发task都启动。相关的配置项如下表所示。

表 控制task并发数的相关配置项

优点

既能满足简单的分布式事务需求,还能充分利用Spark的并行计算能力提升性能。

缺点

1. 需要精确控制task并发数,确保所有task能同时执行,否则会造成死循环。

2. 需要在Spark中实现分布式事务的逻辑,具有一定的工作量。

除了以上这些,2PC协议本身的缺陷在结合了Spark中的一些可靠性机制之后会有所减少(比如协调者挂死、参与者挂死等),但并不代表2PC的所有缺陷Spark都能规避。因此,对于一些有复杂分布式事务处理能力的需求,该方案还是不太满足。

3.3

最佳方案:可幂等的insert ignore模式

思路

1.2中我们遗留了一个BASE理论,先来做一下介绍。BASE理论是eBay架构师提出的,它是CAP理论的一个延伸。BASE理论由BA(Basically Available, 基本可用),S(Soft State),E(Eventual Consistency, 最终一致性)三部分组成,核心思想是即使无法做到强一致性,但应用可以采用适合的方式达到最终一致性。

从案例背景中,我们可以看出来,实际上用户需要的只是最终能够完成所有数据的同步,即所谓的最终一致性,因此我们可以结合Spark的重试机制与类似MySQL的insert ignore功能来实现。

先来介绍一下MySQL的insert ignore。当表中存在主键(例如ID),那么在使用insert ignore into语法时,如果遇到插入主键相同的情况下,将会忽略此次插入。Oracle有类似的功能,叫做merge into,有一点区别是MySQL遇到冲突时,忽略插入,而Oracle遇到冲突时是进行更新操作。PostGre没有直接提供类似功能,但是网上有很多关于PostGre如何实现insert ignore的方法,在这里就不做介绍了。而在案例中涉及到的PostGre,在华为云解决方案中采用的是DWS数据仓库服务,除了兼容PostGre还提供了Oracle的merge into语法来实现该功能。

因此,针对本文的案例,具体的方案是先对表设置主键,如果只是静态的订单状态等信息,则可以简单将订单ID作为主键,而有些涉及到订单操作信息的数据(订单ID会多次出现,记录用户每次对该订单的操作,例如创建订单、对订单支付、撤销订单等),则需要新增一个唯一的事件ID作为主键。

   表 订单操作信息表示例

表 增加事件ID作为主键后的订单操作信息表示例

然后在Spark中使用insert ignore的功能,这样如果遇到并发执行时某个task失败的情况,重新执行作业,会对已经插入的数据进行忽略,并不造成数据的重复。

图 使用insert ignore,当task执行失败时部分数据写入,REDO时忽略已写入数据

优点

既解决了一致性问题,也能利用Spark的并行计算能力提升性能。

缺点

该方案使用的是最终一致性的思想,因此如果遇到强一致性的需求是无法满足的。

注释

1、 参考文献: 王能斌. 《数据库系统教程(上册)》. 电子工业出版社

2、 通过分析向数据库插入数据的源码,来介绍一下 Spark 中如何进行事务处理的(代码表可左右滑动):

// @param insertStmt是插入数据库的sql语句模板, 例如”insert into tablexx values(?, ?, ?…)”

// 根据rdd中的数据调用java jdbc batch接口将问号替换掉

// @param batchSize指定了每批次插入sql数据库的记录(行)数

// @param dialect为各种不同SQL数据库提供类型转换器

// @param isolationLevel事务的隔离级别, 同时也是用于判断是否开启事务的标志

def savePartition(

getConnection: () => Connection,

table: String,

iterator: Iterator[Row],

rddSchema: StructType,

insertStmt: String,

batchSize: Int,

dialect: JdbcDialect,

isolationLevel: Int): Iterator[Byte] = {

val conn = getConnection()

var committed = false

// 判断对方数据库是否支持相应的隔离级别

// JDBC接口中隔离级别一共有5种:(网上有很多关于该级别的解释)

// “NONE”

// “READ_UNCOMMITTED”

// “READ_COMMITTED”

// “REPEATABLE_READ”

// “SERIALIZABLE”

//

// isolationLevel为用户指定的隔离级别, spark sql中可以由参数isolationLevel来指定, 默认为”READ_UNCOMMITTED”

// finalIsolationLevel为分析对方数据库的元数据信息后, 选择的最终隔离级别

//

var finalIsolationLevel = Connection.TRANSACTION_NONE

if (isolationLevel != Connection.TRANSACTION_NONE) {

try {

// 获取数据库的元数据信息

val metadata = conn.getMetaData

// 判断是否支持事务

if (metadata.supportsTransactions()) {

// 若数据库支持用户指定的隔离级别,则将最终隔离级别调整为用户指定

// 否则的话使用数据库的默认隔离级别

val defaultIsolation = metadata.getDefaultTransactionIsolation

finalIsolationLevel = defaultIsolation

if (metadata.supportsTransactionIsolationLevel(isolationLevel)) {

finalIsolationLevel = isolationLevel

} else {

logWarning(s”Requested isolation level $isolationLevel is not supported; ” +

s”falling back to default isolation level $defaultIsolation”)

}

} else {

// 若数据库不支持事务, finalIsolationLevel为’NONE’, 即不开启事务

logWarning(s”Requested isolation level $isolationLevel, but transactions are unsupported”)

}

} catch {

case NonFatal(e) => logWarning(“Exception while detecting transaction support”, e)

}

}

// 若最终隔离级别不为’NONE’则开启事务

val supportsTransactions = finalIsolationLevel != Connection.TRANSACTION_NONE

try {

// 若开启事务, 则关闭jdbc接口中的自动提交功能, 并设置隔离级别

if (supportsTransactions) {

conn.setAutoCommit(false) // Everything in the same db transaction.

conn.setTransactionIsolation(finalIsolationLevel)

}

// 根据不同的数据库(mysql, postgre等)构造不同的setters, 用于将insert模板语句中的”?”填充成数据

val stmt = conn.prepareStatement(insertStmt)

val setters = rddSchema.fields.map(f => makeSetter(conn, dialect, f.dataType))

val nullTypes = rddSchema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType)

val numFields = rddSchema.fields.length

try {

var rowCount = 0

while (iterator.hasNext) {

val row = iterator.next()

var i = 0

// 根据字段个数逐行对insert语句进行填充

while (i < numFields) {

if (row.isNullAt(i)) {

stmt.setNull(i + 1, nullTypes(i))

} else {

setters(i).apply(stmt, row, i)

}

i = i + 1

}

stmt.addBatch()

rowCount += 1

// 当插入数据的行数达到batchSize(默认1000)之后, 进行插入

if (rowCount % batchSize == 0) {

stmt.executeBatch()

rowCount = 0

}

}

// 最后全部处理完之后, 将剩余的数据插入数据库

if (rowCount > 0) {

stmt.executeBatch()

}

} finally {

stmt.close()

}

// 若开启了事务, 执行完stmt.executeBatch()之后数据库是不会生效的, 需要执行提交事务

if (supportsTransactions) {

conn.commit()

}

committed = true

Iterator.empty

} catch {

} finally {

if (!committed) {

// 提交失败, 如果开启了事务则进行回滚

if (supportsTransactions) {

conn.rollback()

}

conn.close()

} else {

}

}

}

▼  每周二18点,不见不散 

延伸阅读

实时流计算和时空数据库助力用户IoT的无限可能

《Serverless计算这么强大》

《Spark SQL在HBase的查询性能优化》

地理时空大数据概论

咦,在看吗?点一下「在看」再走呗:point_down: