spring-cloud-gateway过滤器实践

概述

这里是 SpringCloud Gateway 实践的第一篇,主要讲过滤器的相关实现。Spring-Cloud-Gateway 是以 WebFlux 为基础的响应式架构设计, 是异步非阻塞式的,它能够充分利用多核 CPU 的硬件资源去处理大量的并发请求。

本篇将基于 spring-cloud-gateway 简介 基础环境进行改造。

工作原理

Spring-Cloud-Gateway 基于过滤器实现,同 zuul 类似,有 prepost 两种方式的 filter,分别处理 前置逻辑后置逻辑 。客户端的请求先经过 pre 类型的 filter,然后将请求转发到具体的业务服务,收到业务服务的响应之后,再经过 post 类型的 filter 处理,最后返回响应到客户端。

过滤器执行流程如下, order 越大,优先级越低

接下来我们来验证下 filter 执行顺序。

这里创建 3 个过滤器,分别配置不同的优先级

@Slf4j
public class AFilter implements GlobalFilter {
    @Override
    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("AFilter前置逻辑");
        return chain.filter(exchange).then(Mono.fromRunnable(() -> {
            log.info("AFilter后置逻辑");
        }));
    }
}

@Slf4j
public class BFilter implements GlobalFilter {
    @Override
    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("BFilter前置逻辑");
        return chain.filter(exchange).then(Mono.fromRunnable(() -> {
            log.info("BFilter后置逻辑");
        }));
    }
}

@Slf4j
public class CFilter implements GlobalFilter {

    @Override
    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("CFilter前置逻辑");
        return chain.filter(exchange).then(Mono.fromRunnable(() -> {
            log.info("CFilter后置逻辑");
        }));
    }
}

@Configuration
public class FilterConfig {

    @Bean
    @Order(-1)
    public GlobalFilter a() {
        return new AFilter();
    }

    @Bean
    @Order(0)
    public GlobalFilter b() {
        return new BFilter();
    }

    @Bean
    @Order(1)
    public GlobalFilter c() {
        return new CFilter();
    }
}
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1

curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1

查看网关输出日志

2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter前置逻辑
2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter       : BFilter前置逻辑
2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter       : CFilter前置逻辑

2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter       : CFilter后置逻辑
2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter       : BFilter后置逻辑
2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter后置逻辑

自定义过滤器

现在假设我们要统计某个服务的响应时间,我们可以在代码中

long beginTime = System.currentTimeMillis();
// do something...
long elapsed = System.currentTimeMillis() - beginTime;
log.info("elapsed: {}ms", elapsed);

每次都要这么写是不是很烦?Spring 告诉我们有个东西叫 AOP。但是我们是微服务啊,在每个服务里都写也很烦。这时候就该网关的过滤器登台表演了。

自定义过滤器需要实现 GatewayFilterOrdered 。其中 GatewayFilter 中的这个方法就是用来实现你的自定义的逻辑的

Mono filter(ServerWebExchange exchange, GatewayFilterChain chain);

Ordered 中的 int getOrder() 方法是来给过滤器设定优先级别的,值越大则优先级越低。

好了,让我们来撸代码吧.

/**
 * 此过滤器功能为计算请求完成时间
 */
public class ElapsedFilter implements GatewayFilter, Ordered {

    private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";

    @Override
    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());
        return chain.filter(exchange).then(
                Mono.fromRunnable(() -> {
                    Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);
                    if (startTime != null) {
                        System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms");
                    }
                })
        );
    }

    /*
     *过滤器存在优先级,order越大,优先级越低
     */
    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }
}

我们在请求刚刚到达时,往 ServerWebExchange 中放入了一个属性 elapsedTimeBegin ,属性值为当时的毫秒级时间戳。然后在请求执行结束后,又从中取出我们之前放进去的那个时间戳,与当前时间的差值即为该请求的耗时。因为这是与业务无关的日志所以将 Ordered 设为 Integer.MAX_VALUE 以降低优先级。

现在再来看我们之前的问题:怎么来区分是 “pre” 还是 “post” 呢?其实就是 chain.filter(exchange) 之前的就是 “pre” 部分,之后的也就是 then 里边的是 “post” 部分。

创建好 Filter 之后我们将它添加到我们的 Filter Chain 里边

@Configuration
public class FilterConfig {


