influxdb数据过期_Influxdb Cluster下的数据写入
Cluster下的數據寫入
數據寫入的實現主要分析cluster/points_writer.go中的WritePoints函數的實現//?WritePoints?writes?across?multiple?local?and?remote?data?nodes?according?the?consistency?level.func?(w?*PointsWriter)?WritePoints(p?*WritePointsRequest)?error?{
w.statMap.Add(statWriteReq,?1)
w.statMap.Add(statPointWriteReq,?int64(len(p.Points)))????//2.1?先獲取RetentionPolicy
if?p.RetentionPolicy?==?""?{
db,?err?:=?w.MetaClient.Database(p.Database)????????if?err?!=?nil?{????????????return?err
}?else?if?db?==?nil?{????????????return?influxdb.ErrDatabaseNotFound(p.Database)
}
p.RetentionPolicy?=?db.DefaultRetentionPolicy
}????//?2.2?生成?shardMap
shardMappings,?err?:=?w.MapShards(p)????if?err?!=?nil?{????????return?err
}????//?Write?each?shard?in?it's?own?goroutine?and?return?as?soon
//?as?one?fails.
ch?:=?make(chan?error,?len(shardMappings.Points))????for?shardID,?points?:=?range?shardMappings.Points?{
//?2.3?寫入數據到Shard
go?func(shard?*meta.ShardInfo,?database,?retentionPolicy?string,?points?[]models.Point)?{
ch?
}(shardMappings.Shards[shardID],?p.Database,?p.RetentionPolicy,?points)
}????//?Send?points?to?subscriptions?if?possible.
ok?:=?false
//?We?need?to?lock?just?in?case?the?channel?is?about?to?be?nil'ed
w.mu.RLock()
select?{????case?w.subPoints?
ok?=?true
default:
}
w.mu.RUnlock()????if?ok?{
w.statMap.Add(statSubWriteOK,?1)
}?else?{
w.statMap.Add(statSubWriteDrop,?1)
}????//?2.4?等待寫入完成
for?range?shardMappings.Points?{
select?{????????case?
}
}
}????return?nil}上面的函數實現主要分如下幾個步驟
2.1 獲取對應的RetentionPolicy
2.2 生成ShardMap, 將各個point對應到相應ShardGroup中的Shard中, 這步很關鍵
2.3 按ShardId不同,開啟新的goroutine, 將points寫入相應的Shard,可能設計對寫入數據到其它的DataNode上;
2.4 等待寫入完成或退出
ShardMap的生成先講一下ShardGroup的概念
1.1 寫入Influxdb的每一條數據對帶有相應的time時間,每一個SharGroup都有自己的start和end時間,這個時間跨度是由用戶寫入時選取的RetentionPolicy時的ShardGroupDarution決定,這樣每條寫入的數據就必然僅屬于一個確定的ShardGroup中;
主要實現在cluster/points_writer.go中的MapShards中func?(w?*PointsWriter)?MapShards(wp?*WritePointsRequest)?(*ShardMapping,?error)?{????//?holds?the?start?time?ranges?for?required?shard?groups
timeRanges?:=?map[time.Time]*meta.ShardGroupInfo{}
rp,?err?:=?w.MetaClient.RetentionPolicy(wp.Database,?wp.RetentionPolicy)????if?err?!=?nil?{????????return?nil,?err
}????if?rp?==?nil?{????????return?nil,?influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)
}????for?_,?p?:=?range?wp.Points?{
timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]?=?nil
}????//?holds?all?the?shard?groups?and?shards?that?are?required?for?writes
for?t?:=?range?timeRanges?{
sg,?err?:=?w.MetaClient.CreateShardGroup(wp.Database,?wp.RetentionPolicy,?t)????????if?err?!=?nil?{????????????return?nil,?err
}
timeRanges[t]?=?sg
}
mapping?:=?NewShardMapping()????for?_,?p?:=?range?wp.Points?{
sg?:=?timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
sh?:=?sg.ShardFor(p.HashID())
mapping.MapPoint(&sh,?p)
}????return?mapping,?nil}我們來拆解下上面函數的實現
3.1 掃描所有的points, 按時間確定我們需要多個ShardGroupfor?_,?p?:=?range?wp.Points?{
timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]?=?nil
}
3.2 調用w.MetaClient.CreateShardGroup, 如果ShardGroup存在直接返回ShardGroup信息,如果不存在創建,創建過程涉及到將CreateShardGroup的請求發送給MetadataServer并等待本地更新到新的MetaData數據;sg,?err?:=?w.MetaClient.CreateShardGroup(wp.Database,?wp.RetentionPolicy,?t)
3.3 分析ShardGroup的分配規則, 在services/meta/data.go中的CreateShardGroupfunc?(data?*Data)?CreateShardGroup(database,?policy?string,?timestamp?time.Time)?error?{
...????//?Require?at?least?one?replica?but?no?more?replicas?than?nodes.
//?確認復本數,不能大于DataNode節點總數
replicaN?:=?rpi.ReplicaN????if?replicaN?==?0?{
replicaN?=?1
}?else?if?replicaN?>?len(data.DataNodes)?{
replicaN?=?len(data.DataNodes)
}????//?Determine?shard?count?by?node?count?divided?by?replication?factor.
//?This?will?ensure?nodes?will?get?distributed?across?nodes?evenly?and
//?replicated?the?correct?number?of?times.
//?根據復本數確定Shard數量
shardN?:=?len(data.DataNodes)?/?replicaN????//?Create?the?shard?group.
//?創建ShardGroup
data.MaxShardGroupID++
sgi?:=?ShardGroupInfo{}
sgi.ID?=?data.MaxShardGroupID
sgi.StartTime?=?timestamp.Truncate(rpi.ShardGroupDuration).UTC()
sgi.EndTime?=?sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()????//?Create?shards?on?the?group.
sgi.Shards?=?make([]ShardInfo,?shardN)????for?i?:=?range?sgi.Shards?{
data.MaxShardID++
sgi.Shards[i]?=?ShardInfo{ID:?data.MaxShardID}
}????//?Assign?data?nodes?to?shards?via?round?robin.
//?Start?from?a?repeatably?"random"?place?in?the?node?list.
//?ShardInfo中的Owners記錄了當前Shard所有復本所在DataNode的信息
//?分Shard的所有復本分配DataNode
//?使用data.Index作為基數確定開始的DataNode,然后使用?round?robin策略分配
//?data.Index:每次meta信息有更新,Index就會更新,?可以理解為meta信息的版本號
nodeIndex?:=?int(data.Index?%?uint64(len(data.DataNodes)))????for?i?:=?range?sgi.Shards?{
si?:=?&sgi.Shards[i]????????for?j?:=?0;?j?
nodeID?:=?data.DataNodes[nodeIndex%len(data.DataNodes)].ID
si.Owners?=?append(si.Owners,?ShardOwner{NodeID:?nodeID})
nodeIndex++
}
}????//?Retention?policy?has?a?new?shard?group,?so?update?the?policy.?Shard
//?Groups?must?be?stored?in?sorted?order,?as?other?parts?of?the?system
//?assume?this?to?be?the?case.
rpi.ShardGroups?=?append(rpi.ShardGroups,?sgi)
sort.Sort(ShardGroupInfos(rpi.ShardGroups))????return?nil
}
3.3 按每一個具體的point對應到ShardGroup中的一個Shard: 按point的HashID來對Shard總數取模,HashID是measurment + tag set的Hash值for?_,?p?:=?range?wp.Points?{
sg?:=?timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
sh?:=?sg.ShardFor(p.HashID())
mapping.MapPoint(&sh,?p)
}
....
func?(sgi?*ShardGroupInfo)?ShardFor(hash?uint64)?ShardInfo?{????return?sgi.Shards[hash%uint64(len(sgi.Shards))]
}
數據按一致性要求寫入過程簡述
1.1 根據一致性要求確認需要成功寫入幾份switch?consistency?{????//?對于ConsistencyLevelAny,?ConsistencyLevelOne只需要寫入一份即滿足一致性要求,返回客戶端
case?ConsistencyLevelAny,?ConsistencyLevelOne:
required?=?1
case?ConsistencyLevelQuorum:
required?=?required/2?+?1
}
1.2 根據Shard.Owners對應的DataNode, 向其中的每個DataNode寫入數據,如果是本機,直接調用w.TSDBStore.WriteToShard寫入;如果非本機,調用err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points);
1.3 寫入遠端失敗時,數據寫入HintedHandoff本地磁盤隊列多次重試寫到遠端,直到數據過期被清理;對于一致性要求是ConsistencyLevelAny, 寫入本地HintedHandoff成功,就算是寫入成功;w.statMap.Add(statWritePointReqHH,?int64(len(points)))
hherr?:=?w.HintedHandoff.WriteShard(shardID,?owner.NodeID,?points)????????????????if?hherr?!=?nil?{
ch?
}????????????????if?hherr?==?nil?&&?consistency?==?ConsistencyLevelAny?{
ch?
}
1.4 等待寫入超時或完成for?range?shard.Owners?{
select?{????????case?
w.statMap.Add(statWriteTimeout,?1)????????????//?return?timeout?error?to?caller
return?ErrTimeout????????case?result?:=?
if?result.Err?!=?nil?{????????????????if?writeError?==?nil?{
writeError?=?result.Err
}????????????????continue
}
wrote++????????????//?寫入已達到一致性要求,就立即返回
if?wrote?>=?required?{
w.statMap.Add(statWriteOK,?1)????????????????return?nil
}
}
}
HintedHandoff服務定義在services/hh/service.go中
寫入HintedHandoff中的數據,按NodeID的不同寫入不同的目錄,每個目錄下又分多個文件,每個文件作為一個segment, 命名規則就是依次遞增的id, id的大小按序就是寫入的時間按從舊到新排序;
hitnedhandoff.png
HintedHandoff服務會針對每一個遠端DataNode創建NodeProcessor, 每個負責自己DataNode的寫入, 運行在一個單獨的goroutine中
在每個goroutine中,作兩件事:一個是定時清理過期的數據,如果被清理掉的數據還沒有成功寫入到遠端,則會丟失;二是從文件讀取數據寫入到遠端;func?(n?*NodeProcessor)?run()?{
defer?n.wg.Done()
...????for?{
select?{????????case?
case?
n.Logger.Printf("failed?to?purge?for?node?%d:?%s",?n.nodeID,?err.Error())
}????????case?
limiter?:=?NewRateLimiter(n.RetryRateLimit)????????????for?{
c,?err?:=?n.SendWrite()????????????????if?err?!=?nil?{????????????????????if?err?==?io.EOF?{????????????????????????//?No?more?data,?return?to?configured?interval
currInterval?=?time.Duration(n.RetryInterval)
}?else?{
currInterval?=?currInterval?*?2
if?currInterval?>?time.Duration(n.RetryMaxInterval)?{
currInterval?=?time.Duration(n.RetryMaxInterval)
}
}????????????????????break
}????????????????//?Success!?Ensure?backoff?is?cancelled.
currInterval?=?time.Duration(n.RetryInterval)????????????????//?Update?how?many?bytes?we've?sent
limiter.Update(c)????????????????//?Block?to?maintain?the?throughput?rate
time.Sleep(limiter.Delay())
}
}
}
}數據的本地存儲和讀取
5.1 定義在services/hh/queue.go,所有的segment file在內存中組織成一個隊列,讀從head指向的segment讀取,寫入到tail指向的segment, 每個segment文件的最后8字節記錄當前segment文件已經讀到什么位置
5.2 清理,當這個segment文件內容都發送完當前文件會被刪除,周期性清理每次只會check當前head指向的segment是否需要清理掉
作者:掃帚的影子
鏈接:https://www.jianshu.com/p/6a94486b2daa
總結
以上是生活随笔為你收集整理的influxdb数据过期_Influxdb Cluster下的数据写入的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 快手关注请求功能在哪里
- 下一篇: 《流浪地球2》发行通知公开:片长173分