五种常见消费模型
总的来说,不同的消息队列模型适用于不同的场景和需求。简单队列模型适合于点对点通信;工作队列模型适用于任务分配和负载均衡;发布/订阅模型适用于消息广播和解耦;路由模型适用于动态消息路由和选择性投递;主题模型适用于灵活的消息路由和过滤。根据具体的业务需求和系统架构,合理选择适用的消息队列模型可以提高系统的可扩展性、可靠性和性能。
简单队列模型
简单队列模型(Simple Queue Model
)是最基础的RabbitMQ模型。它包括单个生产者和单个消费者。生产者将消息发送到一个队列中,然后消费者从队列中读取消息并处理。这种模式不适用于多个消费者或消息广播,因为一旦消息被一个消费者接收,它就会从队列中删除。
- 单生产者和单消费者之间的点对点通信。
- 系统中只有一个进程或线程可以处理消息。
例如,一个后端服务向另一个后端服务发送消息,或者一个客户端将任务发送给服务器
- 实现简单,易于理解和部署
- 可以提供一些基本的可靠性保证,例如消息确认和持久化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String queueName = "myQueue"; channel.queueDeclare(queueName, false, false, false, null);
String message = "Hello, RabbitMQ!"; channel.basicPublish("", queueName, null, message.getBytes("UTF-8")); System.out.println("发送消息: " + message);
boolean autoAck = true; Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息: " + message); } }; channel.basicConsume(queueName, autoAck, consumer);
|
工作队列模型
工作队列模型(Work Queue Model
)允许多个消费者协同地从一个队列中接收、处理和分发消息。在这种模型中,消息被平均分配给不同的消费者。当一个消费者正在处理一个消息时,它不能接收新的消息。这确保了公平的分布和消费,同时在不同的消费者之间进行负载均衡。
需要在多个工人之间分配任务的应用程序,例如异步任务处理或负载均衡。
- 支持多个消费者处理同一个队列中的消息。
- 消费负载均衡,每个消费者最多处理一条消息。
- 通过设置并发数,可以实现更高的消息吞吐量。
- 没有消息路由的动态性。
- 如果有消息时,所有的消费者都会在接收到该消息后进行同样的处理,无法根据具体情况进行消息的划分,而且消息被平均分配,不能根据消息的重要性和紧急性进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String queueName = "myQueue"; channel.queueDeclare(queueName, false, false, false, null);
String message = "Hello, RabbitMQ!"; channel.basicPublish("", queueName, null, message.getBytes("UTF-8")); System.out.println("发送消息: " + message);
channel.basicQos(1);
boolean autoAck = false; Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息: " + message); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queueName, autoAck, consumer);
|
发布/订阅模型
发布/订阅模型(Publish/Subscribe Model
)允许一个生产者向多个消费者广播一条消息。在这种模型中,生产者将消息发送到一个交换机中,然后这个交换机将消息路由到所有与之绑定的队列。每个队列对应一个消费者,可以独立地处理这个队列中的消息。
需要将消息通知多个消费者的应用程序,例如事件通知或新闻发布。
- 支持广播式消息发布和订阅。
- 与其他应用程序解耦,生产者和消费者不需要知道对方的存在和细节。
- 不支持消息路由的动态性。
- 没有消息过滤机制,每个订阅者都会收到所有的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String exchangeName = "myExchange"; channel.exchangeDeclare(exchangeName, "fanout");
String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchangeName, "");
String message = "Hello, RabbitMQ!"; channel.basicPublish(exchangeName, "", null, message.getBytes("UTF-8")); System.out.println("发送消息: " + message);
boolean autoAck = true; Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息: " + message); } }; channel.basicConsume(queueName, autoAck, consumer);
|
路由模型
路由模型(Routing Model
)允许生产者根据路由键将消息发送到指定的队列中。在这种模型中,交换机会将消息路由到与它所绑定的队列匹配的路由键的队列中。消费者可以从这些队列中接收和处理消息。
需要根据某些特定属性或条件将消息路由到相应队列的应用程序,例如日志记录或按优先级处理任务。
- 支持基于路由键的动态消息路由。
- 可以根据消息的类型、内容和优先级选择发送给哪个队列,支持消息的定向投递。
- 需要提前配置好交换机和队列之间的绑定关系。
- 支持的路由逻辑有限,只能通过路由键进行匹配。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String exchangeName = "myExchange"; channel.exchangeDeclare(exchangeName, "direct");
String queueName = "myQueue"; String routingKey = "myRoutingKey"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey);
String message = "Important message!"; channel.basicPublish(exchangeName, routingKey, null, message.getBytes("UTF-8")); System.out.println("发送消息: " + message);
boolean autoAck = true; Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息: " + message); } }; channel.basicConsume(queueName, autoAck, consumer);
|
主题模型
主题模型(Topic Model
)是路由模型的扩展,它可以实现更灵活的消息路由和分发。在这种模型中,生产者可以使用通配符匹配来匹配路由键。交换机会将消息路由到与它所绑定的队列匹配的路由键的队列中。消费者可以从这些队列中接收和处理消息。
需要根据消息内容的模式将消息路由到不同队列的应用程序,例如按标签或关键字分发和处理不同的任务。
- 支持更灵活、更具体的消息路由和过滤。
- 可以使用通配符匹配路由键,实现更复杂的消息匹配和分发。
- 高度配置化和复杂化,需要额外配置主题模式下的应用程序逻辑。
- 在一些场景下,通配符匹配路由键可能会导致性能问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String exchangeName = "myExchange"; channel.exchangeDeclare(exchangeName, "topic");
String queueName = "myQueue"; String routingKeyPattern = "com.example.*"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKeyPattern);
String routingKey = "com.example.news"; String message = "Important news!"; channel.basicPublish(exchangeName, routingKey, null, message.getBytes("UTF-8")); System.out.println("发送消息: " + message);
boolean autoAck = true; Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息: " + message); } }; channel.basicConsume(queueName, autoAck, consumer);
|