    /**
     * http://localhost:8100/filter/provider
     * @param builder
     * @return
     */
    @Bean
    public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
        // @formatter:off
        // 可以对比application.yml中关于路由转发的配置
        return builder.routes()
                .route(r -> r.path("/filter/**")
                        .filters(f -> f.stripPrefix(1)
                                .filter(new ElapsedFilter()))
                        .uri("lb://idc-cloud-provider")
                        .order(0)
                        .id("filter")
                )
                .build();
        // @formatter:on
    }

}

基于全局过滤器实现审计功能

// AdaptCachedBodyGlobalFilter

@Component
public class LogFilter implements GlobalFilter, Ordered {

    private Logger log = LoggerFactory.getLogger(LogFilter.class);

    private final ObjectMapper objectMapper = new ObjectMapper();
    private static final String START_TIME = "startTime";
    private static final List<HttpMessageReader> messageReaders = HandlerStrategies.withDefaults().messageReaders();

    @Override
    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        ServerHttpRequest request = exchange.getRequest();
        // 请求路径
        String path = request.getPath().pathWithinApplication().value();
        // 请求schema: http/https
        String scheme = request.getURI().getScheme();
        // 请求方法
        HttpMethod method = request.getMethod();
        // 路由服务地址
        URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        // 请求头
        HttpHeaders headers = request.getHeaders();
        // 设置startTime
        exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
        // 获取请求地址
        InetSocketAddress remoteAddress = request.getRemoteAddress();


        MultiValueMap formData = null;



        AccessRecord accessRecord = new AccessRecord();
        accessRecord.setPath(path);
        accessRecord.setSchema(scheme);
        accessRecord.setMethod(method.name());
        accessRecord.setTargetUri(targetUri.toString());
        accessRecord.setRemoteAddress(remoteAddress.toString());
        accessRecord.setHeaders(headers);

        if (method == HttpMethod.GET) {
            formData = request.getQueryParams();
            accessRecord.setFormData(formData);
            writeAccessRecord(accessRecord);
        }

        if (method == HttpMethod.POST) {
            Mono voidMono = null;
            if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) {
                // JSON
                voidMono = readBody(exchange, chain, accessRecord);
            }

            if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) {
                // x-www-form-urlencoded
                voidMono = readFormData(exchange, chain, accessRecord);
            }

            if (voidMono != null) {
                return voidMono;
            }

        }

        return chain.filter(exchange);
    }

    private Mono readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {
        return null;
    }

    private Mono readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {

        return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

            byte[] bytes = new byte[dataBuffer.readableByteCount()];
            dataBuffer.read(bytes);
            DataBufferUtils.release(dataBuffer);
            Flux cachedFlux = Flux.defer(() -> {
                DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                DataBufferUtils.retain(buffer);
                return Mono.just(buffer);
            });


            // 重写请求体,因为请求体数据只能被消费一次
            ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
                @Override
                public Flux getBody() {
                    return cachedFlux;
                }
            };

            ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();

            return ServerRequest.create(mutatedExchange, messageReaders)
                    .bodyToMono(String.class)
                    .doOnNext(objectValue -> {
                        accessRecord.setBody(objectValue);
                        writeAccessRecord(accessRecord);
                    }).then(chain.filter(mutatedExchange));
        });
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }

    /**
     * TODO 异步日志
     * @param accessRecord
     */
    private void writeAccessRecord(AccessRecord accessRecord) {

        log.info("\n\n start------------------------------------------------- \n " +
                        "请求路径:{}\n " +
                        "scheme:{}\n " +
                        "请求方法:{}\n " +
                        "目标服务:{}\n " +
                        "请求头:{}\n " +
                        "远程IP地址:{}\n " +
                        "表单参数:{}\n " +
                        "请求体:{}\n " +
                        "end------------------------------------------------- \n ",
                accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody());
    }
}
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1

curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1

输出结果

start-------------------------------------------------
 请求路径:/provider1
 scheme:http
 请求方法:POST
 目标服务:http://192.168.124.5:2001/provider1
 请求头:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"]
 远程IP地址:/192.168.124.5:49969
 表单参数:null
 请求体:{"name":"admin"}
 end-------------------------------------------------

