RabbitMQ

官网:https://www.rabbitmq.com/

RabbitMQ是部署最广泛的开源消息代理。

RabbitMQ拥有成千上万的用户,是最受欢迎的开源消息代理之一。从T-MobileRuntastic,RabbitMQ在全球范围内的小型初创企业和大型企业中都得到使用。

RabbitMQ轻巧,易于在内部和云中部署。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。

RabbitMQ可在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发人员工具

Spring Boot 整合 RabbitMQ

配置

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

spring:
  rabbitmq:
    host: 127.0.0.1
    username: guest
    password: guest

一对一:一个发送者一个接收者

队列配置

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabvitConfig {

    @Bean
    public Queue queue() {
        return new Queue("hello");
    }

}

发送

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send() {
        String context = "hello---" + LocalDateTime.now();
        System.out.println("context = " + context);
        amqpTemplate.convertAndSend("hello", context);
    }


}

接收

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    // 消息处理器
    @RabbitHandler
    public void process(String message) {
        System.out.println("message = " + message);
    }

}

测试

import cn.sakura521.redis_demo.demo.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RedisDemoApplicationTests {

    @Autowired
    private HelloSender helloSender;

    @Test
    void contextLoads() {
        helloSender.send();
    }

}

测试结果

context = hello---2020-06-14T15:41:22.320
message = hello---2020-06-14T15:41:22.320

一对多:发送一个发送者多个接收者

添加队列

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabvitConfig {


    @Bean
    public Queue queue() {
        return new Queue("hello");
    }

    @Bean
    public Queue queue2() {
        return new Queue("hello2");
    }

}

发送

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send2(int i) {
        String context = i + "";
        System.out.println(context + "----send");
        amqpTemplate.convertAndSend("hello2", context);
    }


}

接收

两个接收者

接收者1

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver1 {

    @RabbitHandler
    public void process(String message){
        System.out.println("HelloReceiver1 = " + message);
    }

}

接收者2

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2 {

    @RabbitHandler
    public void process(String message){
        System.out.println("HelloReceiver2 = " + message);
    }

}

测试

import cn.sakura521.redis_demo.demo.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RedisDemoApplicationTests {

    @Autowired
    private HelloSender helloSender;

    @Test
    void mangReceiver() {
        for (int i = 0; i < 25; i++) {
            helloSender.send2(i);
        }
    }

}

测试结果

多次测试每次测试显示均不一样。

0----send
1----send
2----send
3----send
4----send
5----send
6----send
7----send
8----send
9----send
10----send
11----send
12----send
13----send
14----send
15----send
16----send
17----send
18----send
19----send
20----send
21----send
22----send
23----send
24----send
HelloReceiver1 = 0
HelloReceiver1 = 2
HelloReceiver1 = 4
HelloReceiver1 = 6
HelloReceiver2 = 1
HelloReceiver1 = 8
HelloReceiver2 = 3
HelloReceiver2 = 5
HelloReceiver2 = 7
HelloReceiver1 = 10
HelloReceiver1 = 12
HelloReceiver1 = 14
HelloReceiver1 = 16

多对多:多个发送者对多个接收者

更改上一个测试用例

测试

import cn.sakura521.redis_demo.demo.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RedisDemoApplicationTests {

    @Autowired
    private HelloSender helloSender;

    @Test
    void mangReceiver() {
        for (int i = 0; i < 25; i++) {
            helloSender.send();
            helloSender.send2(i);
        }
    }

}

测试结果

