场景用户购票在服务端校验验证码拿到锁选座购票那么现在拿锁和选座购票中插入一个异步线程告诉用户你有资格购票或者已经下单成功不然一直在等待给一个快速响应前端轮询购票结果用户不用担心中间的选座购票轮询带来的压力对于后端相对较小。下单和查询分为两个模块。现在把异步操作分为出票模块从服务端发送消息给MQ查看是否有出票的请求MQ有一个生产者和服务者。现在我们使用MQ的异步存储消息即使中间出错也会保存出错前的消息。生产者和消费者和主题消息就是跟主题做关联购票也可以是一个主题。普通消息发送顺序消息发送延迟消息发送批量消息发送事务消息发送解决分布式事务。新版本默认自动开启接收消息消费者消费概念Push消费Pull消费消息可以被多个消费者处理也可以被单个处理。启动NameServer和Broker修改runbroker.cmd的配置修改为一个512m另一个1g-Xms启动时就占用的内存-Xmx最大可用内存。另外把堆外内存改为1g本地生产15g太大了。在conf目录下的broker.conf文件下添加存放rocketmq数据的地方。修改完之后先启动NameServerv看到这个显示这个就成功了然后再启动runbroker使用broker.conf启动启动成功发送消息测试使用RocketMQ Assistant的GUI客户端并连接可以看到主题中有了1000条消息有一部分在发送MQ之前一部分在发送之后。首先导入依赖dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-spring-boot-starter/artifactId version2.2.3/version /dependencySpringBoot3并不认识这个文件一般消费者个生产者不在同一台机器。实例在火车购票系统中为了防止一个时间段的大量请求压跨系统就要引入RocketMQ的消息队列先拿号在买票而不是同时一下子处理所有人的购票请求。购买车票成功后就会显示主题的状态。定义处理主题为确认订单的消费者Service RocketMQMessageListener(consumerGroup default, topic CONFIRM_ORDER) //消息主题 public class ConfirmOrderConsumer implements RocketMQListenerMessageExt { private static final Logger LOG LoggerFactory.getLogger(ConfirmOrderConsumer.class); Resource private ConfirmOrderService confirmOrderService; Override public void onMessage(MessageExt messageExt) { byte[] body messageExt.getBody(); LOG.info(ROCKETMQ收到消息{}, new String(body)); ConfirmOrderDoReq req JSON.parseObject(new String(body), ConfirmOrderDoReq.class); MDC.put(LOG_ID, req.getLogId()); confirmOrderService.doConfirm(req); } }在确认订单之前的controller中注入MQ的templeteService public class BeforeConfirmOrderServiceImpl implements BeforeConfirmOrderService { private static final Logger LOG LoggerFactory.getLogger(BeforeConfirmOrderServiceImpl.class); Resource private ConfirmOrderMapper confirmOrderMapper; Resource private SkTokenService skTokenService; Resource ConfirmOrderService confirmOrderService; //SpringBoot3注入不成功 Resource public RocketMQTemplate rocketMQTemplate; SentinelResource(value beforeDoConfirm, blockHandler beforeDoConfirmBlock) Override public Long beforeDoConfirm(ConfirmOrderDoReq req) { req.setMemberId(LoginMemberContext.getId()); Long id null; // 根据前端传值加入排队人数 for (int i 0; i req.getLineNumber() 1; i) { req.setMemberId(LoginMemberContext.getId()); // 校验令牌余量 boolean validSkToken skTokenService.validSkToken(req.getDate(), req.getTrainCode(), LoginMemberContext.getId()); if (validSkToken) { LOG.info(令牌校验通过); } else { LOG.info(令牌校验不通过); throw new BusinessException(BusinessExceptionEnum.CONFIRM_ORDER_SK_TOKEN_FAIL); } //获取车次锁 //RedisKeyPreEnum.CONFIRM_ORDER - DateUtil.formatDate(req.getDate())-req. Date date req.getDate(); String trainCode req.getTrainCode(); String start req.getStart(); String end req.getEnd(); ListConfirmOrderTicketReq tickets req.getTickets(); // 保存确认订单表状态初始 DateTime now DateTime.now(); ConfirmOrder confirmOrder new ConfirmOrder(); confirmOrder.setId(SnowUtil.getSnowflakeNextId()); confirmOrder.setCreateTime(now); confirmOrder.setUpdateTime(now); confirmOrder.setMemberId(req.getMemberId()); confirmOrder.setDate(date); confirmOrder.setTrainCode(trainCode); confirmOrder.setStart(start); confirmOrder.setEnd(end); confirmOrder.setDailyTrainTicketId(req.getDailyTrainTicketId()); confirmOrder.setStatus(ConfirmOrderStatusEnum.INIT.getCode()); confirmOrder.setTickets(JSON.toJSONString(tickets)); confirmOrderMapper.insert(confirmOrder); ConfirmOrderDoReq confirmOrderDoReq new ConfirmOrderDoReq(); confirmOrderDoReq.setDate(req.getDate()); confirmOrderDoReq.setTrainCode(req.getTrainCode()); confirmOrderDoReq.setLogId(MDC.get(LOG_ID)); // 发送MQ排队购票 String reqJson JSON.toJSONString(confirmOrderDoReq); LOG.info(排队购票发送mq开始消息{}, reqJson); //发送的主题,购票的请求转换为string rocketMQTemplate.convertAndSend(RocketMQTopicEnum.CONFIRM_ORDER.getCode(), reqJson); LOG.info(排队购票发送mq结束); confirmOrderService.doConfirm(confirmOrderDoReq); id confirmOrder.getId(); } //返回最后一个id return id; } Override public void beforeDoConfirmBlock(ConfirmOrderDoReq req, BlockException e) { LOG.info(购票请求被限流{}, req); throw new BusinessException(BusinessExceptionEnum.CONFIRM_ORDER_FLOW_EXCEPTION); } }关键再发送MQ之前订单表先保存下来。即使MQ丢失订单表的数据任然存在。优先保证数据的准确性。获取所得操作必须在正常的购票逻辑中。打印日志那票但是拿不到锁拿令牌就有资格购票所以有了排队机制分布式锁是为了防止超卖。MQ就告诉出票模块有订单产生出票模块只关心出票不用关心给谁按照订单出票。后端经常处理大批量数据所以分页会减缓压力分页分页的出票。订单轮询查询用来给正在排队的订单提供一个结果让用户知道前面还有多少个订单。
Day5-微服务-RocketMQ具体项目的应用场景
场景用户购票在服务端校验验证码拿到锁选座购票那么现在拿锁和选座购票中插入一个异步线程告诉用户你有资格购票或者已经下单成功不然一直在等待给一个快速响应前端轮询购票结果用户不用担心中间的选座购票轮询带来的压力对于后端相对较小。下单和查询分为两个模块。现在把异步操作分为出票模块从服务端发送消息给MQ查看是否有出票的请求MQ有一个生产者和服务者。现在我们使用MQ的异步存储消息即使中间出错也会保存出错前的消息。生产者和消费者和主题消息就是跟主题做关联购票也可以是一个主题。普通消息发送顺序消息发送延迟消息发送批量消息发送事务消息发送解决分布式事务。新版本默认自动开启接收消息消费者消费概念Push消费Pull消费消息可以被多个消费者处理也可以被单个处理。启动NameServer和Broker修改runbroker.cmd的配置修改为一个512m另一个1g-Xms启动时就占用的内存-Xmx最大可用内存。另外把堆外内存改为1g本地生产15g太大了。在conf目录下的broker.conf文件下添加存放rocketmq数据的地方。修改完之后先启动NameServerv看到这个显示这个就成功了然后再启动runbroker使用broker.conf启动启动成功发送消息测试使用RocketMQ Assistant的GUI客户端并连接可以看到主题中有了1000条消息有一部分在发送MQ之前一部分在发送之后。首先导入依赖dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-spring-boot-starter/artifactId version2.2.3/version /dependencySpringBoot3并不认识这个文件一般消费者个生产者不在同一台机器。实例在火车购票系统中为了防止一个时间段的大量请求压跨系统就要引入RocketMQ的消息队列先拿号在买票而不是同时一下子处理所有人的购票请求。购买车票成功后就会显示主题的状态。定义处理主题为确认订单的消费者Service RocketMQMessageListener(consumerGroup default, topic CONFIRM_ORDER) //消息主题 public class ConfirmOrderConsumer implements RocketMQListenerMessageExt { private static final Logger LOG LoggerFactory.getLogger(ConfirmOrderConsumer.class); Resource private ConfirmOrderService confirmOrderService; Override public void onMessage(MessageExt messageExt) { byte[] body messageExt.getBody(); LOG.info(ROCKETMQ收到消息{}, new String(body)); ConfirmOrderDoReq req JSON.parseObject(new String(body), ConfirmOrderDoReq.class); MDC.put(LOG_ID, req.getLogId()); confirmOrderService.doConfirm(req); } }在确认订单之前的controller中注入MQ的templeteService public class BeforeConfirmOrderServiceImpl implements BeforeConfirmOrderService { private static final Logger LOG LoggerFactory.getLogger(BeforeConfirmOrderServiceImpl.class); Resource private ConfirmOrderMapper confirmOrderMapper; Resource private SkTokenService skTokenService; Resource ConfirmOrderService confirmOrderService; //SpringBoot3注入不成功 Resource public RocketMQTemplate rocketMQTemplate; SentinelResource(value beforeDoConfirm, blockHandler beforeDoConfirmBlock) Override public Long beforeDoConfirm(ConfirmOrderDoReq req) { req.setMemberId(LoginMemberContext.getId()); Long id null; // 根据前端传值加入排队人数 for (int i 0; i req.getLineNumber() 1; i) { req.setMemberId(LoginMemberContext.getId()); // 校验令牌余量 boolean validSkToken skTokenService.validSkToken(req.getDate(), req.getTrainCode(), LoginMemberContext.getId()); if (validSkToken) { LOG.info(令牌校验通过); } else { LOG.info(令牌校验不通过); throw new BusinessException(BusinessExceptionEnum.CONFIRM_ORDER_SK_TOKEN_FAIL); } //获取车次锁 //RedisKeyPreEnum.CONFIRM_ORDER - DateUtil.formatDate(req.getDate())-req. Date date req.getDate(); String trainCode req.getTrainCode(); String start req.getStart(); String end req.getEnd(); ListConfirmOrderTicketReq tickets req.getTickets(); // 保存确认订单表状态初始 DateTime now DateTime.now(); ConfirmOrder confirmOrder new ConfirmOrder(); confirmOrder.setId(SnowUtil.getSnowflakeNextId()); confirmOrder.setCreateTime(now); confirmOrder.setUpdateTime(now); confirmOrder.setMemberId(req.getMemberId()); confirmOrder.setDate(date); confirmOrder.setTrainCode(trainCode); confirmOrder.setStart(start); confirmOrder.setEnd(end); confirmOrder.setDailyTrainTicketId(req.getDailyTrainTicketId()); confirmOrder.setStatus(ConfirmOrderStatusEnum.INIT.getCode()); confirmOrder.setTickets(JSON.toJSONString(tickets)); confirmOrderMapper.insert(confirmOrder); ConfirmOrderDoReq confirmOrderDoReq new ConfirmOrderDoReq(); confirmOrderDoReq.setDate(req.getDate()); confirmOrderDoReq.setTrainCode(req.getTrainCode()); confirmOrderDoReq.setLogId(MDC.get(LOG_ID)); // 发送MQ排队购票 String reqJson JSON.toJSONString(confirmOrderDoReq); LOG.info(排队购票发送mq开始消息{}, reqJson); //发送的主题,购票的请求转换为string rocketMQTemplate.convertAndSend(RocketMQTopicEnum.CONFIRM_ORDER.getCode(), reqJson); LOG.info(排队购票发送mq结束); confirmOrderService.doConfirm(confirmOrderDoReq); id confirmOrder.getId(); } //返回最后一个id return id; } Override public void beforeDoConfirmBlock(ConfirmOrderDoReq req, BlockException e) { LOG.info(购票请求被限流{}, req); throw new BusinessException(BusinessExceptionEnum.CONFIRM_ORDER_FLOW_EXCEPTION); } }关键再发送MQ之前订单表先保存下来。即使MQ丢失订单表的数据任然存在。优先保证数据的准确性。获取所得操作必须在正常的购票逻辑中。打印日志那票但是拿不到锁拿令牌就有资格购票所以有了排队机制分布式锁是为了防止超卖。MQ就告诉出票模块有订单产生出票模块只关心出票不用关心给谁按照订单出票。后端经常处理大批量数据所以分页会减缓压力分页分页的出票。订单轮询查询用来给正在排队的订单提供一个结果让用户知道前面还有多少个订单。