接下来,我们来配置日志,方便日志系统提取日志。SpringBoot 默认的日志为 logback。



    

    
        
            
                %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable
            
        
    

    
        ${LOGS}/spring-boot-logger.log
        
            %d %p %C{1.} [%t] %m%n
        

        
            
            ${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log
            
            
                10MB
            
        
    

    
    
        <!---->
        
    

    
    
        
        
    

这样 console 和日志目录下就都有日志了。

自定义过滤器工厂

如果你看过静态路由的配置,你应该对如下配置有印象。

filters:
  - StripPrefix=1
  - AddResponseHeader=X-Response-Default-Foo, Default-Bar

StripPrefixAddResponseHeader 这两个实际上是两个过滤器工厂(GatewayFilterFactory),用这种配置的方式更灵活方便。

我们就将之前的那个 ElapsedFilter 改造一下,让它能接收一个 boolean 类型的参数,来决定是否将请求参数也打印出来。

public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory {

    private static final Log log = LogFactory.getLog(GatewayFilter.class);
    private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";
    private static final String KEY = "withParams";


    public List shortcutFieldOrder() {
        return Arrays.asList(KEY);
    }

    public ElapsedGatewayFilterFactory() {
        super(Config.class);
    }


    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
            exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());
            return chain.filter(exchange).then(
                    Mono.fromRunnable(() -> {
                        Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);
                        if (startTime != null) {
                            StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath())
                                    .append(": ")
                                    .append(System.currentTimeMillis() - startTime)
                                    .append("ms");
                            if (config.isWithParams()) {
                                sb.append(" params:").append(exchange.getRequest().getQueryParams());
                            }
                            log.info(sb.toString());
                        }
                    })
            );
        };
    }


    public static class Config {

        private boolean withParams;

        public boolean isWithParams() {
            return withParams;
        }

        public void setWithParams(boolean withParams) {
            this.withParams = withParams;
        }

    }
}

过滤器工厂的顶级接口是 GatewayFilterFactory ,我们可以直接继承它的两个抽象类来简化开发 AbstractGatewayFilterFactoryAbstractNameValueGatewayFilterFactory ,这两个抽象类的区别就是前者接收一个参数(像 StripPrefix 和我们创建的这种),后者接收两个参数(像 AddResponseHeader )。

GatewayFilter apply(Config config) 方法内部实际上是创建了一个 GatewayFilter 的匿名类,具体实现和之前的几乎一样,就不解释了。

静态内部类 Config 就是为了接收那个 boolean 类型的参数服务的,里边的变量名可以随意写,但是要重写 List shortcutFieldOrder() 这个方法。

这里注意一下,一定要调用一下父类的构造器把 Config 类型传过去,否则会报 ClassCastException

public ElapsedGatewayFilterFactory() {
    super(Config.class);
}

工厂类我们有了,再把它注册到 Spring 当中

@Bean
public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() {
    return new ElapsedGatewayFilterFactory();
}

然后添加配置(主要改动在 default-filters 配置)

server:
  port: 2000
spring:
  application:
    name: idc-gateway
  redis:
    host: localhost
    port: 6379
    timeout: 6000ms  # 连接超时时长(毫秒)
    jedis:
      pool:
        max-active: 1000  # 连接池最大连接数(使用负值表示没有限制)
        max-wait: -1ms      # 连接池最大阻塞等待时间(使用负值表示没有限制)
        max-idle: 10      # 连接池中的最大空闲连接
        min-idle: 5       # 连接池中的最小空闲连接
  cloud:
    consul:
      host: localhost
      port: 8500
    gateway:
      discovery:
        locator:
          enabled: true
          # 修改在这里。gateway可以通过开启以下配置来打开根据服务的serviceId来匹配路由,默认是大写
      default-filters:
        - Elapsed=true
      routes:
        - id: provider  # 路由 ID,保持唯一
          uri: lb://idc-provider1 # uri指目标服务地址,lb代表从注册中心获取服务
          predicates: # 路由条件。Predicate 接受一个输入参数,返回一个布尔值结果。该接口包含多种默认方法来将 Predicate 组合成其他复杂的逻辑(比如:与,或,非)
            - Path=/p/**
          filters:
            - StripPrefix=1 # 过滤器StripPrefix,作用是去掉请求路径的最前面n个部分截取掉。StripPrefix=1就代表截取路径的个数为1,比如前端过来请求/test/good/1/view,匹配成功后,路由到后端的请求路径就会变成http://localhost:8888/good/1/view

结语

本文到此结束。关于 Webflux 的学习刚入门,觉得可以像 Rxjava 那样在 onNext 中拿到异步数据,然而在 post 获取 body 中没生效。经测试可知 getBody 获得的数据输出为 null,而自己通过 Flux.create 创建的数据可以在订阅者中获取到。此处还有待研究,希望抛砖引玉,大家有研究出来的不吝赐教。同时,希望大家关注公众号【当我遇上你】。

参考