seata TC 模块分析
在分析TC各模块之前,首先再回顾下seata的整个执行流程:
-
TM:事务的发起者。用来告诉TC,全局事务的开始,提交,回滚。
-
RM:具体的事务资源,每一个RM都会作为一个分支事务注册在TC。
-
TC:事务的协调者。也可以看做是seata-server,用于接收事务注册,提交和回滚。
为什么TC是seata核心呢?因为TC这个角色就好像上帝一样,协调控制TM、RM协同工作,TC一旦不好使,那么RM和TM就会出现问题,那必定会乱的一塌糊涂。
那么一个优秀的事务协调者应该具备哪些能力呢?
-
正确的协调:能正确的协调RM和TM接下来应该做什么,做错了应该怎么办,做对了应该怎么办。
-
高可用: 事务协调器在分布式事务中很重要,如果不能保证高可用,那么它也没有存在的必要了。
-
高性能:事务协调器的性能一定要高,如果事务协调器性能有瓶颈那么它所管理的RM和TM那么会经常遇到超时,从而引起回滚频繁。
-
高扩展性:这个特点是属于代码层面的,如果是一个优秀的框架,那么需要给使用方很多自定义扩展,比如服务注册/发现,读取配置等等。
TC整体设计
TC整体设计如上,各模块说明如下:
-
CoordinatorCore: 在最下面的模块是事务协调器核心代码,主要用来处理事务协调的逻辑,如是否commit,rollback等协调活动。
-
Store: 存储模块,用来将我们的数据持久化,防止重启或者宕机数据丢失。
-
Discovery: 服务注册/发现模块,用于将Server地址暴露给我们Client。
-
Config: 用来存储和查找我们服务端的配置。
-
Lock: 锁模块,用于给Seata提供全局锁的功能。
-
RPC: 用于和其它端通信。
-
HA-Cluster: 高可用集群,目前还没开源。
Discovery
Discovery模块就是服务发现模块,TC启动后需要将自己的信息注册到服务中心,这样才能暴露给其他使用者,Discovery接口定义如下:
public interface RegistryService { void register(InetSocketAddress address) throws Exception; void unregister(InetSocketAddress address) throws Exception; void subscribe(String cluster, T listener) throws Exception; void unsubscribe(String cluster, T listener) throws Exception; List lookup(String key) throws Exception; void close() throws Exception; }
上述方法看定义就能知道其作用,因此不在赘述。Discovery模块有多个实现类,如下图所示:
我们知道,服务注册到服务中心,一般需要与服务中心进行心跳保活,否则服务中心会将该服务信息给清除,一般服务中心的client jar包会集成对应的心跳能力。但是针对redis来说,该如何注册呢,下面就以redis作为示例来分析服务注册流程,对应类 RedisRegistryServiceImpl。
从源码来看,seata使用redis注册是使用的是hash字典结构,那么它怎么心跳的呢?准确来说,seata注册redis是没有心跳的,只使用到了redis channel作为通知机制来保证tc实例变化时的通知上下线能力。注意 redis channel只有在更改时的通知能力,因此tm/rm在启动时需要先从redis获取数据之后,然后再设置channel监听,seata对应逻辑在方法 io.seata.discovery.registry.redis.RedisRegistryServiceImpl#subscribe
中,严格来讲,在获取数据和设置channel监听之间,如果数据发生了变更,是存在更新丢失问题的,不过这种问题触发概率极地可以忽略,并且后续有更新时还可以再次获取得到新的数据。
@Override public void register(InetSocketAddress address) { NetUtil.validAddress(address); String serverAddr = NetUtil.toStringAddress(address); try (Jedis jedis = jedisPool.getResource()) { jedis.hset(getRedisRegistryKey(), serverAddr, ManagementFactory.getRuntimeMXBean().getName()); jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER); } } @Override public void unregister(InetSocketAddress address) { NetUtil.validAddress(address); String serverAddr = NetUtil.toStringAddress(address); try (Jedis jedis = jedisPool.getResource()) { jedis.hdel(getRedisRegistryKey(), serverAddr); jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.UN_REGISTER); } }
由于没有心跳能力,那就就需要在tc进程关闭(JVM关闭)时执行unregister逻辑,因此会添加对应的 ShutdownHook
钩子函数,其能保证在程序正常退出、System.out、Ctrl+C中断结束、系统关闭、OOM宕机、kill pid进程时被执行到,但是如果是执行 kill -9 pid
这种,是没有执行到钩子函数的。register是在Netty启动后进行注册的,对应的逻辑在方法 io.seata.core.rpc.netty.NettyServerBootstrap#start
中,这里不在赘述。
Config
配置模块是seata的基础模块,比如netty线程配置、session配置等,这些配置seata基本上都有默认配置。seata有一个关于配置的类Configuration:
public interface Configuration { int getInt(String dataId, int defaultValue); String getConfig(String dataId, long timeoutMills); boolean putConfig(String dataId, String content); boolean removeConfig(String dataId, long timeoutMills); boolean removeConfig(String dataId); void addConfigListener(String dataId, ConfigurationChangeListener listener); void removeConfigListener(String dataId, ConfigurationChangeListener listener); }
目前seata适配了多种配置中心,如下:
-
getInt/Long/Boolean/getConfig():通过dataId来获取对应的值,读取不到配置、异常或超时将返回参数中的默认值。
-
putConfig:用于添加配置。
-
removeConfig:删除一个配置。
-
add/remove/get ConfigListener:添加/删除/获取 配置监听器,一般用来监听配置的变更。
在Seata中需要配置registry.conf,来配置config.type :
config { # file、nacos 、apollo、zk、consul、etcd3 type = "file" nacos { serverAddr = "127.0.0.1:8848" namespace = "" group = "SEATA_GROUP" username = "" password = "" dataId = "seataServer.properties" } consul { serverAddr = "127.0.0.1:8500" } apollo { appId = "seata-server" apolloMeta = "http://192.168.1.204:8801" namespace = "application" apolloAccesskeySecret = "" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }
Store
Store模块为seata的存储模块,主要存储session数据,存储类为 TransactionStoreManager
,其主要提供了读写session接口:
public interface TransactionStoreManager { boolean writeSession(LogOperation logOperation, SessionStorable session); GlobalSession readSession(String xid); GlobalSession readSession(String xid, boolean withBranchSessions); List readSession(SessionCondition sessionCondition); void shutdown(); enum LogOperation { /** * Global add log operation. */ GLOBAL_ADD((byte)1), /** * Global update log operation. */ GLOBAL_UPDATE((byte)2), /** * Global remove log operation. */ GLOBAL_REMOVE((byte)3), /** * Branch add log operation. */ BRANCH_ADD((byte)4), /** * Branch update log operation. */ BRANCH_UPDATE((byte)5), /** * Branch remove log operation. */ BRANCH_REMOVE((byte)6); private byte code; } }
存储模块目前支持file/db/redis存储,如果需要保证TC可用性建议将数据存储到DB中。
写入file文件流程如下:
-
加锁,开始将数据序列化,然后写入到currFileChannel
-
写入完成之后,判断是否需要创建新的存储文件
-
加锁,刷盘,刷盘有同步和异步2种方式,异步的话有flush线程来异步完成
public boolean writeSession(LogOperation logOperation, SessionStorable session) { writeSessionLock.lock(); long curFileTrxNum; try { if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) { return false; } lastModifiedTime = System.currentTimeMillis(); curFileTrxNum = FILE_TRX_NUM.incrementAndGet(); if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) { return saveHistory(); } } catch (Exception exx) { return false; } finally { writeSessionLock.unlock(); } flushDisk(curFileTrxNum, currFileChannel); return true; }
注意这里的刷盘模式默认为异步模式,为了数据安全性的话可以设置为同步刷盘,避免系统断电写入pagecache中未刷盘的数据丢失。
Lock
大家知道数据库实现隔离级别主要是通过锁来实现的,同样的在分布式事务框架Seata中要实现隔离级别也需要通过锁。一般在数据库中数据库的隔离级别一共有四种:读未提交,读已提交,可重复读,串行化。在Seata中可以保证隔离级别是读已提交,但是提供了达到读已提交隔离的手段。
Lock模块也就是Seata实现隔离级别的核心模块。在Lock模块中提供了一个接口用于管理我们的锁:
public interface Locker { boolean acquireLock(List rowLock) ; boolean releaseLock(List rowLock); boolean releaseLock(String xid, Long branchId); boolean releaseLock(String xid, List branchIds); boolean isLockable(List rowLock); void cleanAllLocks(); }
其中有三个方法:
-
acquireLock:用于对我们的BranchSession加锁,这里虽然是传的分支事务Session,实际上是对分支事务的资源加锁,成功返回true。
-
isLockable:根据事务ID,资源Id,锁住的Key来查询是否已经加锁。
-
cleanAllLocks:清除所有的锁。对于锁我们可以在本地实现,也可以通过redis或者mysql来帮助我们实现。官方默认提供了本地全局锁的实现:
public class FileLocker extends AbstractLocker { private static final int BUCKET_PER_TABLE = 128; private static final ConcurrentMap<String/* resourceId */, ConcurrentMap<String/* tableName */, ConcurrentMap>> LOCK_MAP = new ConcurrentHashMap();
在本地锁的实现中有两个常量需要关注:
-
BUCKET_PER_TABLE:用来定义每个table有多少个bucket,目的是为了后续对同一个表加锁的时候减少竞争。
-
LOCK_MAP:这个map从定义上来看非常复杂,里里外外套了很多层Map,这里用个表格具体说明一下:
可以看见实际上的加锁在bucketLockMap这个map中,这里具体的加锁方法比较简单就不作详细阐述,主要是逐步的找到bucketLockMap,然后将当前trascationId塞进去,如果这个主键当前有TranscationId,那么比较是否是自己,如果不是则加锁失败。
RPC
seata RPC通信基层基于netty来保证高性能,采用默认的配置netty线程池模型处理流程如下:
如果采用默认的基本配置那么会有一个Acceptor线程用于处理客户端的链接,会有cpu*2数量的NIO-Thread,在这个线程中不会做业务太重的事情,只会做一些速度比较快的事情,比如编解码,心跳事件,和TM注册。一些比较费时间的业务操作将会交给业务线程池,默认情况下业务线程池配置为最小线程为100,最大为500。
seata心跳是通过netty的IdleStateHandler来完成的,在Sever端对于写没有设置最大空闲时间,对于读设置了最大空闲时间,默认为15s(客户端默认写空闲为5s,发送ping消息),如果超过15s则会将链接断开,关闭资源。
在TC server侧的netty处理流程中,接收到数据首先进行解码,按照seata定义的固定协议格式进行,会将数据解码成 RpcMessage 消息,代码如下:
public class RpcMessage { private int id; private byte messageType; private byte codec; private byte compressor; private Map headMap = new HashMap(); private Object body;
后续seata的各种处理器的处理流程都是基于 RpcMessage 消息来的。
Coordinator Core
Coordinator Core的实现为DefaultCoordinator,其实TC中重要的协调类,负责分布式事务生命周期中的各种操作管理工作,其初始化代码如下:
// main方法中 DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer); coordinator.init(); nettyRemotingServer.setHandler(coordinator); // 初始化方法 public void init() { retryRollbacking.scheduleAtFixedRate(() -> { // xxx }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS); retryCommitting.scheduleAtFixedRate(() -> { // xxx }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); asyncCommitting.scheduleAtFixedRate(() -> { // xxx }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); timeoutCheck.scheduleAtFixedRate(() -> { // xxx }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS); undoLogDelete.scheduleAtFixedRate(() -> { // xxx }, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS); }
其内部会初始化几种线程池来驱动分布式事务操作的进行,比如undoLog清理、commit/rollback重试等等。关于Coordinator这块业务逻辑较多,后续会专门写这块内容,本文就不在赘述了。
参考资料:
-
http://seata.io/zh-cn/blog/seata-analysis-java-server.html
-
https://blog.csdn.net/qq_26323323/article/details/89814410