高并发-【抢红包案例】之四:使用Redis+Lua脚本实现抢红包并异步持久化到数据库
文章目錄
- 導讀
- 概述
- 實現步驟
- 注解方式配置 Redis
- lua腳本和異步持久化功能的開發
- Service層添加Redis搶紅包的邏輯
- Controller層新增路由方法
- 構造模擬數據,測試
- 代碼
- 總結
導讀
高并發-【搶紅包案例】之一:SSM環境搭建及復現紅包超發問題
高并發-【搶紅包案例】之二:使用悲觀鎖方式修復紅包超發的bug
高并發-【搶紅包案例】之三:使用樂觀鎖方式修復紅包超發的bug
概述
上面三篇博文是使用的MySql數據庫來作為數據的載體數據最終會將數據保存到磁盤中,而Redis使用的是內存,內存的速度比磁盤速度肯定要快很多.
對于使用 Redis實現搶紅包,首先需要知道的是Redis的功能不如數據庫強大,事務也不是很完整.因此要保證數據的正確性數據的正確性可以通過嚴格的驗證得以保證。
而 Redis的 Lua 語言是原子性的,且功能更為強大,所以優先選擇使用Lua語言來實現搶紅包。
但是無論如何對于數據而言,在 Redis 當中存儲,始終都不是長久之計 , 因為 Redis并非一個長久儲存數據的地方,更多的時候只是為了提供更為快速的緩存,所以當紅包金額為 0 或者紅包超時的時候(超時操作可以使用定時機制實,這里暫不討論), 會將紅包數據保存到數據庫中, 這樣才能夠保證數據的安全性和嚴格性。
所以本篇博文我們將使用Redis + lua腳本來實現搶紅包的功能。
實現步驟
注解方式配置 Redis
首先在類 RootConfig 上創建一個 RedisTemplate 對象,并將其裝載到 Spring IoC 容器中。
/*** 創建一個 RedisTemplate 對象*/@Bean(name = "redisTemplate")public RedisTemplate initRedisTemplate() {JedisPoolConfig poolConfig = new JedisPoolConfig();// 最大空閑數poolConfig.setMaxIdle(50);// 最大連接數poolConfig.setMaxTotal(100);// 最大等待毫秒數poolConfig.setMaxWaitMillis(20000);// 創建Jedis鏈接工廠JedisConnectionFactory connectionFactory = new JedisConnectionFactory(poolConfig);connectionFactory.setHostName("192.168.31.66");connectionFactory.setPort(6379);// 調用后初始化方法,沒有它將拋出異常connectionFactory.afterPropertiesSet();// 自定Redis序列化器RedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();RedisSerializer stringRedisSerializer = new StringRedisSerializer();// 定義RedisTemplate,并設置連接工廠RedisTemplate redisTemplate = new RedisTemplate();redisTemplate.setConnectionFactory(connectionFactory);// 設置序列化器redisTemplate.setDefaultSerializer(stringRedisSerializer);redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setValueSerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setHashValueSerializer(stringRedisSerializer);return redisTemplate;}這樣 RedisTemplate 就可以在 Spring 上下文中使用了。
注意, JedisConnectionFactory對象在最后的時候需要自行調用 afterPropertiesSet 方法,它實現了 lnitializingBean 接 口。 如果將其配置在 Spring IoC 容器中, Spring 會自動調用它,但是這里我們是自行創建的, 因此需要自行調用,否則在運用的時候會拋出異常。
lua腳本和異步持久化功能的開發
Redis 并不是一個嚴格的事務,而且事務的功能也是有限的 。 加上 Redis 本身的命令也比較有限,功能性不強,為了增強功能性,還可以使用 Lua 語言。
Redis 中的 Lua 語言是一種原子性的操作,可以保證數據的一致性 。
依據這個原理可以避免超發現象,完成搶紅包的功能,而且對于性能而言, Redis 會比數據庫快得多。
第一次運行 Lua 腳本的時候,先在 Redis 中編譯和緩存腳本,這樣就可以得到一個 SHA1字符串,之后通過 SHAl 字符串和參數就能調用 Lua 腳本了
--緩存搶紅包列表信息列表 key local listKey = 'red_packet_list_'..KEYS[1] --當前被搶紅包 key local redPacket = 'red_packet_'..KEYS[1] --獲取當前紅包庫存 local stock = tonumber(redis.call('hget', redPacket, 'stock')) --沒有庫存,返回為 0 if stock <= 0 then return 0 end --庫存減 1 stock = stock-1 --保存當前庫存 redis.call('hset', redPacket, 'stock', tostring(stock)) --往鏈表中加入當前紅包信息 redis.call('rpush', listKey, ARGV[1]) --如果是最后一個紅包,則返回 2 ,表示搶紅包已經結束,需要將列表中的數據保存到數據庫中 if stock == 0 then return 2 end --如果并非最后一個紅包,則返回 l ,表示搶紅包成功 return 1
流程:
- 判斷是否存在可搶的庫存,如果己經沒有可搶奪 的紅包,則返回為 0,結束流程
- 有可搶奪的紅包,對于紅包的庫存減1 ,然后重新設置庫存
- 將搶紅包數據保存到 Redis 的鏈表當中,鏈表的 key 為 red_packet_list_ {id}
- 如果當前庫存為 0 ,那么返回 2,這說明可以觸發數據庫對 Redis 鏈表數據的保存,鏈表的 key 為 red_packet_ list_ {id},它將保存搶紅包的用戶名和搶的時間
- 如果當前庫存不為 0 ,那么將返回 1,這說明搶紅包信息保存成功。
當返回為 2 的時候,說明紅包己經沒有庫存,會觸發數據庫對鏈表數據的保存, 這是一個大數據量的保存。為了不影響最后一次搶紅包的響應,在實際的操作中往往會考慮使用 JMS 消息發送到別的服務器進行操作,我們這里選擇一種簡單的方式來實現,去創建一條新的線程去運行保存 Redis 鏈表數據到數據庫。
那就在Service層寫一個持久到數據庫的服務類吧
接口
package com.artisan.redpacket.service;public interface RedisRedPacketService {/*** 保存redis搶紅包列表* @param redPacketId --搶紅包編號* @param unitAmount -- 紅包金額*/public void saveUserRedPacketByRedis(Long redPacketId, Double unitAmount); }實現類
package com.artisan.redpacket.service.impl;import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List;import javax.sql.DataSource;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.BoundListOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service;import com.artisan.redpacket.pojo.UserRedPacket; import com.artisan.redpacket.service.RedisRedPacketService;@Service public class RedisRedPacketServiceImpl implements RedisRedPacketService {private static final String PREFIX = "red_packet_list_";// 每次取出1000條,避免一次取出消耗太多內存private static final int TIME_SIZE = 1000;@Autowiredprivate RedisTemplate redisTemplate; // RedisTemplate@Autowiredprivate DataSource dataSource; // 數據源@Override// 開啟新線程運行@Asyncpublic void saveUserRedPacketByRedis(Long redPacketId, Double unitAmount) {System.err.println("開始保存數據");Long start = System.currentTimeMillis();// 獲取列表操作對象BoundListOperations ops = redisTemplate.boundListOps(PREFIX + redPacketId);Long size = ops.size();Long times = size % TIME_SIZE == 0 ? size / TIME_SIZE : size / TIME_SIZE + 1;int count = 0;List<UserRedPacket> userRedPacketList = new ArrayList<UserRedPacket>(TIME_SIZE);for (int i = 0; i < times; i++) {// 獲取至多TIME_SIZE個搶紅包信息List userIdList = null;if (i == 0) {userIdList = ops.range(i * TIME_SIZE, (i + 1) * TIME_SIZE);} else {userIdList = ops.range(i * TIME_SIZE + 1, (i + 1) * TIME_SIZE);}userRedPacketList.clear();// 保存紅包信息for (int j = 0; j < userIdList.size(); j++) {String args = userIdList.get(j).toString();String[] arr = args.split("-");String userIdStr = arr[0];String timeStr = arr[1];Long userId = Long.parseLong(userIdStr);Long time = Long.parseLong(timeStr);// 生成搶紅包信息UserRedPacket userRedPacket = new UserRedPacket();userRedPacket.setRedPacketId(redPacketId);userRedPacket.setUserId(userId);userRedPacket.setAmount(unitAmount);userRedPacket.setGrabTime(new Timestamp(time));userRedPacket.setNote("搶紅包 " + redPacketId);userRedPacketList.add(userRedPacket);}// 插入搶紅包信息count += executeBatch(userRedPacketList);}// 刪除Redis列表redisTemplate.delete(PREFIX + redPacketId);Long end = System.currentTimeMillis();System.err.println("保存數據結束,耗時" + (end - start) + "毫秒,共" + count + "條記錄被保存。");}/*** 使用JDBC批量處理Redis緩存數據.* * @param userRedPacketList* -- 搶紅包列表* @return 搶紅包插入數量.*/private int executeBatch(List<UserRedPacket> userRedPacketList) {Connection conn = null;Statement stmt = null;int[] count = null;try {conn = dataSource.getConnection();conn.setAutoCommit(false);stmt = conn.createStatement();for (UserRedPacket userRedPacket : userRedPacketList) {String sql1 = "update T_RED_PACKET set stock = stock-1 where id=" + userRedPacket.getRedPacketId();DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String sql2 = "insert into T_USER_RED_PACKET(red_packet_id, user_id, " + "amount, grab_time, note)"+ " values (" + userRedPacket.getRedPacketId() + ", " + userRedPacket.getUserId() + ", "+ userRedPacket.getAmount() + "," + "'" + df.format(userRedPacket.getGrabTime()) + "'," + "'"+ userRedPacket.getNote() + "')";stmt.addBatch(sql1);stmt.addBatch(sql2);}// 執行批量count = stmt.executeBatch();// 提交事務conn.commit();} catch (SQLException e) {/********* 錯誤處理邏輯 ********/throw new RuntimeException("搶紅包批量執行程序錯誤");} finally {try {if (conn != null && !conn.isClosed()) {conn.close();}} catch (SQLException e) {e.printStackTrace();}}// 返回插入搶紅包數據記錄return count.length / 2;} }注解@Async 表示讓 Spring 自動創建另外一條線程去運行它,這樣它便不在搶最后一個紅包的線程之內。因為這個方法是一個較長時間的方法,如果在同一個線程內,那么對于最后搶紅包的用戶需要等待的時間太長,用戶體驗不好
這里是每次取出 1 000 個搶紅包的信息,之所以這樣做是為了避免取出 的數據過大 , 導致JVM 消耗過多的內存影響系統性能。
對于大批量的數據操作,這是我們在實際操作中要注意的,最后還會刪除 Redis保存的鏈表信息,這樣就幫助 Redis 釋放內存了
對于數據庫的保存 ,這里采用了 JDBC的批量處理,每 1000 條批量保存一次,使用批量有助于性能的提高。
注解@Async 的前提是提供一個任務池給 Spring 環境,這個時候要在原有的基礎上改寫配置類 WebConfig
@EnableAsync public class WebConfig extends AsyncConfigurerSupport { ............@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(5);taskExecutor.setMaxPoolSize(10);taskExecutor.setQueueCapacity(200);taskExecutor.initialize();return taskExecutor;} }使用@EnableAsync 表明支持異步調用,而我們實現了接口 AsyncConfigurerSupport 的getAsyncExecutor 方法,它是獲取一個任務池,當在 Spring 環境中遇到注解@Async就會啟動這個任務池的一條線程去運行對應的方法,這樣便能執行異步了。
Service層添加Redis搶紅包的邏輯
UserRedPacketService接口新增接口方法grapRedPacketByRedis
/*** 通過Redis實現搶紅包* * @param redPacketId* --紅包編號* @param userId* -- 用戶編號* @return 0-沒有庫存,失敗 1--成功,且不是最后一個紅包 2--成功,且是最后一個紅包*/public Long grapRedPacketByRedis(Long redPacketId, Long userId);實現類
@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate RedisRedPacketService redisRedPacketService;// Lua腳本String script = "local listKey = 'red_packet_list_'..KEYS[1] \n" + "local redPacket = 'red_packet_'..KEYS[1] \n"+ "local stock = tonumber(redis.call('hget', redPacket, 'stock')) \n" + "if stock <= 0 then return 0 end \n"+ "stock = stock -1 \n" + "redis.call('hset', redPacket, 'stock', tostring(stock)) \n"+ "redis.call('rpush', listKey, ARGV[1]) \n" + "if stock == 0 then return 2 end \n" + "return 1 \n";// 在緩存LUA腳本后,使用該變量保存Redis返回的32位的SHA1編碼,使用它去執行緩存的LUA腳本[加入這句話]String sha1 = null;@Overridepublic Long grapRedPacketByRedis(Long redPacketId, Long userId) {// 當前搶紅包用戶和日期信息String args = userId + "-" + System.currentTimeMillis();Long result = null;// 獲取底層Redis操作對象Jedis jedis = (Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection();try {// 如果腳本沒有加載過,那么進行加載,這樣就會返回一個sha1編碼if (sha1 == null) {sha1 = jedis.scriptLoad(script);}// 執行腳本,返回結果Object res = jedis.evalsha(sha1, 1, redPacketId + "", args);result = (Long) res;// 返回2時為最后一個紅包,此時將搶紅包信息通過異步保存到數據庫中if (result == 2) {// 獲取單個小紅包金額String unitAmountStr = jedis.hget("red_packet_" + redPacketId, "unit_amount");// 觸發保存數據庫操作Double unitAmount = Double.parseDouble(unitAmountStr);redisRedPacketService.saveUserRedPacketByRedis(redPacketId, unitAmount);}} finally {// 確保jedis順利關閉if (jedis != null && jedis.isConnected()) {jedis.close();}}return result;}這里使用了保存腳本返回 的 SHAl 字符串 ,所以只會發送一次腳本到 Redis 服務器,之后只傳輸 SHAl 字符串和參數到 Redis 就能執行腳本 了, 當腳本返回為 2 的時候, 表示此時所有的紅包都已經被搶光了 ,那么就會觸發 redisRedPacketService 的 saveUserRedPacketByRedis 方法。由于在 saveU serRedPacketByRedis 加入注解@Async , 所以 Spring 會創建一條新的線程去運行它 , 這樣就不會影響最后搶一個紅包用戶 的響應時間了 。
Controller層新增路由方法
@RequestMapping(value = "/grapRedPacketByRedis")@ResponseBodypublic Map<String, Object> grapRedPacketByRedis(Long redPacketId, Long userId) {Map<String, Object> resultMap = new HashMap<String, Object>();Long result = userRedPacketService.grapRedPacketByRedis(redPacketId, userId);boolean flag = result > 0;resultMap.put("result", flag);resultMap.put("message", flag ? "搶紅包成功" : "搶紅包失敗");return resultMap;}構造模擬數據,測試
先在 Redis 上添加紅包信息
127.0.0.1:6379> HMSET red_packet_1 stock 20000 unit_amount 10 OK初始化了一個編號為1 的大紅包,其中庫存為 2 萬個,每個 10 元. 需要保證數據庫的紅包表內也有對應的記錄才可以。
復制個grapByRedis.jsp,測試吧
<%@ page language="java" contentType="text/html; charset=UTF-8"pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>參數</title> <!-- 加載Query文件--> <script type="text/javascript"src="https://code.jquery.com/jquery-3.2.0.js"></script> <script type="text/javascript">$(document).ready(function () {//模擬30000個異步請求,進行并發var max = 30000;for (var i = 1; i <= max; i++) {$.post({//請求搶id為1的紅包//根據自己請求修改對應的url和大紅包編號url: "./userRedPacket/grapRedPacketByRedis.do?redPacketId=1&userId=1",//成功后的方法success: function (result) {console.log("OK")}});}});</script> </head> <body> </body> </html>啟動應用,訪問 http://localhost:8080/ssm_redpacket/grapByRedis.jsp
結合前幾篇的數據統計,使用Redis的方式數據一致性也得到了保證且性能遠遠高于樂觀鎖和悲觀鎖的方式。
代碼
https://github.com/yangshangwei/ssm_redpacket
總結
總結
以上是生活随笔為你收集整理的高并发-【抢红包案例】之四:使用Redis+Lua脚本实现抢红包并异步持久化到数据库的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 高并发-【抢红包案例】之三:使用乐观锁方
- 下一篇: Spring Boot2.x-03Spr