【黑马点评学习笔记 | 实战篇 】| 6-Redis消息队列

【黑马点评学习笔记 | 实战篇 】| 6-Redis消息队列 Bug如山勤为径代码似海苦作舟。友友们好这里是苦瓜大王。今天学习的是黑马点评项目实战篇——Redis消息队列部分的学习之前我们对秒杀用Redisson进行了分布式锁的优化又进行了异步秒杀优化但是当时我们用的是JVM的阻塞队列今天将使用Redis消息队列对异步秒杀进一步优化。笔记如下后续会一直更新黑马点评学习过程中的笔记、问题等请多多支持哦之前学习了基于JVM的阻塞队列来完成异步秒杀但是仍有缺陷内存限制问题在高并发情况下大量订单需要创建时可能导致超出JVM阻塞队列的上限数据安全问题JVM内存没有持久化安全机制每当服务重启或者出现宕机的情况时阻塞队列中的所有订单任务都会丢失这一篇就是在解决这两个问题最佳方案是使用消息队列好处解除耦合提高效率消息队列如何解决上面两个问题消息队列是在JVM以外的独立服务不受JVM内存的限制消息队列会对存储的数据进行持久化服务重启和宕机之后数据也不受影响并且会在消息投递给消费者之后做确认如果没有得到确认依然会存在消息队列里下一次会继续投递让消费者处理直到得到确认确保消息至少被消费一次常见的消息队列kafkaMQ等但是这些都有一定成本小型企业成本越低越好可以利用redis实现消息队列专用的消息队列还是建议学一下文章目录一、消息队列1.基于List实现消息队列2.基于PubSub的消息队列3.基于Stream的消息队列二、Stream的消费者组模式1. 创建消费者组2. 从消费者组读取消息3.确认消息4.查看pending-list5.Java代码的基本思路6.总结三、秒杀优化——基于Stream消息队列实现异步秒杀1. 创建Stream消息队列2. 改造Lua脚本3. 改造业务逻辑一、消息队列Redis实现消息队列1.基于List实现消息队列用BRPOP和BLPOP来实现可以阻塞#监听li这个队列指定监听的阻塞时间为20秒BRPOP l120#往l1存入1、2LPUSH l112#会弹出1#再次获取BRPOP l120#会弹出2#再次获取BRPOP l120#阻塞2.基于PubSub的消息队列天生就是阻塞式的对可靠性要求较高的话不建议使用PubSub3.基于Stream的消息队列是一种全新的数据类型是支持数据的持久化的发送消息读取消息#往s1中存入消息XADD s1 * name jack1773903918442-0#查看消息数量XLEN s11#从第一条消息开始读取s1的1条消息XREAD count1streams s10s11773903918442-0 name jack#读取s1的最新消息XREAD count1streams s1 $#返回nil是因为都读过了没有最新消息了nil#读最新消息0代表永久阻塞什么时候有新消息就什么时候结束xread count1block0streams s1 $可以反复读读完了之后并不会删除消息阻塞式循环读取最新消息可能会出现漏读消息的情况处理消息的过程中如果来了很多条消息只能看到最新那一条下面将讲解更好的消息读取方案防止漏读二、Stream的消费者组模式命令1. 创建消费者组2. 从消费者组读取消息ID配置一般0是异常时候配置消费了但是还没确认是正常情况总是从下一个未消费的消息开始3.确认消息4.查看pending-list#先插入1条数据XADD s1 * name jack#创建消费者组XGROUP CREATE s1 s1_group0#从消费者组中阻塞20秒读取下一个未被消费的信息读出name jackXREADGROUP GROUP s1_group a1 COUNT1BLOCK20000STREAMS s1XREADGROUP GROUP s1_group a1 COUNT1BLOCK20000STREAMS s1#此时再插入1条数据就会读取出clour bluexadd s1 * clour blue#从消费者组中阻塞20秒读取下一个未被消费的信息XREADGROUP GROUP s1_group a1 COUNT1BLOCK20000STREAMS s1#此时再插入1条数据就会读取出age 21xadd s1 * age21#确认前两条消费过的2条消息XACK s1 s1_group1773906757092-01773906757094-0#此时再插入1条数据xadd s1 * sexman#从消费者组中阻塞20秒读取下一个未被消费的消息会读出sex manXREADGROUP GROUP s1_group a1 COUNT1BLOCK20000STREAMS s1#确认消息XACK s1 s1_group 【id】#查看pending—listXPENDING s1 s1_group - 10#从消费者组中阻塞20秒读取下一个消费了但是还没确认的消息会读出age 21XREADGROUP GROUP s1_group a1 COUNT1BLOCK20000STREAMS s1#确认消息XACK s1 s1_group 【id】5.Java代码的基本思路6.总结解决了内存限制不受JVM限制、数据安全持久化机制、消息漏读确认机制确保消息至少消费一次问题stream可以满足中小型企业的需求但是如果公司业务比较庞大对消息队列的要求更加严格建议使用更加专业的消息队列如RabbitMQ等因为stream的持久化依赖于redis的持久化并不是万无一失的还是有消息丢失的风险并且消息确认机制只支持消费者确认而不支持生产者的确认机制生产者在发消息的时候丢失了就无法处理另外还有事务机制、多消费者下的事务有序性等要解决三、秒杀优化——基于Stream消息队列实现异步秒杀1. 创建Stream消息队列#直接创建消费组顺便把队列创建了XGROUP CREATE stream.orders g10MKSTREAM OK2. 改造Lua脚本修改脚本认定有抢购资格之后向stream.orders添加消息-- 1.参数列表-- 1.1.优惠券idlocalvoucherIdARGV[1]-- 1.2.用户idlocaluserIdARGV[2]-- 1.3.订单idlocalorderIdARGV[3]-- 2.数据key-- 2.1.库存keylocalstockKeyseckill:stock:..voucherId-- 2.2.订单keylocalorderKeyseckill:order:..voucherId-- 3.脚本业务-- 3.1.判断库存是否充足 get stockKeyif(tonumber(redis.call(get, stockKey)) 0) then-- 3.2.库存不足返回1return1end-- 3.2.判断用户是否下单 SISMEMBER orderKey userIdif(redis.call(sismember, orderKey, userId) 1) then-- 3.3.存在说明是重复下单返回2return2end-- 3.4.扣库存 incrby stockKey -1redis.call(incrby, stockKey, -1)-- 3.5.下单保存用户sadd orderKey userIdredis.call(sadd,orderKey,userId)-- 3.6.发送消息到队列中 XADD stream.orders * k1 v1 k2 v2 ...redis.call(xadd,stream.orders,*,userId,userId,voucherId,voucherId,id,orderId)return03. 改造业务逻辑修改VoucherOrderServiceImpl中的seckillVoucher方法发消息publicResultseckillVoucher(LongvoucherId){//1.执行Lua脚本//优惠券idLonguserIdUserHolder.getUser().getId();//订单idlongorderIdredisIdWorker.nextId(order);LongresultstringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),//脚本的key参数为空所以传一个空集合voucherId.toString(),//以字符串形式传具体看execute的参数列表userId.toString(),String.valueOf(orderId));//2.判断结果是否为零if(result.intValue()!0)//2.1不为零没有购买资格returnResult.fail(result.intValue()1?库存不足:不能重复下单);//3.获取代理对象因为子线程无法获取父线程的代理//获取代理对象初始化proxy(IVoucherOrderService)AopContext.currentProxy();// 4. 返回订单idreturnResult.ok(orderId);}获取消息完成下单VoucherOrderHandler线程代码privateclassVoucherOrderHandlerimplementsRunnable{Overridepublicvoidrun(){while(true){try{// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 ListMapRecordString,Object,ObjectliststringRedisTemplate.opsForStream().read(Consumer.from(g1,c1),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(stream.orders,ReadOffset.lastConsumed()));// 2.判断订单信息是否为空if(listnull||list.isEmpty()){// 如果为null说明没有消息继续下一次循环continue;}// 解析数据MapRecordString,Object,Objectrecordlist.get(0);MapObject,Objectvaluerecord.getValue();VoucherOrdervoucherOrderBeanUtil.fillBeanWithMap(value,newVoucherOrder(),true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge(s1,g1,record.getId());}catch(Exceptione){log.error(处理订单异常,e);//处理异常消息handlePendingList();}}}privatevoidhandlePendingList(){while(true){try{// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0ListMapRecordString,Object,ObjectliststringRedisTemplate.opsForStream().read(Consumer.from(g1,c1),StreamReadOptions.empty().count(1),StreamOffset.create(stream.orders,ReadOffset.from(0)));// 2.判断订单信息是否为空if(listnull||list.isEmpty()){// 如果为null说明pending-list中没有异常消息结束循环break;}// 解析数据MapRecordString,Object,Objectrecordlist.get(0);MapObject,Objectvaluerecord.getValue();VoucherOrdervoucherOrderBeanUtil.fillBeanWithMap(value,newVoucherOrder(),true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge(s1,g1,record.getId());}catch(Exceptione){log.error(处理pendding-list订单异常,e);//担心处理太频繁try{Thread.sleep(20);}catch(Exceptione){e.printStackTrace();}}}}}优化后VoucherOrderServiceImpl完整代码Slf4jServicepublicclassVoucherOrderServiceImplextendsServiceImplVoucherOrderMapper,VoucherOrderimplementsIVoucherOrderService{/** * 秒杀优惠券 * param voucherId 优惠券id * return 订单id */ResourceprivateISeckillVoucherServiceseckillVoucherService;ResourceprivateRedisIdWorkerredisIdWorker;ResourceprivateStringRedisTemplatestringRedisTemplate;//注入redissonResourceprivateRedissonClientredissonClient;privatestaticfinalDefaultRedisScriptLongSECKILL_SCRIPT;//在静态代码块里初始化//只会在类加载的时候执行一次所以不会浪费资源static{SECKILL_SCRIPTnewDefaultRedisScript();//为了避免硬编码指定lua脚本的位置SECKILL_SCRIPT.setLocation(newClassPathResource(seckill.lua));//设置返回值类型为LongSECKILL_SCRIPT.setResultType(Long.class);}/*创建阻塞队列 private BlockingQueueVoucherOrder orderTasks new ArrayBlockingQueue(1024*1024);*///创建线程池privatestaticfinalExecutorServiceSECKILL_ORDER_EXECUTORExecutors.newSingleThreadExecutor();//当前类初始化完毕之后就来执行PostConstructprivatevoidinit(){SECKILL_ORDER_EXECUTOR.submit(newVoucherOrderHandler());}//创建一个runnableprivateclassVoucherOrderHandlerimplementsRunnable{Overridepublicvoidrun(){while(true){try{// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 ListMapRecordString,Object,ObjectliststringRedisTemplate.opsForStream().read(Consumer.from(g1,c1),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(stream.orders,ReadOffset.lastConsumed()));// 2.判断订单信息是否为空if(listnull||list.isEmpty()){// 如果为null说明没有消息继续下一次循环continue;}// 解析数据MapRecordString,Object,Objectrecordlist.get(0);MapObject,Objectvaluerecord.getValue();VoucherOrdervoucherOrderBeanUtil.fillBeanWithMap(value,newVoucherOrder(),true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge(s1, g1, record.getId());}catch(Exceptione){log.error(处理订单异常,e);//处理异常消息handlePendingList();}}}privatevoidhandlePendingList(){while(true){try{// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0ListMapRecordString,Object,ObjectliststringRedisTemplate.opsForStream().read(Consumer.from(g1,c1),StreamReadOptions.empty().count(1),StreamOffset.create(stream.orders,ReadOffset.from(0)));// 2.判断订单信息是否为空if(listnull||list.isEmpty()){// 如果为null说明pending-list中没有异常消息结束循环break;}// 解析数据MapRecordString,Object,Objectrecordlist.get(0);MapObject,Objectvaluerecord.getValue();VoucherOrdervoucherOrderBeanUtil.fillBeanWithMap(value,newVoucherOrder(),true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge(s1, g1, record.getId());}catch(Exceptione){log.error(处理pendding-list订单异常,e);//担心处理太频繁try{Thread.sleep(20);}catch(Exceptionb){b.printStackTrace();}}}}}privatevoidhandleVoucheOrder(VoucherOrdervoucherOrder){LonguserIdvoucherOrder.getId();//锁定的范围是用户idRLocklockredissonClient.getLock(lock:order:userId);//获取锁//这一段只是兜底其实不做也没问题booleanisLocklock.tryLock();//判断是否获取成功if(!isLock){//获取锁失败,输出错误log.info(不允许重复下单);}try{//拿到那个现成的代理对象proxy.createVoucherOrder(voucherOrder);}finally{lock.unlock();}}//成员变量方便子线程获取privateIVoucherOrderServiceproxy;publicResultseckillVoucher(LongvoucherId){//1.执行Lua脚本//优惠券idLonguserIdUserHolder.getUser().getId();//订单idlongorderIdredisIdWorker.nextId(order);LongresultstringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),//脚本的key参数为空所以传一个空集合voucherId.toString(),//以字符串形式传具体看execute的参数列表userId.toString(),String.valueOf(orderId));//2.判断结果是否为零if(result.intValue()!0)//2.1不为零没有购买资格returnResult.fail(result.intValue()1?库存不足:不能重复下单);//3.获取代理对象因为子线程无法获取父线程的代理//获取代理对象初始化proxy(IVoucherOrderService)AopContext.currentProxy();// 4. 返回订单idreturnResult.ok(orderId);}TransactionalpublicvoidcreateVoucherOrder(VoucherOrdervoucherOrder){//异步的所以不能通过ThreadLocal来获取了LonguserIdvoucherOrder.getId();//4.一人一单//4.1 查询订单intcountquery().eq(user_id,userId).eq(voucher_id,voucherOrder.getVoucherId()).count();//4.2 判断是否存在if(count0)log.info(您已经购买过一次了);//5. 扣减库存booleansuccessseckillVoucherService.update().setSql(stock stock -1).eq(voucher_id,voucherOrder.getVoucherId()).gt(stock,0).update();//where id ? and stock 0if(!success)//扣减失败log.info(库存不足);//6.订单写入数据库save(voucherOrder);}}压测查看在高并发情况下的功能、性能如何保证了库存不会超卖、订单不会超出、一人一单解决了集群下线程安全问题性能不错以上就是黑马点评实战篇——Redis消息队列部分的学习笔记仅供参考多多支持