## Spring Cloud Stream
Spring Cloud Stream 是一个构建消息驱动微服务的框架,应用程序通过input通道或者output通道来与Spring Cloud Stream中binder交互,通过配置来binding. 而Spring Cloud Stream的binder负责与中间件交互, 消息的中间件有(RabbitMQ, Kafka, ActiveMQ).
本文以RabbitMQ作为中间件作为讲解(启动rabbitMQ: rabbitmq-plugins enable rabbitmq_management)

### 为什么要需要Stream呢? Stream的使用场景
a: 异步(消息的发送方, 只需要发送一条消息出去, 就可以不管了, 至于怎处理, 则交给消息的订阅者去处理)
b: 流量削峰(某宝, 某东大的电商平台, 需要stream去做削峰)
d: 应用解耦(发布者和订阅者相互间解耦)
d: 日志处理
### Spring Cloud Stream标准流程套路
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source/Sink:简单的可理解为参考对象是Spring Cloud Stream的自身,从Stream发布信息就是输出,接受消息就是输入

### 实战
Maven 依赖
```
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
```
消息生产(stream-server)
1.yml
```
server:
port: 9010
spring:
application:
name: stream-server
#rabbit相关配置 15672是web管理界面的端口,5672是MQ访问的端口
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
cloud:
stream:
bindings: # 服务的整合处理
myoutput: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
binders: # 在此处配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于binding的整合
type: rabbit # 消息中间件类型
environment: # 设置rabbitMQ的相关环境配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
```
2.定义管道
```
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
/**
* 消息发送管道
*/
@Autowired
private MyProcessor processor;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
processor.output().send(MessageBuilder.withPayload(serial).build());
System.out.println("serial = " + serial);
return "ok";
}
}
```
2.发送消息
```
@RestController
@EnableBinding(MyProcessor.class)
public class SendMessageController {
@Resource
private IMessageProvider messageProvider;
@GetMapping("/sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}
```
消息消费(stream-consumer1/2)
进行监听
```
@Component
@EnableBinding(MyProcessor.class)
@Slf4j
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(MyProcessor.INPUT)
@SendTo(MyProcessor.CALLBACKINPUT)
public String process(String message){
log.info("消费者2号,------->接收到的消息: "+message+"\t port: "+serverPort);
return message;
}
@StreamListener(MyProcessor.CALLBACKINPUT)
public void callback(String message){
log.info("message has recived : {} ", message);
}
}
```
关于Processor类请看源码
## 获取源码公众号回复: Stream


Spring Could学习之 Stream