前言

JAVA原生代码调用

引入Maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
* SyncProducer 生产者
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化【生产者组】
DefaultMQProducer producer = new DefaultMQProducer("MyGroup1");
// 服务器地址
producer.setNamesrvAddr("服务器IP地址:9876");
// 启动实例
producer.start();
for (int i = 0; i < 1; i++) {
// 创建消息实例,并指定主题、标记和消息主体
Message msg = new Message("MyTopic1" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
// 调用发送消息以将消息传递到代理之一
SendResult sendResult = producer.send(msg);
System.out.println("生产者:" + sendResult);
}
// 关闭生产者实例
producer.shutdown();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

/**
* Consumer 消费者
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化【消费者组】
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup1");
// 服务器地址
consumer.setNamesrvAddr("服务器IP地址:9876");
// 订阅主题
consumer.subscribe("MyTopic1", "*");
// 注册回调,在从代理获取的消息到达时执行
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("消费者:【" + Thread.currentThread().getName() + "】" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.println(" ------------ 开始消费 ------------ ");
}
}

SpringBoot调用

引入Maven一俩

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.2</version>
</dependency>

修改YML配置文件

1
2
3
4
5
6
7
8
9
10
11
rocketmq: 
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: your-producer-group
retry-times-when-send-failed: 3
max-message-size: 1024
consumer:
group: your-consumer-group
enable-auto-commit: true
max-reconsume-times: 3
名称 含义
name-server RocketMQ 的 NameServer 地址,格式为 IP 地址:端口号,多个地址之间用分号或逗号分隔
producer.group 生产者(Producer)的组名,用于标识一组生产者
producer.retry-times-when-send-failed 生产者发送消息失败时的重试次数
producer.max-message-size 生产者发送消息的最大大小,单位为字节
consumer.group 消费者(Consumer)的组名,用于标识一组消费
consumer.enable-auto-commit 消费者是否启用自动提交偏移量
consumer.max-reconsume-times 消息消费失败时的最大重试次数

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {

@Autowired
private RocketMQTemplate rocketMQTemplate;

public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "your-topic", consumerGroup = "your-group")
public class Consumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
System.out.println("接收消息:" + message);
// todo 处理消息的业务逻辑
}
}