RabbitMQ
RabbitMQ是部署最广泛的开源消息代理。
RabbitMQ拥有成千上万的用户,是最受欢迎的开源消息代理之一。从T-Mobile 到Runtastic,RabbitMQ在全球范围内的小型初创企业和大型企业中都得到使用。
RabbitMQ轻巧,易于在内部和云中部署。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。
RabbitMQ可在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发人员工具。
Spring Boot 整合 RabbitMQ
配置
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
spring:
rabbitmq:
host: 127.0.0.1
username: guest
password: guest
一对一:一个发送者一个接收者
队列配置
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabvitConfig {
@Bean
public Queue queue() {
return new Queue("hello");
}
}
发送
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
String context = "hello---" + LocalDateTime.now();
System.out.println("context = " + context);
amqpTemplate.convertAndSend("hello", context);
}
}
接收
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
// 消息处理器
@RabbitHandler
public void process(String message) {
System.out.println("message = " + message);
}
}
测试
import cn.sakura521.redis_demo.demo.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RedisDemoApplicationTests {
@Autowired
private HelloSender helloSender;
@Test
void contextLoads() {
helloSender.send();
}
}
测试结果
context = hello---2020-06-14T15:41:22.320
message = hello---2020-06-14T15:41:22.320
一对多:发送一个发送者多个接收者
添加队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabvitConfig {
@Bean
public Queue queue() {
return new Queue("hello");
}
@Bean
public Queue queue2() {
return new Queue("hello2");
}
}
发送
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send2(int i) {
String context = i + "";
System.out.println(context + "----send");
amqpTemplate.convertAndSend("hello2", context);
}
}
接收
两个接收者
接收者1
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver1 {
@RabbitHandler
public void process(String message){
System.out.println("HelloReceiver1 = " + message);
}
}
接收者2
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2 {
@RabbitHandler
public void process(String message){
System.out.println("HelloReceiver2 = " + message);
}
}
测试
import cn.sakura521.redis_demo.demo.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RedisDemoApplicationTests {
@Autowired
private HelloSender helloSender;
@Test
void mangReceiver() {
for (int i = 0; i < 25; i++) {
helloSender.send2(i);
}
}
}
测试结果
多次测试每次测试显示均不一样。
0----send
1----send
2----send
3----send
4----send
5----send
6----send
7----send
8----send
9----send
10----send
11----send
12----send
13----send
14----send
15----send
16----send
17----send
18----send
19----send
20----send
21----send
22----send
23----send
24----send
HelloReceiver1 = 0
HelloReceiver1 = 2
HelloReceiver1 = 4
HelloReceiver1 = 6
HelloReceiver2 = 1
HelloReceiver1 = 8
HelloReceiver2 = 3
HelloReceiver2 = 5
HelloReceiver2 = 7
HelloReceiver1 = 10
HelloReceiver1 = 12
HelloReceiver1 = 14
HelloReceiver1 = 16
多对多:多个发送者对多个接收者
更改上一个测试用例
测试
import cn.sakura521.redis_demo.demo.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RedisDemoApplicationTests {
@Autowired
private HelloSender helloSender;
@Test
void mangReceiver() {
for (int i = 0; i < 25; i++) {
helloSender.send();
helloSender.send2(i);
}
}
}
测试结果
context = hello---2020-06-14T17:16:44.640
0----send
context = hello---2020-06-14T17:16:44.647
1----send
context = hello---2020-06-14T17:16:44.649
2----send
context = hello---2020-06-14T17:16:44.649
3----send
context = hello---2020-06-14T17:16:44.649
4----send
context = hello---2020-06-14T17:16:44.649
5----send
context = hello---2020-06-14T17:16:44.650
6----send
context = hello---2020-06-14T17:16:44.656
7----send
context = hello---2020-06-14T17:16:44.657
8----send
context = hello---2020-06-14T17:16:44.657
9----send
context = hello---2020-06-14T17:16:44.658
10----send
context = hello---2020-06-14T17:16:44.658
11----send
context = hello---2020-06-14T17:16:44.659
12----send
context = hello---2020-06-14T17:16:44.660
13----send
context = hello---2020-06-14T17:16:44.660
14----send
context = hello---2020-06-14T17:16:44.660
15----send
context = hello---2020-06-14T17:16:44.661
16----send
context = hello---2020-06-14T17:16:44.663
17----send
context = hello---2020-06-14T17:16:44.663
18----send
context = hello---2020-06-14T17:16:44.668
19----send
context = hello---2020-06-14T17:16:44.669
20----send
context = hello---2020-06-14T17:16:44.670
21----send
context = hello---2020-06-14T17:16:44.671
22----send
context = hello---2020-06-14T17:16:44.671
23----send
context = hello---2020-06-14T17:16:44.672
24----send
HelloReceiver2 = 1
HelloReceiver = hello---2020-06-14T17:16:44.640
HelloReceiver1 = 0
HelloReceiver2 = 3
HelloReceiver = hello---2020-06-14T17:16:44.647
HelloReceiver2 = 5
HelloReceiver1 = 2
HelloReceiver = hello---2020-06-14T17:16:44.649
HelloReceiver2 = 7
HelloReceiver1 = 4
HelloReceiver2 = 9
HelloReceiver = hello---2020-06-14T17:16:44.649
HelloReceiver1 = 6
HelloReceiver2 = 11
HelloReceiver1 = 8
HelloReceiver2 = 13
HelloReceiver1 = 10
HelloReceiver = hello---2020-06-14T17:16:44.649
HelloReceiver = hello---2020-06-14T17:16:44.649
HelloReceiver2 = 15
HelloReceiver = hello---2020-06-14T17:16:44.650
HelloReceiver = hello---2020-06-14T17:16:44.656
HelloReceiver1 = 12
HelloReceiver = hello---2020-06-14T17:16:44.657
HelloReceiver1 = 14
HelloReceiver = hello---2020-06-14T17:16:44.657
HelloReceiver = hello---2020-06-14T17:16:44.658
HelloReceiver = hello---2020-06-14T17:16:44.658
HelloReceiver = hello---2020-06-14T17:16:44.659
HelloReceiver = hello---2020-06-14T17:16:44.660
HelloReceiver = hello---2020-06-14T17:16:44.660
HelloReceiver = hello---2020-06-14T17:16:44.660
HelloReceiver = hello---2020-06-14T17:16:44.661
HelloReceiver = hello---2020-06-14T17:16:44.663
HelloReceiver1 = 16
HelloReceiver2 = 17
HelloReceiver1 = 18
HelloReceiver1 = 20
HelloReceiver1 = 22
HelloReceiver2 = 19
HelloReceiver = hello---2020-06-14T17:16:44.663
HelloReceiver1 = 24
HelloReceiver2 = 21
HelloReceiver = hello---2020-06-14T17:16:44.668
HelloReceiver2 = 23
HelloReceiver = hello---2020-06-14T17:16:44.669
HelloReceiver = hello---2020-06-14T17:16:44.670
HelloReceiver = hello---2020-06-14T17:16:44.671
HelloReceiver = hello---2020-06-14T17:16:44.671
HelloReceiver = hello---2020-06-14T17:16:44.672
发送对象
创建对象
首先创建对象User,并实现Serializable接口
import java.io.Serializable;
public class User implements Serializable {
private String username;
private String password;
public User(String username, String password) {
this.username = username;
this.password = password;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
}
创建队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabvitConfig {
@Bean
public Queue queue3() {
return new Queue("object_queue");
}
}
发送
import cn.sakura521.redis_demo.pojo.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ObjectSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendUser(User user) {
System.out.println("sendUser = " + user);
amqpTemplate.convertAndSend("object_queue", user);
}
}
接收
import cn.sakura521.redis_demo.pojo.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver {
@RabbitHandler
public void objectReceiver(User user) {
System.out.println("objectReceiver = " + user);
}
}
测试
import cn.sakura521.redis_demo.demo.HelloSender;
import cn.sakura521.redis_demo.demo.ObjectSender;
import cn.sakura521.redis_demo.pojo.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RedisDemoApplicationTests {
@Autowired
private ObjectSender objectSender;
@Test
void sendUser() {
User user = new User("小坏孩", "123456");
objectSender.sendUser(user);
}
}
测试结果
sendUser = User{username='小坏孩', password='123456'}
objectReceiver = User{username='小坏孩', password='123456'}
Topic Exchange
topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列
topic规则配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
@Bean
public Queue queueMsg() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMsgs() {
return new Queue(TopicRabbitConfig.messages);
}
// 配置 TopicExchange,指定名称为 topicExchange
@Bean
public TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
// 给队列绑定exchange和routing_key
@Bean
public Binding bindingExchangeMessage(Queue queueMsg, TopicExchange exchange) {
return BindingBuilder.bind(queueMsg).to(exchange).with("topic.message");
}
@Bean
public Binding bindingExchangeMessages(Queue queueMsgs, TopicExchange exchange) {
return BindingBuilder.bind(queueMsgs).to(exchange).with("topic.#");
}
}
发送
消息发送者:都是用topicExchange,并且绑定到不同的 routing_key
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send1() {
String context = "消息1";
System.out.println("Sender1 = " + context);
amqpTemplate.convertAndSend("topicExchange", "topic.message", context);
}
public void send2() {
String context = "消息2";
System.out.println("Sender2 = " + context);
amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
}
接收
接收1
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {
@RabbitHandler
public void process(String message){
System.out.println("topic.message = " + message);
}
}
接收2
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message){
System.out.println("topic.messages = " + message);
}
}
测试
package cn.sakura521.redis_demo;
import cn.sakura521.redis_demo.demo.HelloSender;
import cn.sakura521.redis_demo.demo.ObjectSender;
import cn.sakura521.redis_demo.demo.TopicSender;
import cn.sakura521.redis_demo.pojo.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RedisDemoApplicationTests {
@Autowired
private TopicSender topicSender;
@Test
void topicSender() {
topicSender.send1();
System.out.println("---------------------------------------");
topicSender.send2();
}
}
测试结果
Sender1 = 消息1
---------------------------------------
Sender2 = 消息2
topic.messages = 消息1
topic.message = 消息1
topic.messages = 消息2
Fanout Exchange
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanQutRabbitMq {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
@Bean
public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
public Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
发送
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
String context = "hi,fanout.msg";
System.out.println("Sender = " + context);
//这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
amqpTemplate.convertAndSend("fanoutExchange", "", context);
}
}
接收
接收者A
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.A: " + message);
}
}
接收者B
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.B: " + message);
}
}
接收者C
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.C: " + message);
}
}
测试
package cn.sakura521.redis_demo;
import cn.sakura521.redis_demo.demo.FanoutSender;
import cn.sakura521.redis_demo.demo.HelloSender;
import cn.sakura521.redis_demo.demo.ObjectSender;
import cn.sakura521.redis_demo.demo.TopicSender;
import cn.sakura521.redis_demo.pojo.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RedisDemoApplicationTests {
@Autowired
private FanoutSender fanoutSender;
@Test
void fanoutSender(){
fanoutSender.send();
}
}
测试结果
Sender = hi,fanout.msg
Receiver form fanout.B: hi,fanout.msg
Receiver form fanout.C: hi,fanout.msg
Receiver form fanout.A: hi,fanout.msg
最终全部测试用例
import cn.sakura521.redis_demo.demo.FanoutSender;
import cn.sakura521.redis_demo.demo.HelloSender;
import cn.sakura521.redis_demo.demo.ObjectSender;
import cn.sakura521.redis_demo.demo.TopicSender;
import cn.sakura521.redis_demo.pojo.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RedisDemoApplicationTests {
@Autowired
private HelloSender helloSender;
@Autowired
private ObjectSender objectSender;
@Autowired
private TopicSender topicSender;
@Autowired
private FanoutSender fanoutSender;
@Test
void contextLoads() {
helloSender.send();
}
@Test
void mangReceiver() {
for (int i = 0; i < 25; i++) {
helloSender.send();
helloSender.send2(i);
}
}
@Test
void sendUser() {
User user = new User("小坏孩", "123456");
objectSender.sendUser(user);
}
@Test
void topicSender() {
topicSender.send1();
System.out.println("---------------------------------------");
topicSender.send2();
}
@Test
void fanoutSender(){
fanoutSender.send();
}
}