context = hello---2020-06-14T17:16:44.640
0----send
context = hello---2020-06-14T17:16:44.647
1----send
context = hello---2020-06-14T17:16:44.649
2----send
context = hello---2020-06-14T17:16:44.649
3----send
context = hello---2020-06-14T17:16:44.649
4----send
context = hello---2020-06-14T17:16:44.649
5----send
context = hello---2020-06-14T17:16:44.650
6----send
context = hello---2020-06-14T17:16:44.656
7----send
context = hello---2020-06-14T17:16:44.657
8----send
context = hello---2020-06-14T17:16:44.657
9----send
context = hello---2020-06-14T17:16:44.658
10----send
context = hello---2020-06-14T17:16:44.658
11----send
context = hello---2020-06-14T17:16:44.659
12----send
context = hello---2020-06-14T17:16:44.660
13----send
context = hello---2020-06-14T17:16:44.660
14----send
context = hello---2020-06-14T17:16:44.660
15----send
context = hello---2020-06-14T17:16:44.661
16----send
context = hello---2020-06-14T17:16:44.663
17----send
context = hello---2020-06-14T17:16:44.663
18----send
context = hello---2020-06-14T17:16:44.668
19----send
context = hello---2020-06-14T17:16:44.669
20----send
context = hello---2020-06-14T17:16:44.670
21----send
context = hello---2020-06-14T17:16:44.671
22----send
context = hello---2020-06-14T17:16:44.671
23----send
context = hello---2020-06-14T17:16:44.672
24----send
HelloReceiver2 = 1
HelloReceiver = hello---2020-06-14T17:16:44.640
HelloReceiver1 = 0
HelloReceiver2 = 3
HelloReceiver = hello---2020-06-14T17:16:44.647
HelloReceiver2 = 5
HelloReceiver1 = 2
HelloReceiver = hello---2020-06-14T17:16:44.649
HelloReceiver2 = 7
HelloReceiver1 = 4
HelloReceiver2 = 9
HelloReceiver = hello---2020-06-14T17:16:44.649
HelloReceiver1 = 6
HelloReceiver2 = 11
HelloReceiver1 = 8
HelloReceiver2 = 13
HelloReceiver1 = 10
HelloReceiver = hello---2020-06-14T17:16:44.649
HelloReceiver = hello---2020-06-14T17:16:44.649
HelloReceiver2 = 15
HelloReceiver = hello---2020-06-14T17:16:44.650
HelloReceiver = hello---2020-06-14T17:16:44.656
HelloReceiver1 = 12
HelloReceiver = hello---2020-06-14T17:16:44.657
HelloReceiver1 = 14
HelloReceiver = hello---2020-06-14T17:16:44.657
HelloReceiver = hello---2020-06-14T17:16:44.658
HelloReceiver = hello---2020-06-14T17:16:44.658
HelloReceiver = hello---2020-06-14T17:16:44.659
HelloReceiver = hello---2020-06-14T17:16:44.660
HelloReceiver = hello---2020-06-14T17:16:44.660
HelloReceiver = hello---2020-06-14T17:16:44.660
HelloReceiver = hello---2020-06-14T17:16:44.661
HelloReceiver = hello---2020-06-14T17:16:44.663
HelloReceiver1 = 16
HelloReceiver2 = 17
HelloReceiver1 = 18
HelloReceiver1 = 20
HelloReceiver1 = 22
HelloReceiver2 = 19
HelloReceiver = hello---2020-06-14T17:16:44.663
HelloReceiver1 = 24
HelloReceiver2 = 21
HelloReceiver = hello---2020-06-14T17:16:44.668
HelloReceiver2 = 23
HelloReceiver = hello---2020-06-14T17:16:44.669
HelloReceiver = hello---2020-06-14T17:16:44.670
HelloReceiver = hello---2020-06-14T17:16:44.671
HelloReceiver = hello---2020-06-14T17:16:44.671
HelloReceiver = hello---2020-06-14T17:16:44.672

发送对象

创建对象

首先创建对象User,并实现Serializable接口

import java.io.Serializable;

public class User implements Serializable {

    private String username;
    private String password;

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

创建队列

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabvitConfig {

    @Bean
    public Queue queue3() {
        return new Queue("object_queue");
    }
}

发送

import cn.sakura521.redis_demo.pojo.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ObjectSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendUser(User user) {
        System.out.println("sendUser = " + user);
        amqpTemplate.convertAndSend("object_queue", user);
    }

}

接收

import cn.sakura521.redis_demo.pojo.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver {

    @RabbitHandler
    public void objectReceiver(User user) {
        System.out.println("objectReceiver = " + user);
    }

}

测试

import cn.sakura521.redis_demo.demo.HelloSender;
import cn.sakura521.redis_demo.demo.ObjectSender;
import cn.sakura521.redis_demo.pojo.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RedisDemoApplicationTests {

    @Autowired
    private ObjectSender objectSender;

    @Test
    void sendUser() {
        User user = new User("小坏孩", "123456");
        objectSender.sendUser(user);
    }

}

测试结果

sendUser = User{username='小坏孩', password='123456'}

objectReceiver = User{username='小坏孩', password='123456'}

Topic Exchange

topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

