生活随笔 
收集整理的這篇文章主要介紹了
                                
聊聊Redis消息队列-实现异步秒杀 
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.                        
 
                                
                            一、前言  
消息隊列(Message Queue), 字面意思就是存放消息的隊列,最簡單的消息隊列模型包括3個角色:
 
消息隊列:存儲和管理消息,也被稱為消息代理(Message Broker); 生產者:發送消息到消息隊列; 消費者:從消息隊列獲取消息并處理消息。 list結構:基于List結構模擬消息隊列; PubSub: 基本的點對點消息模型; Stream: 比較完善的消息隊列模型 二、基于List結構模擬消息隊列  
消息隊列(Message Queue),字面意思就是存放消息的隊列。而Redis的list數據結構是一個雙向鏈表,很容易模擬出隊列效果。
 
2.1 基于List的消息隊列有哪些優缺點?  
優點: 利用Redis存儲,不受限于JVM內存上限; 基于Redis的持久化機制,數據安全性有保證; 可以滿足消息有序性;  缺點:   三、基于PubSub的消息隊列  
PubSub(發布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個channel,生產者向對應channel發送消息后,所有訂閱者都能收到相關消息。
 
SUBSCCRIBE channel [channel]: 訂閱一個或多個頻道; PUBLISH channel msg: 向一個頻道發送消息; PSUBSCRIBE pattern[pattern]: 訂閱與pattern格式匹配的所有頻道 3.1 基于PubSub的消息隊列有哪些優缺點?  
優點:   缺點: 不支持數據持久化; 無法避免消息丟失; 消息堆積有上限,超出時數據丟失  四、基于Stream的消息隊列  
Stream 是 Redis 5.0 引入的一種新數據類型,可以實現一個功能非常完善的消息隊列;
 
發送消息的命令: 讀取消息的方式之一:XREAD XREAD阻塞方式,讀取最新的消息: 4.1 STREAM類型消息隊列的XREAD命令特點:  
消息可回溯; 一個消息可以被多個消費者讀取; 可以阻塞讀取; 有消息漏讀的風險 五、基于Stream的消息隊列-消費者組  
消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:
 
消息分流:隊列中的消息會分流給組內的不同消費者,而不是重復的消費,從而加快消息處理的速度; 消息標示:消費者組會維護一個標識,記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標識之后讀取消息。確保每一個消息都會被消費; 消息確認:消費者獲取消費后,消息處于pending狀態,并存入一個pending-list。當處理完成后需要通過XACK來確認消息,標記消息為已處理,才會從pending-list移除。  
通俗的講,就是多個消費者在一個隊列中處于競爭關系,多個消費者來處理隊列消息,加快消息處理的速度。而且消費者組會給消息加上一個標識,記錄最新讀到的消息。如果中途消息處理完未提交,消息還會進入pending狀態。進入pending-list中,不會造成數據的丟失。
 
5.1 STREAM類型消息隊列的XREADGROUP命令特點:  
消息可回溯; 可以多消費者爭搶消息,加快消費速度; 可以阻塞讀取; 沒有消息漏讀的風險; 有消息確認機制,保證消息至少被消費一次 總結
 
 
 
六、案例  
 基于Redis的Stream結構作為消息隊列,實現異步秒殺下單
 
 
需求:
 
 
local  voucherId 
=  ARGV
[ 1 ] 
local  userId 
=  ARGV
[ 2 ] 
local  orderId 
=  ARGV
[ 3 ] 
local  stockKey 
=  "seckill:stock:"  ..  voucherId
local  orderKey 
=  "seckill:order:"  ..  userId
local  stock 
=  redis
. call ( 'get' ,  stockKey
) 
local  stockNumber 
=  tonumber ( stock
) 
if  ( stockNumber 
<=  0 )  then return  1 
end 
if  ( redis
. call ( 'sismember' ,  orderKey
,  userId
)  ==  1 )  then return  2 
end 
redis
. call ( 'incrby' ,  stockKey
,  - 1 ) 
redis
. call ( 'sadd' ,  orderKey
,  userId
) 
redis
. call ( 'xadd' ,  'stream.orders' ,  '*' ,  'userId' ,  userId
,  'voucherId' ,  voucherId
,  'id' ,  orderId
) 
return  0 
 
 
private  IVoucherOrderService  proxy
; private  static  final  DefaultRedisScript < Long > SECKILL_SCRIPT ; static  { SECKILL_SCRIPT  =  new  DefaultRedisScript < > ( ) ; SECKILL_SCRIPT . setLocation ( new  ClassPathResource ( "seckill.lua" ) ) ; SECKILL_SCRIPT . setResultType ( Long . class ) ; } private  final  BlockingQueue < VoucherOrder >  orderTasks 
=  new  ArrayBlockingQueue < > ( 1024  *  1024 ) ; private  static  final  ExecutorService  SECKILL_ORDER_EXECUTOR  =  Executors . newSingleThreadExecutor ( ) ; @PostConstruct private  void  init ( )  { SECKILL_ORDER_EXECUTOR . submit ( new  VoucherOrderHandler ( ) ) ; } private  class  VoucherOrderHandler  implements  Runnable  { String  queueName 
=  "stream.orders" ; @Override public  void  run ( )  { while  ( true )  { try  { List < MapRecord < String ,  Object ,  Object > >  list 
=  stringRedisTemplate
. opsForStream ( ) . read ( Consumer . from ( "g1" ,  "c1" ) , StreamReadOptions . empty ( ) . count ( 1 ) . block ( Duration . ofSeconds ( 2 ) ) , StreamOffset . create ( queueName
,  ReadOffset . lastConsumed ( ) ) ) ; if  ( CollectionUtils . isEmpty ( list
) )  { continue ; } MapRecord < String ,  Object ,  Object >  record 
=  list
. get ( 0 ) ; Map < Object ,  Object >  values 
=  record
. getValue ( ) ; VoucherOrder  voucherOrder 
=  BeanUtil . fillBeanWithMap ( values
,  new  VoucherOrder ( ) ,  true ) ; handleVoucherOrder ( voucherOrder
) ; stringRedisTemplate
. opsForStream ( ) . acknowledge ( queueName
,  "g1" ,  record
. getId ( ) ) ; }  catch  ( Exception  e
)  { log
. error ( "處理訂單異常" ,  e
) ; handlePendingList ( ) ; } } } private  void  handlePendingList ( )  { while  ( true )  { try  { List < MapRecord < String ,  Object ,  Object > >  list 
=  stringRedisTemplate
. opsForStream ( ) . read ( Consumer . from ( "g1" ,  "c1" ) , StreamReadOptions . empty ( ) . count ( 1 ) , StreamOffset . create ( queueName
,  ReadOffset . from ( "0" ) ) ) ; if  ( CollectionUtils . isEmpty ( list
) )  { break ; } MapRecord < String ,  Object ,  Object >  record 
=  list
. get ( 0 ) ; Map < Object ,  Object >  values 
=  record
. getValue ( ) ; VoucherOrder  voucherOrder 
=  BeanUtil . fillBeanWithMap ( values
,  new  VoucherOrder ( ) ,  true ) ; handleVoucherOrder ( voucherOrder
) ; stringRedisTemplate
. opsForStream ( ) . acknowledge ( queueName
,  "g1" ,  record
. getId ( ) ) ; }  catch  ( Exception  e
)  { log
. error ( "處理訂單異常" ,  e
) ; try  { TimeUnit . MILLISECONDS . sleep ( 20 ) ; }  catch  ( InterruptedException  ex
)  { ex
. printStackTrace ( ) ; } } } } } private  void  handleVoucherOrder ( VoucherOrder  voucherOrder
)  { Long  userId 
=  voucherOrder
. getUserId ( ) ; RLock  lock 
=  redissonClient
. getLock ( "lock:order:"  +  userId
) ; boolean  isLock 
=  lock
. tryLock ( ) ; if  ( ! isLock
)  { log
. error ( "不允許重復下單" ) ; return ; } try  { proxy
. createVoucherOrder ( voucherOrder
) ; }  finally  { lock
. unlock ( ) ; } } @Override public  Result  seckillVoucher ( Long  voucherId
)  { Long  userId 
=  UserHolder . getUser ( ) . getId ( ) ; Long  orderId 
=  redisIdWorker
. nextId ( "orderId" ) ; Long  result 
=  stringRedisTemplate
. execute ( SECKILL_SCRIPT , Collections . emptyList ( ) , voucherId
. toString ( ) , userId
. toString ( ) , String . valueOf ( orderId
) ) ; int  r 
=  Objects . requireNonNull ( result
) . intValue ( ) ; if  ( r 
!=  0 )  { return  Result . fail ( r 
==  1  ?  "庫存不足"  :  "不能重復下單" ) ; } VoucherOrder  voucherOrder 
=  VoucherOrder . builder ( ) . id ( orderId
) . userId ( userId
) . voucherId ( voucherId
) . build ( ) ; orderTasks
. add ( voucherOrder
) ; proxy 
=  ( IVoucherOrderService )  AopContext . currentProxy ( ) ; return  Result . ok ( orderId
) ; } 
                            總結 
                            
                                以上是生活随笔 為你收集整理的聊聊Redis消息队列-实现异步秒杀 的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                            
                                如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。