深入 Spring Boot(十三):整合 Kafka 详解
- Kafka
- 整合Kafka
- 小结
Kafka
Kafka是Apache组织下的一个分布式流处理平台,它具有以下三个功能特性:
- 作为消息系统,发布和订阅流式的记录,这个与消息队列或者企业消息系统类似。
- 作为存储系统,储存流式的记录,并且有较好的容错性。
- 作为流处理,在流式记录产生时就进行实时处理。
Kafka可用于构建以下两大类别的应用:
- 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据,相当于消息队列。
- 构建实时流式应用程序,对这些流数据进行转换或者影,也就是流处理。
Kafka的内容比较多,这里只简单介绍相关基本概念,更多kafka知识请浏览
http://kafka.apache.org/
。
topic
topic直译为主题,在kafka中就是数据主题,是数据记录发布的地方,可用来区分数据、业务系统。
producer
producer就是生产者,在kafka中Producer API允许一个应用程序发布一串流式的数据到一个或者多个topic。
consumer
consumer就是消费者,在kafka中Consumer API允许一个应用程序订阅一个或多个topic ,并且对发布给他们的流式数据进行处理。
Stream Processors
kafka中的Connector API允许构建并运行可重用的生产者或者消费者,将topics连接到已存在的应用程序或者数据系统,例如连接到一个关系型数据库,捕捉表的内容变更。
整合Kafka
使用IDEA新建项目,选择maven管理依赖和构建项目,在pom.xml中添加spring-boot-starter和spring-kafka依赖配置,项目中会使用单元测试检查整合是否正确,所以需要添加spring-boot-starter-test依赖,pom.xml详细内容如下。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 com.kafka demo 1.0-SNAPSHOT
org.springframework.boot spring-boot-starter-parent 2.2.0.RELEASE
org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka 2.3.1.RELEASE org.slf4j jcl-over-slf4j 1.7.28
org.springframework.boot spring-boot-maven-plugin
在resources目录下新增application.properties,并在其中配置生产者和消费者的相关参数,application.properties中参数会在应用启动时被加载解析并初始化,更多生产者和消费者的参数配置请查阅官方文档。
# kafka server的地址,如果有多个,使用逗号分割 spring.kafka.bootstrap-servers=127.0.0.1:9092 # 生产者发送失败时,重试次数 spring.kafka.producer.retries=0 # 生产者消息key和消息value的序列化处理类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 默认消费者group id spring.kafka.consumer.group-id=testGroup # 消费者消息key和消息value的序列化处理类 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
项目目录结构如下图所示。
DemoApplication.java
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
ProducerService.java
public interface ProducerService { void send(String topic, String msg); }
ProducerServiceImpl.java
import com.kafka.demo.service.ProducerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.lang.Nullable; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @Service("ProducerService") public class ProducerServiceImpl implements ProducerService { private Logger log = LoggerFactory.getLogger(ProducerServiceImpl.class); @Autowired private KafkaTemplate kafkaTemplate;
@Override public void send(String topic, String msg) { ListenableFuture future = kafkaTemplate.send(topic, msg); future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable throwable) { log.info("send failure"); }
@Override public void onSuccess(@Nullable Object o) { log.info("send success"); } }); } }
ConsumerService.java
public interface ConsumerService { void onReceived(String msg); }
ConsumerServiceImpl.java
import com.kafka.demo.service.ConsumerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class ConsumerServiceImpl implements ConsumerService { private Logger log = LoggerFactory.getLogger(ConsumerServiceImpl.class); @KafkaListener(topics="test")
@Override public void onReceived(String msg) { log.info("receive msg=" + msg); } }
DemoApplicationTest.java
import com.kafka.demo.service.ProducerService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest(classes={com.kafka.demo.DemoApplication.class}) public class DemoApplicationTest {
@Autowired private ProducerService producerService;
@Test public void test(){ producerService.send("test", "hello world"); } }
运行单元测试之前,需要下载并启动Kafka服务器。
进入
http://kafka.apache.org/downloads
下载最新版本并解压。 压缩包中Kafka脚本在Unix和Windows平台是不同的,下面使用到的相关命令,如果在Unix平台下请使用bin/,如果在Windows平台下请使用bin\windows\,并且脚本扩展名分别为.bat和.sh。因为kafka使用zookeeper来实现动态的集群扩展,所以要先启动zookeeper,使用如下命令:
bin/zookeeper-server-start.sh config/zookeeper.properties
然后使用如下命令启动kafka:
bin/kafka-server-start.sh config/server.properties
使用如下命令创建一个名为”test”的topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
上面各步骤完成后,即可执行单元测试验证了。
小结
本文通读下来,你会发现整合kafka很简单,添加kafka依赖、使用KafkaTemplate、使用
@KafkaListener注解就完成了,其实是SpringBoot在背后默默的做了很多工作,如果想深入了解这部分工作做了什么,入口
就是@SpringBootApplication注解。
@SpringBootApplication是一个组合注解,它包含了@SpringBootConfiguration、
@EnableAutoConfiguration和@ComponentScan等注解,通过这三个注解实现了bean的配置和加载。
深入
@EnableAutoConfiguration注解源码,你会发现加载了KafkaAutoConfiguration,在这里加载并实例化了kafka相关的类。
为了更方便的技术交流,建了一个微信群,加博主微信wind7rui,邀你进群!
END
如果觉得有收获,记得关注、点赞、转发。