控制 Redis stream 的消息数量
控制 Redis stream 的消息數(shù)量
Intro
Redis Stream 是 Redis 5.0 引入的一個(gè)新的類(lèi)型,之前我們介紹過(guò)使用 Redis Stream 來(lái)實(shí)現(xiàn)消息隊(duì)列,可以參考之前的文章 使用 Redis Stream 實(shí)現(xiàn)消息隊(duì)列,而 Stream 的消息會(huì)持久化地內(nèi)存中,如果我們不控制消息數(shù)量的話,可能會(huì)出現(xiàn)大量的消息存在內(nèi)存里導(dǎo)致過(guò)大的內(nèi)存占用,Redis Stream 5.0 開(kāi)始支持根據(jù) Max Length 來(lái)控制 Stream 的長(zhǎng)度(消息數(shù)量),從 6.2 開(kāi)始支持根據(jù)消息 Id 來(lái)控制 Stream 的長(zhǎng)度,默認(rèn)地消息 Id 是一個(gè)時(shí)間戳,所以使用默認(rèn)地 Id 也可以理解為按時(shí)間來(lái)控制 Stream 長(zhǎng)度,下面我們來(lái)看使用示例吧
Redis 語(yǔ)法
控制 Stream 消息長(zhǎng)度有兩個(gè) Redis 命令,一個(gè)是?XTRIM?只做?Trim?操作,把不滿足要求的消息去除,另外一個(gè)是?XADD?在添加 Stream 消息的同時(shí)做?Trim?操作,簡(jiǎn)化還要多一步?Trim?的操作,將添加消息和控制消息長(zhǎng)度可以合并為一個(gè)操作
XTRIM?語(yǔ)法:
XTRIM?key?MAXLEN|MINID?[=|~]?threshold?[LIMIT?count]使用示例:
XTRIM?mystream?MAXLEN?1000 XTRIM?mystream?MINID?649085820XTRIM?mystream?MAXLEN?~?1000(Nearly?trim,不準(zhǔn)確,可能有些消息該刪掉的會(huì)保留下來(lái),但是執(zhí)行效率會(huì)比?`=`(Exactly?Trim)?高一些) XTRIM?mystream?MINID?=?649085820Trimming the stream can be done using one of these strategies:
MAXLEN: Evicts entries as long as the stream's length exceeds the specified?threshold, where?threshold?is a positive integer.
MINID: Evicts entries with IDs lower than?threshold, where?threshold?is a stream ID.
XADD?語(yǔ)法:
XADD?key?[NOMKSTREAM]?[MAXLEN|MINID?[=|~]?threshold?[LIMIT?count]]?*|ID?field?value?[field?value?...]使用示例:
redis>?XADD?mystream?*?name?Sara?surname?OConnor "1631546114612-0" redis>?XADD?mystream?*?field1?value1?field2?value2?field3?value3 "1631546114612-1"redis>?XADD?mystream?MAXLEN?~?1000?field1?value1 redis>?XADD?mystream?MINID?~?1631546460687?field1?value1XADD?允許用戶(hù)在向 Stream 里添加消息的時(shí)候控制消息的長(zhǎng)度返回值是消息ID,默認(rèn)是一個(gè)時(shí)間戳,語(yǔ)法如上,可以 Trim 也可以 不Trim,可以根據(jù)需要選擇
Prepare
先來(lái)準(zhǔn)備一些幫助類(lèi)和公共方法,下面的示例是基于 StackExchange.Redis 來(lái)實(shí)現(xiàn)的
RedisHelper,獲取 Redis 連接
internal?static?class?RedisHelper {private?static?readonly?IConnectionMultiplexer?ConnectionMultiplexer?=?StackExchange.Redis.ConnectionMultiplexer.Connect("127.0.0.1:6379");public?static?IDatabase?GetRedisDb(int?dbIndex?=?0){return?ConnectionMultiplexer.GetDatabase(dbIndex);} }AddStreamMessage,向指定 stream 中添加若干條消息
private?static?async?Task?AddStreamMessage(string?key,?int?msgCount,?Action?action=null) {var?redis?=?RedisHelper.GetRedisDb();for?(var?i?=?0;?i?<?msgCount;?i++){await?redis.StreamAddAsync(key,?"messages",?$"val-{i}");action?.Invoke();} }Max-Length
根據(jù) MaxLength 來(lái)控制 Stream 長(zhǎng)度示例
var?streamKey?=?$"stream-{nameof(MaxLengthTrim)}"; await?AddStreamMessage(streamKey,?10); var?redis?=?RedisHelper.GetRedisDb(); Console.WriteLine(await?redis.StreamLengthAsync(streamKey));//?trim?directly await?redis.StreamTrimAsync(streamKey,?5); Console.WriteLine(await?redis.StreamLengthAsync(streamKey));//?add?with?trim await?redis.StreamAddAsync(streamKey,?StreamMessageField,?"Test",?maxLength:?3); Console.WriteLine(await?redis.StreamLengthAsync(streamKey));await?redis.KeyDeleteAsync(streamKey);輸出結(jié)果如下:
Min-ID
根據(jù) Min-ID 來(lái)控制 Stream 消息長(zhǎng)度,是 Redis 6.2 新引入的功能,目前 StackExchange.Redis 還沒(méi)有專(zhuān)門(mén)的 API 來(lái)支持這個(gè)功能,不過(guò)我們可以通過(guò) Execute 來(lái)執(zhí)行 Redis 命令,通常這些 Redis 客戶(hù)端庫(kù)都會(huì)支持直接調(diào)用 Redis 命令,根據(jù) MinID 控制 Stream 長(zhǎng)度示例如下:
private?const?string?StreamAddCommandName?=?"XADD"; private?const?string?StreamTrimCommandName?=?"XTRIM";private?const?string?StreamAddAutoMsgId?=?"*";private?const?string?StreamTrimByMinIdName?=?"MINID";private?const?string?StreamTrimOperator?=?"=";private?const?string?StreamMessageField?=?"message";private?static?async?Task?MinMsgIdTrim() {var?streamKey?=?$"stream-{nameof(MaxLengthTrim)}";await?AddStreamMessage(streamKey,?10,?()?=>?Thread.Sleep(1000));var?redis?=?RedisHelper.GetRedisDb();var?minId?=?DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(5)).ToUnixTimeMilliseconds();Console.WriteLine(await?redis.StreamLengthAsync(streamKey));//?https://redis.io/commands/xtrim//?trim?directlyawait?redis.ExecuteAsync(StreamTrimCommandName,?streamKey,StreamTrimByMinIdName,StreamTrimOperator,?//?optionalminId);Console.WriteLine(await?redis.StreamLengthAsync(streamKey));minId?=?DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(2)).ToUnixTimeMilliseconds();//?https://redis.io/commands/xadd//?add?with?trimvar?result?=?redis.Execute(StreamAddCommandName,?streamKey,?StreamTrimByMinIdName,?StreamTrimOperator,?//?optionalminId,StreamAddAutoMsgId,?StreamMessageField,?"Test");Console.WriteLine(await?redis.StreamLengthAsync(streamKey));await?redis.KeyDeleteAsync(streamKey); }上述代碼輸出結(jié)果如下:
More
本文主要介紹了控制 Redis Stream 的消息長(zhǎng)度,除了介紹 Redis 本身的命令之外,也是介紹一下如何使用 StackExchange.Redis 實(shí)現(xiàn)調(diào)用沒(méi)有 API 支持的 Redis 命令,Redis 6.2 之后支持了很多新的特性,但是很多庫(kù)都還太支持,了解如何原生調(diào)用 Redis 命令有些時(shí)候會(huì)很有幫助
References
https://redis.io/commands/xadd
https://redis.io/commands/xtrim
https://github.com/WeihanLi/SamplesInPractice/blob/master/RedisSample/StreamTrimSample.cs
總結(jié)
以上是生活随笔為你收集整理的控制 Redis stream 的消息数量的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: docker-compose 一键部署分
- 下一篇: 更了吗?Windows 11 22000