博主简介CSDN博客专家「历代文学网」PC端可以访问https://lidaiwenxue.com/#/?__c1000移动端可关注公众号 “心海云图” 微信小程序搜索“历代文学”总架构师首席架构师也是联合创始人16年工作经验精通Java编程高并发设计分布式系统架构设计Springboot和微服务熟悉LinuxESXI虚拟化以及云原生Docker和K8s热衷于探索科技的边界并将理论知识转化为实际应用。保持对新技术的好奇心乐于分享所学希望通过我的实践经历和见解启发他人的创新思维。在这里我希望能与志同道合的朋友交流探讨共同进步一起在技术的世界里不断学习成长。商务合作请搜索或扫码关注微信公众号 “心海云图”RocketMQ 批量消费 自定义 JSONB 编解码抽象基类一行代码开启高性能监听器摘要在 Spring Boot 集成 RocketMQ 时如何轻量、优雅地实现批量消费同时保持消息编解码器的全局一致性本文将展示一个基于rocketmq-spring-boot-starter的抽象消费者基类它通过反射自动获取容器配置、复用自定义Fastjson2 JSONB转换器并同时支持并发与顺序两种消费模式下的批量消息处理。子类只需实现单条消息处理接口即可一行注解获得批量消费能力。1. 背景在使用rocketmq-spring-boot-starter开发消息消费者时我们常遇到以下痛点批量消费支持不够直观注解RocketMQMessageListener无批量大小配置项官方文档要求通过RocketMQPushConsumerLifecycleListener回调解锁。消息转换器重复配置生产者侧已配置了 Fastjson2 的JSONB序列化消费者又需手动反序列化容易造成不一致。顺序消费与并发消费代码相似批量监听器的写法在两种模式下高度重复。若能设计一个通用抽象类自动注入公共MessageConverter、开启批量拉取、支持顺序/并发双模并让业务开发者只关心单条消息的处理逻辑将大幅提升开发效率与可维护性。2. 设计目标一行继承子类继承AbstractRocketMQConsumerT实现onMessage(T message)即刻获得批量消费能力。复用全局编解码器使用项目启动时注册的RocketMQMessageConverter内含 Fastjson2JSONB转换器保证消息格式一致。安全的容器定位通过监听器自身引用来查找所属的DefaultRocketMQListenerContainer避免硬编码 topic/group。动态解析消息类型从容器中通过反射获取泛型真实类型支持ParameterizedType适配不同消息体。覆盖批量大小提供getBatchMaxSize()可被子类定制。3. 全局自定义 JSONB 转换器回顾在RocketMQConfig中我们已用子类方式注入了轻量的Fastjson2 JSONB转换器ConfigurationpublicclassRocketMQConfig{BeanpublicRocketMQMessageConverterrocketMQMessageConverter(){returnnewFastJson2JSONBRocketMQMessageConverter();}}FastJson2JSONBRocketMQMessageConverter继承了官方RocketMQMessageConverter重写getMessageConverter()返回包含FastJson2JSONBMessageConverter的组合转换器。所有RocketMQTemplate的发送和原生消费者的反序列化都会走这一套序列化逻辑。4. 抽象基类核心源码packagecom.sinhy.rocketmq.model;importjava.lang.reflect.Field;importjava.lang.reflect.ParameterizedType;importjava.lang.reflect.Type;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.*;importorg.apache.rocketmq.common.message.MessageExt;importorg.apache.rocketmq.spring.annotation.ConsumeMode;importorg.apache.rocketmq.spring.core.RocketMQListener;importorg.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;importorg.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.messaging.Message;importorg.springframework.messaging.converter.MessageConverter;importorg.springframework.messaging.support.MessageBuilder;/** * RocketMQ 消费者抽象基类自动启用批量消费并复用全局 MessageConverter。 * 子类只需实现 {link #onMessage(Object)} 即可处理单条消息基类会自动将一批消息 * 反序列化后调用 {link #onMessage(List)}默认逐条调用子类实现。 * * param T 消息体类型非集合如 User、Order * author lilinhai * since 2026-05-31 */publicabstractclassAbstractRocketMQConsumerTimplementsRocketMQListenerT,RocketMQPushConsumerLifecycleListener,ApplicationContextAware{protectedfinalLoggerloggerLoggerFactory.getLogger(getClass());privateApplicationContextapplicationContext;OverridepublicvoidsetApplicationContext(ApplicationContextapplicationContext){this.applicationContextapplicationContext;}/** * 单条消息处理方法子类必须实现 */OverridepublicabstractvoidonMessage(Tmessage);/** * 批量消息处理方法默认逐条调用 {link #onMessage(Object)}。 * 子类可以覆写以实现批量处理优化如批量入库。 */publicvoidonMessage(ListTmessages){for(Tmsg:messages){onMessage(msg);}}/** * 每次批量拉取的最大消息数默认 32子类可覆盖 */protectedintgetBatchMaxSize(){return32;}OverridepublicvoidprepareStart(DefaultMQPushConsumerconsumer){// 1. 定位自身的容器DefaultRocketMQListenerContainercontainerfindSelfContainer();// 2. 获取消息转换器包含自定义 JSONB 转换器MessageConverterconvertercontainer.getMessageConverter();// 3. 动态解析消息泛型类型ClassTtargetTyperesolveMessageType(container);// 4. 配置批量拉取大小intbatchSizegetBatchMaxSize();consumer.setConsumeMessageBatchMaxSize(batchSize);consumer.setPullBatchSize(batchSize);// 5. 根据消费模式注册对应的批量监听器if(container.getConsumeMode()ConsumeMode.CONCURRENTLY){consumer.setMessageListener((MessageListenerConcurrently)(msgs,context)-{returnconsumeMessagesConcurrently(msgs,context,converter,targetType,container);});}else{consumer.setMessageListener((MessageListenerOrderly)(msgs,context)-{returnconsumeMessagesOrderly(msgs,context,converter,targetType,container);});}}/* ------ 批量消费实现 ------ */privateConsumeConcurrentlyStatusconsumeMessagesConcurrently(ListMessageExtmsgs,ConsumeConcurrentlyContextcontext,MessageConverterconverter,ClassTtargetType,DefaultRocketMQListenerContainercontainer){ListTmessagesnewArrayList(msgs.size());try{for(MessageExtmsg:msgs){messages.add(convertMessage(msg,converter,targetType));}onMessage(messages);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}catch(Exceptione){logger.error(批量并发消费失败topic{}, msgIds{},container.getTopic(),msgs.stream().map(MessageExt::getMsgId).toArray(),e);context.setDelayLevelWhenNextConsume(container.getDelayLevelWhenNextConsume());returnConsumeConcurrentlyStatus.RECONSUME_LATER;}}privateConsumeOrderlyStatusconsumeMessagesOrderly(ListMessageExtmsgs,ConsumeOrderlyContextcontext,MessageConverterconverter,ClassTtargetType,DefaultRocketMQListenerContainercontainer){ListTmessagesnewArrayList(msgs.size());try{for(MessageExtmsg:msgs){messages.add(convertMessage(msg,converter,targetType));}onMessage(messages);returnConsumeOrderlyStatus.SUCCESS;}catch(Exceptione){logger.error(批量顺序消费失败topic{}, msgIds{},container.getTopic(),msgs.stream().map(MessageExt::getMsgId).toArray(),e);context.setSuspendCurrentQueueTimeMillis(container.getSuspendCurrentQueueTimeMillis());returnConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}SuppressWarnings(unchecked)privateTconvertMessage(MessageExtmsg,MessageConverterconverter,ClassTtargetType)throwsException{Message?springMsgMessageBuilder.withPayload(msg.getBody()).build();return(T)converter.fromMessage(springMsg,targetType);}/* ------ 泛型类型安全解析 ------ */SuppressWarnings(unchecked)privateClassTresolveMessageType(DefaultRocketMQListenerContainercontainer){TypetypegetMessageTypeField(container);if(typeinstanceofClass){return(ClassT)type;}elseif(typeinstanceofParameterizedType){Type[]args((ParameterizedType)type).getActualTypeArguments();if(args.length0args[0]instanceofClass){return(ClassT)args[0];}thrownewIllegalStateException(无法解析 ParameterizedType: type);}thrownewIllegalStateException(不支持的消息类型: type);}privateTypegetMessageTypeField(DefaultRocketMQListenerContainercontainer){try{FieldfieldDefaultRocketMQListenerContainer.class.getDeclaredField(messageType);field.setAccessible(true);return(Type)field.get(container);}catch(Exceptione){thrownewRuntimeException(无法从容器中获取 messageType,e);}}/* ------ 容器查找通过 listener 引用反向匹配 ------ */privateDefaultRocketMQListenerContainerfindSelfContainer(){MapString,DefaultRocketMQListenerContainercontainersapplicationContext.getBeansOfType(DefaultRocketMQListenerContainer.class);for(DefaultRocketMQListenerContainercontainer:containers.values()){if(container.getRocketMQListener()this){returncontainer;}}thrownewIllegalStateException(未找到与 getClass().getSimpleName() 对应的容器);}}5. 关键技术点剖析5.1 自动复用全局 MessageConverter通过container.getMessageConverter()拿到我们注册的CompositeMessageConverter内部包含ByteArray、String以及自定义的FastJson2JSONBMessageConverter。批量监听器中对每条消息调用converter.fromMessage(springMsg, targetType)即可使用同一套序列化规则完全无需硬编码JSONB.parseObject。5.2 容器定位listener this反向匹配DefaultRocketMQListenerContainer在初始化时通过setRocketMQListener()保存了监听器实例的引用。我们在prepareStart中遍历所有容器通过比对引用即可精确找到自己的容器避免依赖动态生成 bean 名称或 topic/group 重复导致的多容器冲突。5.3 安全解析泛型类型messageType框架源码中messageType是根据RocketMQListener的泛型接口解析得到的Type对象。对于RocketMQListenerUser它是User.class对于RocketMQListenerListUser它是一个ParameterizedType。本抽象类通过反射读取该字段并提取实际元素类型确保转换时targetType准确无误。5.4 批量拉取的必要设置即使替换了MessageListenerConcurrently或MessageListenerOrderly若未设置consumer.setConsumeMessageBatchMaxSize(batchSize)Broker 默认每次仍只投递 1 条消息。我们在prepareStart中强制将批量最大值和拉取大小都设为getBatchMaxSize()默认 32保证真正批量回调。5.5 并发与顺序模式的差异处理并发模式失败时设置delayLevelWhenNextConsume进行延迟重试。顺序模式失败时设置suspendCurrentQueueTimeMillis暂停当前队列避免乱序。两种模式共用convertMessage和onMessage(List)仅返回状态不同代码高度复用。5.6 顺序消费的多线程机制扩展解读顺序消费并非单线程而是队列级别的单线程串行。消费者通过synchronized (messageQueue)锁定队列同一队列在同一时刻仅由一个线程处理但不同队列可以分配不同线程并行。我们的批量监听器在一次回调中获得同一队列的一系列连续消息因此能天然保证顺序而整体消费又是多线程并发的。6. 快速使用示例只需创建一个继承自AbstractRocketMQConsumer的子类并添加Service和RocketMQMessageListener注解ServiceRocketMQMessageListener(topicorder-topic,consumerGrouporder-group,consumeModeConsumeMode.CONCURRENTLY// 或 ORDERLY)publicclassOrderConsumerextendsAbstractRocketMQConsumerOrder{OverridepublicvoidonMessage(Orderorder){// 处理单条订单orderService.process(order);}// 可按需覆盖批量大小OverrideprotectedintgetBatchMaxSize(){return64;}// 亦可覆盖 onMessage(ListOrder) 实现批量入库OverridepublicvoidonMessage(ListOrderorders){orderService.batchProcess(orders);}}发送端正常使用RocketMQTemplate即可消息体自动 JSONB 序列化。rocketMQTemplate.convertAndSend(order-topic,order);启动应用后消费者将一次拉取最多 64 条 Order 消息反序列化为对象列表后传递给onMessage(ListOrder)输出示例批量消息条数64 - Order{id1, amount99.9} ... 批量消息条数64 ...7. 总结本文实现了一个轻量级、高内聚的 RocketMQ 消费者抽象基类具备以下优势零重复配置全局 JSONB 转换器一处定义处处生效。开箱即用的批量消费子类无需关心底层 API只需继承即可。安全的类型解析利用反射加泛型解析杜绝强转异常。兼容并发与顺序模式两套逻辑高度复用维护成本低。这种方式非常适合大型微服务项目中统一消息消费规范降低接入门槛。完整代码可直接复制到项目中使用建议配合 RocketMQ 2.2 版本。
RocketMQ 批量消费 + 自定义 JSONB 编解码抽象基类,一行代码开启高性能监听器
博主简介CSDN博客专家「历代文学网」PC端可以访问https://lidaiwenxue.com/#/?__c1000移动端可关注公众号 “心海云图” 微信小程序搜索“历代文学”总架构师首席架构师也是联合创始人16年工作经验精通Java编程高并发设计分布式系统架构设计Springboot和微服务熟悉LinuxESXI虚拟化以及云原生Docker和K8s热衷于探索科技的边界并将理论知识转化为实际应用。保持对新技术的好奇心乐于分享所学希望通过我的实践经历和见解启发他人的创新思维。在这里我希望能与志同道合的朋友交流探讨共同进步一起在技术的世界里不断学习成长。商务合作请搜索或扫码关注微信公众号 “心海云图”RocketMQ 批量消费 自定义 JSONB 编解码抽象基类一行代码开启高性能监听器摘要在 Spring Boot 集成 RocketMQ 时如何轻量、优雅地实现批量消费同时保持消息编解码器的全局一致性本文将展示一个基于rocketmq-spring-boot-starter的抽象消费者基类它通过反射自动获取容器配置、复用自定义Fastjson2 JSONB转换器并同时支持并发与顺序两种消费模式下的批量消息处理。子类只需实现单条消息处理接口即可一行注解获得批量消费能力。1. 背景在使用rocketmq-spring-boot-starter开发消息消费者时我们常遇到以下痛点批量消费支持不够直观注解RocketMQMessageListener无批量大小配置项官方文档要求通过RocketMQPushConsumerLifecycleListener回调解锁。消息转换器重复配置生产者侧已配置了 Fastjson2 的JSONB序列化消费者又需手动反序列化容易造成不一致。顺序消费与并发消费代码相似批量监听器的写法在两种模式下高度重复。若能设计一个通用抽象类自动注入公共MessageConverter、开启批量拉取、支持顺序/并发双模并让业务开发者只关心单条消息的处理逻辑将大幅提升开发效率与可维护性。2. 设计目标一行继承子类继承AbstractRocketMQConsumerT实现onMessage(T message)即刻获得批量消费能力。复用全局编解码器使用项目启动时注册的RocketMQMessageConverter内含 Fastjson2JSONB转换器保证消息格式一致。安全的容器定位通过监听器自身引用来查找所属的DefaultRocketMQListenerContainer避免硬编码 topic/group。动态解析消息类型从容器中通过反射获取泛型真实类型支持ParameterizedType适配不同消息体。覆盖批量大小提供getBatchMaxSize()可被子类定制。3. 全局自定义 JSONB 转换器回顾在RocketMQConfig中我们已用子类方式注入了轻量的Fastjson2 JSONB转换器ConfigurationpublicclassRocketMQConfig{BeanpublicRocketMQMessageConverterrocketMQMessageConverter(){returnnewFastJson2JSONBRocketMQMessageConverter();}}FastJson2JSONBRocketMQMessageConverter继承了官方RocketMQMessageConverter重写getMessageConverter()返回包含FastJson2JSONBMessageConverter的组合转换器。所有RocketMQTemplate的发送和原生消费者的反序列化都会走这一套序列化逻辑。4. 抽象基类核心源码packagecom.sinhy.rocketmq.model;importjava.lang.reflect.Field;importjava.lang.reflect.ParameterizedType;importjava.lang.reflect.Type;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.*;importorg.apache.rocketmq.common.message.MessageExt;importorg.apache.rocketmq.spring.annotation.ConsumeMode;importorg.apache.rocketmq.spring.core.RocketMQListener;importorg.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;importorg.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.messaging.Message;importorg.springframework.messaging.converter.MessageConverter;importorg.springframework.messaging.support.MessageBuilder;/** * RocketMQ 消费者抽象基类自动启用批量消费并复用全局 MessageConverter。 * 子类只需实现 {link #onMessage(Object)} 即可处理单条消息基类会自动将一批消息 * 反序列化后调用 {link #onMessage(List)}默认逐条调用子类实现。 * * param T 消息体类型非集合如 User、Order * author lilinhai * since 2026-05-31 */publicabstractclassAbstractRocketMQConsumerTimplementsRocketMQListenerT,RocketMQPushConsumerLifecycleListener,ApplicationContextAware{protectedfinalLoggerloggerLoggerFactory.getLogger(getClass());privateApplicationContextapplicationContext;OverridepublicvoidsetApplicationContext(ApplicationContextapplicationContext){this.applicationContextapplicationContext;}/** * 单条消息处理方法子类必须实现 */OverridepublicabstractvoidonMessage(Tmessage);/** * 批量消息处理方法默认逐条调用 {link #onMessage(Object)}。 * 子类可以覆写以实现批量处理优化如批量入库。 */publicvoidonMessage(ListTmessages){for(Tmsg:messages){onMessage(msg);}}/** * 每次批量拉取的最大消息数默认 32子类可覆盖 */protectedintgetBatchMaxSize(){return32;}OverridepublicvoidprepareStart(DefaultMQPushConsumerconsumer){// 1. 定位自身的容器DefaultRocketMQListenerContainercontainerfindSelfContainer();// 2. 获取消息转换器包含自定义 JSONB 转换器MessageConverterconvertercontainer.getMessageConverter();// 3. 动态解析消息泛型类型ClassTtargetTyperesolveMessageType(container);// 4. 配置批量拉取大小intbatchSizegetBatchMaxSize();consumer.setConsumeMessageBatchMaxSize(batchSize);consumer.setPullBatchSize(batchSize);// 5. 根据消费模式注册对应的批量监听器if(container.getConsumeMode()ConsumeMode.CONCURRENTLY){consumer.setMessageListener((MessageListenerConcurrently)(msgs,context)-{returnconsumeMessagesConcurrently(msgs,context,converter,targetType,container);});}else{consumer.setMessageListener((MessageListenerOrderly)(msgs,context)-{returnconsumeMessagesOrderly(msgs,context,converter,targetType,container);});}}/* ------ 批量消费实现 ------ */privateConsumeConcurrentlyStatusconsumeMessagesConcurrently(ListMessageExtmsgs,ConsumeConcurrentlyContextcontext,MessageConverterconverter,ClassTtargetType,DefaultRocketMQListenerContainercontainer){ListTmessagesnewArrayList(msgs.size());try{for(MessageExtmsg:msgs){messages.add(convertMessage(msg,converter,targetType));}onMessage(messages);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}catch(Exceptione){logger.error(批量并发消费失败topic{}, msgIds{},container.getTopic(),msgs.stream().map(MessageExt::getMsgId).toArray(),e);context.setDelayLevelWhenNextConsume(container.getDelayLevelWhenNextConsume());returnConsumeConcurrentlyStatus.RECONSUME_LATER;}}privateConsumeOrderlyStatusconsumeMessagesOrderly(ListMessageExtmsgs,ConsumeOrderlyContextcontext,MessageConverterconverter,ClassTtargetType,DefaultRocketMQListenerContainercontainer){ListTmessagesnewArrayList(msgs.size());try{for(MessageExtmsg:msgs){messages.add(convertMessage(msg,converter,targetType));}onMessage(messages);returnConsumeOrderlyStatus.SUCCESS;}catch(Exceptione){logger.error(批量顺序消费失败topic{}, msgIds{},container.getTopic(),msgs.stream().map(MessageExt::getMsgId).toArray(),e);context.setSuspendCurrentQueueTimeMillis(container.getSuspendCurrentQueueTimeMillis());returnConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}SuppressWarnings(unchecked)privateTconvertMessage(MessageExtmsg,MessageConverterconverter,ClassTtargetType)throwsException{Message?springMsgMessageBuilder.withPayload(msg.getBody()).build();return(T)converter.fromMessage(springMsg,targetType);}/* ------ 泛型类型安全解析 ------ */SuppressWarnings(unchecked)privateClassTresolveMessageType(DefaultRocketMQListenerContainercontainer){TypetypegetMessageTypeField(container);if(typeinstanceofClass){return(ClassT)type;}elseif(typeinstanceofParameterizedType){Type[]args((ParameterizedType)type).getActualTypeArguments();if(args.length0args[0]instanceofClass){return(ClassT)args[0];}thrownewIllegalStateException(无法解析 ParameterizedType: type);}thrownewIllegalStateException(不支持的消息类型: type);}privateTypegetMessageTypeField(DefaultRocketMQListenerContainercontainer){try{FieldfieldDefaultRocketMQListenerContainer.class.getDeclaredField(messageType);field.setAccessible(true);return(Type)field.get(container);}catch(Exceptione){thrownewRuntimeException(无法从容器中获取 messageType,e);}}/* ------ 容器查找通过 listener 引用反向匹配 ------ */privateDefaultRocketMQListenerContainerfindSelfContainer(){MapString,DefaultRocketMQListenerContainercontainersapplicationContext.getBeansOfType(DefaultRocketMQListenerContainer.class);for(DefaultRocketMQListenerContainercontainer:containers.values()){if(container.getRocketMQListener()this){returncontainer;}}thrownewIllegalStateException(未找到与 getClass().getSimpleName() 对应的容器);}}5. 关键技术点剖析5.1 自动复用全局 MessageConverter通过container.getMessageConverter()拿到我们注册的CompositeMessageConverter内部包含ByteArray、String以及自定义的FastJson2JSONBMessageConverter。批量监听器中对每条消息调用converter.fromMessage(springMsg, targetType)即可使用同一套序列化规则完全无需硬编码JSONB.parseObject。5.2 容器定位listener this反向匹配DefaultRocketMQListenerContainer在初始化时通过setRocketMQListener()保存了监听器实例的引用。我们在prepareStart中遍历所有容器通过比对引用即可精确找到自己的容器避免依赖动态生成 bean 名称或 topic/group 重复导致的多容器冲突。5.3 安全解析泛型类型messageType框架源码中messageType是根据RocketMQListener的泛型接口解析得到的Type对象。对于RocketMQListenerUser它是User.class对于RocketMQListenerListUser它是一个ParameterizedType。本抽象类通过反射读取该字段并提取实际元素类型确保转换时targetType准确无误。5.4 批量拉取的必要设置即使替换了MessageListenerConcurrently或MessageListenerOrderly若未设置consumer.setConsumeMessageBatchMaxSize(batchSize)Broker 默认每次仍只投递 1 条消息。我们在prepareStart中强制将批量最大值和拉取大小都设为getBatchMaxSize()默认 32保证真正批量回调。5.5 并发与顺序模式的差异处理并发模式失败时设置delayLevelWhenNextConsume进行延迟重试。顺序模式失败时设置suspendCurrentQueueTimeMillis暂停当前队列避免乱序。两种模式共用convertMessage和onMessage(List)仅返回状态不同代码高度复用。5.6 顺序消费的多线程机制扩展解读顺序消费并非单线程而是队列级别的单线程串行。消费者通过synchronized (messageQueue)锁定队列同一队列在同一时刻仅由一个线程处理但不同队列可以分配不同线程并行。我们的批量监听器在一次回调中获得同一队列的一系列连续消息因此能天然保证顺序而整体消费又是多线程并发的。6. 快速使用示例只需创建一个继承自AbstractRocketMQConsumer的子类并添加Service和RocketMQMessageListener注解ServiceRocketMQMessageListener(topicorder-topic,consumerGrouporder-group,consumeModeConsumeMode.CONCURRENTLY// 或 ORDERLY)publicclassOrderConsumerextendsAbstractRocketMQConsumerOrder{OverridepublicvoidonMessage(Orderorder){// 处理单条订单orderService.process(order);}// 可按需覆盖批量大小OverrideprotectedintgetBatchMaxSize(){return64;}// 亦可覆盖 onMessage(ListOrder) 实现批量入库OverridepublicvoidonMessage(ListOrderorders){orderService.batchProcess(orders);}}发送端正常使用RocketMQTemplate即可消息体自动 JSONB 序列化。rocketMQTemplate.convertAndSend(order-topic,order);启动应用后消费者将一次拉取最多 64 条 Order 消息反序列化为对象列表后传递给onMessage(ListOrder)输出示例批量消息条数64 - Order{id1, amount99.9} ... 批量消息条数64 ...7. 总结本文实现了一个轻量级、高内聚的 RocketMQ 消费者抽象基类具备以下优势零重复配置全局 JSONB 转换器一处定义处处生效。开箱即用的批量消费子类无需关心底层 API只需继承即可。安全的类型解析利用反射加泛型解析杜绝强转异常。兼容并发与顺序模式两套逻辑高度复用维护成本低。这种方式非常适合大型微服务项目中统一消息消费规范降低接入门槛。完整代码可直接复制到项目中使用建议配合 RocketMQ 2.2 版本。