Soul 网关源码阅读(六)Sofa 请求处理概览

Soul 网关源码阅读(六)Sofa请求处理概览





简介



今天来探索一下Sofa请求处理流程,看看和前面的HTTP、Dubbo有什么异同



Sofa示例运行



PS:如果请求加上参数运行不成功,请更新最新版本,此问题在新版本中已经修复: https://blog.csdn.net/baidu_27627251/article/details/112726694   Add sofa param resolve service



首先运行下官方的Sofa示例,首先启动下mysql和zookeeper,这里使用docker启动:



docker run -dit --name zk -p 2181:2181 zookeepe
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest



然后运行Soul-admin,Soul-Bootst,在管理图界面起用sofa插件



运行官方示例:soul-examples –> soul-examples-sofa



这里有个坑,需要注意,启动后,bootstrap打印日志中没有sofa插件,请求一直失败



o.d.s.w.configuration.SoulConfiguration : load plugin:[global] [org.dromara.soul.plugin.global.GlobalPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[sign] [org.dromara.soul.plugin.sign.SignPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[waf] [org.dromara.soul.plugin.waf.WafPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[rate_limiter] [org.dromara.soul.plugin.ratelimiter.RateLimiterPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[hystrix] [org.dromara.soul.plugin.hystrix.HystrixPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[resilience4j] [org.dromara.soul.plugin.resilience4j.Resilience4JPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[divide] [org.dromara.soul.plugin.divide.DividePlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[webClient] [org.dromara.soul.plugin.httpclient.WebClientPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[divide] [org.dromara.soul.plugin.divide.websocket.WebSocketPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[alibaba-dubbo-body-param] [org.dromara.soul.plugin.alibaba.dubbo.param.BodyParamPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[dubbo] [org.dromara.soul.plugin.alibaba.dubbo.AlibabaDubboPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[monitor] [org.dromara.soul.plugin.monitor.MonitorPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[response] [org.dromara.soul.plugin.httpclient.response.WebClientResponsePlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[response] [org.dromara.soul.plugin.alibaba.dubbo.response.DubboResponsePlugin]
```xml

    查看初始的plugins也是没有sofa

```java
public SoulWebHandler soulWebHandler(final ObjectProvider<List> plugins) {
List pluginList = plugins.getIfAvailable(Collections::emptyList);
final List soulPlugins = pluginList.stream()
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
return new SoulWebHandler(soulPlugins);
}



经过探索和老哥的讨论,发现是每天起用sofa的相关依赖



我们在Bootstrap的pom.xml文件中添加下面的依赖,然后重启





com.alipay.sofa
sofa-rpc-all
5.7.6


org.apache.curator
curator-client
4.0.1


org.apache.curator
curator-framework
4.0.1


org.apache.curator
curator-recipes
4.0.1


org.dromara
soul-spring-boot-starter-plugin-sofa
${project.version}




然后查看日志打印,出现了sofa相关的插件



o.d.s.w.configuration.SoulConfiguration : load plugin:[global] [org.dromara.soul.plugin.global.GlobalPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[sign] [org.dromara.soul.plugin.sign.SignPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[waf] [org.dromara.soul.plugin.waf.WafPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[rate_limiter] [org.dromara.soul.plugin.ratelimiter.RateLimiterPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[hystrix] [org.dromara.soul.plugin.hystrix.HystrixPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[resilience4j] [org.dromara.soul.plugin.resilience4j.Resilience4JPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[divide] [org.dromara.soul.plugin.divide.DividePlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[webClient] [org.dromara.soul.plugin.httpclient.WebClientPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[divide] [org.dromara.soul.plugin.divide.websocket.WebSocketPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[sofa-body-param] [org.dromara.soul.plugin.sofa.param.BodyParamPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[dubbo] [org.dromara.soul.plugin.alibaba.dubbo.AlibabaDubboPlugin]
// 新出现的sofa相关的
o.d.s.w.configuration.SoulConfiguration : load plugin:[sofa] [org.dromara.soul.plugin.sofa.SofaPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[monitor] [org.dromara.soul.plugin.monitor.MonitorPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[response] [org.dromara.soul.plugin.alibaba.dubbo.response.DubboResponsePlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[response] [org.dromara.soul.plugin.httpclient.response.WebClientResponsePlugin]
// 新出现的sofa相关的
o.d.s.w.configuration.SoulConfiguration : load plugin:[response] [org.dromara.soul.plugin.sofa.response.SofaResponsePlugin]



日志中还打印了成功加载sofa相关的metadata



o.d.s.p.s.cache.ApplicationConfigCache : init sofa reference success there meteData is :MetaData
o.d.s.p.s.cache.ApplicationConfigCache : init sofa reference success there meteData is :MetaData
o.d.s.p.s.cache.ApplicationConfigCache : init sofa reference success there meteData is :MetaData



访问链接:  http://localhost:9195/sofa/findAll  ,成功返回如下请求



{
"code": 200,
"message": "Access to success!",
"data": {
"name": "hello world Soul Sofa , findAll",
"id": "998932133"
}
}



源码解析



PS:Debug时间过程,会导致超时,这是正常的



首先找到我们非常熟悉的切入点函数: SoulWebHandler ,在下面的方法中打上断点,然后逐步进入每个plugin观察其行为



public Mono execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}



GlobalPlugin



进入其中,执行处理逻辑,通过上篇的分析,我们知道大致作用是将请求类型放入exchange中



SignPlugin/WafPlugin/RateLimiterPlugin/HystrixPlugin/Resilience4JPlugin



进入其中,但plugin没有起用,不执行逻辑



DividePlugin/WebClientPlugin/WebSocketPlugin



通过类型判断,跳过,不执行



BodyParamPlugin



这个plugin在dubbo的时候也是要执行,我们来看看它干了写啥事。从下面逻辑中大概能看出先判断是否符合执行条件,然后将请求地址替换成真实的后端地址



public Mono execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
final ServerHttpRequest request = exchange.getRequest();
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
// 判断类型是不是sofa
if (Objects.nonNull(soulContext) && RpcTypeEnum.SOFA.getName().equals(soulContext.getRpcType())) {
MediaType mediaType = request.getHeaders().getContentType();
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
return body(exchange, serverRequest, chain);
}
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
return formData(exchange, serverRequest, chain);
}
// 进行路径替换,换成后端服务器的
return query(exchange, serverRequest, chain);
}
return chain.execute(exchange);
}

private Mono query(final ServerWebExchange exchange, final ServerRequest serverRequest, final SoulPluginChain chain) {
exchange.getAttributes().put(Constants.SOFA_PARAMS,
HttpParamConverter.ofString(() -> serverRequest.uri().getQuery()));
return chain.execute(exchange);
}



这里有个非常有趣的现象,我们第四篇分析中,dubbo也走了一模一样的类,在上面函数逻辑中,我们看出它并不能兼容dubbo,那dubbo是如何走这个类的呢?



通过调试我们发现,当同时启动dubbo和sofa的时候,会生成两个BodyParamPlugin,名称是一模一样的,但里面的判断类型换了,很神奇,猜测这个类是动态生成之类的手段,这里先不探索了,可以后面研究研究



AlibabaDubboPlugin



通过类型判断,跳过



SofaPlugin



这个从名字就看出来是核心类,我们看看它具体干了啥。通过下面注释的地方,可以看出和dubbo请求的非常的相像。进行路由匹配,成功后rpc调用,获得结果后放入exchange中



# AbstractSoulPlugin
public Mono execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
final Collection selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
final SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
final List rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
// 判断是否有路由规则能匹配上
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
// 匹配上后执行处理逻辑
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}

# SofaPlugin
protected Mono doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
String body = exchange.getAttribute(Constants.SOFA_PARAMS);
SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
if (!checkMetaData(metaData)) {
assert metaData != null;
log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) {
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = SoulResultWrap.error(SoulResultEnum.SOFA_HAVE_BODY_PARAM.getCode(), SoulResultEnum.SOFA_HAVE_BODY_PARAM.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 这里得到结果,跟下去
final Mono result = sofaProxyService.genericInvoker(body, metaData, exchange);
return result.then(chain.execute(exchange));
}

# SofaProxyService
public Mono genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
// 根据请求路径,获得rpc中的consumer
ConsumerConfig reference = ApplicationConfigCache.getInstance().get(metaData.getPath());
if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterfaceId())) {
ApplicationConfigCache.getInstance().invalidate(metaData.getServiceName());
reference = ApplicationConfigCache.getInstance().initRef(metaData);
}
GenericService genericService = reference.refer();
Pair pair;
if (null == body || "".equals(body) || "{}".equals(body) || "null".equals(body)) {
pair = new ImmutablePair(new String[]{}, new Object[]{});
} else {
pair = sofaParamResolveService.buildParameter(body, metaData.getParameterTypes());
}
CompletableFuture future = new CompletableFuture();
RpcInvokeContext.getContext().setResponseCallback(new SofaResponseCallback() {
@Override
public void onAppResponse(final Object o, final String s, final RequestBase requestBase) {
future.complete(o);
}

@Override
public void onAppException(final Throwable throwable, final String s, final RequestBase requestBase) {
future.completeExceptionally(throwable);
}

@Override
public void onSofaException(final SofaRpcException e, final String s, final RequestBase requestBase) {
future.completeExceptionally(e);
}
});
// 通过函数名,能猜到是rpc调用,然后得到结果,并将结果放入exchange中
genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());
return Mono.fromFuture(future.thenApply(ret -> {
if (Objects.isNull(ret)) {
ret = Constants.SOFA_RPC_RESULT_EMPTY;
}
exchange.getAttributes().put(Constants.SOFA_RPC_RESULT, ret);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
})).onErrorMap(SoulException::new);
}



MonitorPlugin



不跳过,但插件没有开启



DubboResponsePlugin/WebClientResponsePlugin



通过类型判断,跳过执行



SofaResponsePlugin



通过上几篇分析和名字能猜出来是将响应返回给客户端的,通过下面代码的逻辑也可以看出



public Mono execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
// 从exchange中获取结果
final Object result = exchange.getAttribute(Constants.SOFA_RPC_RESULT);
if (Objects.isNull(result)) {
Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
Object success = SoulResultWrap.success(SoulResultEnum.SUCCESS.getCode(), SoulResultEnum.SUCCESS.getMsg(), JsonUtils.removeClass(result));
// 熟悉的返回响应的函数
return WebFluxResultUtils.result(exchange, success);
}));
}



总结



上面的plugin流程大致如下:



  • GlobalPlugin : 将请求类型置入

  • SignPlugin : 跳过不执行逻辑

  • WafPlugin : 跳过不执行逻辑

  • RateLimiterPlugin : 跳过不执行逻辑

  • HystrixPlugin : 跳过不执行逻辑

  • Resilience4JPlugin : 跳过不执行逻辑

  • DividePlugin : 跳过不执行逻辑

  • WebClientPlugin : 跳过不执行逻辑

  • WebSocketPlugin : 跳过不执行逻辑

  • BodyParamPlugin : 执行RPC的请求路径替换,替换成真实的服务器后端路径,作用类似于dividePlugin;不同rpc有相关的这个插件名,也就是会有多个BodyParamPlugin

  • AlibabaDubboPlugin : 跳过不执行逻辑

  • SofaPlugin : 发送请求到后台服务器,拿到结果,写入exchange

  • MonitorPlugin : 跳过不执行逻辑

  • DubboResponsePlugin : 跳过不执行逻辑

  • WebClientResponsePlugin : 跳过不执行逻辑

  • SofaResponsePlugin : 从exchange中拿到响应,发送给客户端



经过这几篇的分析,我们进一步优化我们对Soul网关的请求流程,大致如下:





更新了我们对处理流程中一些类的认知:



  • 通过上篇分析,得到GlobalPlugin的具体作用,是置入请求类型

  • BodyParamPlugin 作用类似于 dividePlugin,能进行路由匹配,匹配后将路径修改真实的后端服务器路径;并且能动态的生成同名的但针对不同rpc实现的plugin