前言

JAVA原生代码调用

  1. 创建了一个ConnectionFactory来指定与RabbitMQ服务器的连接信息
  2. 通过factory.newConnection()获取一个连接对象。
  3. 在连接上创建一个通道(channel)并声明一个队列。
  4. 向队列中发布消息/接收消息
  5. 关闭通道和连接(生产者)

引入Maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
<!-- amqp clien -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<!-- commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>

RabittMQ连接工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import com.rabbitmq.client.*;
/**
* RabbitMQ连接工具类
*/
public class RabbitMQConnectionUtil {

public static final String RABBITMQ_HOST = "192.168.1.100"; // MQ服务端的IP
public static final int RABBITMQ_PORT = 5672; // 端口号(默认5672,页面访问的端口15672,不要搞混淆)
public static final String RABBITMQ_USERNAME = "admin"; // 用户名
public static final String RABBITMQ_PASSWORD = "123456"; // 密码
public static final String RABBITMQ_VIRTUAL_HOST = "/"; // vhost
public static final int RABBITMQ_TIMEOUT = 30000; // 超时时间

/**
* 构建RabbitMQ的连接对象
* @return
*/
public static Connection getConnection() throws Exception {
//1. 创建Connection工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置RabbitMQ的连接信息
factory.setHost(RABBITMQ_HOST);
factory.setPort(RABBITMQ_PORT);
factory.setUsername(RABBITMQ_USERNAME);
factory.setPassword(RABBITMQ_PASSWORD);
factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
factory.setHandshakeTimeout(RABBITMQ_TIMEOUT);
//3. 返回连接对象
Connection connection = factory.newConnection();
return connection;
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import com.rabbitmq.client.*;
import org.junit.Test;

/**
* 生产者
*/
public class Producer {
/**
* 消息队列名,生产者与消费者队列名需要一致
*/
public static final String QUEUE_NAME = "myQueue";

@Test
public void producer() {
try{
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2. 构建Channel
Channel channel = connection.createChannel();
//3. 构建队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//4. 发布消息
String message = "This is a MQ message:" + System.currentTimeMillis();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [消息队列:生产者]<" + QUEUE_NAME + "> 发送 '" + message + "'成功");
// 关闭通道和连接
channel.close();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import com.rabbitmq.client.*;
import java.io.IOException;
import org.junit.Test;

/**
* 消费者
*/
public class Consumer {

/**
* 消息队列名,生产者与消费者队列名需要一致
*/
public static final String QUEUE_NAME = "myQueue";

@Test
public void consume() {
try{
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2. 构建Channel
Channel channel = connection.createChannel();
//3. 构建队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// todo 业务逻辑区
// System.out.println("消费者接收消息:" + new String(body,"UTF-8"));
System.out.println(" [消息队列:消费者]<" + QUEUE_NAME + "> 接收 '" + new String(body,"UTF-8") + "'成功");
}
};
channel.basicConsume(Producer.QUEUE_NAME, true, callback);
System.out.println("-------- [消息队列:消费者]<" + QUEUE_NAME + ">持续监听中 --------");
System.in.read();
}catch (Exception e){
e.printStackTrace();
}
}
}

属性解析

生产者/消费者中的【channel.queueDeclare()】函数
1
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
属性类型含义补充
queueString队列的名称补充说明
durableboolean是否持久化RabbitMQ重启后队列不会丢失。
RabbitMQ退出时它会将队列信息保存到Erlang自带的Mnesia数据库中,当RabbitMQ重启之后会读取该数据库
exclusiveboolean是否排外的是否限制其他通道对该队列的访问
排外队列:仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除
非排外队列:不同连接(Connection)的管道Channel可以使用该队列
autoDeleteboolean是否自动删除当最后一个消费者断开连接时,是否删除队列
当所有消费者都与这个队列断开连接时,这个队列会自动删除
但没有消费者连接消费时,该队列是不会自动删除
argumentsMap可选参数例:x-rnessage-ttlx-expiresx-rnax-length

关于上面属性有几个注意点

  1. 【exclusive属性】:排他队列是基于连接(Connection)可见的,同个连接(Connection)的不同管道(Channel)是可以同时访问同一连接创建的排他队列 。其他连接是访问不了的。
  2. 【exclusive属性】:”首次“是指如果某个连接(Connection)已经声明了排他队列,其他连接是不允许建立同名的排他队列的。这个与普通队列不同:即使该队列是持久化的(durable = true),一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
  3. 【arguments属性】可以参考下面的Map代码使用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    Map<String, Object> arguments = new HashMap<>();
    // 消息条数限制,该参数是非负整数值。限制加入queue中消息的条数。先进先出原则,超过10条后面的消息会顶替前面的消息。
    arguments.put("x-max-length", 10);
    // 消息容量限制,该参数是非负整数值。该参数和x-max-length目的一样限制队列的容量,但是这个是靠队列大小(bytes)来达到限制。
    arguments.put("x-max-length-bytes", 1024);
    /**
    * 消息存活时间,该参数是非负整数值.创建queue时设置该参数可指定消息在该queue中待多久,
    * 可根据x-dead-letter-routing-key和x-dead-letter-exchange生成可延迟的死信队列。
    */
    arguments.put("x-message-ttl", 10000);''
    /**
    * 消息优先级,创建queue时arguments可以使用x-max-priority参数声明优先级队列 。该参数应该是一个整数,表示队列应该支持的最大优先级。
    * ​​建议使用1到10之间。目前使用更多的优先级将消耗更多的资源(Erlang进程)。
    * 设置该参数同时设置死信队列时或造成已过期的低优先级消息会在未过期的高优先级消息后面执行。
    * 该参数会造成额外的CPU消耗。
    */
    arguments.put("x-max-priority", 5);
    /**
    * 存活时间,创建queue时参数arguments设置了x-expires参数,该queue会在x-expires到期后queue消息,
    * 亲身测试直接消失(哪怕里面有未消费的消息)。
    */
    arguments.put("x-expires", 60000);
    /**
    * 创建queue时参数arguments设置了x-dead-letter-routing-key和x-dead-letter-exchange,
    * 会在x-message-ttl时间到期后把消息放到x-dead-letter-routing-key和x-dead-letter-exchange指定的队列中达到延迟队列的目的。
    */
    arguments.put("x-dead-letter-exchange", "TopExchangeName");
    // 这里的routing-key也可以是队列名称,当消息过期后会转发到这个exchange对应的routing-key,达到延时队列效果
    arguments.put("x-dead-letter-routing-key", "ttl.*.value");
生产者的【channel.basicPublish()】函数
1
channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
属性类型含义
exchangeString交换机名称(简单模式下交换机使用默认)
routingKeyString路由名称(简单模式下使用对列名)
propsBasicProperties消息配置信息(可配置消息持久化)
bodybyte[]发送消息数据

Spring Boot AMQP调用

Spring Boot已经提供了对 AMQP 协议完全支持的 spring-boot-starter-amqp 依赖,引入此依赖即可快速方便的在 SpringBoot 中使用 RabbitMQ。参考:Spring AMQP

优点

  • 该调用方式引入依赖spring-boot-starter-amqp,对RabbitMQ的使用进行了进一步的封装
  • 通过这种方式使用集成到Spring Boot中的RabbitMQ时,我们不再关心ConnectChannel的创建,Spring Boot会替我们创建好
  • 我们要做的,只是通过注解的方式创建ExchangeQueueBind对象,并把他们交给Spring IOC进行管理,然后Spring Boot又会自动生成这些对象对应的交换机队列绑定

引入Mave依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

修改配置文件

配置类中的sprig.rabbitmq.listener.simple.acknowledge-mode模式有三种:none、auto、manual

  • none:默认值,自动确认消息
  • manual:手动确认消息
  • auto:根据异常情况确认消息

自动确认消息是指当消息一旦被消费者接收到则自动确认收到,并将消息从RabbitMQ的消息缓存中移除。
但在实际业务中,有可能消息接收到了,业务却出现异常,那该消息就会丢失。
所以一般采用手动确认方式,当业务处理成功后,调用channel.basicACK();如果出现异常则调用channel.basicNack(),让其自动重新发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
spring:
rabbitmq:
host: 192.168.1.100
port: 5672
username: guest
password: guest
virtual-host: /
# 支持发布确认
publisher-confirms: true
# 支持发布返回
publisher-returns: true
listener:
simple:
# 采用手动应答
acknowledge-mode: manual
# 当前监听容器数
concurrency: 1
# 最大数
max-concurrency: 1
# 是否支持重试
retry:
enabled: true

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Rabbitmq配置类
*
* @ClassName RabbitmqConfig
* @Description TODO
*/
@Configuration
public class RabbitmqConfig {
// 定义 队列名、路由名、交换机名
public static final String QUEUE_NAME = "myQueue";
public static final String EXCHANGE_NAME = "myExchange";
public static final String ROUTING_KEY = "myRoutingKey";

/**
* 声明交换机
* @return
*/
@Bean(EXCHANGE_NAME)
public Exchange EXCHANGE_NAME(){
// durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}

/**
* 声明QUEUE_NAME队列
*/
@Bean(QUEUE_NAME)
public Queue QUEUE_NAME(){
return new Queue(QUEUE_NAME);
}

/**
* 队列绑定交换机
* ROUTINGKEY_SMS 队列绑定交换机,指定routingKey
* @param queue 队列
* @param exchange 交换机
* @return
*/
@Bean
public Binding BINDING_ROUTINGKEY(@Qualifier(QUEUE_NAME) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
}
}

生产者

生产者,需要特定情况下主动调用请求
ps:这里暂时使用 @SpringBootTest@RunWith(SpringRunner.class)@Test 注解进行测试代替了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.junit4.SpringRunner;

/**
* 生产者
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class Prodcer {

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessage() {
// 使用rabbitTemplate发送消息
String message = "这是一条MQ消息:" + System.currentTimeMillis();
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, message);
System.out.println("[消息队列:生产者]<" + RabbitmqConfig.QUEUE_NAME + "> 发送'" + message + "'成功");
}
}

消费者

消费者,使用了@RabbitListener注解,实现自动监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 消费者
*/
@Component
public class Consumer {

/**
* 监听 短信队列sms
* @param msg
* @param message
* @param channel
*/
@RabbitListener(queues = {RabbitmqConfig.QUEUE_NAME})
public void receive_sms(Object msg, Message message, Channel channel) throws IOException {
// 消息的唯一标识id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try{
//todo 业务逻辑区
System.out.println("[消息队列:消费者]<" + RabbitmqConfig.QUEUE_NAME + ">接收'" + msg + "'成功");
// channel.basicAck(唯一标识id, 是否批量签收)
channel.basicAck(deliveryTag, true);
}catch (Exception e){
// channel.basicNack(唯一标识id, 是否批量签收, 是否重回队列);
channel.basicNack(deliveryTag, true, true);
System.out.println("[消息队列:消费者]<" + RabbitmqConfig.QUEUE_NAME + ">接收'" + msg + "'失败");
}
}
}

代码执行

先启动SpringBoot,从而使消费者监听启动
然后调用接口,触发生产者逻辑(此文章是直接进行的单元测试)

Spring Cloud Stream调用

xxx

xxx

xxx