本地docker 启动。MQ简单逻辑后续其实可以做在FaaS中,支持弹性扩缩容。
* enum org.apache.rocketmq.common.consumer.ConsumeFromWhere
CONSUME_FROM_LAST_OFFSET: 从最后的偏移量开始消费
CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费
CONSUME_FROM_TIMESTAMP:从某个时间开始消费
* 控制台 -> topic -> 重置消费位点 -> 设置 订阅组 和 时间点 -> 提交即可
- 批量消息
- 过滤消息
中台的业务消息不好限速,会收到别的业务方消息,消费限速高了容易调下游超过重试次数丢数据,限速低了容易引起消息堆积,可以通过Tag or 属性来只消费自己场景的消息,消息消费均速化。
- 事务消息
三个服务: 商品服务、订单服务、用户积分服务。
用户创单后,就算商品库存没有正常扣减、用户积分没有对应增加 也不管了,保证用户下单时候的核心事务执行成功就行,后续再进行商品库存扣减和用户积分增加修复, 最大限度地保证下单成功。
用于: 核心事务和非核心事务解耦(不能说用户积分服务异常就导致用户无法创单)
- [cutoff] 日志格式
- OMS(OpenMessaging)
- 消息轨迹
new DefaultMQProducer("ProducerGroupName",true);
new DefaultMQPushConsumer("CID_JODIE_1",true);
// 指一条消息从生产者发送到消息队列RocketMQ版服务端,再到消费者消费,整个过程中的各个相关节点的时间、状态等数据汇聚而成的完整链路信息。
// 类似traceId排查问题用的
- 优化消费速度
* 提高消费并行度
* 加机器
* 多线程 consumeThreadMin、consumeThreadMax
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("topic_name");
consumer.setConsumeThreadMax(100); // 默认是20
consumer.setConsumeThreadMax(50); // 默认是20
* 批量方式消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("topic_name");
consumer.setConsumeMessageBatchMaxSize(10); // 默认是1
* 跳过非重要消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
long offset = messages.get(0).getQueueOffset();
String maxOffset = messages.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
// 消息堆积情况的特殊处理...
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 正常消费过程...
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
* 优化每条消息消费过程
- 轻消息队列
- 消息幂等
- 一个生产者对应多个topic,一个消费者对应多个topic
- 消费限速:
- Sentinel 为 Apache RocketMQ 保驾护航
- 消费者参数控制消费速度 (但是只能控制单实例的):
- 总的消费最大QPS = 实例数量 * setPullBatchSize * setConsumeThreadMax / setPullInterval * 1000
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_msg"); // consumerGroup:batch_msg consumer.setConsumeMessageBatchMaxSize(1); // 调用MessageListener处理的地方一次传入List<MessageExt>的数量 consumer.setPullBatchSize(1); // 一次从Broker的一个Message Queue获取消息的数量(默认32个) consumer.setConsumeThreadMax(1); // consumer最大线程数 consumer.setConsumeThreadMin(1); // consumer最小线程数 consumer.setPullInterval(1000); // // 每次拉取的间隔,单位为毫秒
Feature:
- 告警监控
- producer/consumer 支持重试
- 多次重试失败支持存储到DB
- 支持切面对业务代码无侵入添加逻辑
- 自动分区sharding队列
TODO: 待完善
这里以RMQ为例,应当也无缝切换支持kafka。