SpringCloud学习记录 – Stream
2014 年 11 月 1 日
作者:朋也
日期:2021-01-20
版权声明:自由转载-非商用-非衍生-保持署名( 创意共享3.0许可证 )
Stream是啥?
Stream做的事跟hibernate一样,是个翻译器,hibernate适配不同的数据库,stream适配不同的队列
创建模块
创建一个stream-provider和两个 stream-consumer1, stream-consumer2 模块
stream-provider引入依赖
org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-stream-rabbit <!-- org.springframework.cloud spring-cloud-starter-stream-kafka -->
两个消费者引入的依赖
org.springframework.cloud spring-cloud-starter-stream-rabbit <!-- org.springframework.cloud spring-cloud-starter-stream-kafka -->
配置
stream-provider模块的配置
server.port=18087 spring.application.name=stream-provider # rabbit 配置 spring.cloud.stream.binders.myMQ.type=rabbit #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.host=localhost #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.port=5672 #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.username=guest #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.password=guest # kafka 配置 #spring.cloud.stream.binders.myMQ.type=kafka #spring.cloud.stream.kafka.binder.brokers=localhost #spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.bindings.output.destination=stream-provider-test # 下面配置的myMQ是上面配置的binders spring.cloud.stream.bindings.output.binder=myMQ
stream-consumer1模块的配置
server.port=18088 spring.application.name=stream-consumer1 spring.cloud.stream.binders.myMQ.type=rabbit #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.host=localhost #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.port=5672 #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.username=guest #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.password=guest #spring.cloud.stream.binders.myMQ.type=kafka #spring.cloud.stream.kafka.binder.brokers=localhost #spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.bindings.input.destination=stream-provider-test spring.cloud.stream.bindings.input.binder=myMQ # 给消费者添加上组名,只要组名相同,消费消息的时候就不会重复 spring.cloud.stream.bindings.input.group=consumer-group-test
stream-consumer2模块的配置
server.port=18089 spring.application.name=stream-consumer1 spring.cloud.stream.binders.myMQ.type=rabbit #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.host=localhost #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.port=5672 #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.username=guest #spring.cloud.stream.binders.myMQ.environment.spring.rabbitmq.password=guest #spring.cloud.stream.binders.myMQ.type=kafka #spring.cloud.stream.kafka.binder.brokers=localhost #spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.bindings.input.destination=stream-provider-test spring.cloud.stream.bindings.input.binder=myMQ spring.cloud.stream.bindings.input.group=consumer-group-test
原链文接: https://tomoya92.github.io/2021/01/20/spring-cloud-stream/
提供者
消息提供者发送消息到队列中,使用 @EnableBinding
来开启消息功能
package com.example.springcloudtutorial; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.UUID; @EnableBinding(Processor.class) public class MessageProviderService { @Resource private MessageChannel output; public String send() { String uuid = UUID.randomUUID().toString(); Map payload = new HashMap(); payload.put("uuid", uuid); output.send(MessageBuilder.withPayload(payload).build()); return uuid; } }
写一个接口,调用一次发一条消息
package com.example.springcloudtutorial; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @SpringBootApplication @RestController public class StreamProviderApplication { @Resource private MessageProviderService messageProviderService; @GetMapping("/sendMsg") public ResponseEntity sendMsg() { String uuid = messageProviderService.send(); System.out.println("provider: " + uuid); return ResponseEntity.ok(uuid); } public static void main(String[] args) { SpringApplication.run(StreamProviderApplication.class, args); } }
消费者
同样使用 @EnableBinding
开启消息功能,同时使用 @StreamListener
来监听消费哪的消息
StreamConsumer1Application.java
package com.example.springcloudtutorial; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.Message; import java.util.Map; @SpringBootApplication @EnableBinding(Processor.class) public class StreamConsumer1Application { @StreamListener(Processor.INPUT) public void receiveMsg(Message<Map> message) { Map payload = message.getPayload(); System.out.println("consumer2: " + payload.toString()); } public static void main(String[] args) { SpringApplication.run(StreamConsumer1Application.class, args); } }
StreamConsumer2Application.java
package com.example.springcloudtutorial; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.Message; import java.util.Map; @SpringBootApplication @EnableBinding(Processor.class) public class StreamConsumer2Application { @StreamListener(Processor.INPUT) public void receiveMsg(Message<Map> message) { Map payload = message.getPayload(); System.out.println("consumer2: " + payload.toString()); } public static void main(String[] args) { SpringApplication.run(StreamConsumer2Application.class, args); } }
总结
org.springframework.cloud.stream.annotation
这个包下的几个注解都标注了不推荐使用了,但官方文档上又没说明用哪个来代替