grpc kafka redis 杂记

0x00 关于

最近一个项目使用的技术栈有点乱,也爬了不少坑, 记录备忘一下.

处理流程:

  • 在springboot项目中监听kafka数据
  • 在springboot中, 使用grpc调用golang的grpc服务程序,作格式转换
  • 把处理结果合并后, 推送给远程redis

0x01 关于grpc代码的生成

经过实践, 发现网上提供的 pom.xml 插入大量代码,使用插件来生成 proto 代码的方式并不优雅.

笔者认为, 采用命令行的方式来生成代码更为简洁灵活高效.

(这里就不细说插件方式生成的代码 是在classes中,而不是放到src下, 更不想说, proto内容更新后, mvn compile各种失败带来的痛苦)

cd prj\src\main
protoc.exe -I=proto --grpc-java_out=.\java proto\lua.proto
protoc.exe -I=proto --java_out=.\java proto\lua.proto
  • pom.xml java grpc 依赖的库:
        1.23.0
        3.9.0
        1.8
        1.8
    

    
        
            io.grpc
            grpc-netty-shaded
            ${grpc.version}
            runtime
        
        
            io.grpc
            grpc-protobuf
            ${grpc.version}
        
        
            io.grpc
            grpc-stub
            ${grpc.version}
        

        
            com.google.protobuf
            protobuf-java-util
            ${protobuf.version}
        
    

0x02 如何让Kafka每次接入时消费最新数据

给项目配置了kafka参数 : auto-offset-reset: latest
但是, 项目重启后, 还是会从上一次消费结束的地方接着消息.

如何解决这个问题呢? 方法如下:

@KafkaListener 所在类, 继承一下 ConsumerSeekAware

重点是现在的 seekToEnd 方法

@Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
    }

    @Override
    public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map assignments, ConsumerSeekCallback callback) {
    }

0x03 增强Kafka消费能力

在实测过程中, 发现消费能力有问题, 导致kafka的消费队列lag很大.

开始以为是向远程 redis单次写数据耗时太大, 从而优化成 leftPushAll 方法.

但问题依旧, 后来经同事提醒, 可能是带宽问题.

果然, 提一下带宽,问题立即解决.