前言
JAVA原生代码调用
- 创建了一个
ConnectionFactory
来指定与RabbitMQ服务器
的连接信息
- 通过
factory.newConnection()
获取一个连接对象。
- 在连接上创建一个通道(
channel
)并声明一个队列。
- 向队列中发布消息/接收消息
- 关闭通道和连接(生产者)
引入Maven依赖
1 2 3 4 5 6 7 8 9 10 11 12
| <!-- amqp clien --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency> <!-- commons-io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.11.0</version> </dependency>
|
RabittMQ连接工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import com.rabbitmq.client.*;
public class RabbitMQConnectionUtil {
public static final String RABBITMQ_HOST = "192.168.1.100"; public static final int RABBITMQ_PORT = 5672; public static final String RABBITMQ_USERNAME = "admin"; public static final String RABBITMQ_PASSWORD = "123456"; public static final String RABBITMQ_VIRTUAL_HOST = "/"; public static final int RABBITMQ_TIMEOUT = 30000;
public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(RABBITMQ_HOST); factory.setPort(RABBITMQ_PORT); factory.setUsername(RABBITMQ_USERNAME); factory.setPassword(RABBITMQ_PASSWORD); factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST); factory.setHandshakeTimeout(RABBITMQ_TIMEOUT); Connection connection = factory.newConnection(); return connection; } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import com.rabbitmq.client.*; import org.junit.Test;
public class Producer {
public static final String QUEUE_NAME = "myQueue";
@Test public void producer() { try{ Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); String message = "This is a MQ message:" + System.currentTimeMillis(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [消息队列:生产者]<" + QUEUE_NAME + "> 发送 '" + message + "'成功"); channel.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import com.rabbitmq.client.*; import java.io.IOException; import org.junit.Test;
public class Consumer {
public static final String QUEUE_NAME = "myQueue";
@Test public void consume() { try{ Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); DefaultConsumer callback = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(" [消息队列:消费者]<" + QUEUE_NAME + "> 接收 '" + new String(body,"UTF-8") + "'成功"); } }; channel.basicConsume(Producer.QUEUE_NAME, true, callback); System.out.println("-------- [消息队列:消费者]<" + QUEUE_NAME + ">持续监听中 --------"); System.in.read(); }catch (Exception e){ e.printStackTrace(); } } }
|
属性解析
生产者/消费者中的【channel.queueDeclare()】函数
1
| channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
|
属性 | 类型 | 含义 | 补充 |
---|
queue | String | 队列的名称 | 补充说明 |
durable | boolean | 是否持久化 | RabbitMQ重启后队列不会丢失。 RabbitMQ退出时它会将队列信息保存到Erlang自带的Mnesia数据库 中,当RabbitMQ重启之后会读取该数据库 |
exclusive | boolean | 是否排外的 | 是否限制其他通道对该队列的访问
排外队列 :仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除 非排外队列 :不同连接(Connection)的管道Channel可以使用该队列 |
autoDelete | boolean | 是否自动删除 | 当最后一个消费者断开连接时,是否删除队列 当所有消费者都与这个队列断开连接时,这个队列会自动删除 但没有消费者连接消费时,该队列是不会自动删除 |
arguments | Map | 可选参数 | 例:x-rnessage-ttl 、x-expires 、x-rnax-length 等 |
关于上面属性有几个注意点
- 【exclusive属性】:排他队列是基于连接(Connection)可见的,同个连接(Connection)的不同管道(Channel)是可以同时访问同一连接创建的排他队列 。其他连接是访问不了的。
- 【exclusive属性】:”
首次
“是指如果某个连接(Connection)已经声明了排他队列,其他连接是不允许建立同名的排他队列的。这个与普通队列不同:即使该队列是持久化的(durable = true),一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。 - 【arguments属性】可以参考下面的Map代码使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-length", 10);
arguments.put("x-max-length-bytes", 1024);
arguments.put("x-message-ttl", 10000);''
arguments.put("x-max-priority", 5);
arguments.put("x-expires", 60000);
arguments.put("x-dead-letter-exchange", "TopExchangeName");
arguments.put("x-dead-letter-routing-key", "ttl.*.value");
|
生产者的【channel.basicPublish()】函数
1
| channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
|
属性 | 类型 | 含义 |
---|
exchange | String | 交换机名称(简单模式下交换机使用默认) |
routingKey | String | 路由名称(简单模式下使用对列名) |
props | BasicProperties | 消息配置信息(可配置消息持久化) |
body | byte[] | 发送消息数据 |
Spring Boot AMQP调用
Spring Boot已经提供了对 AMQP 协议完全支持的 spring-boot-starter-amqp
依赖,引入此依赖即可快速方便的在 SpringBoot 中使用 RabbitMQ。参考:Spring AMQP。
优点
- 该调用方式引入依赖
spring-boot-starter-amqp
,对RabbitMQ的使用进行了进一步的封装
- 通过这种方式使用集成到Spring Boot中的
RabbitMQ
时,我们不再关心Connect
和Channel
的创建,Spring Boot会替我们创建好
- 我们要做的,只是通过
注解
的方式创建Exchange
、Queue
和Bind
对象,并把他们交给Spring IOC
进行管理,然后Spring Boot又会自动生成这些对象对应的交换机
、队列
和绑定
引入Mave依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
修改配置文件
配置类中的sprig.rabbitmq.listener.simple.acknowledge-mode
模式有三种:none、auto、manual
- none:默认值,自动确认消息
- manual:手动确认消息
- auto:根据异常情况确认消息
自动确认消息
是指当消息一旦被消费者接收到则自动确认收到,并将消息从RabbitMQ的消息缓存中移除。
但在实际业务中,有可能消息接收到了,业务却出现异常,那该消息就会丢失。
所以一般采用手动确认方式
,当业务处理成功后,调用channel.basicACK()
;如果出现异常则调用channel.basicNack()
,让其自动重新发送消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| spring: rabbitmq: host: 192.168.1.100 port: 5672 username: guest password: guest virtual-host: / publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 1 retry: enabled: true
|
配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitmqConfig { public static final String QUEUE_NAME = "myQueue"; public static final String EXCHANGE_NAME = "myExchange"; public static final String ROUTING_KEY = "myRoutingKey";
@Bean(EXCHANGE_NAME) public Exchange EXCHANGE_NAME(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); }
@Bean(QUEUE_NAME) public Queue QUEUE_NAME(){ return new Queue(QUEUE_NAME); }
@Bean public Binding BINDING_ROUTINGKEY(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs(); } }
|
生产者
生产者,需要特定情况下主动调用请求
ps:这里暂时使用 @SpringBootTest
、@RunWith(SpringRunner.class)
和 @Test
注解进行测试代替了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest @RunWith(SpringRunner.class) public class Prodcer {
@Autowired RabbitTemplate rabbitTemplate;
@Test public void sendMessage() { String message = "这是一条MQ消息:" + System.currentTimeMillis(); rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, message); System.out.println("[消息队列:生产者]<" + RabbitmqConfig.QUEUE_NAME + "> 发送'" + message + "'成功"); } }
|
消费者
消费者,使用了@RabbitListener
注解,实现自动监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class Consumer {
@RabbitListener(queues = {RabbitmqConfig.QUEUE_NAME}) public void receive_sms(Object msg, Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ System.out.println("[消息队列:消费者]<" + RabbitmqConfig.QUEUE_NAME + ">接收'" + msg + "'成功"); channel.basicAck(deliveryTag, true); }catch (Exception e){ channel.basicNack(deliveryTag, true, true); System.out.println("[消息队列:消费者]<" + RabbitmqConfig.QUEUE_NAME + ">接收'" + msg + "'失败"); } } }
|
代码执行
先启动SpringBoot,从而使消费者监听启动
然后调用接口,触发生产者
逻辑(此文章是直接进行的单元测试)
Spring Cloud Stream调用
xxx
xxx
xxx