大话ion系列(二)
點擊上方“LiveVideoStack”關注我們
作者 | 王朋闖
本文為王朋闖老師創作的系列ion文章,LiveVideoStack已獲得授權發布,未來將持續更新。
大話ion系列(一)
三、演進與模塊說明
1.演進
ion-sfu最早從ion抽出來,經過長時間的演變和社區的打磨,功能和復雜度都在增加,主要過程有:
多PC模式:
每個人推/拉每路流都是單獨一個PC(PeerConnection),同一房間內PC個數是N^2,比如3個人是9個PC,4個人是16個。
實現簡單,但隨著人數增多,PC個數驟增,會造成資源浪費。
單PC模式:
每個人推/拉多路流都是同一個PC,同一房間內PC個數是N,比如3個人是3個PC,4個人是4個PC。
上下行流都用同一個PC重協商,瀏覽器兼容性差,造成SFU狀態競爭沖突問題等。
雙PC模式:
每個人推流用同一個PC,拉流用另一個PC。同一房間內PC個數是N*2。存在問題:端口占用是N*2,后來支持單端口解決了這個問題
抽象接口:
很多人將ion-sfu作為包導入使用,為了方便業務定制,抽象出了很多接口。
比如一開始只有一個SFU結構體,后來抽象出SessionProvider接口組,然后SFU結構體實現這個接口,上層業務就可以繼承派生SFU出自己的子類,方便做定制業務,然后再作為SessionProvider傳入NewPeer。之后GetSession獲取的就是派生的子類。同理也可以發現很多其他接口和對應的結構體,同樣都可以基于結構體派生出自己的定制類。
性能優化:
在ion-sdk-go完成后,基于它實現了ion-sfu-load-tool,壓測出很多channel,goroutine濫用導致性能下降,之后ion-sfu使用回調和queue來減少了濫用,性能得到很大提升。
功能強化:
Simulcast,大小流支持
Qos優化,TWCC/REMB+PLI/FIR+Nack+SR/RR等
發音檢測、定制dc等
2.?模塊介紹
SFU
一個SFU里應包含多個會話(房間)。
// SFU represents an sfu instance type SFU struct {//業務可以派生SFU來構建自己的類sync.RWMutexwebrtc WebRTCTransportConfig//配置turn *turn.Server//內置turnsessions map[string]Session//session管理datachannels []*Datachannel//dc管理withStats bool//是否開啟狀態監控 }Session
一個會話管理多個peer。
// Session represents a set of peers. Transports inside a SessionLocal// are automatically subscribed to each other.type Session interface {//Session是一組接口,具體的實現在SessionLocal里ID() stringPublish(router Router, r Receiver)//把Receiver的流發布到router中,給Session中的每個Peer增加一個Subscribe(peer Peer)//把peer的Subscriber訂閱到房間中其他peerAddPeer(peer Peer)//房間增加一個peerGetPeer(peerID string) Peer//獲取peerRemovePeer(peer Peer)//刪除peerAddRelayPeer(peerID string, signalData []byte) ([]byte, error)//增加級聯peerAudioObserver() *AudioObserver//獲取聲音檢測AddDatachannel(owner string, dc *webrtc.DataChannel)//增加dcGetDCMiddlewares() []*Datachannel//獲取dc中間件GetFanOutDataChannelLabels() []string//獲取扇出dcGetDataChannels(peerID, label string) (dcs []*webrtc.DataChannel)//獲取某個peer的全部dcFanOutMessage(origin, label string, msg webrtc.DataChannelMessage)//扇出消息Peers() []Peer//獲取全部peerRelayPeers() []*RelayPeer//獲取全部級聯peer}type SessionLocal struct {//SessionLocal是真實的管理類,當然上層業務可以派生它實現業務定制id stringmu sync.RWMutexconfig WebRTCTransportConfigpeers map[string]Peer//管理peerrelayPeers map[string]*RelayPeer//管理級聯peerclosed atomicBoolaudioObs *AudioObserver//聲音檢測fanOutDCs []string//扇出dc管理datachannels []*Datachannel//dc管理onCloseHandler func()//關閉時回調}Router
路由,表示流從哪個receiver接收,下發到哪個downtrack。
// Router defines a track rtp/rtcp Routertype Router interface {ID() stringAddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote) (Receiver, bool)//增加receiverAddDownTracks(s *Subscriber, r Receiver) error//把Receiver的downtracks加到SubscriberSetRTCPWriter(func([]rtcp.Packet) error)//設置rtcp寫入回調AddDownTrack(s *Subscriber, r Receiver) (*DownTrack, error)//根據Receiver創建downtrack,加到SubscriberStop()//停止}type router struct {//路由sync.RWMutexid stringtwcc *twcc.Responder//twcc響應器stats map[uint32]*stats.Stream//流狀態管理rtcpCh chan []rtcp.Packet//rtcp通道stopCh chan struct{}config RouterConfigsession Session//會話receivers map[string]Receiver//receiver管理,實際上是WebRTCReceiverbufferFactory *buffer.Factory//buffer管理writeRTCP func([]rtcp.Packet) error}Peer
一個peer對應一個客戶端,包含一個Publisher和Subscriber。
// PeerLocal represents a pair peer connectiontype PeerLocal struct {//代表SFU端本地peersync.Mutexid stringclosed atomicBoolsession Session//會話,代表一個房間provider SessionProvider//接口,只有一個GetSession,業務可以派生SFU來構建自己的類,然后傳給這個providerpublisher *Publisher//發布者subscriber *Subscriber//訂閱者OnOffer func(*webrtc.SessionDescription)//服務端發起協商(negotiate)時調用此接口OnIceCandidate func(*webrtc.ICECandidateInit, int)//服務端收集到候選(candidate)時調用OnICEConnectionStateChange func(webrtc.ICEConnectionState)//服務器ICE狀態改變時調用remoteAnswerPending bool//遠程是否answer中negotiationPending bool//是否協商中,為了避免狀態亂掉}type SessionProvider interface {//只有一個GetSession方法,業務可以派生SFU結構體來構建自己的類,SFU也包含這個GetSession方法GetSession(sid string) (Session, WebRTCTransportConfig)}Publisher
Publisher管理webrtc.TrackRemote和webrtc.RTPReceiver。
type Publisher struct {mu sync.RWMutexid stringpc *webrtc.PeerConnection//推流pccfg *WebRTCTransportConfigrouter Router//rtp/rtcp路由session Session//會話tracks []PublisherTrack//推流track,封裝了*webrtc.TrackRemote和*webrtc.RTPReceiverrelayed atomicBoolrelayPeers []*relayPeer//級聯peercandidates []webrtc.ICECandidateInit//trickle-ice候選onICEConnectionStateChangeHandler atomic.Value // func(webrtc.ICEConnectionState)回調onPublisherTrack atomic.Value // func(PublisherTrack)回調closeOnce sync.Once}type PublisherTrack struct {//封裝了TrackRemote和ReceiverTrack *webrtc.TrackRemoteReceiver Receiver// This will be used in the future for tracks that will be relayed as clients or servers// This is for SVC and Simulcast where you will be able to chose if the relayed peer just// want a single track (for recording/ processing) or get all the tracks (for load balancing)clientRelay bool//標記為客戶端或服務端}Subscriber
Subscriber管理下行PC和DownTrack。
type Subscriber struct {sync.RWMutexid stringpc *webrtc.PeerConnection//封裝下行PC,從客戶端接收流me *webrtc.MediaEnginetracks map[string][]*DownTrack//管理downtrackchannels map[string]*webrtc.DataChannel//管理dccandidates []webrtc.ICECandidateInitnegotiate func()//協商回調closeOnce sync.OncenoAutoSubscribe bool//不自動訂閱,router.AddDownTracks時會直接返回,并不添加track}Receiver
Receiver從客戶端接收rtp流,并發送rtcp。
// Receiver defines a interface for a track receiverstype Receiver interface {TrackID() string//track的idStreamID() string//stream的id,webrtc一個stream可以包含多個trackCodec() webrtc.RTPCodecParameters//codec參數,對應sdp里的Kind() webrtc.RTPCodecType//kind表示video、audioSSRC(layer int) uint32//同步信源(SSRC)標識符SetTrackMeta(trackID, streamID string)//設置trackID,streamIDAddUpTrack(track *webrtc.TrackRemote, buffer *buffer.Buffer, bestQualityFirst bool)//增加上行track、buffer,實際上讀寫包是從buffer里讀的AddDownTrack(track *DownTrack, bestQualityFirst bool)//增加下行trackSwitchDownTrack(track *DownTrack, layer int) error//切換downtrack的simulcast的空間層GetBitrate() [3]uint64//獲取每個layer的碼流GetMaxTemporalLayer() [3]int32//獲取每個空間層最大的時域層RetransmitPackets(track *DownTrack, packets []packetMeta) error//收到nack時向downtrack重傳包DeleteDownTrack(layer int, id string)//根據空間層和id刪除downtrackOnCloseHandler(fn func())//關閉回調SendRTCP(p []rtcp.Packet)//發送rtcp給SetRTCPCh(ch chan []rtcp.Packet)GetSenderReportTime(layer int) (rtpTS uint32, ntpTS uint64)}// WebRTCReceiver receives a video tracktype WebRTCReceiver struct {//WebRTCReceiver實現了Receiver的接口,是實際工作的接收器sync.MutexcloseOnce sync.OncepeerID string//哪個peertrackID string//哪個trackstreamID string//哪個streamkind webrtc.RTPCodecType//視頻or音頻closed atomicBoolbandwidth uint64//帶寬lastPli int64//上次pli的時間stream string//沒用receiver *webrtc.RTPReceiver//封裝實際的RTPReceivercodec webrtc.RTPCodecParameters//codec參數rtcpCh chan []rtcp.Packet//rtcp通道buffers [3]*buffer.Buffer//三個空間層各有一個接收包的bufferupTracks [3]*webrtc.TrackRemote//三個空間層各有一個實際的TrackRemotestats [3]*stats.Stream//三個空間層各有一個狀態統計available [3]atomicBool//三個空間層各自是否可用downTracks [3]atomic.Value //三個空間層各自有一組[]*DownTrackpending [3]atomicBool//三個空間層各自是否pendingpendingTracks [3][]*DownTrack//存三個空間層各自pending的tracknackWorker *workerpool.WorkerPoolisSimulcast bool//是否開啟大小流onCloseHandler func()//關閉回調函數}其實,真正收流并轉發的是WebRTCReceiver,參考WebRTCReceiver.writeRTP接口。
四、Join流程
1.?簡介
這里以JsonRPC為例,前邊提到過,所有信令入口是Handle函數。
推薦使用vscode等IDE分析,使用command+左鍵點擊,跳轉很方便,command+shift+F可以搜索關鍵字。
本文分析會省略Trickle-ICE和datachannel處理部分代碼,只分析核心流程,這樣更容易理解。
首先,來看一下ion-sdk-js的join流程:
const offer = await this.transports[Role.pub].pc.createOffer();//sdk端的pub是用來推流await this.transports[Role.pub].pc.setLocalDescription(offer);const answer = await this.signal.join(sid, uid, offer);//這里發送信令join到SFUawait this.transports[Role.pub].pc.setRemoteDescription(answer);this.transports[Role.pub].candidates.forEach((c) => this.transports![Role.pub].pc.addIceCandidate(c));this.transports[Role.pub].pc.onnegotiationneeded = this.onNegotiationNeeded.bind(this);然后,用一張簡圖來表示PeerLocal和Publisher、Subscriber之間的關系。
這里Publisher是對應sdk的pub,用來收pub的流。
2. Join信令邏輯
PeerLocal是信令連接成功后,NewPeer創建的,可以理解為SFU端的Peer。
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {。。。switch req.Method {case "join":var join Joinerr := json.Unmarshal(*req.Params, &join)if err != nil {p.Logger.Error(err, "connect: error parsing offer")replyError(err)break}//設置OnOffer回調,SFU需要重協商時,會調用這個函數p.OnOffer = func(offer *webrtc.SessionDescription) {if err := conn.Notify(ctx, "offer", offer); err != nil {p.Logger.Error(err, "error sending offer")}}//設置OnIceCandidate,SFU收到新候選時,會回調這個函數p.OnIceCandidate = func(candidate *webrtc.ICECandidateInit, target int) {if err := conn.Notify(ctx, "trickle", Trickle{Candidate: *candidate,Target: target,}); err != nil {p.Logger.Error(err, "error sending ice candidate")}}//調用PeerLocal.Joinerr = p.Join(join.SID, join.UID, join.Config)if err != nil {replyError(err)break}//調用PeerLocal.Answer生成answeranswer, err := p.Answer(join.Offer)if err != nil {replyError(err)break}//返回answer_ = conn.Reply(ctx, req.ID, answer)接下來分析一下Peer.Join和Peer.Answer。
3. PeerLocal.Join分析
最終進入的是PeerLocal.Join。
func (p *PeerLocal) Join(sid, uid string, config ...JoinConfig) error {...s, cfg := p.provider.GetSession(sid)//這里實際調的是SFU.GetSession,獲取或創建一個sessionp.session = sif !conf.NoSubscribe {p.subscriber, err = NewSubscriber(uid, cfg)//創建subscriber,來管理一組downtrackif err != nil {return fmt.Errorf("error creating transport: %v", err)}p.subscriber.noAutoSubscribe = conf.NoAutoSubscribe//設置subscriber的重協商回調函數,最終調用subscriber.Negotiate()就會觸發此函數//調用宗旨:subcriber的一組downtrack有變化,比如增加、刪除,此時需要subscriber重協商,//重協商的流程在前邊的文章有講過p.subscriber.OnNegotiationNeeded(func() {p.Lock()defer p.Unlock()if p.remoteAnswerPending {p.negotiationPending = true//是否為協商中return}Logger.V(1).Info("Negotiation needed", "peer_id", p.id)offer, err := p.subscriber.CreateOffer()//協商第一步驟CreateOffer+SetLocalDescriptionif err != nil {Logger.Error(err, "CreateOffer error")return}p.remoteAnswerPending = trueif p.OnOffer != nil && !p.closed.get() {Logger.V(0).Info("Send offer", "peer_id", p.id)p.OnOffer(&offer)//回調信令發送offer}})。。。}if !conf.NoPublish {//創建Publisher,創建上行pc以及設置pc.OnTrack,這是客戶端上行流的核心驅動函數//熟悉webrtc的都知道OnTrack是收到了遠端track流才會觸發,比如音頻、視頻p.publisher, err = NewPublisher(uid, p.session, &cfg)if err != nil {return fmt.Errorf("error creating transport: %v", err)}。。。p.session.AddPeer(p)//加入session管理Logger.V(0).Info("PeerLocal join SessionLocal", "peer_id", p.id, "session_id", sid)if !conf.NoSubscribe {p.session.Subscribe(p)//使p訂閱session中的其他peer}return nil}接下來分析一下session.Subscribe的核心代碼。
// 使peer訂閱房間內所有其他peer的流func (s *SessionLocal) Subscribe(peer Peer) {。。。// 循環遍歷房間內的peers,對每個peer的Publisher下的router進行操作:根據router的所有receiver,創建downtrack,并增加到peer.Subscriber中for _, p := range peers {err := p.Publisher().GetRouter().AddDownTracks(peer.Subscriber(), nil)if err != nil {Logger.Error(err, "Subscribing to Router err")continue}}。。。peer.Subscriber().negotiate()//訂閱好了,重協商}這里提一下關鍵函數調用鏈:
router.AddDownTracks—》router.AddDownTrack—》Subscriber.AddDownTrack/Receiver.AddDownTrack
func (r *router) AddDownTrack(sub *Subscriber, recv Receiver) (*DownTrack, error) {for _, dt := range sub.GetDownTracks(recv.StreamID()) {//避免重復添加if dt.ID() == recv.TrackID() {return dt, nil}}codec := recv.Codec()if err := sub.me.RegisterCodec(codec, recv.Kind()); err != nil {return nil, err}//創建downtrack,downtrack用來給客戶端下發流downTrack, err := NewDownTrack(webrtc.RTPCodecCapability{MimeType: codec.MimeType,ClockRate: codec.ClockRate,Channels: codec.Channels,SDPFmtpLine: codec.SDPFmtpLine,RTCPFeedback: []webrtc.RTCPFeedback{{"goog-remb", ""}, {"nack", ""}, {"nack", "pli"}},}, recv, r.bufferFactory, sub.id, r.config.MaxPacketTrack)if err != nil {return nil, err}//把downtrack增加到pc中if downTrack.transceiver, err = sub.pc.AddTransceiverFromTrack(downTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly,}); err != nil {return nil, err}// 設置關閉回調,關閉時pc自動刪除trackdownTrack.OnCloseHandler(func() {if sub.pc.ConnectionState() != webrtc.PeerConnectionStateClosed {if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {if err == webrtc.ErrConnectionClosed {return}Logger.Error(err, "Error closing down track")} else {//如果刪除成功,再從sub中刪除,然后重協商sub.RemoveDownTrack(recv.StreamID(), downTrack)sub.negotiate()}}})//設置OnBind回調,DownTrack.Bind()里會調用這個;PC協商完成時,DownTrack.Bind()會觸發downTrack.OnBind(func() {go sub.sendStreamDownTracksReports(recv.StreamID())})//增加downTrack到sub中,sub只是用來管理downtracks和生成SenderReport等sub.AddDownTrack(recv.StreamID(), downTrack)//增加downTrack到WebRTCReceiver中,實際收發包是WebRTCReceiver來控制,在writeRTP中recv.AddDownTrack(downTrack, r.config.Simulcast.BestQualityFirst)return downTrack, nil }4. PeerLocal.Answer分析
func (p *PeerLocal) Answer(sdp webrtc.SessionDescription) (*webrtc.SessionDescription, error) { 。。。answer, err := p.publisher.Answer(sdp)if err != nil {return nil, fmt.Errorf("error creating answer: %v", err)}Logger.V(0).Info("PeerLocal send answer", "peer_id", p.id)return &answer, nil }5.?總結
總結一下:
連接成功,創建PeerLocal,Join信令進入Handle函數
PeerLocal.Join
獲取或創建一個session
給PeerLocal創建Subscriber和Publisher,并設置他們的一些回調函數
使PeerLocal訂閱session中的其他peer,因為此時其他peer可能正在推流:
1)router.AddDownTracks—》router.AddDownTrack—》Subscriber.AddDownTrack/Receiver.AddDownTrack
2)PeerLocal.Subscriber().negotiate();訂閱好了,協商,這里是第二次重協商,完成后客戶端可以收到其他人的流了
PeerLocal.Answer生成answer
信令發送answer
客戶端收到answer后,完成第一次協商,此時pub的dc通道打通了(在前邊的文章有一張圖,作為參考)
作者簡介:
王朋闖:前百度RTN資深工程師,前金山云RTC技術專家,前VIPKID流媒體架構師,ION開源項目發起人。
特別說明:
本文發布于知乎,已獲得作者授權轉載。
掃描圖中二維碼或點擊閱讀原文
了解大會更多信息
喜歡我們的內容就點個“在看”吧!
總結
以上是生活随笔為你收集整理的大话ion系列(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 音视频技术开发周刊 | 217
- 下一篇: 当35岁遇到裁员