时序数据库技术体系 – InfluxDB TSM存储引擎之数据读取
任何一個數據庫系統內核關注的重點無非:數據在內存中如何存儲、在文件中如何存儲、索引結構如何存儲、數據寫入流程以及數據讀取流程。關于InfluxDB存儲內核,筆者在之前的文章中已經比較全面的介紹了數據的文件存儲格式、倒排索引存儲實現以及數據寫入流程,本篇文章重點介紹InfluxDB中時序數據的讀取流程。
InfluxDB支持類SQL查詢,稱為InfluxQL。InfluxQL支持基本的DDL操作和DML操作語句,詳見InfluxQL_Spec,比如Select語句:
?
select_stmt = "SELECT" fields from_clause [ into_clause ] [ where_clause ] [ group_by_clause ] [ order_by_clause ] [ limit_clause ] [ offset_clause ] [ slimit_clause ] [ soffset_clause ] .?
使用InfluxQL可以非常方便、人性化地對InfluxDB中的時序數據進行多維聚合分析。那InfluxDB內部是如何處理Query請求的呢?接下來筆者結合源碼對InfluxDB的查詢流程做一個剖析。另外,如果看官對源碼這部分感興趣,推薦先閱讀官方文檔對應部分:https://docs.influxdata.com/influxdb/v1.0/query_language/spec/#query-engine-internals
本文篇幅相對較長。為了方便閱讀,本文分為上下兩部分,上半部分會從原理層面介紹InfluxDB的數據讀取流程,下半部分會舉一個例子模擬整個數據讀取的過程。
上半部分:InfluxDB數據讀取流程原理
LSM(TSM)引擎對于讀流程的處理通常來說都比較復雜,建議保持足夠的耐心和專注力。理論部分會分兩個小模塊進行介紹,第一個模塊會從宏觀框架層面簡單梳理整個讀取流程,第二個模塊會從微觀細節層面分析TSM存儲引擎(TSDB)內部詳細的執行邏輯。
InfluxDB讀取流程框架
筆者對照源碼對整個流程做了一個簡單的梳理(下圖讀者可能看不清楚,文末附有該圖的高清版):
整個讀取流程從宏觀上分為四個部分:
1.?Query:InfluxQL允許用戶使用類SQL語句執行查詢分析聚合,InfluxQL語法詳見:https://docs.influxdata.com/influxdb/v1.0/query_language/spec/
?
2.?QueryParser:InfluxQL進入系統之后,系統首先會對InfluxQL執行切詞并解析為抽象語法樹(AST),抽象樹中標示出了數據源、查詢條件、查詢列以及聚合函數等等,分別對應上圖中Source、Condition以及Aggration。InfluxQL沒有使用通用的第三方AST解析庫,自己實現了一套解析庫,對細節感興趣的可以參考:https://github.com/influxdata/influxql。接著InfluxDB會將抽象樹轉化為一個Query實體對象,供后續查詢中使用。
?
3.?BuildIterators:InfluxQL語句轉換為Query實體對象之后,就進入讀取流程中最重要最核心的一個環節?–?構建Iterator體系。構建Iterator體系是一個非常復雜的邏輯過程,其中細節非常繁復,筆者盡可能化繁為簡,將其中的主線抽出來。為了方便理解,筆者將Iterator體系分為三個子體系:頂層Iterator子體系、中間層Iterator子體系以及底層Iterator子體系。
(1)頂層Iterator子體系
InfluxDB會為InfluxQL中所有查詢field構造一個FieldIterator,FieldIterator表示每個查詢列都會創建一個Iterator(稱為ExprIterator),這是因為InfluxDB是列式存儲系統,所有的列都是獨立存儲的,因此基于列分別構建Iterator方便執行查詢聚合操作。比如sum(click),sum(impressions)和sum(revenue)三個查詢列就分別對應一個ExprIterator。
ExprIterator根據查詢列值是否需要聚合可以分為VarRefIterator和CallIterator,前者表示列值可以直接查詢返回,不需要聚合;后者表示查詢列需要執行某些聚合操作。示例中查詢sum(click)就是典型的CallIterator,CallIterator實際實現分為兩步,首先通過VarRefIterator把對應的列值查詢到,再通過對應的Reduce函數執行相應聚合。比如sum(click)這個CallIterator就需要雇傭一個VarRefIterator把滿足條件的click列值拿上來,再執行Reduce函數sum執行聚合操作。
(2)中間層Iterator子體系
InfluxDB中一個查詢列的值可能分布在不同的Shard上,需要根據TimeRange決定給定時間段在哪些shard上,并為每個Shard構建一個Iterator,雇傭這個邏輯Iterator負責查詢這個shard上對應列的列值。目前單機版所有shard都在同一個InfluxDB實例上,如果實現分布式管理,需要在這一層做處理。
(3)底層Iterator子體系
底層Iterator子體系負責單個shard(engine)上滿足條件的某一列值的查找或者單機聚合,是Iterator體系中實際干活的Iterator。比如滿足where?advertiser?=?“baidu.com”?這個條件就需要先在倒排索引中根據advertiser?=?“baidu.com”查到包含該tag的所有series,再為每個series構建一個TagsetIterator去查找對應的列值,TagsetIterator會將查找指針置于最小的列值處。
縱觀整個Iterator體系的構建,整體邏輯還是很清晰的。總結起來就是,查詢按照查詢列構建最頂層FieldIterator,每個FieldIterator會根據TimeRange雇傭多個ShardIterator去處理單個Shard上面對應列值的查找,對查找到的值要么直接返回要么執行Reduce函數進行聚合操作。每個Shard內部首先會根據查詢條件利用倒排索引定位到所有滿足條件的series,再為每個series構建一個TagsetIterator用來查找具體的列值數據。因此,TagsetIterator是整個體系中唯一干活的Iterator,所有其他上層Iterator都是邏輯Iterator。
另一個非常重要的點是,同一個Shard內的所有TagsetIterator在構建完成會合并成一個ShardIterator,這個合并過程是對這些TagsetIterator進行排序的過程,排序規則是按照series由小到大排序或者由大到小排序(由用戶SQL對查詢結果是由小到大排序還是由大到小排序決定)。同理,一個列值對應的多個ShardIterator構建完成之后會合并成一個FieldIterator,合并過程亦是一個排序過程,不過排序是針對所有Shard中的TagsetIterator進行的,排序規則是先比較series,再比較時間。可見,一個FieldIterator最終是由一系列排序過的TagsetIterator構成的。
?
4.?Emitter.Emit:Iterator體系構建完成之后就完成了查詢聚合前的準備工作,接下來就開始干活了。干活邏輯簡單來講是遍歷所有FieldIterator,對每個FieldIterator執行一次Next函數,就會返回每個查詢列的結果值,組裝到一起就是一行數據。FieldIterator執行Next()函數會傳遞到最底層的TagsetIterator,TagsetIterator執行Next函數實際返回真實的時序數據。
TSDB存儲引擎執行邏輯
TSDB存儲引擎(實際上就是一個Shard)根據用戶的查詢請求執行原始數據的查詢就是上文中提到的底層Iterator子體系的構建。查詢過程分為兩個部分:倒排索引查詢過濾以及TSM數據層查詢,前者通過Query中的where條件結合倒排索引過濾掉不滿足條件的SeriesKey;后者根據留下的SeriesKey以及where條件中時間段信息(TimeRange)在TSMFile中以及內存中查出最終滿足條件的數值列。TSDB存儲引擎會將查詢到的所有滿足條件的原始數值列返回給上層,上層根據聚合函數對原始數據進行聚合并將聚合結果返回給用戶。整個過程如下圖所示:
?
上圖需要從底部向上瀏覽,整個流程可以整理為如下:
1.?根據where?condition以及所有倒排索引文件查處所有滿足條件的SeriesKey
2.?將滿足條件的SeriesKey根據GroupBy維度列進行分組,不同分組后續的所有操作都可以獨立并發執行,因此可以多線程處理
3.?針對某個分組的SeriesKey集合以及待查詢列,根據指定查詢時間段(TimeRange)在所有TSMFile中根據B+樹索引構建查詢iterator
4.?將滿足條件的原始數據返回給上層進行聚合運算,并將聚合運算的結果返回給用戶
實際執行的過程可能比較抽象,為了更好的理解,筆者在下半部分舉了一個示例。沒有理解上面的邏輯沒關系,可以先看下面的示例,看完之后再看上面的理論邏輯相信會更加容易理解。
下半部分:InfluxDB查詢流程示例
文章上半部分從理論層面對InfluxDB查詢流程進行了介紹。為了方便理解TSDB存儲引擎處理查詢流程的邏輯,筆者通過如下一個真實示例將其中的核心步驟進行說明。下表為原始時序數據表,表中有3個維度列:publisher、advertiser以及gender,3個數值列:impression、click以及revenue:
| timestamp | publisher | advertiser | gender | impression | click | revenue |
| 2017-11-01T00:00:00 | ultrarimfast.com | baidu.com | male | 1800 | 23 | 11.24 |
| 2017-12-01T00:00:00 | bieberfever.com | google.com | male | 2074 | 72 | 31.22 |
| 2018-01-04T00:00:00 | ultrarimfast.com | baidu.com | false | 1079 | 54 | 9.72 |
| 2018-01-08T00:00:01 | ultrarimfast.com | google.com | male | 1912 | 11 | 3.74 |
| 2018-01-21T00:00:01 | bieberfever.com | baidu.com | male | 897 | 17 | 5.48 |
| 2018-01-26T00:00:01 | ultrarimfast.com | baidu.com | male | 1120 | 73 | 6.48 |
現在用戶想查詢2018年1月份發布在baidu.com平臺上的不同廣告商的曝光量、點擊量以及總收入,SQL如下所示:
select sum(click),sum(impression),sum(revenue) from table group by publisher where advertiser = "baidu.com" and timestamp > "2018-01-01" and timestamp < "2018-02-01"步驟一:倒排索引過濾+groupby分組
原始查詢語句:select?….??from?ad_datasource?where?advertiser?=?“baidu.com”?……?。倒排索引即根據條件advertiser=”baidu.com”在所有Index?File中遍歷查詢包含該tag的所有SeriesKey,具體原理(詳見《時序數據庫技術體系?–?InfluxDB?多維查詢之倒排索引》)如下:
1.?根據Index?File中Measurement?Block根據”ad_datasource”進行過濾,可以直接定位到給定source對應的所有TagKey所在的文件offset|size。
2.?加載出對應TagKey區域的Hash?Index,使用給定TagKey(”advertiser”)進行hash可以直接定位到該TagKey對應的TagValue的文件offset|size。
3.?加載出TagKey對應TagValue區域的Hash?Index,使用過濾條件TagValue(”baidu.com”)進行hash可以直接定位到該TagValue對應的所有SeriesID。
4.?SeriesID就是對應SeriesKey在索引文件中的offset,直接根據SeriesID可以加載出對應的SeriesKey。
滿足條件的所有SeriesKey如下表所示,共有3個:
| publisher | advertiser | gender |
| ultrarimfast.com | baidu.com | male |
| ultrarimfast.com | baidu.com | false |
| bieberfever.com | baidu.com | male |
根據倒排索引查詢得到所有的SeriesKey之后,這里有一個非常重要的步驟:根據groupby條件對SeriesKey進行分組,分組算法為hash。示例查詢中聚合條件為group?by?publisher,因此需要將上面得到的3個SeriesKey按照publisher的不同分成如下兩組:
| publisher | advertiser | gender |
| bieberfever.com | baidu.com | male |
| publisher | advertiser | gender |
| ultrarimfast.com | baidu.com | male |
| ultrarimfast.com | baidu.com | female |
在倒排索引之后執行分組意義非常重大,分組后不同group的SeriesKey是可以并行獨立執行查詢并最終執行聚合的,因此后續的所有操作都可以使用多個線程并發執行,極大提升整個查詢性能。
步驟二:TSM文件數據檢索
到這一步,我們已經按照groupby得到分組后的SeriesKey集合。接下來需要根據SeriesKey以及TimeRange在TSM數據文件中查找滿足條件的待查詢列。在TSM數據文件中根據SeriesKey以及TimeRange查詢field的具體過程(詳見:《時序數據庫技術體系?–?InfluxDB?TSM存儲引擎之TSMFile》)如下:
上圖中中間部分為索引層,TSM在啟動之后就會將TSM文件的索引部分加載到內存,數據部分因為太大并不會直接加載到內存。用戶查詢可以分為三步:
1.?首先根據Key(SeriesKey+fieldKey)找到對應的SeriesIndex?Block,因為Key是有序的,所以可以使用二分查找來具體實現
2.?找到SeriesIndex?Block之后再根據查找的時間范圍,使用[MinTime,?MaxTime]索引定位到可能的Series?Data?Block列表
3.?將滿足條件的Series?Data?Block加載到內存中解壓進一步使用二分查找算法查找即可找到
在TSM中查詢滿足TimeRange條件的SeriesKey對應的待查詢列值,因為InfluxDB會根據不同的查詢列設置獨立的FieldIterator,因此查詢列有多少就有多少個FieldIterator,如下所示:
步驟三:原始數據聚合
查詢到滿足條件的所有原始數據之后,InfluxDB會根據查詢聚合函數對原始數據進行聚合,如下圖所示:
| publisher | sum(impression) | sum(click) | sum(revenue) |
| bieberfever.com | 897 | 17 | 5.48 |
| ultrarimfast.com | 1079?+?1120 | 54?+?73 | 9.72?+?6.48 |
文章總結
本文主要結合InfluxDB源碼對查詢聚合請求在服務器端的處理框架進行了系統理論介紹,同時深入介紹了InfluxDB?Shard?Engine是如何利用倒排索引、時序數據存儲文件(TSMFile)處理用戶的查詢請求。最后,舉了一個示例對Shard?Engine的執行流程進行了形象化說明。整個讀取的示意圖附件:
?
總結
以上是生活随笔為你收集整理的时序数据库技术体系 – InfluxDB TSM存储引擎之数据读取的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Centos7.x 安装 CDH 6.x
- 下一篇: boost::shared_mutex