大话ion系列(五)
點擊上方“LiveVideoStack”關注我們
作者 | 王朋闖
本文為王朋闖老師創作的系列ion文章,LiveVideoStack已獲得授權發布,未來將持續更新。
大話ion系列(一)
大話ion系列(二)
大話ion系列(三)
大話ion系列(四)
八、QOS之Buffer和NACK
1.?buffer簡介
大家都知道webrtc有jitterbuffer,ion-sfu里也有buffer,抗丟包40%的秘訣就在這里。
主要作用有:
緩存rtp包,收到nack后,重傳rtp包
計算rtcp-nack,發送給客戶端
計算rtcp-twcc,發送給客戶端
計算rtcp-rr/sr/pli/等,發送給客戶端
2.buffer結構
看了上邊buffer的功能,基本buffer內的數據結構都是為功能服務的。
//待處理的包 type pendingPackets struct {arrivalTime int64//到達時間packet []byte//包數據 }//擴展的包結構體 type ExtPacket struct {Head bool//是否是第一個包Cycle uint32//SN轉了多少輪Arrival int64//到達時間Packet rtp.Packet//包Payload interface{}//包payloadKeyFrame bool//是否關鍵幀 }// Buffer contains all packets type Buffer struct {sync.Mutexbucket *Bucket//定制的rtp包ringbuffernacker *nackQueue//nack計算隊列videoPool *sync.Pool//視頻包臨時緩存audioPool *sync.Pool//視頻包臨時緩存codecType webrtc.RTPCodecTypeextPackets deque.Deque//擴展包緩存pPackets []pendingPackets//待處理包,用于緩存一些來不及處理的包closeOnce sync.OncemediaSSRC uint32//媒體源clockRate uint32//時鐘頻率maxBitrate uint64//最大碼率lastReport int64//上次報告時間twccExt uint8//rtp擴展頭twcc的id,sdp里有audioExt uint8//rtp擴展頭audiolevel的id,sdp里有bound boolclosed atomicBoolmime string//媒體類型,如video/h264等// 是否開啟remb nack twcc audiolevelremb boolnack booltwcc boolaudioLevel boolminPacketProbe intlastPacketRead intmaxTemporalLayer int32bitrate uint64//存儲碼率bitrateHelper uint64//用來計算碼率lastSRNTPTime uint64//最后一次SR的NTP時間lastSRRTPTime uint32//最后一次SR的RTP時間lastSRRecv int64//1970年1月1日0時0分0秒起到現在的總納秒數baseSN uint16//用來組裝RRcycles uint32//SN回環次數lastRtcpPacketTime int64//上一次rtcplastRtcpSrTime int64// Time the lastRTCP SR was received. Required for DLSR computation.lastTransit uint32//用來計算jittermaxSeqNo uint16//收到最大的SNstats Stats//狀態統計latestTimestamp uint32// latestreceived RTP timestamp on packetlatestTimestampTime int64 // Time of the latest timestamp (innanos since unix epoch)// callbacksonClose func()onAudioLevel func(level uint8)feedbackCB func([]rtcp.Packet)feedbackTWCC func(sn uint16, timeNS int64, marker bool)// loggerlogger logr.Logger }3.buffer創建
Pion/webrtc支持自定義BufferFactory,設置好之后,pion/webrtc的組件會使用自定義buffer。
比如pion/srtp是實際收發srtp和srtcp包的類,它們也會使用自定義buffer。
首先來看一下ion-sfu是在哪里設置自定義buffer的:
func NewWebRTCTransportConfig(c Config)WebRTCTransportConfig {//這個SettingEngine是pion里很重要的設置類,可以控制pion/webrtc很多行為和參數,比如ice-lite等se :=webrtc.SettingEngine{}se.DisableMediaEngineCopy(true)....//這里把自定義的BufferFactory給配置進去了//意思是pion/srtp會使用這個buffer來傳包se.BufferFactory =c.BufferFactory.GetOrNew }srtp和srtcp流向是這樣的:
當包到達pion/srtp時,就會觸發ReadStreamSRTP.init函數和ReadStreamSRTCP.init函數。
ReadStreamSRTP.init調用自定義的BufferFactory.GetOrNew函數,new了一個buffer
ReadStreamSRTCP.init調用自定義的BufferFactory.GetOrNew函數,new了一個rtcpReader
之后收發rtp和rtcp包,就會流經這個buffer和rtcpReader:
https://github.com/pion/srtp/blob/3c34651fa0c6de900bdc91062e7ccb5992409643/stream_srtp.go#L53
func (r *ReadStreamSRTP) init(childstreamSession, ssrc uint32) error {sessionSRTP, ok :=child.(*SessionSRTP) ......ifr.session.bufferFactory != nil {//這里就是調用自定義的BufferFactory.GetOrNew函數了,new了一個bufferr.buffer = r.session.bufferFactory(packetio.RTPBufferPacket,ssrc)} else { .......}return nil } srtp.(*ReadStreamSRTP).init--->session.bufferFactory(其實是buffer.BufferFactory.GetOrNew)--->buffer.NewBuffer srtp.(*ReadStreamSRTCP).init--->session.bufferFactory(其實是buffer.BufferFactory.GetOrNew)--->buffer.NewNewRTCPReader為什么這么搞呢?
仔細想想,如果控制了rtp和rtcp的buffer,是不是計算twcc、nack、stats等就很方便了,在buffer寫入包的同時,就可以通過設置的回調函數搞各種復雜計算。
4.buffer收發包流程
收發rtp包流程圖簡單總結:
srtp.(*ReadStreamSRTP).write--->buffer.(*Buffer).Write--->buffer.(*Buffer).ReadExtended--->DownTrack.WriteRTP--->DownTrack.writeSimpleRTP/writeSimulcastRTP貼一下代碼:
func (r *ReadStreamSRTP) write(buf []byte) (n int, err error) {//這里就把包寫入了自定義buffern, err = r.buffer.Write(buf)if errors.Is(err,packetio.ErrFull) {// Silently dropdata when the buffer is full.return len(buf), nil}return n, err }downtrack收發rtcp包流程圖:
srtp.(*ReadStreamSRTCP).write--->buffer.(*RTCPReader).Write--->DownTrack.Bind里rr.OnPacket
5.bucket存儲rtp
如圖,bucket是一個定制的ringbuffer,是用來存儲rtp包的:
包含一個數組,step是索引,每來一個包,先把包長度存入2字節,再把包存入一個MTU;step遞增,以此類推,達到最大再從0循環(從0到maxSteps)。
buf: [2][MTU][2][MTU][2][MTU][2][MTU]...[MTU]0 1 2 3 maxSteps| | | | ... |step-------------------------------><---------------------------------|rtp包寫入bucket,并被WebRTCReceiver用來查找+重傳包的過程。
buffer.(*Buffer).Write-->buffer.Buffer.calc-->buffer.bucket.AddPacket-->buffer.bucket.GetPacket<---WebRTCReceiver.RetransmitPackets<---DownTrack.handleRTCP看下代碼細節:
6. ?nackQueue計算nack
nack數組,用來存儲nack信息并計算rtcp-nack。
[9316][9317]...[N]|sn代碼細節:
const maxNackTimes = 3 // 每個nack包發送的最大次數,防止客戶端一直重傳加重擁塞 const maxNackCache = 100// 最大緩存個數type nack struct {sn uint32//rtp序列號nacked uint8//發送的次數 }type nackQueue struct {nacks []nack//nack數組kfSN uint32//askKeyframeSN 要求發送PLI }//創建nackQueue funcnewNACKQueue() *nackQueue{return &nackQueue{nacks: make([]nack, 0, maxNackCache+1),} }//刪除 func (n *nackQueue) remove(extSN uint32) {i := sort.Search(len(n.nacks), func(iint) bool { return n.nacks[i].sn >= extSN })if i >= len(n.nacks) ||n.nacks[i].sn != extSN {return}copy(n.nacks[i:],n.nacks[i+1:])n.nacks = n.nacks[:len(n.nacks)-1] }//插入,extSN從大到小,查找效率高 func (n *nackQueue) push(extSN uint32) {//找到<=數組中sn的位置,一般是0i := sort.Search(len(n.nacks), func(iint) bool { return n.nacks[i].sn >= extSN })if i < len(n.nacks) &&n.nacks[i].sn == extSN {return}nck := nack{sn: extSN,nacked: 0,}if i == len(n.nacks) {//如果是0,直接appendn.nacks = append(n.nacks, nck)} else {//否則復制元素,到最前邊n.nacks = append(n.nacks[:i+1], n.nacks[i:]...)n.nacks[i] = nck}//如果nack達到最大,刪除最前一個if len(n.nacks) >=maxNackCache {copy(n.nacks,n.nacks[1:])} }//生成nack func (n *nackQueue) pairs(headSN uint32)([]rtcp.NackPair, bool) {if len(n.nacks) ==0 {return nil, false}i := 0askKF := falsevar np rtcp.NackPairvar nps []rtcp.NackPairfor _, nck := rangen.nacks {if nck.nacked >=maxNackTimes {//如果nack重發>=3次if nck.sn >n.kfSN {//如果sn>上次請求關鍵幀SNn.kfSN = nck.sn//記錄下來askKF = true//返回請求PLI}continue}//跳過比headSN大3的if nck.sn >=headSN-2 {//如:9316>=9320-2 不成立,跳過n.nacks[i] = ncki++continue}//過來的是3個包//這個值是個經驗值://如果太大,返回rtcp-nack包會delay太久,導致客戶端重發包太遲,畫面延遲//如果太小,比如2,則可能亂序的概率會大,因為等的越短,亂序包到來的概率越小n.nacks[i] = nack{sn: nck.sn,nacked: nck.nacked +1,//計數器+1}i++//如果是np.PacketID==0,是第一個包,需要初始化np.PacketIDnp.LostPacketsif np.PacketID ==0 || uint16(nck.sn) > np.PacketID+16 {if np.PacketID !=0 {nps = append(nps, np)}np.PacketID = uint16(nck.sn)np.LostPackets = 0continue}//如果是后續包,計算LostPacketsnp.LostPackets |= 1 <<(uint16(nck.sn) - np.PacketID - 1)}if np.PacketID != 0 {nps = append(nps, np)//追加到后邊}n.nacks = n.nacks[:i]//去掉已經算過的包return nps, askKF }7. 總結
本文介紹了Qos中兩個基礎部分:
使用bucket緩存rtp包,收到nack后,重傳rtp包
使用nackQueue,存儲信息并計算rtcp-nack,發送給客戶端
作者簡介:
王朋闖:前百度RTN資深工程師,前金山云RTC技術專家,前VIPKID流媒體架構師,ION開源項目發起人。
特別說明:
本文發布于知乎,已獲得作者授權轉載。
講師招募
LiveVideoStackCon 2022 音視頻技術大會 上海站,正在面向社會公開招募講師,無論你所處的公司大小,title高低,老鳥還是菜鳥,只要你的內容對技術人有幫助,其他都是次要的。歡迎通過?speaker@livevideostack.com?提交個人資料及議題描述,我們將會在24小時內給予反饋。
喜歡我們的內容就點個“在看”吧!
總結
以上是生活随笔為你收集整理的大话ion系列(五)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网易云信自研大规模传输网核心系统架构剖析
- 下一篇: LiveVideoStack Meet