跟着 RocketMQ 学并发网络编程

预备知识

一 : 网络通信的一般形式

采用  BIO 通信模型  的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接。我们一般通过在 while(true)  循环中服务端会调用  accept()  方法等待接收客户端的连接的方式监听请求,请求一旦接收到一个连接请求,就可以建立通信套接字在这个通信套接字上进行读写操作,此时不能再接收其他客户端连接请求,只能等待同当前连接的客户端的操作执行完成, 不过可以通过多线程来支持多个客户端的连接,每个客户端一个线程就是上图所示,但是这里有一个问题就是如果客户端的很多我们需要开启很多线程然后操作系统对线程的个数是限制的,以及在调度过程中很多线程也会导致上下文切换效率不高,所以该模型有的局限性,为了能够支持多个客户端进行通信我们就引入了多路复用的机制 

二: 单Reactor多线程模型

acceptor 是一个独立的模块在代码的实现他就是一个线程(当然也可以是一个线程池) 一个ThreadPool 线程池用来进行数据编解码 我们称作工作线程 这两个模块交由reactor 统一的管理与分发 ,reactor 负责监听客户端是否 可 read/write ,发现可以IO则交由 worker thread 去处理

三:多个reactor  模型

acceptor 仅仅负责监听是否有客户端连接上来,然后交给sub reactor ,sub reactor 负责监听已经连接过来的客户端的 是否可读 可写,当可以 IO 的时候,交给 work thread 进行处理 (netty 如果使用nio ,就是这种模型) 这种模型可以适配超大规模的客户端的连接

Rocket Mq 里面的netty 调用事件 

首先我们从Netty Remoting Client 出发

先看一下整个它的 继承图

我们直观的认为NettyRemotingClient 他会有一些基本的操作,发送给服务端的数据,在这个类图的的结构中也体现出来了 invokeAsync invokeSync 异步发送和同步发送 

同步调用 和异步调用这里有有些区别的,首先从函数的签名上

invokeSync   有返回值 他的类型是RemotingCommand

invokeAsync 返回值void ,但是入参有一个InvokeCallback 

异步调用有callback,我们仅仅需要在callback里面做我们的业务逻辑处理即可

而同步调用这里需要我们有点小的技巧

talk is cheap 直接看代码

从整个代码流程来看,在call 之前做了一些verify 还有就是一些before after 的记录 真正实现的是invokeSyncImpl 方法的调用

invokeSyncImpl 方法是在  NettyRemotingAbstract 里面,它是NettyRemotingClient 的父类  这里也透露了一些设计模式,就是把一些公共通用的代码可以向上他的父类进行抽象 (模板)

1 : 通过RemotingCommand 获取一个 opaque ,可以理解为每一次请于获取一个request id

2: 通过request id 和一些超时的参数 构造一个ResponseFuture 

3: 将ResponseFuture 放入到 一个reponseTable 中(他是一个concurrent  hash map),request id 作为key

4: 调用channel(netty 里面代表一个client 客户端 )的write flush 方法,同时在调用成功过后,更改一下 response future 的状态 成功 与否,然后在response table 移除

5: 调用ResponseFuture的waitResponse方法 (重点,程序会阻塞在这里)

6: 最后判断进行一个返回responseCommand

看一下  ResponseFuture waitResponse  的实现机制

里面有一个 countDownLatch.await() 方法的调用同时它的初始值仅仅为1

告诉我们这个需要有另外的线程或者其他流程需要  countDownLatch .countDown() 方法进行一个释放操作,当然也可以超时我们暂时先忽略. 

最后我们发现在  NettyRemotingAbstract 类中  processResponseCommand 方法调用了

responseFuture.putResponse() 方法, putResponse 方法就是调用了 countDownLatch.countDown 方法,从

processResponseCommand 这个方法的名字来看,我们便知道他是处理我们服务端到客户端的返回数据的方法

它里面的实现逻辑跟我们之前发送有一点类似

1: remoting command 里面获取 opaque (request id)

2: responseTable 里面获取 ResponseFuture通过 request id

3: 设置request future 里面的response command (返回的内容)

4: response Table 里面移除该request id

到这里,我们 remotion client 里面阻塞的方法可以进行继续往下运行了

client 与server 的时序图

1: 进行各种校验

2: 构造一个ResponseFuture 对象用于接收返回值

3: 将response future 放入到 response table 中

4: 通过channel 发送 request 到服务端

5: 调用response future 的 waitResponse 方法进行等待服务端返回结果

6: 服务端返回数据,调用NettyRemotingAbstract 的 processResponseCommand 方法用于接受response

7: 调用response future 的putResponse 唤醒 waitResponse

8: 最后得到结果,返回给NettyRemotingClient