javascript
Spring Boot Redis 入门
本文,我們基于 Spring Boot 2.X 版本。
1. 概述
在快速入門 Spring Boot 整合 Redis 之前,我們先來做個簡單的了解。在 Spring 的生態中,我們使用?Spring Data Redis?來實現對 Redis 的數據訪問。
可能這個時候,會有胖友會有疑惑,市面上已經有 Jedis、Redisson、Lettuce 等優秀的 Java Redis 工具庫,為什么還要有 Spring Data Redis 呢?學不動了,頭都要禿了!不要慌,我們先來看一張圖:
- 對于下層,Spring Data Redis 提供了統一的操作模板(后文中,我們會看到是 RedisTemplate 類),封裝了 Jedis、Lettuce 的 API 操作,訪問 Redis 數據。所以,實際上,Spring Data Redis 內置真正訪問的實際是 Jedis、Lettuce 等 API 操作。
- 對于上層,開發者學習如何使用 Spring Data Redis 即可,而無需關心 Jedis、Lettuce 的 API 操作。甚至,未來如果我們想將 Redis 訪問從 Jedis 遷移成 Lettuce 來,無需做任何的變動。? 相信很多胖友,在選擇 Java Redis 工具庫,也是有過煩惱的。
- 目前,Spring Data Redis 暫時只支持 Jedis、Lettuce 的內部封裝,而 Redisson 是由?redisson-spring-data?來提供。
OK ,嗶嗶結束,我們先來快速上手下 Spring Data Redis 的使用。
2. 快速入門
示例代碼對應倉庫:spring-data-redis-with-jedis?。
感興趣的胖友可以看看?https://mvnrepository.com/artifact/redis.clients/jedis?地址,會發現 2016 年到 2018 年的 Jedis 更新頻率。所幸,2018 年底又突然復活了。
同時,艿艿目前使用的?SkyWalking?中間件,暫時只支持 Jedis 的自動化的追蹤,那么更加考慮使用 Jedis 啦。
這里在分享一個?Jedis 和 Lettuce?的對比討論。
2.1 引入依賴
在?pom.xml?文件中,引入相關依賴。
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.3.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent><dependencies><!-- 實現對 Spring Data Redis 的自動化配置 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><!-- 去掉對 Lettuce 的依賴,因為 Spring Boot 優先使用 Lettuce 作為 Redis 客戶端 --><exclusion><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId></exclusion></exclusions></dependency><!-- 引入 Jedis 的依賴,這樣 Spring Boot 實現對 Jedis 的自動化配置 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><!-- 方便等會寫單元測試 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- 等會示例會使用 fastjson 作為 JSON 序列化的工具 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.61</version></dependency><!-- Spring Data Redis 默認使用 Jackson 作為 JSON 序列化的工具 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency></dependencies>具體每個依賴的作用,胖友自己認真看下艿艿添加的所有注釋噢。
2.2 配置文件
在?application.yml?中,添加 Redis 配置,如下:
spring:# 對應 RedisProperties 類redis:host: 127.0.0.1port: 6379password: # Redis 服務器密碼,默認為空。生產中,一定要設置 Redis 密碼!database: 0 # Redis 數據庫號,默認為 0 。timeout: 0 # Redis 連接超時時間,單位:毫秒。# 對應 RedisProperties.Jedis 內部類jedis:pool:max-active: 8 # 連接池最大連接數,默認為 8 。使用負數表示沒有限制。max-idle: 8 # 默認連接數最小空閑的連接數,默認為 8 。使用負數表示沒有限制。min-idle: 0 # 默認連接池最小空閑的連接數,默認為 0 。允許設置 0 和 正數。max-wait: -1 # 連接池最大阻塞等待時間,單位:毫秒。默認為 -1 ,表示不限制。具體每個參數的作用,胖友自己認真看下艿艿添加的所有注釋噢。
2.3 簡單測試
創建?Test01?測試類,我們來測試一下簡單的 SET 指令。代碼如下:
@RunWith(SpringRunner.class) @SpringBootTest public class Test01 {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Testpublic void testStringSetKey() {stringRedisTemplate.opsForValue().set("yunai", "shuai");} }通過 StringRedisTemplate 類,我們進行了一次 Redis SET 指令的執行。關于 StringRedisTemplate 是什么,我們先賣個關子,在?「2.4 RedisTemplate」?中來介紹。
我們先來執行下?#testStringSetKey()?方法這個測試方法。執行完成后,我們在控制臺查詢,看看是否真的執行成功了。
$ redis-cli get yunai "shuai"- 請大聲的告訴我,Redis 是怎么夸獎?"yunai"?的,哈哈哈哈。
2.4 RedisTemplate
org.springframework.data.redis.core.RedisTemplate<K, V>?類,從類名上,我們就明明白白知道,提供 Redis 操作模板 API 。核心屬性如下:
// RedisTemplate.java // 艿艿省略了一些不重要的屬性。// <1> 序列化相關屬性 @SuppressWarnings("rawtypes") private @Nullable RedisSerializer keySerializer = null; @SuppressWarnings("rawtypes") private @Nullable RedisSerializer valueSerializer = null; @SuppressWarnings("rawtypes") private @Nullable RedisSerializer hashKeySerializer = null; @SuppressWarnings("rawtypes") private @Nullable RedisSerializer hashValueSerializer = null; private RedisSerializer<String> stringSerializer = RedisSerializer.string();// <2> Lua 腳本執行器 private @Nullable ScriptExecutor<K> scriptExecutor;// <3> 常見數據結構操作類 // cache singleton objects (where possible) private @Nullable ValueOperations<K, V> valueOps; private @Nullable ListOperations<K, V> listOps; private @Nullable SetOperations<K, V> setOps; private @Nullable ZSetOperations<K, V> zSetOps; private @Nullable GeoOperations<K, V> geoOps; private @Nullable HyperLogLogOperations<K, V> hllOps;- <1>?處,看到了四個序列化相關的屬性,用于 KEY 和 VALUE 的序列化。
- 例如說,我們在使用 POJO 對象存儲到 Redis 中,一般情況下,會使用 JSON 方式序列化成字符串,存儲到 Redis 中。詳細的,我們在?「3. 序列化」?小節中來說明。
- 在上文中,我們看到了?org.springframework.data.redis.core.StringRedisTemplate?類,它繼承 RedisTemplate 類,使用?org.springframework.data.redis.serializer.StringRedisSerializer?字符串序列化方式。直接點開 StringRedisSerializer 源碼,看下它的構造方法,瞬間明明白白。
- <2>?處,Lua 腳本執行器,提供?Redis scripting?API 操作。
- <3>?處,Redis 常見數據結構操作類。
- ValueOperations?類,提供?Redis String?API 操作。
- ListOperations?類,提供?Redis List?API 操作。
- SetOperations?類,提供?Redis Set?API 操作。
- ZSetOperations?類,提供?Redis ZSet(Sorted Set)?API 操作。
- GeoOperations?類,提供?Redis Geo?API 操作。
- HyperLogLogOperations?類,提供?Redis HyperLogLog?API 操作。
那么 Pub/Sub、Transaction、Pipeline、Keys、Cluster、Connection 等相關的 API 操作呢?它在 RedisTemplate 自身提供,因為它們不屬于具體每一種數據結構,所以沒有封裝在對應的 Operations 類中。哈哈哈,胖友打開?RedisTemplate?類,去瞅瞅,妥妥的明白。
3. 序列化
艿艿:為了盡量把序列化說的清楚一些,所以本小節內容會略長。
因為有些地方,直接擼源碼,比嚇嗶嗶一段話更易懂,所以會有一些源碼,保持淡定。
3.1 RedisSerializer
org.springframework.data.redis.serializer.RedisSerializer?接口,Redis 序列化接口,用于 Redis KEY 和 VALUE 的序列化。簡化代碼如下:
// RedisSerializer.java public interface RedisSerializer<T> {@Nullablebyte[] serialize(@Nullable T t) throws SerializationException;@NullableT deserialize(@Nullable byte[] bytes) throws SerializationException;}- 定義了對象?<T>?和二進制數組的轉換。
- 啊,可能有胖友會有疑惑了:我們在?redis-cli?終端,看到的不都是字符串么,怎么這里是序列化成二進制數組呢?實際上,Redis Client 傳遞給 Redis Server 是傳遞的 KEY 和 VALUE 都是二進制值數組。好奇的胖友,可以打開 Jedis?Connection#sendCommand(final Command cmd, final byte[]... args)?方法,傳入的參數就是二進制數組,而?cmd?命令也會被序列化成二進制數組。
RedisSerializer 的實現類,如下圖:
主要分成四類:
- JDK 序列化方式
- String 序列化方式
- JSON 序列化方式
- XML 序列化方式
3.1.1 JDK 序列化方式
org.springframework.data.redis.serializer.JdkSerializationRedisSerializer?,默認情況下,RedisTemplate 使用該數據列化方式。具體的,可以看看?RedisTemplate#afterPropertiesSet()?方法,在 RedisTemplate 未設置序列化的情況下,使用 JdkSerializationRedisSerializer 作為序列化實現。在 Spring Boot 自動化配置 RedisTemplate Bean 對象時,就未設置。
絕大多數情況下,可能 99.9999% ,我們不會使用 JdkSerializationRedisSerializer 進行序列化。為什么呢?我們來看一個示例,代碼如下:
// Test01.java @RunWith(SpringRunner.class) @SpringBootTest public class Test01 {@Autowiredprivate RedisTemplate redisTemplate;@Testpublic void testStringSetKey02() {redisTemplate.opsForValue().set("yunai", "shuai");}}我們先來執行下?#testStringSetKey02()?方法這個測試方法。注意,此處我們使用的是 RedisTemplate 而不是 StringRedisTemplate 。執行完成后,我們在控制臺查詢,看看是否真的執行成功了。
# 在 `redis-cli` 終端中127.0.0.1:6379> scan 0 1) "0" 2) 1) "\xac\xed\x00\x05t\x00\x05yunai"127.0.0.1:6379> get "\xac\xed\x00\x05t\x00\x05yunai" "\xac\xed\x00\x05t\x00\x05shuai"-
具體為什么是這樣一串奇怪的 16 進制,胖友可以看看?ObjectOutputStream#writeString(String str, boolean unshared)?的代碼,實際就是標志位 + 字符串長度 + 字符串內容。
對于 KEY 被序列化成這樣,我們線上通過 KEY 去查詢對應的 VALUE 勢必會非常不方便,所以 KEY 肯定是不能被這樣序列化的。
對于 VALUE 被序列化成這樣,除了閱讀可能困難一點,不支持跨語言外,實際上也沒啥問題。不過,實際線上場景,還是使用 JSON 序列化居多。
3.1.2 String 序列化方式
①?org.springframework.data.redis.serializer.StringRedisSerializer?,字符串和二進制數組的直接轉換。代碼如下:
// StringRedisSerializer.javaprivate final Charset charset;@Override public String deserialize(@Nullable byte[] bytes) {return (bytes == null ? null : new String(bytes, charset)); }@Override public byte[] serialize(@Nullable String string) {return (string == null ? null : string.getBytes(charset)); }- 是不是很直接簡單。
絕大多數情況下,我們 KEY 和 VALUE 都會使用這種序列化方案。而 VALUE 的序列化和反序列化,自己在邏輯調用 JSON 方法去序列化。為什么呢?繼續往下看。
②?org.springframework.data.redis.serializer.GenericToStringSerializer<T>?,使用 Spring?ConversionService?實現?<T>?對象和 String 的轉換,從而 String 和二進制數組的轉換。
例如說,序列化的過程,首先?<T>?對象通過 ConversionService 轉換成 String ,然后 String 再序列化成二進制數組。反序列化的過程,胖友自己結合源碼思考下 ? 。
當然,GenericToStringSerializer 貌似基本不會去使用,所以不用去了解也問題不大,哈哈哈。
3.1.3 JSON 序列化方式
①?org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer?,使用 Jackson 實現 JSON 的序列化方式,并且從 Generic 單詞可以看出,是支持所有類。怎么體現呢?參見構造方法的代碼:
// GenericJackson2JsonRedisSerializer.javapublic GenericJackson2JsonRedisSerializer(@Nullable String classPropertyTypeName) {this(new ObjectMapper());// simply setting {@code mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)} does not help here since we need// the type hint embedded for deserialization using the default typing feature.mapper.registerModule(new SimpleModule().addSerializer(new NullValueSerializer(classPropertyTypeName)));// <1> if (StringUtils.hasText(classPropertyTypeName)) {mapper.enableDefaultTypingAsProperty(DefaultTyping.NON_FINAL, classPropertyTypeName);// <2> } else {mapper.enableDefaultTyping(DefaultTyping.NON_FINAL, As.PROPERTY);} }- <1>?處,如果傳入了?classPropertyTypeName?屬性,就是使用使用傳入對象的?classPropertyTypeName?屬性對應的值,作為默認類型(Default Typing)。
- <2>?處,如果未傳入?classPropertyTypeName?屬性,則使用傳入對象的類全名,作為默認類型(Default Typing)。
那么,胖友可能會問題,什么是**默認類型(Default Typing)**呢?我們來思考下,在將一個對象序列化成一個字符串,怎么保證字符串反序列化成對象的類型呢?Jackson 通過 Default Typing ,會在字符串多冗余一個類型,這樣反序列化就知道具體的類型了。來舉個例子,使用我們等會示例會用到的?UserCacheObject?類。
-
標準序列化的結果,如下:
{"id": 1,"name": "芋道源碼","gender": 1 }?
-
使用 Jackson Default Typing 機制序列化的結果,如下:
{"@class": "cn.iocoder.springboot.labs.lab10.springdatarediswithjedis.cacheobject.UserCacheObject","id": 1,"name": "芋道源碼","gender": 1}- 看?@class?屬性,反序列化的對象的類型不就有了么?
下面我們來看一個 GenericJackson2JsonRedisSerializer 的示例。在看之前,胖友先跳到?「3.2 配置序列化方式」?小節,來看看如何配置 GenericJackson2JsonRedisSerializer 作為 VALUE 的序列化方式。然后,馬上調回到此處。
示例代碼如下:
// Test01.java@Autowired private RedisTemplate redisTemplate;@Test public void testStringSetKeyUserCache() {UserCacheObject object = new UserCacheObject().setId(1).setName("芋道源碼").setGender(1); // 男String key = String.format("user:%d", object.getId());redisTemplate.opsForValue().set(key, object); }@Test public void testStringGetKeyUserCache() {String key = String.format("user:%d", 1);Object value = redisTemplate.opsForValue().get(key);System.out.println(value); }胖友分別執行?#testStringSetKeyUserCache()?和?#testStringGetKeyUserCache()?方法,然后對著 Redis 的結果看看,比較簡單,就不多嗶嗶了。
我們在回過頭來看看?@class?屬性,它看似完美解決了反序列化后的對象類型,但是帶來 JSON 字符串占用變大,所以實際項目中,我們也并不會采用 Jackson2JsonRedisSerializer 類。
②?org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer<T>?,使用 Jackson 實現 JSON 的序列化方式,并且顯示指定?<T>?類型。代碼如下:
// Jackson2JsonRedisSerializer.java public class Jackson2JsonRedisSerializer<T> implements RedisSerializer<T> {// ... 省略不重要的代碼public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;/*** 指定類型,和 <T> 要一致。*/private final JavaType javaType;private ObjectMapper objectMapper = new ObjectMapper();}因為 Jackson2JsonRedisSerializer 序列化類里已經聲明了類型,所以序列化的 JSON 字符串,無需在存儲一個?@class?屬性,用于存儲類型。
但是,我們摳腳一想,如果使用 Jackson2JsonRedisSerializer 作為序列化實現類,那么如果我們類型比較多,豈不是每個類型都要定義一個 RedisTemplate Bean 了?!所以實際場景下,我們也并不會使用 Jackson2JsonRedisSerializer 類。?
注意,GenericFastJsonRedisSerializer 不是 Spring Data Redis 內置實現,而是由?FastJSON 自己實現。
3.1.4 XML 序列化方式
org.springframework.data.redis.serializer.OxmSerializer?,使用 Spring?OXM?實現將對象和 String 的轉換,從而 String 和二進制數組的轉換。
因為 XML 序列化方式,暫時沒有這么干過,我自己也沒有,所以就直接忽略它吧。?
3.2 配置序列化方式
創建 RedisConfiguration 配置類,代碼如下:
@Configuration public class RedisConfiguration {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {// 創建 RedisTemplate 對象RedisTemplate<String, Object> template = new RedisTemplate<>();// 設置 RedisConnection 工廠。? 它就是實現多種 Java Redis 客戶端接入的秘密工廠。感興趣的胖友,可以自己去擼下。template.setConnectionFactory(factory);// 使用 String 序列化方式,序列化 KEY 。template.setKeySerializer(RedisSerializer.string());// 使用 JSON 序列化方式(庫是 Jackson ),序列化 VALUE 。template.setValueSerializer(RedisSerializer.json());return template;}}-
RedisSerializer#string()?靜態方法,返回的就是使用 UTF-8 編碼的 StringRedisSerializer 對象。代碼如下:
// RedisSerializer.java static RedisSerializer<String> string() {return StringRedisSerializer.UTF_8; }// StringRedisSerializer.java public static final StringRedisSerializer ISO_8859_1 = new StringRedisSerializer(StandardCharsets.ISO_8859_1); -
RedisSerializer#json()?靜態方法,返回 GenericJackson2JsonRedisSerializer 對象。代碼如下:
// RedisSerializer.javastatic RedisSerializer<Object> json() {return new GenericJackson2JsonRedisSerializer(); }
3.3 自定義 RedisSerializer 實現類
我們直接以 GenericFastJsonRedisSerializer 舉例子,直接莽源碼。代碼如下:
// GenericFastJsonRedisSerializer.javapublic class GenericFastJsonRedisSerializer implements RedisSerializer<Object> {private final static ParserConfig defaultRedisConfig = new ParserConfig();static { defaultRedisConfig.setAutoTypeSupport(true);}public byte[] serialize(Object object) throws SerializationException {// 空,直接返回空數組if (object == null) {return new byte[0];}try {// 使用 JSON 進行序列化成二進制數組,同時通過 SerializerFeature.WriteClassName 參數,聲明寫入類全名。return JSON.toJSONBytes(object, SerializerFeature.WriteClassName);} catch (Exception ex) {throw new SerializationException("Could not serialize: " + ex.getMessage(), ex);}}public Object deserialize(byte[] bytes) throws SerializationException {// 如果為空,則返回空對象if (bytes == null || bytes.length == 0) {return null;}try {// 使用 JSON 解析成對象。return JSON.parseObject(new String(bytes, IOUtils.UTF8), Object.class, defaultRedisConfig);} catch (Exception ex) {throw new SerializationException("Could not deserialize: " + ex.getMessage(), ex);}} }完成自定義 RedisSerializer 配置類后,我們就可以參照?「3.2 配置序列化方式」?小節,將 VALUE 序列化的修改成我們的,哈哈哈。
4. 項目實踐
本小節,我們來分享我們在生產中的一些實踐。關于這塊,希望大家可以一起討論,能夠讓我們的代碼更加優雅干凈。
4.1 Cache Object
在我們使用數據庫時,我們會創建?dataobject?包,存放 DO(Data Object)數據庫實體對象。
那么同理,我們緩存對象,怎么進行對應呢?對于復雜的緩存對象,我們創建了?cacheobject?包,和?dataobject?包同層。如:
service # 業務邏輯層 dao # 數據庫訪問層 dataobject # DO cacheobject # 緩存對象并且所有的 Cache Object 對象使用 CacheObject 結尾,例如說 UserCacheObject、ProductCacheObject 。
4.2 數據訪問層
在我們訪問數據庫時,我們會創建?dao?包,存放每個 DO 對應的 Dao 類。那么對于每一個 CacheObject 類,我們也會創建一個其對應的 Dao 類。例如說,UserCacheObject 對應 UserCacheObjectDao 類。示例代碼如下:
@Repository public class UserCacheDao {private static final String KEY_PATTERN = "user:%d"; // user:用戶編號 <1>@Resource(name = "redisTemplate")@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")private ValueOperations<String, String> operations; // <2>private static String buildKey(Integer id) { // <3>return String.format(KEY_PATTERN, id);}public UserCacheObject get(Integer id) {String key = buildKey(id);String value = operations.get(key);return JSONUtil.parseObject(value, UserCacheObject.class);}public void set(Integer id, UserCacheObject object) {String key = buildKey(id);String value = JSONUtil.toJSONString(object);operations.set(key, value);}}- <1>?處,通過靜態變量,聲明 KEY 的前綴,并且使用冒號作為間隔
- <3>?處,聲明?KEY_PATTERN?對應的 KEY 拼接方法,避免散落在每個方法中。
- <2>?處,通過?@Resource?注入指定名字的 RedisTemplate 對應的 Operations 對象,這樣明確每個 KEY 的類型。
- 剩余的,就是每個方法封裝對應的操作。
可能會有胖友問,為什么不支持將 RedisTemplate 直接在 Service 業務層調用呢?如果這樣,我們業務代碼里,就容易混雜著很多 Redis 訪問代碼的細節,導致很臟亂。我們試著把 RedisTemplate 想象成 Spring JDBCTemplate ,我們一定會聲明對應的 Dao 類,訪問數據庫。所以,同理咯。
那么還有一個問題,UserCacheDao 放在哪個包下?目前的想法是,將?dao?包下拆成?mysql、redis?包。這樣,MySQL 相關的 Dao 放在?mysql?包下,Redis 相關的 Dao 放在?redis?。
4.3 序列化
在?「3. 序列化」?小節中,我們仔細翻看了每個序列化方式,暫時沒有一個能夠完美的契合我們的需求,所以我們直接使用最簡單的?StringRedisSerializer?作為序列化實現類。而真正的序列化,我們在各個 Dao 類里,自己手動來調用。
例如說,在 UserCacheDao 示例中,已經看到了這么做了。這里還有一個細化點,雖然我們是自己手動序列化,可以自己簡單封裝一個?JSONUtil?類,未來如果我們想換 JSON 庫,就比較方便了。其實,這個和 Spring Data Redis 所做的封裝是一個思路。
5. 示例補充
像 String、List、Set、ZSet、Geo、HyperLogLog 等等數據結構的操作,胖友自己去用用對應的 Operations 操作類的 API 方法,就非常容易懂了,我們更多的,補充 Pipeline、Transaction、Pub/Sub、Script 等等功能的示例。
5.1 Pipeline
如果胖友沒有了解過 Redis 的 Pipeline 機制,可以看看?《Redis 文檔 —— Pipeline》?文章,批量操作,提升性能必備神器。
在 RedisTemplate 類中,提供了 2 組四個方法,用于執行 Redis Pipeline 操作。代碼如下:
// <1> 基于 Session 執行 Pipeline @Override public List<Object> executePipelined(SessionCallback<?> session) {return executePipelined(session, valueSerializer); } @Override public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {// ... 省略代碼 }// <2> 直接執行 Pipeline @Override public List<Object> executePipelined(RedisCallback<?> action) {return executePipelined(action, valueSerializer); } @Override public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer) {// ... 省略代碼 }- 兩組方法的差異,在于是否是 Session 中執行。那么 Session 是什么呢?賣個關子,在?「5.3 Session」?中來詳細解析。本小節,我們只講 Pipeline + RedisCallback 的組合的方法。
- 每組方法里,差別在于是否傳入 RedisSerializer 參數。如果不傳,則使用 RedisTemplate 自己的序列化相關的屬性。
5.1.1 源碼解讀
在看具體的?#executePipelined(RedisCallback<?> action, ...)?方法的示例之前,我們先來看一波源碼,這樣我們才能更好的理解具體的使用方法。代碼如下:
// RedisTemplate.java @Override public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer) {// <1> 執行 Redis 方法return execute((RedisCallback<List<Object>>) connection -> {// <2> 打開 pipeline connection.openPipeline();boolean pipelinedClosed = false; // 標記 pipeline 是否關閉try {// <3> 執行Object result = action.doInRedis(connection);// <4> 不要返回結果if (result != null) {throw new InvalidDataAccessApiUsageException("Callback cannot return a non-null value as it gets overwritten by the pipeline");}// <5> 提交 pipeline 執行List<Object> closePipeline = connection.closePipeline();pipelinedClosed = true;// <6> 反序列化結果,并返回return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);} finally {if (!pipelinedClosed) {connection.closePipeline();}}}); }- <1>?處,調用?#execute(RedisCallback<T> action)?方法,執行 Redis 方法。注意,此處傳入的?action?參數,不是我們傳入的 RedisCallback 參數。我們的會在該?action?中被執行。
- <2>?處,調用?RedisConnection#openPipeline()?方法,自動打開 Pipeline 模式。這樣,我們就不需要手動去打開了。
- <3>?處,調用我們傳入的實現的?RedisCallback#doInRedis(RedisConnection connection)?方法,執行在 Pipeline 中,想要執行的 Redis 操作。
- <4>?處,不要返回結果。因為 RedisCallback 是統一定義的接口,所以可以返回一個結果。但是在 Pipeline 中,未提交執行時,顯然是沒有結果,返回也沒有意思。簡單來說,就是我們在實現?RedisCallback#doInRedis(RedisConnection connection)?方法時,返回?null?即可。
- <5>?處,調用?RedisConnection#closePipeline()?方法,自動提交 Pipeline 執行,并返回執行結果。
- <6>?處,反序列化結果,并返回 Pipeline 結果。
至此,Spring Data Redis 對 Pipeline 的封裝,我們已經做了一個簡單的了解,實際就是經典的“模板方法”設計模式化的應用。下面,在讓我們來看看?org.springframework.data.redis.core.RedisCallback<T>?接口,Redis 回調接口。代碼如下:
// RedisCallback.java public interface RedisCallback<T> {/*** Gets called by {@link RedisTemplate} with an active Redis connection. Does not need to care about activating or* closing the connection or handling exceptions.** @param connection active Redis connection* @return a result object or {@code null} if none* @throws DataAccessException*/@NullableT doInRedis(RedisConnection connection) throws DataAccessException; }-
雖然接口名是以 Callback 結尾,但是通過?#doInRedis(RedisConnection connection)?方法可以很容易知道,實際可以理解是 Redis Action ,想要執行的 Redis 操作。
-
有一點要注意,傳入的?connection?參數是 RedisConnection 對象,它提供的?'low level'?更底層的 Redis API 操作。例如說:
// RedisStringCommands.java // RedisConnection 實現 RedisStringCommands 接口byte[] get(byte[] key);Boolean set(byte[] key, byte[] value);- 傳入和返回的是二進制數組,實際就是 RedisTemplate 已經序列化的入參和會被反序列化的出參。
5.1.2 具體示例
示例代碼對應測試類:PipelineTest?。
創建?PipelineTest?單元測試類,編寫代碼如下:
// PipelineTest.java@RunWith(SpringRunner.class) @SpringBootTest public class PipelineTest {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Testpublic void test01() {List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {// set 寫入for (int i = 0; i < 3; i++) {connection.set(String.format("yunai:%d", i).getBytes(), "shuai".getBytes());}// getfor (int i = 0; i < 3; i++) {connection.get(String.format("yunai:%d", i).getBytes());}// 返回 null 即可return null;}});// 打印結果System.out.println(results);} }執行?#test01()?方法,結果如下:
[true, true, true, shuai, shuai, shuai]- 因為我們使用 StringRedisTemplate 自己的序列化相關屬性,所以 Redis GET 命令返回的二進制,被反序列化成了字符串。
5.2 Transaction
基情提示:實際項目實戰中,Redis Transaction 事務基本不用,至少問了一些胖友,包括自己,都沒有再用。所以呢,本小節可以選擇性看看。或者,就不看,哈哈哈哈。
在看 Redis Transaction 事務之前,我們先回想下 Spring 是如何管理數據庫 Transaction?的。在應用程序中處理一個請求時,如果我們的方法開啟Trasaction 功能,Spring 會把數據庫的 Connection 連接和當前線程進行綁定,從而實現 Connection 打開一個 Transaction 后,所有當前線程的數據庫操作都在該 Connection 上執行,達到所有操作在這個 Transaction 中,最終提交或回滾。
在 Spring Data Redis 中,實現 Redis Transaction 也是這個思路。通過 SessionCallback 操作 Redis 時,會從當前線程獲得 Redis Connection ,如果獲取不到,則會去“創建”一個 Redis Connection 并綁定到當前線程中。這樣,我們在該 Redis Connection 開啟 Redis Transaction 后,在該線程的所有操作,都可以在這個 Transaction 中,最后交由 Spring 事務管理器統一提供或回滾 Transaction 。
如果想要使用 Redis Transaction 功能,需要創建 RedisTemplate Bean 時,設置其?enableTransactionSupport?屬性為?true?,默認為?false?不開啟。示例如下:
@Configuration public class RedisConfiguration {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {// 創建 RedisTemplate 對象RedisTemplate<String, Object> template = new RedisTemplate<>();// 【重要】設置開啟事務支持template.setEnableTransactionSupport(true);// 設置 RedisConnection 工廠。? 它就是實現多種 Java Redis 客戶端接入的秘密工廠。感興趣的胖友,可以自己去擼下。template.setConnectionFactory(factory);// 使用 String 序列化方式,序列化 KEY 。template.setKeySerializer(RedisSerializer.string());// 使用 JSON 序列化方式(庫是 Jackson ),序列化 VALUE 。template.setValueSerializer(RedisSerializer.json());return template;}}5.2.1 源碼解析
概念和原理層面的東西,一旦復雜,就會特別抽象,那么還是老規矩,讓我們一起擼下源碼,讓原理具象化。很多時候,這就是為什么我們要去擼源碼的意義。
我們先來看看,配置下?enableTransactionSupport?屬性,Redis 在執行命令,是如何獲得 Connection 連接的。代碼如下:
// RedisTemplate.javapublic <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");Assert.notNull(action, "Callback object must not be null");RedisConnectionFactory factory = getRequiredConnectionFactory();RedisConnection conn = null;try {// <1.1>if (enableTransactionSupport) {// only bind resources in case of potential transaction synchronizationconn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);} else {// <1.2>conn = RedisConnectionUtils.getConnection(factory);}// ... 省略中間,執行 Redis 命令的代碼。} finally {// <2>RedisConnectionUtils.releaseConnection(conn, factory);} }- 考慮到盡量讓內容簡單一些,我們不會對每一行代碼做特別的深究,主要是保證胖友對 Spring Data Redis 對 Transaction 的封裝,有個總體了解。
- <1.2>?處,當我們未開啟?enableTransactionSupport?事務時,調用?RedisConnectionUtils#getConnection(factory)?方法,獲得 Redis Connection 。如果獲取不到,則進行創建。
- <1.1>?處,當我們有開啟?enableTransactionSupport?事務時,調用?RedisConnectionUtils#bindConnection(RedisConnectionFactory factory, boolean enableTranactionSupport)?方法,在?RedisConnectionUtils#getConnection(factory)?的基礎上,如果是創建的 Redis Connection ,會綁定到當前線程中。因為 Transaction 是需要在 Connection 打開,然后后續的 Redis 的操作,都需要在其上。并且,還有一個非常重要的操作,打開 Redis Transaction ,會在該方法中,通過調用?RedisConnectionUtils#potentiallyRegisterTransactionSynchronisation(RedisConnectionHolder connHolder, final RedisConnectionFactory factory)?。
- <2>?處,調用?RedisConnectionUtils#releaseConnection(RedisConnection conn, RedisConnectionFactory factory)?方法,釋放 Redis Connection 。當然,這是有一個前提,整個 Transaction 已經完成。如果未完成,實際 Redis Connection 不會釋放。
那么,此時會有胖友有疑問,Redis Transaction 的提交和回滾在哪呢?答案在 RedisConnectionUtils 的內部類 RedisTransactionSynchronizer 中。代碼如下:
// RedisConnectionUtils.javaprivate static class RedisTransactionSynchronizer extends TransactionSynchronizationAdapter {private final RedisConnectionHolder connHolder;private final RedisConnection connection;private final RedisConnectionFactory factory;@Overridepublic void afterCompletion(int status) {try {switch (status) {// 提交 case TransactionSynchronization.STATUS_COMMITTED:connection.exec();break;// 回滾case TransactionSynchronization.STATUS_ROLLED_BACK:case TransactionSynchronization.STATUS_UNKNOWN:default:connection.discard();}} finally {connHolder.setTransactionSyncronisationActive(false);connection.close();TransactionSynchronizationManager.unbindResource(factory);}} }- 根據事務結果的狀態,進行 Redis Transaction 提交或回滾。? 如果想進一步的深入,胖友就需要去了解 Spring Transaction 的源碼。
5.2.2 具體示例
示例代碼對應測試類:TransactionTest?。
創建 TransactionTest 單元測試類,編寫代碼如下:
// TransactionTest.java@RunWith(SpringRunner.class) @SpringBootTest public class TransactionTest {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Test // @Transactionalpublic void test01() {// 這里是偷懶,沒在 RedisConfiguration 配置類中,設置 stringRedisTemplate 開啟事務。stringRedisTemplate.setEnableTransactionSupport(true);// 執行想要的操作stringRedisTemplate.opsForValue().set("yunai:1", "shuai");stringRedisTemplate.opsForValue().set("yudaoyuanma:1", "dai");} }目前這僅僅是一個示例。因為 Redis Transaction?實際創建事務的前提,是當前已經存在 Spring Transaction 。具體可以看看傳送門處的判斷的代碼。? 略感神奇,不曉得為什么是這樣的設定。
5.2.3 補充資料
如果覺得還是無法理解的胖友,可以在看看如下幾篇文章:
- 《Spring Data Redis(Redis Transactions)》
- 《Redis 之坑:spring-data-redis 中的 Redis 事務》
- 《Spring Data Redis 事務專題》
5.2.4 閑話兩句
實際場景下,如果胖友有 Redis 事務的訴求,建議把事務的、和非事務的 RedisTemplate 拆成兩個連接池,相互獨立。主要原因有兩個:
- 1)Spring Data Redis 的事務設計,是將其融入到 Spring 整個 Transaction 當中。一般來說,Spring Transaction 中,肯定會存在數據庫的 Transaction 。考慮到數據庫操作相比 Redis 來說,肯定是慢得多,那么就會導致 Redis 的 Connection 一直被當前 Transaction 占用著。
- 2)How can i eliminate getting junk value through redis get command?
5.3 Session
首先,我們需要澄清下,Session 不是 Redis 的功能,而是 Spring Data Redis 封裝的一個功能。一次 Session ,代表通過同一個 Redis Connection 執行一系列的 Redis 操作。
在?「5.2.1 源碼解析」?中,我們可以發現,如果我們在一個 Redis Transaction 中的時候,所有 Redis 操作都使用同一個 Redis Connection ,因為我們會將獲得到的 Connection 綁定到當前線程中。
但是,如果我們不在一個 Redis Transaction 中的時候,我們每一次使用 Redis Operations 執行 Redis 操作的時候,每一次都會獲取一次 Redis Connection 的獲取。實際項目中,我們必然會使用 Redis Connection 連接池,那么在獲取的時候,會存在一定的競爭,會有資源上的消耗。那么,如果我們希望已知我們要執行一個系列的 Redis 操作,能不能使用同一個 Redis Connection ,避免重復獲取它呢?答案是有,那就是 Session 。
當我們要執行在同一個 Session 里的操作時,我們通過實現?org.springframework.data.redis.core.SessionCallback<T>?接口,其代碼如下:
// SessionCallback.javapublic interface SessionCallback<T> {@Nullable<K, V> T execute(RedisOperations<K, V> operations) throws DataAccessException; }- 相比 RedisCallback 來說,總體是比較相似的。但是比較友好的是,它的入參?operations?是?org.springframework.data.redis.core.RedisOperations?接口類型,而 RedisTemplate 的各種操作,實際就是在 RedisOperations 接口中定義,由 RedisTemplate 來實現。所以使用上也會更加便利。
- 實際上,我們在實現 RedisCallback 接口,也能實現在同一個 Connection 執行一系列的 Redis 操作,因為 RedisCallback 的入參本身就是一個 Redis Connection 。
5.3.1 源碼解析
在生產中,Transaction 和 Pipeline 會經常一起使用,從而提升性能。所以在?RedisTemplate#executePipelined(SessionCallback<?> session, ...)?方法中,提供了這種的功能。而在這個方法的實現上,本質和?RedisTemplate#executePipelined(RedisCallback<?> action, ...)?方法是基本一致的,差別在于這一行?,替換成了調用?#executeSession(SessionCallback<?> session)?方法。所以,我們來直接來看被調用的這個方法的實現。代碼如下:
// RedisTemplate.java@Override public <T> T execute(SessionCallback<T> session) {Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");Assert.notNull(session, "Callback object must not be null");RedisConnectionFactory factory = getRequiredConnectionFactory();// bind connection// <1> 獲得并綁定 Connection 。RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);try {// <2> 執行定義的一系列 Redis 操作return session.execute(this);} finally {// <3> 釋放并解綁 Connection 。RedisConnectionUtils.unbindConnection(factory);} }- <1>?處,調用?RedisConnectionUtils#bindConnection(RedisConnectionFactory factory, boolean enableTranactionSupport)?方法,實際和我們開啟?enableTranactionSupport?事務時候,獲取 Connection 和處理的方式,是一模一樣的。也就是說:
- 如果當前線程已經有一個綁定的 Connection 則直接使用(例如說,當前正在 Redis Transaction 事務中);
- 如果當前線程未綁定一個 Connection ,則進行創建并綁定到當前線程。甚至,如果此時是配置開啟?enableTranactionSupport?事務的,那么此處就會觸發 Redis Transaction 的開啟。
- <2>?處,調用?SessionCallback#execute(RedisOperations<K, V> operations)?方法,執行我們定義的一系列的 Redis 操作。看看此處傳入的參數是?this?,是不是仿佛更加明白點什么了?
- <3>?處,調用?RedisConnectionUtils#unbindConnection(RedisConnectionFactory factory)?方法,釋放并解綁 Connection 。當前,前提是當前不存在激活的 Redis Transaction ,不然不就提早釋放了嘛。
恩,現在胖友在回過頭,好好在想一想 Pipeline、Transaction、Session 之間的關系,以及組合排列。之后,我們在使用上,會更加得心應手。
5.3.2 具體示例
示例代碼對應測試類:SessionTest?。
創建?SessionTest?單元測試類,編寫代碼如下:
// SessionTest.java@RunWith(SpringRunner.class) @SpringBootTest public class SessionTest {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Testpublic void test01() {String result = stringRedisTemplate.execute(new SessionCallback<String>() {@Overridepublic String execute(RedisOperations operations) throws DataAccessException {for (int i = 0; i < 100; i++) {operations.opsForValue().set(String.format("yunai:%d", i), "shuai02");}return (String) operations.opsForValue().get(String.format("yunai:%d", 0));}});System.out.println("result:" + result);}}執行?#test01()?方法,結果如下:
result:shuai02- 臥槽,一直被 Redis 夸獎,已經超級不好意思了。
5.4 Pub/Sub
Redis 提供了 Pub/Sub 功能,實現簡單的訂閱功能,不了解的胖友,可以看看?「Redis 文檔 —— Pub/Sub」?。
5.4.1 源碼解析
暫時不提供,感興趣的胖友,可以自己看看最核心的?org.springframework.data.redis.listener.RedisMessageListenerContainer?類,Redis 消息監聽器容器,基于 Pub/Sub 的?SUBSCRIBE、PSUBSCRIBE?命令實現,我們只需要添加相應的?org.springframework.data.redis.connection.MessageListener?即可。不算復雜,1000 多行,只要調試下核心的功能即可。
5.4.2 具體示例
示例代碼對應測試類:PubSubTest?。
Spring Data Redis 實現 Pub/Sub 的示例,主要分成兩部分:
- 配置 RedisMessageListenerContainer Bean 對象,并添加我們自己實現的 MessageListener 對象,用于監聽處理相應的消息。
- 使用 RedisTemplate 發布消息。
下面,我們通過四個小步驟,來實現一個簡單的示例。
第一步,了解 Topic
org.springframework.data.redis.listener.Topic?接口,表示 Redis 消息的 Topic 。它有兩個子類實現:
- ChannelTopic :對應?SUBSCRIBE?訂閱命令。
- PatternTopic :對應?PSUBSCRIBE?訂閱命令。
第二步,實現 MessageListener 類
創建?TestChannelTopicMessageListener?類,編寫代碼如下:
public class TestPatternTopicMessageListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {System.out.println("收到 PatternTopic 消息:");System.out.println("線程編號:" + Thread.currentThread().getName());System.out.println("message:" + message);System.out.println("pattern:" + new String(pattern));}}- message?參數,可獲得到具體的消息內容,不過是二進制數組,需要我們自己序列化。具體可以看下?org.springframework.data.redis.connection.DefaultMessage?類。
- pattern?參數,發布的 Topic 的內容。
有一點要注意,默認的 RedisMessageListenerContainer 情況下,MessageListener 是并發消費,在線程池中執行(具體見傳送門代碼)。所以如果想相同 MessageListener?串行消費,可以在方法上加?synchronized?修飾,來實現同步。
第三步,創建 RedisMessageListenerContainer Bean
在 RedisConfiguration 中,配置 RedisMessageListenerContainer Bean 。代碼如下:
// RedisConfiguration.java@Bean public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory factory) {// 創建 RedisMessageListenerContainer 對象RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 設置 RedisConnection 工廠。? 它就是實現多種 Java Redis 客戶端接入的秘密工廠。感興趣的胖友,可以自己去擼下。container.setConnectionFactory(factory);// 添加監聽器container.addMessageListener(new TestChannelTopicMessageListener(), new ChannelTopic("TEST")); // container.addMessageListener(new TestChannelTopicMessageListener(), new ChannelTopic("AOTEMAN")); // container.addMessageListener(new TestPatternTopicMessageListener(), new PatternTopic("TEST"));return container; }要注意,雖然 RedisConnectionFactory 可以多次調用?#addMessageListener(MessageListener listener, Topic topic)?方法,但是一定要都是相同的 Topic 類型。例如說,添加了 ChannelTopic 類型,就不能添加 PatternTopic 類型。為什么呢?因為 RedisMessageListenerContainer 是基于一次?SUBSCRIBE?或?PSUBSCRIBE?命令,所以不支持不同類型的 Topic 。當然,如果是相同類型的 Topic ,多個 MessageListener 是支持的。
那么,可能會有胖友會問,如果我添加了?"Test"?給 MessageListenerA?,"AOTEMAN"?給 MessageListenerB?,兩個 Topic 是怎么分發(Dispatch)的呢?在 RedisMessageListenerContainer 中,有個?DispatchMessageListener?分發器,負責將不同的 Topic 分發到配置的 MessageListener 中。看到此處,有木有想到 Spring MVC 的 DispatcherServlet 分發不同的請求到對應的?@RequestMapping?方法。
第四步,使用 RedisTemplate 發布消息
創建?PubSubTest?測試類,編寫代碼如下:
@RunWith(SpringRunner.class) @SpringBootTest public class PubSubTest {public static final String TOPIC = "TEST";@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Testpublic void test01() throws InterruptedException {for (int i = 0; i < 10; i++) {stringRedisTemplate.convertAndSend(TOPIC, "yunai:" + i);Thread.sleep(1000L);}}}- 通過?RedisTemplate#convertAndSend(String channel, Object message)?方法,PUBLISH 消息。
執行?#test01()?方法,運行結果如下:
收到 ChannelTopic 消息: 線程編號:listenerContainer-2 message:yunai:0 pattern:TEST 收到 ChannelTopic 消息: 線程編號:listenerContainer-3 message:yunai:1 pattern:TEST 收到 ChannelTopic 消息: 線程編號:listenerContainer-4 message:yunai:2 pattern:TEST- 整整齊齊,發送和訂閱都成功了。注意,線程編號。
5.4.3 閑話兩句
Redis 5.0 版本后,正式發布 Stream 功能,相信是有可能可以替代掉 Redis Pub/Sub 功能,提供可靠的消息訂閱功能。
上述的場景,艿艿自己在使用 PUB/SUB 功能的時候,確實被這么坑過。當時我們的管理后臺的權限,是緩存在 Java 進程當中,通過 Redis Pub/Sub 實現緩存的刷新。結果,當時某個 Java 節點網絡出問題,恰好那個時候,有一條刷新權限緩存的消息 PUBLISH 出來,結果沒刷新到。結果呢,運營在訪問某個功能的時候,一會有權限(因為其他 Java 節點緩存刷新了),一會沒有權限。
最近,艿艿又去找了幾個朋友請教了下,問問他們在生產環境下,是否使用 Redis Pub/Sub 功能,他們說使用 Kafka、或者 RocketMQ 的廣播消費功能,更加可靠有保障。
對了,我們有個管理系統里面有 Websocket 需要實時推送管理員消息,因為不知道管理員當前連接的是哪個 Websocket 服務節點,所以我們是通過 Redis Pub/Sub 功能,廣播給所有 Websocket 節點,然后每個 Websocket 節點判斷當前管理員是否連接的是它,如果是,則進行 Websocket 推送。因為之前網絡偶爾出故障,會存在消息丟失,所以近期我們替換成了 RocketMQ 的廣播消費,替代 Redis Pub/Sub 功能。
當然,不能說 Redis Pub/Sub 毫無使用的場景,以下艿艿來列舉幾個:
- 1、在使用 Redis Sentinel 做高可用時,Jedis 通過 Redis Pub/Sub 功能,實現對 Redis 主節點的故障切換,刷新 Jedis 客戶端的主節點的緩存。如果出現 Redis Connection 訂閱的異常斷開,會重新主動去 Redis Sentinel 的最新主節點信息,從而解決 Redis Pub/Sub 可能因為網絡問題,丟失消息。
- 2、Redis Sentinel 節點之間的部分信息同步,通過 Redis Pub/Sub 訂閱發布。
- 3、在我們實現 Redis 分布式鎖時,如果獲取不到鎖,可以通過 Redis 的 Pub/Sub 訂閱鎖釋放消息,從而實現其它獲得不到鎖的線程,快速搶占鎖。當然,Redis Client 釋放鎖時,需要 PUBLISH 一條釋放鎖的消息。在 Redisson 實現分布式鎖的源碼中,我們可以看到。
- 4、Dubbo 使用 Redis 作為注冊中心時,使用 Redis Pub/Sub 實現注冊信息的同步。
也就是說,如果想要有保障的使用 Redis Pub/Sub 功能,需要處理下發起訂閱的 Redis Connection 的異常,例如說網絡異常。然后,重新主動去查詢最新的數據的狀態。?
5.5 Script
Redis 提供 Lua 腳本,滿足我們希望組合排列使用 Redis 的命令,保證串行執行的過程中,不存在并發的問題。同時,通過將多個命令組合在同一個 Lua 腳本中,一次請求,直接處理,也是一個提升性能的手段。不了解的胖友,可以看看?「Redis 文檔 —— Lua 腳本」?。
示例代碼對應測試類:ScriptTest?。
第一步,編寫 Lua 腳本
創建?resources/compareAndSet.lua?腳本,實現 CAS 功能。代碼如下:
if redis.call('GET', KEYS[1]) ~= ARGV[1] thenreturn 0 end redis.call('SET', KEYS[1], ARGV[2]) return 1- 第 1 到 3 行:判斷?KEYS[1]?對應的 VALUE 是否為?ARGV[1]?值。如果不是(Lua 中不等于使用?~=),則直接返回 0 表示失敗。
- 第 4 到 5 行:設置?KEYS[1]?對應的 VALUE 為新值?ARGV[2]?,并返回 1 表示成功。
第二步,調用 Lua 腳本
創建?ScriptTest?測試類,編寫代碼如下:
@RunWith(SpringRunner.class) @SpringBootTest public class ScriptTest {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Testpublic void test01() throws IOException {// <1.1> 讀取 /resources/lua/compareAndSet.lua 腳本 。注意,需要引入下 commons-io 依賴。String scriptContents = IOUtils.toString(getClass().getResourceAsStream("/lua/compareAndSet.lua"), "UTF-8");// <1.2> 創建 RedisScript 對象RedisScript<Long> script = new DefaultRedisScript<>(scriptContents, Long.class);// <2> 執行 LUA 腳本Long result = stringRedisTemplate.execute(script, Collections.singletonList("yunai:1"), "shuai02", "shuai");System.out.println(result);} }- <1.1>?行,讀取?/resources/lua/compareAndSet.lua?腳本。注意,需要引入下?commons-io?依賴。
- <1.2>?行,創建 DefaultRedisScript 對象。第一個參數是腳本內容(?scriptSource?),第二個是腳本執行返回值(?resultType?)。
- <2>?處,調用?RedisTemplate#execute(RedisScript<T> script, List<K> keys, Object... args)?方法,發送 Redis 執行 LUA 腳本。
最后,我們打印下執行結果。胖友可以自己執行下試試。?
6. 嘗試 Redisson
在?redisson-examples?中,Redisson 官方提供了大量的示例。
6.1 快速入門
示例代碼對應倉庫:spring-data-redis-with-redisson?。
6.1.1 引入依賴
在?pom.xml?中,引入相關依賴。
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.3.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent><dependencies><!-- 實現對 Redisson 的自動化配置 --> <!-- X --><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.11.3</version></dependency><!-- 方便等會寫單元測試 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- 等會示例會使用 fastjson 作為 JSON 序列化的工具 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.61</version></dependency><!-- Spring Data Redis 默認使用 Jackson 作為 JSON 序列化的工具 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies>和?「2.1 引入依賴」?的差異點是,我們需要引入?redisson-spring-boot-starter?依賴,實現 Redisson 的自動化配置。
6.1.2 配置文件
在?application.yml?中,添加 Redis 配置,如下:
spring:# 對應 RedisProperties 類redis:host: 127.0.0.1port: 6379 # password: # Redis 服務器密碼,默認為空。生產中,一定要設置 Redis 密碼!database: 0 # Redis 數據庫號,默認為 0 。timeout: 0 # Redis 連接超時時間,單位:毫秒。# 對應 RedissonProperties 類redisson:config: classpath:redisson.yml # 具體的每個配置項,見 org.redisson.config.Config 類。和?「2.2 配置文件」?的差異點是:
1)去掉 Jedis 相關的配置項
2)增加?redisson.config?配置
在我們使用 Spring Boot 整合 Redisson 時候,通過該配置項,引入一個外部的 Redisson 相關的配置文件。例如說,示例中,我們引入了?classpath:redisson.yaml?配置文件。它可以使用 JSON 或 YAML 格式,進行配置。
FROM?《Spring Boot 2.x 整合 lettuce redis 和 redisson》?文章。
clusterServersConfig:# 連接空閑超時 如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,并從連接池里去掉。時間單位是毫秒。idleConnectionTimeout: 10000pingTimeout: 1000# 連接超時connectTimeout: 10000# 命令等待超時timeout: 3000# 命令失敗重試次數retryAttempts: 3# 命令重試發送時間間隔retryInterval: 1500# 重新連接時間間隔reconnectionTimeout: 3000# failedAttemptsfailedAttempts: 3# 密碼password: null# 單個連接最大訂閱數量subscriptionsPerConnection: 5# 客戶端名稱clientName: null#負載均衡算法類的選擇 默認輪詢調度算法RoundRobinLoadBalancerloadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}slaveSubscriptionConnectionMinimumIdleSize: 1slaveSubscriptionConnectionPoolSize: 50# 從節點最小空閑連接數slaveConnectionMinimumIdleSize: 32# 從節點連接池大小slaveConnectionPoolSize: 64# 主節點最小空閑連接數masterConnectionMinimumIdleSize: 32# 主節點連接池大小masterConnectionPoolSize: 64# 只在從服務節點里讀取readMode: "SLAVE"# 主節點信息nodeAddresses:- "redis://192.168.56.128:7000"- "redis://192.168.56.128:7001"- "redis://192.168.56.128:7002"#集群掃描間隔時間 單位毫秒scanInterval: 1000 threads: 0 nettyThreads: 0 codec: !<org.redisson.codec.JsonJacksonCodec> {}注意
注意
注意
如果?redisson.config?對應的配置文件,沒有配置任何內容,需要在?application.yml?里注釋掉?redisson.config?。像這樣:
spring:# 對應 RedisProperties 類redis:host: 127.0.0.1port: 6379 # password: # Redis 服務器密碼,默認為空。生產中,一定要設置 Redis 密碼!database: 0 # Redis 數據庫號,默認為 0 。timeout: 0 # Redis 連接超時時間,單位:毫秒。# 對應 RedissonProperties 類 # redisson: # config: classpath:redisson.yml # 具體的每個配置項,見 org.redisson.config.Config 類。6.1.3 簡單測試
創建?Test01?測試類,我們來測試一下簡單的 SET 指令。代碼如下:
@RunWith(SpringRunner.class) @SpringBootTest public class Test01 {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Testpublic void testStringSetKey() {stringRedisTemplate.opsForValue().set("yunai", "shuai");} }我們先來執行下?#testStringSetKey()?方法這個測試方法。執行完成后,我們在控制臺查詢,看看是否真的執行成功了。
$ redis-cli get yunai "shuai"- 請大聲的告訴我,Redis 是怎么夸獎?"yunai"?的,哈哈哈哈。
6.1.4 閑聊兩句
因為有 Spring Data Redis 的存在,我們其實已經能感受到,即使我們將 Jedis 替換成了 Redisson ,依然調用的是相同的 Spring Data Redis 提供的 API ,而無需感知到 Redisson 或是 Jedis 的存在。如果哪一天,Spring Boot 2.X 版本默認推薦的 Lettuce 真的成熟了,那么我們也可以無感知的進行替換。
6.2 Redis 分布式鎖
示例代碼對應測試類:LockTest?。
一說到分布式鎖,大家一般會想到的就是基于 Zookeeper 或是 Redis 實現分布式鎖。相對來說,在考慮性能為優先因素,不需要特別絕對可靠性的場景下,我們會優先考慮使用 Redis 實現的分布式鎖。
在 Redisson 中,提供了 8 種分布式鎖的實現,具體胖友可以看看?《Redisson 文檔 —— 分布式鎖和同步器》?。真特碼的強大!大多數開發者可能連 Redis 怎么實現分布式鎖都沒完全搞清楚,Redisson 直接給了 8 種鎖,氣人,簡直了。
本小節,我們來編寫一個簡單使用 Redisson 提供的可重入鎖 RLock 的示例。
創建?LockTest?測試類,編寫代碼如下:
@RunWith(SpringRunner.class) @SpringBootTest public class LockTest {private static final String LOCK_KEY = "anylock";@Autowired // <1>private RedissonClient redissonClient;@Testpublic void test() throws InterruptedException {// <2.1> 啟動一個線程 A ,去占有鎖new Thread(new Runnable() {@Overridepublic void run() {// 加鎖以后 10 秒鐘自動解鎖// 無需調用 unlock 方法手動解鎖final RLock lock = redissonClient.getLock(LOCK_KEY);lock.lock(10, TimeUnit.SECONDS);}}).start();// <2.2> 簡單 sleep 1 秒,保證線程 A 成功持有鎖Thread.sleep(1000L);// <3> 嘗試加鎖,最多等待 100 秒,上鎖以后 10 秒自動解鎖System.out.println(String.format("準備開始獲得鎖時間:%s", new SimpleDateFormat("yyyy-MM-DD HH:mm:ss").format(new Date())));final RLock lock = redissonClient.getLock(LOCK_KEY);boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);if (res) {System.out.println(String.format("實際獲得鎖時間:%s", new SimpleDateFormat("yyyy-MM-DD HH:mm:ss").format(new Date())));} else {System.out.println("加鎖失敗");}}}- 整個測試用例,意圖是:1)啟動一個線程 A ,先去持有鎖 10 秒然后釋放;2)主線程,也去嘗試去持有鎖,因為線程 A 目前正在占用著該鎖,所以需要等待線程 A 釋放到該鎖,才能持有成功。
- <1>?處,注入 RedissonClient 對象。因為我們需要使用 Redisson 獨有的功能,所以需要使用到它。
- <2.1>?處,啟動線程 A ,然后調用?RLock#lock(long leaseTime, TimeUnit unit)?方法,加鎖以后 10 秒鐘自動解鎖,無需調用 unlock 方法手動解鎖。
- <2.2>?處,簡單 sleep 1 秒,保證線程 A 成功持有鎖。
- <3>?處,主線程,調用?RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)?方法,嘗試加鎖,最多等待 100 秒,上鎖以后 10 秒自動解鎖。
執行?#test()?測試用例,結果如下:
準備開始獲得鎖時間:2019-10-274 00:44:08 實際獲得鎖時間:2019-10-274 00:44:17- 9 秒后(因為我們 sleep 了 1 秒),主線程成功獲得到 Redis 分布式鎖,符合預期。
666. 彩蛋
寫了老長一篇,都不曉得有木有會看。斷斷續續寫了小一周,不曉得有木有胖友會看完,甚至看到彩蛋環節,哈哈哈哈。
在高并發場景下,系統會大量依賴緩存和消息隊列,實現所需要的高性能。而緩存,絕大部分的選擇,基本都是 Redis ,這點毋庸置疑。所以,我們是非常有必要深入去學習下 Redis ,友情推薦下付磊大佬的?《Redis 開發與運維》?。
因為寫的還是略有些聰明,所以有錯誤或者表達不清晰的地方,歡迎胖友指出。國慶快樂,繼續學習!
總結
以上是生活随笔為你收集整理的Spring Boot Redis 入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 我的代码和注释都写的像坨屎,那又怎么样?
- 下一篇: Java 中如何模拟真正的同时并发请求?