redis stream持久化_Beetlex.Redis之Stream功能详解
原標題:Beetlex.Redis之Stream功能詳解
有一段時間沒有寫文章,techempower的測試規則評分竟然發生了變化,只能忘著補充一下占比權重最多的數據更新示例了和深入設計一下組件模塊化加載的設計。但在不久前有用戶問了一下組件是否支持redis的Stream功能,看了一樣相關資料后把功能實現之;接下來就介紹一下如何用Beetlex.Redis來調用redis的Stream功能。
什么是Stream
是Redis5.0的Stream是一個新的強大的支持多播的可持久化的消息隊列,它提供了消息添加,多組和多消費者一致性讀取和ack確認等功能;更詳細的介紹就不多說了可以通過網絡找到更多詳細描述。
創建Stream
組件通過RedisDB對象的GetStream訪求來創建一個Stream訪問對象,對象創建后就可以進行一系列的 XACK| XADD| XDEL| XGROUP| XLEN| XRANGE| XREAD| XREADGROUP| XREVRANGE|XTRIM等指令操作。創建代碼如下:
RedisStream < Employee>stream = DB.GetStream < Employee>("employees_stream");
XADD
在介紹這個操作前先說一下Stream里存儲的格式,默認Stream消息是K-V的格式,從基礎指令上可以了解到這種結構
XADDmystream* sensor-id1234 temperature19 .8
但這種格式操作起來并不友好,所以組件除了支持這種K-V的方式外,還支持以對象的方式進行Stream消息處理。接下來看一下插入對象的調用
RedisStream stream = DB.GetStream( "employees_stream");
varid = awaitstream.Add(DataHelper.Defalut.Employees[ 0]);
id = awaitstream.Add(DataHelper.Defalut.Employees[ 1]);
id = awaitstream.Add(DataHelper.Defalut.Employees[ 2]);
varlen = awaitstream.Len;
組件支持直接入插對象,其基礎指令就是
XADDemployees_stream* dateemployeejson
組件直接采用一個K-V的方式來存儲對象,對于原則多個K-V的方式組件同樣也支持,只是在構建Stream指定類型用Dictionary即可;接下其他就不多說了直接上指令用例了。
XLEN
RedisStream stream = DB.GetStream( "employees_stream");
varlen = awaitstream.Len;
XDEL
RedisStream stream = DB.GetStream( "employees_stream");
varitems = awaitstream.Read( null, null, "0-0");
awaitstream.Del(( fromitem initems selectitem.ID).ToArray);
XRANGE
RedisStream stream = DB.GetStream( "employees_stream");
varitems = awaitstream.Range;
items = awaitstream.RangeAll;
XREVRANGE
RedisStream stream = DB.GetStream( "employees_stream");
varitems = awaitstream.RevRange;
items = awaitstream.RevRangeAll;
XREAD
RedisStream stream = DB.GetStream( "employees_stream");
varitems = awaitstream.Read( 0, null, "0-0");
items = awaitstream.Read;
Stream的消費組
前面介紹的指令感覺列表結構都能滿足,其實Stream重要的功能是在組消費這一塊,Redis可以針對Stream創建多個消費組和消費者,而消息會做一致性消費處理。
XGROUP
RedisStream stream = DB.GetStream( "employees_stream");
vargroup= awaitstream.GetGroup( "henry");
XREAD
RedisStream stream = DB.GetStream( "employees_stream");
vargroup= awaitstream.GetGroup( "g1");
varitems = awaitgroup.Read( "henry", "0");
實際XRead提供了是否等待和起始讀已取參數
publicasyncValueTask>> ReadWait( stringconsumer, inttimeout= 0
publicValueTask>> Read( stringconsumer, stringstart = null)
publicasyncValueTask>> Read( stringconsumer, int? block, int? count, stringstart = null)
一般情況下可以通過readwait來不停地消息新的消息
while( true)
{
items = awaitgroup.ReadWait( "henry");
//處理消息
foreach( varitem initems)
{
awaititem.Ack;
}
}
XACK
RedisStream stream = DB.GetStream( "employees_stream");
vargroup= awaitstream.GetGroup( "g1");
varitems = awaitgroup.Read( "henry", "0");
foreach( varitem initems)
awaititem.Ack;
以上是BeetleX.Redis組件提供操作Stream的基礎指令,實際上Stream還有一些和運維相關的指令,只是這些在實際業務上用不上所以就沒有去實現了。 返回搜狐,查看更多
責任編輯:
總結
以上是生活随笔為你收集整理的redis stream持久化_Beetlex.Redis之Stream功能详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cdn贝免费套餐_阿里云香港服务器带宽太
- 下一篇: 搜索重复代码_通过MappedByteB