topic规则配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicRabbitConfig {

    final static String message = "topic.message";
    final static String messages = "topic.messages";


    @Bean
    public Queue queueMsg() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMsgs() {
        return new Queue(TopicRabbitConfig.messages);
    }

    // 配置 TopicExchange,指定名称为 topicExchange
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    // 给队列绑定exchange和routing_key
    @Bean
    public Binding bindingExchangeMessage(Queue queueMsg, TopicExchange exchange) {
        return BindingBuilder.bind(queueMsg).to(exchange).with("topic.message");
    }

    @Bean
    public Binding bindingExchangeMessages(Queue queueMsgs, TopicExchange exchange) {
        return BindingBuilder.bind(queueMsgs).to(exchange).with("topic.#");
    }

}

发送

消息发送者:都是用topicExchange,并且绑定到不同的 routing_key

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send1() {
        String context = "消息1";
        System.out.println("Sender1 = " + context);
        amqpTemplate.convertAndSend("topicExchange", "topic.message", context);
    }

    public void send2() {
        String context = "消息2";
        System.out.println("Sender2 = " + context);
        amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }

}

接收

接收1

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {

    @RabbitHandler
    public void process(String message){
        System.out.println("topic.message = " + message);
    }

}

接收2

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {

    @RabbitHandler
    public void process(String message){
        System.out.println("topic.messages = " + message);
    }

}

测试

package cn.sakura521.redis_demo;

import cn.sakura521.redis_demo.demo.HelloSender;
import cn.sakura521.redis_demo.demo.ObjectSender;
import cn.sakura521.redis_demo.demo.TopicSender;
import cn.sakura521.redis_demo.pojo.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RedisDemoApplicationTests {

    @Autowired
    private TopicSender topicSender;

    @Test
    void topicSender() {
        topicSender.send1();
        System.out.println("---------------------------------------");
        topicSender.send2();
    }

}

测试结果

Sender1 = 消息1
---------------------------------------
Sender2 = 消息2

topic.messages = 消息1
topic.message = 消息1
topic.messages = 消息2

Fanout Exchange

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FanQutRabbitMq {

    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }


    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }


    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    //分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
    @Bean
    public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }


}

发送

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutSender {


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send() {
        String context = "hi,fanout.msg";
        System.out.println("Sender = " + context);
        //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
        amqpTemplate.convertAndSend("fanoutExchange", "", context);
    }

}

接收

接收者A

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver form fanout.A: " + message);
    }

}

接收者B

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver form fanout.B: " + message);
    }

}

接收者C

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver form fanout.C: " + message);
    }

}

测试

package cn.sakura521.redis_demo;

import cn.sakura521.redis_demo.demo.FanoutSender;
import cn.sakura521.redis_demo.demo.HelloSender;
import cn.sakura521.redis_demo.demo.ObjectSender;
import cn.sakura521.redis_demo.demo.TopicSender;
import cn.sakura521.redis_demo.pojo.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RedisDemoApplicationTests {

    @Autowired
    private FanoutSender fanoutSender;

    @Test
    void fanoutSender(){
        fanoutSender.send();
    }

}

测试结果

Sender = hi,fanout.msg

Receiver form fanout.B: hi,fanout.msg
Receiver form fanout.C: hi,fanout.msg
Receiver form fanout.A: hi,fanout.msg

最终全部测试用例

import cn.sakura521.redis_demo.demo.FanoutSender;
import cn.sakura521.redis_demo.demo.HelloSender;
import cn.sakura521.redis_demo.demo.ObjectSender;
import cn.sakura521.redis_demo.demo.TopicSender;
import cn.sakura521.redis_demo.pojo.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RedisDemoApplicationTests {

    @Autowired
    private HelloSender helloSender;

    @Autowired
    private ObjectSender objectSender;

    @Autowired
    private TopicSender topicSender;

    @Autowired
    private FanoutSender fanoutSender;

    @Test
    void contextLoads() {
        helloSender.send();
    }

    @Test
    void mangReceiver() {
        for (int i = 0; i < 25; i++) {
            helloSender.send();
            helloSender.send2(i);
        }
    }

    @Test
    void sendUser() {
        User user = new User("小坏孩", "123456");
        objectSender.sendUser(user);
    }

    @Test
    void topicSender() {
        topicSender.send1();
        System.out.println("---------------------------------------");
        topicSender.send2();
    }

    @Test
    void fanoutSender(){
        fanoutSender.send();
    }

}

The best preparation for tomorrow is doing your best today