【Redis学习08】Redis消息队列实现异步秒杀
文章目錄
- 1. 消息隊列
- 1.1 基于List結構模擬消息隊列
- 1.2 基于PubSub的消息隊列
- 1.3 基于Stream的消息隊列
- 2. 基于Stream的消息隊列---消費者組
- 2.1 消費者組介紹
- 2.2 消費者監聽消息基本思路
- 2.3 消費者組總結
- 3. 基于Stream的消息隊列--消費者組實現異步秒殺
- 3.1 需求分析
- 3.2 代碼實現
- 3.2.1 創建Stream類型的消息隊列
- 3.2.2 編寫用戶下單資格的lua腳本
- 3.2.3 實現異步秒殺完整代碼
1. 消息隊列
1.1 基于List結構模擬消息隊列
1.2 基于PubSub的消息隊列
1.3 基于Stream的消息隊列
既然上述三種消息隊列都有其不可避免的缺點,那我們有沒有辦法解決呢?
接下來我們介紹的基于Stream的消息隊列—消費者組就能解決上述三種消息隊列的弊端。
2. 基于Stream的消息隊列—消費者組
2.1 消費者組介紹
2.2 消費者監聽消息基本思路
我們分析一下下面的偽代碼:
首先我們嘗試監聽消息隊列,如果消息隊列沒有消息,則continue結束本次循環,重試獲取信息。如果一直沒有消息,則進入阻塞狀態,直到消息隊列存入消息。
當消息隊列有消息后,就開始處理消息,處理完消息后一定要執行ACK命令確認消息已經執行。
如果處理消息出現異常,則將消息存入pendingList(待處理)隊列,程序執行過程中會嘗試沖待處理隊列中取消息,如果待處理隊列沒有數據,直接退出異常處理的循環,反之就處理異常信息。
2.3 消費者組總結
3. 基于Stream的消息隊列–消費者組實現異步秒殺
3.1 需求分析
3.2 代碼實現
3.2.1 創建Stream類型的消息隊列
通過redis客戶端創建名為stream.orders的消息隊列
3.2.2 編寫用戶下單資格的lua腳本
用戶下單資格的lua腳本我們可以在之前的基礎上進行修改,添加訂單id,最后將用戶id,優惠券id,訂單id添加到消息隊列。
-- 1.參數列表 -- 1.1.優惠券id local voucherId = ARGV[1] -- 1.2.用戶id local userId = ARGV[2] -- 1.3.訂單id local orderId = ARGV[3]-- 2.數據key -- 2.1.庫存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2.訂單key local orderKey = 'seckill:order:' .. voucherId-- 3.腳本業務 -- 3.1.判斷庫存是否充足 get stockKey if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2.庫存不足,返回1return 1 end -- 3.2.判斷用戶是否下單 SISMEMBER orderKey userId if(redis.call('sismember', orderKey, userId) == 1) then-- 3.3.存在,說明是重復下單,返回2return 2 end -- 3.4.扣庫存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5.下單(保存用戶)sadd orderKey userId redis.call('sadd', orderKey, userId) -- 3.6.發送消息到隊列中, XADD stream.orders * k1 v1 k2 v2 ... redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 03.2.3 實現異步秒殺完整代碼
@Service @Slf4j public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Autowiredprivate ISeckillVoucherService seckillVoucherService;@Autowiredprivate RedisIdWorker RedisIdWorker;@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}//定義一個線程池,異步執行下單操作private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//spring提供的PostConstruct注解:類初始化完畢就執行@PostConstructpublic void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderRunnable());}//定義處理秒殺的線程,該線程應該在類初始化就應該開始執行任務————如何做到?//使用spring提供的PostConstruct注解:類初始化完畢就執行private class VoucherOrderRunnable implements Runnable{@Overridepublic void run() {String queueName = "stream.orders";while (true){try {//1.獲取消息隊列中的訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAM >//從消息隊列中讀消息,g1組,消費者c1,一次讀一個,阻塞時間2秒 ,從stream.orders隊列讀,>未消費的消息List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//2.判斷消息隊列是否為空if(list==null||list.isEmpty()){//如果為空,則說明沒有消息,進行下一次循環continue;}//解析消息//因為我們一次讀一個,所以索引為0,而我們存入消息隊列的是鍵值對,因此解析出來是mapMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//3.創建訂單createVoucherOrder(voucherOrder);//4. 確認消息,XACK stream.orders,g1,idredisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("處理訂單異常",e);handlePendingList();}}}private void handlePendingList() {String queueName = "stream.orders";while (true){try {//1.獲取PendingList中的訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAM 0//從消息隊列中讀消息,g1組,消費者c1,一次讀一個,阻塞時間2秒 ,List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));//2.判斷消息隊列是否為空if(list==null||list.isEmpty()){//如果為空,則說明PendingList沒有消息,結束循環break;}//解析消息//因為我們一次讀一個,所以索引為0,而我們存入消息隊列的是鍵值對,因此解析出來是mapMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//3.創建訂單createVoucherOrder(voucherOrder);//4. 確認消息,XACK stream.orders,g1,idredisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("處理訂單異常",e);}}}}private void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();RLock lock = redissonClient.getLock("lock:order" + userId);//tryLock的三個參數:最大等待時間,鎖釋放時間,時間單位boolean flag = lock.tryLock();//不設置參數默認不等待,釋放時間三十秒if(!flag){return;}try {//一人一單int count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){return ;}//當更新時查詢的庫存大于0時進行庫存減一boolean success = seckillVoucherService.update().setSql("stock=stock-1").gt("stock", 0).eq("voucher_id", voucherId).update();if (!success) {return;}//6. 創建訂單this.save(voucherOrder);return ;} finally {lock.unlock();}}@Override@Transactionalpublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();long voucherOrderId = RedisIdWorker.nextId("order");//使用lua腳本執行原子級別的操作,不會因為線程阻塞導致釋放鎖發生錯誤。Long result = redisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(),String.valueOf(voucherOrderId));//拆箱int res = result.intValue();//1. 判斷庫存是否大于0和用戶是否已經下單if (res != 0) {return Result.fail(res == 1 ? "庫存不足" : "用戶已下單");}return Result.ok(voucherOrderId);} }讀取消息隊列中的消息使用的方法是lastConsumed,也就是從未消費的第一個消息開始消費。
而讀取待處理隊列中的消息是從隊列第一個開始。
用戶下單到提示用戶下單成功只需要經過下面的程序,比之前同步執行下單方法的速度快了不少。我們可以使用Jmeter進行響應時間的測試。
到這里,我們這一部分通過優惠券秒殺介紹了好多內容:
- 全局唯一ID生成器
- 實現優惠券秒殺下單
- 超賣問題如何解決
- 一人一單如何控制
- 從解決一人一單的悲觀鎖到分布式鎖
- 使用阻塞隊列實現異步秒殺
- 使用消息隊列實現異步秒殺
這一部分到這里就算結局了,看到這里的小伙伴不妨好好回顧一下每一部分是如何完成的,我們又是如何進行優化的。
念念不忘,必有回響!!!
總結
以上是生活随笔為你收集整理的【Redis学习08】Redis消息队列实现异步秒杀的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: axure内联框架和动态面板_解读Axu
- 下一篇: 武汉大学计算机专业考研靠什么,过来人分享