RocketMQ无法避免消息重复,能确保所有消息至少传递一次。在大多数情况下,消息不会重复。
问题产生原因:
解决方案:首先生产者给消息携带唯一标记(自定义key等业务控制,消息的msgId一定是全局唯一标识符,但实际使用中可能会出现相同消息有两个不同的msgId)。然后通过消费者控制消息的幂等性(多次操作产生的影响均和第一次影响相同),可通过MySQL自定义去重表或Redis实现。
模拟生产者发送重复消息:
@SpringBootTest
public class KeyTest {
@Test
void testProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");
producer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
producer.start();
// 自定义key
String key = UUID.randomUUID().toString();
System.out.println(key);
Message message = new Message("repeatTopic", null, key, "扣减库存-1".getBytes());
Message repeatMessage = new Message("repeatTopic", null, key, "扣减库存-1".getBytes());
producer.send(message);
producer.send(repeatMessage);
System.out.println("发送成功!");
producer.shutdown();
}
}
通过MySQL自定义去重表解决,设置表中key为唯一索引,只有当key不存在时才能插入成功,失败报错则消息已消费返回消费成功状态码,成功则消息未消费执行相应业务逻辑。
@Test
void testConsumerByMySQL() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
consumer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
consumer.subscribe("repeatTopic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (megs, context) -> {
MessageExt messageExt = megs.get(0);
String keys = messageExt.getKeys();
String desc = new String(messageExt.getBody());
try {
// 插入数据库,key设置了唯一索引,插入成功则处理该消息,报错则表示已经处理过该消息
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
PreparedStatement preparedStatement = connection.prepareStatement("insert into `dedupe_table` (`key`, `desc`) values (?, ?)");
preparedStatement.setString(1, keys);
preparedStatement.setString(2, desc);
preparedStatement.executeUpdate();
} catch (SQLException e) {
if (e instanceof SQLIntegrityConstraintViolationException) {
// 唯一索引冲突异常,说明消息已经处理过
System.out.println("该消息已处理!");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
e.printStackTrace();
}
// 消息未处理,进行相应业务操作
System.out.println(keys + desc);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.in.read(); // 使用系统输入阻塞挂起JVM
}
通过Redis解决,Redis中使用setnx命令插入String类型数据,只有当key不存在时才能插入成功,失败则消息已消费返回消费成功状态码,成功则消息未消费执行相应业务逻辑。
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Test
void testConsumerByRedis() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
consumer.setNamesrvAddr(MQConstant.NAME_SRV_ADDR);
consumer.subscribe("repeatTopic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (megs, context) -> {
MessageExt messageExt = megs.get(0);
String keys = messageExt.getKeys();
String desc = new String(messageExt.getBody());
// Redis中String类型使用setnx命令插入,只有当key不存在时才能插入成功。
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("rocketmq:repeat:" + keys, desc);
if (!flag) {
// key在Redis中已存在,说明消息已经处理过
System.out.println("该消息已处理!");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 消息未处理,进行相应业务操作
System.out.println(keys + desc);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.in.read(); // 使用系统输入阻塞挂起JVM
}