说明
以下的方案仅仅是针对阿里云的kafka,因为我们阿里云的kafka没暴露zookeeper,无法通过spring cloud stream binder的方式连接,如果需要通过这种方式,zookeeper要确保是能连上的,否则只能通过以下的方式。
附: 这是官方文档 https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.2.RELEASE/reference/html/index.html
一 application.yaml的配置如下
1 | spring: |
二 rabbitmq与kafka的maven依赖如下
1 | <dependency> |
三 Java代码
rabbitmq示例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public interface XxxChannel {
String INPUT = "xxx_input01";
String OUTPUT = "xxx_output01";
(XxxChannel.INPUT)
SubscribableChannel input();
(OUTPUT)
MessageChannel output();
}
(XxxChannel.class)
public class XxxConsumer {
(value = XxxChannel.INPUT)
public void handle(String msg) {
...
}
}
(XxxChannel.class)
public class XxxPublisher {
private XxxChannel xxxChannel;
/**
* 发送到mq
* @param msg
*/
public void send(String msg) {
//发送到mq
xxxChannel.output().send(MessageBuilder.withPayload(msg).build());
}
}
kafka示例如下:1
2
3
4
5
6
7
8
9
10
11
12
13// 收
"kafka_xxx_envents") (topics =
public void receive(String msg) {
// kafka消息处理逻辑
}
// 发
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String msg) {
kafkaTemplate.send("kafka_xxx_envents", msg);
}