PulseAudio 设计和实现浅析
PulseAudio 是一個 POSIX 操作系統的音頻服務器系統,它是我們的音頻應用程序訪問系統音頻設備的代理。它是所有相關的現代 Linux 發行版的組成部分,并被多個供應商用在了各種各樣的移動設備中。它在應用程序和硬件設備間傳遞音頻數據時,可以對音頻數據執行一些高級操作。比如,把音頻數據傳給不同的機器,修改樣本格式或通道數,或者混音多路音頻到一路輸入/輸出,這些用 PulseAudio 實現都很簡單。PulseAudio 主頁。
很多學習了編程語言,操作系統,計算機網絡和標準庫及操作系統 API 的同學都會有一些疑問:自己好像已經學習了不少知識了,但這些知識能用來做些什么,基于這些知識怎么才能實現一個有一定價值的應用,應用程序究竟是如何訪問硬件設備的,如音頻設備、視頻設備等。
通過了解 PulseAudio 系統的設計和實現,可以讓我們看到許許多多 POSIX 系統,特別是 Linux 系統平臺下,開發一個實用的軟件,所需要了解的許許多多跟系統相關的點,如基于 udev 的設備發現和設備狀態變化監聽,各種各樣的進程間通信方式,如基于 IPv4 和 IPv6 的 TCP,Unix 域 socket,D-Bus 等,訪問音頻設備的 alsa 架構等。
這里簡單看一下 PulseAudio 的設計和實現。
PulseAudio 的代碼下載及編譯
編譯出調試版代碼,并在調試器中把代碼運行起來,可以為我們研究一個軟件項目提供極大的便利。我們在 Ubuntu 20.04 上編譯 PulseAudio 并把它跑起來。
我們可以通過 Git 下載代碼,當前 PulseAudio 最新發布的版本為 14.2,我們切換到這個分支:
$ git clone https://github.com/pulseaudio/pulseaudio.git $ git checkout -t origin/stable-14.xPulseAudio 是一個通過 meson + ninja 來構建的工程,更多關于 meson 構建系統的信息,可以參考 meson 主頁。在開始構建 PulseAudio 前,需要先下載 meson:
$ sudo apt install meson還需要下載 PulseAudio 本身及測試用例編譯的依賴庫:
$ sudo apt-get install libltdl-dev libsndfile1-dev cmake libsbc-dev check libbluetooth-dev編譯 PulseAudio:
$ cd pulseaudio $ mkdir build $ cd build/ $ meson .. $ ninja -C . . . . . . . . . . Database: tdbLegacy Database Entry Support: truemodule-stream-restore:Clear old devices: falseRunning from build tree: trueSystem User: pulseSystem Group: pulseAccess Group: pulse-access meson.build:899: WARNING: You do not have speex support enabled. It is strongly recommended that you enable speex support if your platform supports it as it is the primary method used for audio resampling and is thus a critical part of PulseAudio on that platform. Build targets in project: 167Found ninja-1.8.2 at ~/bin/depot_tools/ninjaPulseAudio 默認已經運行在 Ubuntu 20.04 中了。為了能夠運行我們編譯出來的 PulseAudio 服務,需要先暫停系統中已經存在的 pulseaudio 服務:
systemctl --user stop pulseaudio.socket systemctl --user stop pulseaudio.service注意,執行這些命令不需要超級用戶權限 sudo。
調試完成之后,還可以通過如下命令重新啟動系統的 pulseaudio 服務:
systemctl --user start pulseaudio.socket systemctl --user start pulseaudio.service通過如下命令,可以將 pulseaudio 服務運行起來:
~/data/opensource/pulseaudio$ build/src/daemon/pulseaudio -n -F build/src/daemon/default.pa -p $(pwd)/build/src/modules/ W: [pulseaudio] caps.c: Normally all extra capabilities would be dropped now, but that's impossible because PulseAudio was built without capabilities support. N: [pulseaudio] daemon-conf.c: Detected that we are run from the build tree, fixing search path. N: [pulseaudio] alsa-util.c: Disabling timer-based scheduling because running inside a VM. E: [alsa-sink-ES1371/1] alsa-sink.c: ALSA 提醒我們在該設備中寫入新數據,但實際上沒有什么可以寫入的! E: [alsa-sink-ES1371/1] alsa-sink.c: 這很可能是 ALSA 驅動程序 'snd_ens1371' 中的一個 bug。請向 ALSA 開發人員報告這個問題。 E: [alsa-sink-ES1371/1] alsa-sink.c: 我們因 POLLOUT 被設置而喚醒 -- 但結果是 snd_pcm_avail() 返回 0 或者另一個小于最小可用值的數值。 N: [pulseaudio] alsa-util.c: Disabling timer-based scheduling because running inside a VM. E: [pulseaudio] stdin-util.c: Unable to read or parse data from client. E: [pulseaudio] module.c: Failed to load module "module-gsettings" (argument: ""): initialization failed.PulseAudio 的設計和實現
pulse audio 工程有許多測試用例,可以用于幫助我們了解這個工程設計與實現。其中許多測試用例是單元測試,主要用于測試 pulse audio 基礎組件的 API,但也有一些測試用例會跑完整的錄制和播放流程,可以讓我們聽到通過 pulse audio 播放出來的音頻數據,這些測試用例有 lo-latency-test (pulseaudio/src/tests/lo-latency-test.c----pulseaudio/build/src/tests/lo-latency-test) 和 sync-playback (pulseaudio/src/tests/sync-playback.c----pulseaudio/build/src/tests/sync-playback) 等。
這里我們通過觀察 sync-playback 和 pulseaudio 服務的交互來了解 PulseAudio 的實現。
從進程角度來看,pulse audio 是客戶端/服務器架構的。pulse audio 提供一個常駐系統的服務進程,它接收音頻相關的操作請求命令,管理音頻設備,實現與音頻設備間的數據交互等。各種接入 pulse audio 庫,通過 pulse audio API 訪問音頻設備的應用,是 pulse audio 系統服務的客戶端,它們通過 pulse audio 系統服務向音頻播放設備寫入數據將聲音播放出來,通過 pulse audio 系統服務讀取錄制設備以獲得采集的音頻數據。
pulse audio 系統服務和它的客戶端可以通過多種方式實現進程間通信。對于命令的傳遞,可以通過 UNIX 域 socket,IPv4 socket,IP v6 socket,Linux 系統的 D-Bus 等,對于大塊的音頻 PCM 數據,則通過共享內存來傳遞。
這里看一下 pulse audio 數據傳輸的基本過程。
pulseaudio 系統服務在進程啟動時,會執行傳入的配置文件 build/src/daemon/default.pa 中的命令。如配置文件中的 load-module module-native-protocol-unix 行會使得 pulseaudio 系統服務去加載 module-native-protocol-unix 模塊,在這個模塊中會起動監聽路徑為 /run/user/1000/pulse/native 的 Unix 域 socket,詳細的執行路徑如上面的堆棧信息,并設置連接建立等事件發生時的回調 socket_server_on_connection_cb() 等。Unix 域 socket 由模塊 module-native-protocol-unix 管理。
客戶端應用程序,在調用 pa_context_connect() 接口時,會去連接 pulseaudio 系統服務監聽的 Unix 域 socket。
pulseaudio 系統服務在收到客戶端的連接時,socket-server 在其回調中,根據新連接的文件描述符創建 pa_iochannel,并執行回調 socket_server_on_connection_cb(),這個回調通過 pa_native_protocol_connect() 函數為連接創建所需的資源,如 pa_client,pa_native_connection 和流 pa_pstream 等,為流 pa_pstream 設置各種事件的回調,并將命令處理程序關聯到流上:
#0 pa_native_protocol_connect () at ../src/pulsecore/protocol-native.c:5194 #1 socket_server_on_connection_cb ()at ../src/modules/module-protocol-stub.c:202 #2 callback ()at ../src/pulsecore/socket-server.c:143 #3 dispatch_pollfds () at ../src/pulse/mainloop.c:655 #4 pa_mainloop_dispatch () at ../src/pulse/mainloop.c:896 #5 pa_mainloop_iterate () at ../src/pulse/mainloop.c:927 #6 pa_mainloop_run () at ../src/pulse/mainloop.c:942 #7 main () at ../src/daemon/main.c:1170pa_native_protocol_connect() 函數的部分代碼如下:
c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);pa_pstream_set_receive_packet_callback(c->pstream, pstream_packet_callback, c);pa_pstream_set_receive_memblock_callback(c->pstream, pstream_memblock_callback, c);pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);c->pdispatch = pa_pdispatch_new(p->core->mainloop, true, command_table, PA_COMMAND_MAX);c->record_streams = pa_idxset_new(NULL, NULL);c->output_streams = pa_idxset_new(NULL, NULL);客戶端在連接建立完成后,通過新創建的連接發送幾個命令,請求 pulseaudio 系統服務分配資源。這些命令的交互主要包括如下這些:
command_get_info command_get_info command_auth command_register_memfd_shmid command_set_client_name command_get_info command_get_info command_enable_srbchannel command_create_playback_stream command_create_playback_stream command_create_playback_stream command_create_playback_stream command_get_info command_get_info command_get_info command_get_info command_get_info command_get_info command_cork_playback_stream command_get_info command_get_info command_get_info忽略不是很重要的 command_get_info 命令。
(1). 如前面我們提到的,客戶端和 pulseaudio 系統服務通過共享內存相互傳遞大塊的音頻數據。客戶端的共享內存對象在創建 pa_context 對象時打開,如:
#0 sharedmem_create ()at ../src/pulsecore/shm.c:141 #1 pa_shm_create_rw () at ../src/pulsecore/shm.c:241 #2 pa_mempool_new () at ../src/pulsecore/memblock.c:849 #3 pa_context_new_with_proplist () at ../src/pulse/context.c:185 #4 pa_context_new () at ../src/pulse/context.c:103 #5 sync_playback_test () at ../src/tests/sync-playback.c:160 #6 main () at ../src/tests/sync-playback.c:184(2). pulseaudio 系統服務是在客戶端發送了 PA_COMMAND_AUTH 命令,在PA_COMMAND_AUTH 命令處理程序中打開的共享內存對象。客戶端在連接建立成功時發送 PA_COMMAND_AUTH 命令:
#0 pa_pstream_send_packet () at ../src/pulsecore/pstream.c:442 #1 pa_pstream_send_tagstruct_with_ancil_data ()at ../src/pulsecore/pstream-util.c:45 #2 pa_pstream_send_tagstruct_with_creds ()at ../src/pulsecore/pstream-util.c:58 #3 setup_context () at ../src/pulse/context.c:650 #4 on_connection () at ../src/pulse/context.c:926 #5 do_call () at ../src/pulsecore/socket-client.c:159 #6 connect_defer_cb () at ../src/pulsecore/socket-client.c:172 #7 dispatch_defer () at ../src/pulse/mainloop.c:680 #8 pa_mainloop_dispatch () at ../src/pulse/mainloop.c:887 #9 pa_mainloop_iterate () at ../src/pulse/mainloop.c:927 #10 pa_mainloop_run () at ../src/pulse/mainloop.c:942 #11 sync_playback_test () at ../src/tests/sync-playback.c:170 #12 main () at ../src/tests/sync-playback.c:184(3). pulseaudio 系統服務在 PA_COMMAND_AUTH 命令處理程序中,創建 pa_srbchannel 對象,并打開共享內存對象:
#0 sharedmem_create ()at ../src/pulsecore/shm.c:141 #1 pa_shm_create_rw () at ../src/pulsecore/shm.c:241 #2 pa_mempool_new (t) at ../src/pulsecore/memblock.c:849 #3 setup_srbchannel () at ../src/pulsecore/protocol-native.c:2502 #4 command_auth ()at ../src/pulsecore/protocol-native.c:2740 #5 pa_pdispatch_run ()at ../src/pulsecore/pdispatch.c:346 #6 pstream_packet_callback ()at ../src/pulsecore/protocol-native.c:5027 #7 do_read () at ../src/pulsecore/pstream.c:1020 #8 do_pstream_read_write () at ../src/pulsecore/pstream.c:260 #9 io_callback () at ../src/pulsecore/pstream.c:312 #10 callback ()at ../src/pulsecore/iochannel.c:158 #11 dispatch_pollfds () at ../src/pulse/mainloop.c:655 #12 pa_mainloop_dispatch () at ../src/pulse/mainloop.c:896 #13 pa_mainloop_iterate () at ../src/pulse/mainloop.c:927 #14 pa_mainloop_run () at ../src/pulse/mainloop.c:942 #15 main () at ../src/daemon/main.c:1170pulseaudio 系統服務在 PA_COMMAND_AUTH 命令的處理中,創建共享內存對象之后,還會通過 pa_pstream_register_memfd_mempool() 將共享內存對象的文件描述符傳給客戶端,如下面的 setup_srbchannel () 函數定義所示:
static void setup_srbchannel(pa_native_connection *c, pa_mem_type_t shm_type) {pa_srbchannel_template srbt;pa_srbchannel *srb;pa_memchunk mc;pa_tagstruct *t;int fdlist[2];#ifndef HAVE_CREDSpa_log_debug("Disabling srbchannel, reason: No fd passing support");return; #endifif (!c->options->srbchannel) {pa_log_debug("Disabling srbchannel, reason: Must be enabled by module parameter");return;}if (c->version < 30) {pa_log_debug("Disabling srbchannel, reason: Protocol too old");return;}if (!pa_pstream_get_shm(c->pstream)) {pa_log_debug("Disabling srbchannel, reason: No SHM support");return;}if (c->rw_mempool) {pa_log_debug("Ignoring srbchannel setup, reason: received COMMAND_AUTH ""more than once");return;}if (!(c->rw_mempool = pa_mempool_new(shm_type, c->protocol->core->shm_size, true))) {pa_log_warn("Disabling srbchannel, reason: Failed to allocate shared ""writable memory pool.");return;}if (shm_type == PA_MEM_TYPE_SHARED_MEMFD) {const char *reason;if (pa_pstream_register_memfd_mempool(c->pstream, c->rw_mempool, &reason)) {pa_log_warn("Disabling srbchannel, reason: Failed to register memfd mempool: %s", reason);goto fail;}}pa_mempool_set_is_remote_writable(c->rw_mempool, true);srb = pa_srbchannel_new(c->protocol->core->mainloop, c->rw_mempool);if (!srb) {pa_log_debug("Failed to create srbchannel");goto fail;}pa_log_debug("Enabling srbchannel...");pa_srbchannel_export(srb, &srbt);/* Send enable command to client */t = pa_tagstruct_new();pa_tagstruct_putu32(t, PA_COMMAND_ENABLE_SRBCHANNEL);pa_tagstruct_putu32(t, (size_t) srb); /* tag */fdlist[0] = srbt.readfd;fdlist[1] = srbt.writefd;pa_pstream_send_tagstruct_with_fds(c->pstream, t, 2, fdlist, false);/* Send ringbuffer memblock to client */mc.memblock = srbt.memblock;mc.index = 0;mc.length = pa_memblock_get_length(srbt.memblock);pa_pstream_send_memblock(c->pstream, 0, 0, 0, &mc);c->srbpending = srb;return;fail:if (c->rw_mempool) {pa_mempool_unref(c->rw_mempool);c->rw_mempool = NULL;} }setup_srbchannel () 函數還會向客戶端發送 PA_COMMAND_ENABLE_SRBCHANNEL 命令并向共享內存中寫入一段數據。
(4). 隨后,客戶端收到 pulseaudio 系統服務發過來的共享內存對象文件描述符,attach 到這塊共享內存對象上:
#0 shm_attach () at ../src/pulsecore/shm.c:347 #1 pa_shm_attach ()at ../src/pulsecore/shm.c:424 #2 segment_attach ()at ../src/pulsecore/memblock.c:1123 #3 pa_memimport_attach_memfd () at ../src/pulsecore/memblock.c:1213 #4 pa_pstream_attach_memfd_shmid () at ../src/pulsecore/pstream.c:377 #5 pa_pstream_register_memfd_mempool ()at ../src/pulsecore/pstream-util.c:174 #6 setup_complete_callback ()at ../src/pulse/context.c:554 #7 run_action () at ../src/pulsecore/pdispatch.c:288 #8 pa_pdispatch_run ()at ../src/pulsecore/pdispatch.c:341 #9 pstream_packet_callback ()at ../src/pulse/context.c:353 #10 do_read () at ../src/pulsecore/pstream.c:1020 #11 do_pstream_read_write () at ../src/pulsecore/pstream.c:260 #12 io_callback () at ../src/pulsecore/pstream.c:312 #13 callback ()at ../src/pulsecore/iochannel.c:158 #14 dispatch_pollfds () at ../src/pulse/mainloop.c:655 #15 pa_mainloop_dispatch () at ../src/pulse/mainloop.c:896 #16 pa_mainloop_iterate () at ../src/pulse/mainloop.c:927 #17 pa_mainloop_run () at ../src/pulse/mainloop.c:942 #18 sync_playback_test () at ../src/tests/sync-playback.c:170 #19 main () at ../src/tests/sync-playback.c:184(5). pulseaudio 系統服務創建的共享內存對象的文件描述符被傳給客戶端,客戶端除了 attach 到這塊共享內存對象上之外,還會通過發送 PA_COMMAND_REGISTER_MEMFD_SHMID 命令向 pulseaudio 系統服務注冊它自己創建的共享內存對象的文件描述符:
#0 pa_pstream_send_packet () at ../src/pulsecore/pstream.c:442 #1 pa_pstream_send_tagstruct_with_ancil_data ()at ../src/pulsecore/pstream-util.c:45 #2 pa_pstream_send_tagstruct_with_fds ()at ../src/pulsecore/pstream-util.c:81 #3 pa_pstream_register_memfd_mempool ()at ../src/pulsecore/pstream-util.c:186 #4 setup_complete_callback ()at ../src/pulse/context.c:554(6). pulseaudio 系統服務處理客戶端發過來的注冊共享內存對象文件描述符請求:
#0 shm_attach () at ../src/pulsecore/shm.c:347 #1 pa_shm_attach ()at ../src/pulsecore/shm.c:424 #2 segment_attach ()at ../src/pulsecore/memblock.c:1123 #3 pa_memimport_attach_memfd () at ../src/pulsecore/memblock.c:1213 #4 pa_pstream_attach_memfd_shmid () at ../src/pulsecore/pstream.c:377 #5 pa_common_command_register_memfd_shmid ()at ../src/pulsecore/native-common.c:67 #6 command_register_memfd_shmid ()at ../src/pulsecore/protocol-native.c:2750 #7 pa_pdispatch_run ()at ../src/pulsecore/pdispatch.c:346(7). 客戶端發送 PA_COMMAND_ENABLE_SRBCHANNEL 命令:
#0 handle_srbchannel_memblock () at ../src/pulse/context.c:359 #1 pstream_memblock_callback () at ../src/pulse/context.c:413 #2 do_read () at ../src/pulsecore/pstream.c:1066 #3 do_pstream_read_write () at ../src/pulsecore/pstream.c:260 #4 io_callback () at ../src/pulsecore/pstream.c:312pulseaudio 系統服務處理了客戶端注冊的共享內存對象文件描述符之后,向共享內存中寫入數據,客戶端收到數據之后,向 pulseaudio 系統服務發送 PA_COMMAND_ENABLE_SRBCHANNEL 命令。
(8). pulseaudio 系統服務處理 PA_COMMAND_ENABLE_SRBCHANNEL 命令
#0 pa_pstream_set_srbchannel () at ../src/pulsecore/pstream.c:1272 #1 command_enable_srbchannel ()at ../src/pulsecore/protocol-native.c:2559 #2 pa_pdispatch_run ()at ../src/pulsecore/pdispatch.c:346 #3 pstream_packet_callback ()at ../src/pulsecore/protocol-native.c:5027PA_COMMAND_ENABLE_SRBCHANNEL 命令處理函數中,將 srbchannel 與 pa_pstream 關聯起來。
(9). 客戶端發送 PA_COMMAND_CREATE_PLAYBACK_STREAM 命令:
#0 pa_pstream_send_packet () at ../src/pulsecore/pstream.c:442 #1 pa_pstream_send_tagstruct_with_ancil_data ()at ../src/pulsecore/pstream-util.c:45 #2 pa_pstream_send_tagstruct_with_creds () at ../src/pulsecore/pstream-util.c:61 #3 create_stream () at ../src/pulse/stream.c:1382 #4 pa_stream_connect_playback () at ../src/pulse/stream.c:1402 #5 context_state_callback () at ../src/tests/sync-playback.c:129客戶端創建 pa_stream 對象,并在調用 pa_stream_connect_playback () 接口時向pulseaudio 系統服務發送 PA_COMMAND_CREATE_PLAYBACK_STREAM 命令,請求 pulseaudio 系統服務創建播放流。
(10). pulseaudio 系統服務處理 PA_COMMAND_CREATE_PLAYBACK_STREAM 命令
#0 playback_stream_new () at ../src/pulsecore/protocol-native.c:964 #1 command_create_playback_stream ()at ../src/pulsecore/protocol-native.c:2067 #2 pa_pdispatch_run )at ../src/pulsecore/pdispatch.c:346pulseaudio 系統服務處理 PA_COMMAND_CREATE_PLAYBACK_STREAM 命令,創建 playback_stream 對象,這里會給流的 sink_input 設置許多回調,如:
s->sink_input->parent.process_msg = sink_input_process_msg;s->sink_input->pop = sink_input_pop_cb;s->sink_input->process_underrun = sink_input_process_underrun_cb;s->sink_input->process_rewind = sink_input_process_rewind_cb;s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;s->sink_input->update_max_request = sink_input_update_max_request_cb;s->sink_input->kill = sink_input_kill_cb;s->sink_input->moving = sink_input_moving_cb;s->sink_input->suspend = sink_input_suspend_cb;s->sink_input->send_event = sink_input_send_event_cb;s->sink_input->userdata = s;start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;fix_playback_buffer_attr(s);pa_sink_input_get_silence(sink_input, &silence);memblockq_name = pa_sprintf_malloc("native protocol playback stream memblockq [%u]", s->sink_input->index);s->memblockq = pa_memblockq_new(memblockq_name,start_index,s->buffer_attr.maxlength,s->buffer_attr.tlength,&sink_input->sample_spec,s->buffer_attr.prebuf,s->buffer_attr.minreq,0,&silence);pa_xfree(memblockq_name);(11). 客戶端發送 PA_COMMAND_CORK_PLAYBACK_STREAM 命令:
#0 pa_pstream_send_packet () at ../src/pulsecore/pstream.c:442 #1 pa_pstream_send_tagstruct_with_ancil_data ()at ../src/pulsecore/pstream-util.c:45 #2 pa_pstream_send_tagstruct_with_creds () at ../src/pulsecore/pstream-util.c:61 #3 pa_stream_cork () at ../src/pulse/stream.c:2293 #4 stream_state_callback () at ../src/tests/sync-playback.c:95客戶端調用 pa_stream_cork () 接口向 pulseaudio 系統服務發送 PA_COMMAND_CORK_PLAYBACK_STREAM 命令。這個接口用于暫停或恢復播放或錄制流。
(12). pulseaudio 系統服務處理 PA_COMMAND_CORK_PLAYBACK_STREAM 命令
#0 pa_sink_input_cork () at ../src/pulsecore/sink-input.c:1577 #1 command_cork_playback_stream ()at ../src/pulsecore/protocol-native.c:3988 #2 pa_pdispatch_run ()at ../src/pulsecore/pdispatch.c:346 #3 pstream_packet_callback ()at ../src/pulsecore/protocol-native.c:5027pulseaudio 系統服務設置 sink_input 的狀態。
此外,客戶端還向 pulseaudio 系統服務發送了 command_set_client_name 命令和多個 command_get_info 命令 ,這里不再詳述這些命令的發送和接收處理過程。
客戶端和 pulseaudio 系統服務經過上面的這些命令交互,完成整個的協商過程,它們之間可以開始進行音頻數據的互相發送了。
客戶端通過調用 pa_stream_write () 向播放流中寫入數據,將播放數據發送給 pulseaudio 系統服務:
#0 pa_pstream_send_memblock () at ../src/pulsecore/pstream.c:477 #1 pa_stream_write_ext_free () at ../src/pulse/stream.c:1549 #2 pa_stream_write () at ../src/pulse/stream.c:1616 #3 stream_state_callback () at ../src/tests/sync-playback.c:87pa_stream_write_ext_free () 將要發送的數據切成一個個的塊,然后通過 pa_pstream_send_memblock () 發送出去:
while (t_length > 0) {pa_memchunk chunk;chunk.index = 0;if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);chunk.length = t_length;} else {void *d;size_t blk_size_max;/* Break large audio streams into _aligned_ blocks or the* other endpoint will happily discard them upon arrival. */blk_size_max = pa_frame_align(pa_mempool_block_size_max(s->context->mempool), &s->sample_spec);chunk.length = PA_MIN(t_length, blk_size_max);chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);d = pa_memblock_acquire(chunk.memblock);memcpy(d, t_data, chunk.length);pa_memblock_release(chunk.memblock);}pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);t_offset = 0;t_seek = PA_SEEK_RELATIVE;t_data = (const uint8_t*) t_data + chunk.length;t_length -= chunk.length;pa_memblock_unref(chunk.memblock);}pa_pstream_send_memblock () 發送數據是異步的,它將一個個塊的數據再按需切成一個個 item,放進發送隊列里:
void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {size_t length, idx;size_t bsm;pa_assert(p);pa_assert(PA_REFCNT_VALUE(p) > 0);pa_assert(channel != (uint32_t) -1);pa_assert(chunk);if (p->dead)return;idx = 0;length = chunk->length;bsm = pa_mempool_block_size_max(p->mempool);while (length > 0) {struct item_info *i;size_t n;if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))i = pa_xnew(struct item_info, 1);i->type = PA_PSTREAM_ITEM_MEMBLOCK;n = PA_MIN(length, bsm);i->chunk.index = chunk->index + idx;i->chunk.length = n;i->chunk.memblock = pa_memblock_ref(chunk->memblock);i->channel = channel;i->offset = offset;i->seek_mode = seek_mode; #ifdef HAVE_CREDSi->with_ancil_data = false; #endifpa_queue_push(p->send_queue, i);idx += n;length -= n;}p->mainloop->defer_enable(p->defer_event, 1); }在主事件循環中,通過 srbchannel 將數據發送出去:
#0 pa_memexport_put () at ../src/pulsecore/memblock.c:1455 #1 prepare_next_write_item () at ../src/pulsecore/pstream.c:664 #2 do_write () at ../src/pulsecore/pstream.c:751 #3 do_pstream_read_write () at ../src/pulsecore/pstream.c:266 #4 srb_callback () at ../src/pulsecore/pstream.c:295 #5 srbchannel_rwloop () at ../src/pulsecore/srbchannel.c:190 #6 semread_cb ()at ../src/pulsecore/srbchannel.c:210 #7 dispatch_pollfds () at ../src/pulse/mainloop.c:655pa_pstream_send_memblock () 函數是如何觸發 srbchannel 的回調執行的呢?這是由于客戶端通過如下過程創建 pa_srbchannel:
#0 pa_srbchannel_new_from_template () at ../src/pulsecore/srbchannel.c:291 #1 handle_srbchannel_memblock () at ../src/pulse/context.c:380 #2 pstream_memblock_callback () at ../src/pulse/context.c:413 #3 do_read () at ../src/pulsecore/pstream.c:1066 #4 do_pstream_read_write () at ../src/pulsecore/pstream.c:260 #5 io_callback () at ../src/pulsecore/pstream.c:312 #6 callback ()at ../src/pulsecore/iochannel.c:158 #7 dispatch_pollfds () at ../src/pulse/mainloop.c:655且會為 srb_channel 設置 defer_event 回調:
#0 pa_srbchannel_set_callback () at ../src/pulsecore/srbchannel.c:340 #1 check_srbpending () at ../src/pulsecore/pstream.c:738 #2 do_write () at ../src/pulsecore/pstream.c:755 #3 do_pstream_read_write () at ../src/pulsecore/pstream.c:266 #4 0x00007ffff7bcd19e in io_callback () at ../src/pulsecore/pstream.c:312pa_srbchannel_set_callback () 函數實現如下:
void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata) {if (sr->callback)pa_fdsem_after_poll(sr->sem_read);sr->callback = callback;sr->cb_userdata = userdata;if (sr->callback) {/* If there are events to be read already in the ringbuffer, we will not get any IO event for that,because that's how pa_fdsem works. Therefore check the ringbuffer in a defer event instead. */if (!sr->defer_event)sr->defer_event = sr->mainloop->defer_new(sr->mainloop, defer_cb, sr);sr->mainloop->defer_enable(sr->defer_event, 1);} }pa_srbchannel_new_from_template () 函數的實現如下:
pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t) {int temp;struct srbheader *srh;pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));sr->mainloop = m;sr->memblock = t->memblock;pa_memblock_ref(sr->memblock);srh = pa_memblock_acquire(sr->memblock);sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity;sr->rb_read.count = &srh->read_count;sr->rb_write.count = &srh->write_count;sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset;sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset;sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd);if (!sr->sem_read)goto fail;sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd);if (!sr->sem_write)goto fail;pa_srbchannel_swap(sr);temp = t->readfd; t->readfd = t->writefd; t->writefd = temp;#ifdef DEBUG_SRBCHANNELpa_log("Enabling io event on fd %d", t->readfd); #endifsr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr);m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);return sr;fail:pa_srbchannel_free(sr);return NULL; }這里會給 io event (read_event)關聯回調。
在 pa_pstream_send_memblock () 函數中通過 p->mainloop->defer_enable(p->defer_event, 1); 喚醒主循環,這個回調的實際實現函數為 mainloop_defer_enable():
static void mainloop_defer_enable(pa_defer_event *e, int b) {pa_assert(e);pa_assert(!e->dead);if (e->enabled && !b) {pa_assert(e->mainloop->n_enabled_defer_events > 0);e->mainloop->n_enabled_defer_events--;} else if (!e->enabled && b) {e->mainloop->n_enabled_defer_events++;pa_mainloop_wakeup(e->mainloop);}e->enabled = b; }mainloop_defer_enable() 函數更新狀態 n_enabled_defer_events,并喚醒主事件循環。被喚醒的主事件循環中,pa_mainloop_dispatch() 在看到這個狀態時,會將事件派發給各個 defer event:
static unsigned dispatch_defer(pa_mainloop *m) {pa_defer_event *e;unsigned r = 0;if (m->n_enabled_defer_events <= 0)return 0;PA_LLIST_FOREACH(e, m->defer_events) {if (m->quit)break;if (e->dead || !e->enabled)continue;pa_assert(e->callback);e->callback(&m->api, e, e->userdata);r++;}return r; } . . . . . . int pa_mainloop_dispatch(pa_mainloop *m) {unsigned dispatched = 0;pa_assert(m);pa_assert(m->state == STATE_POLLED);if (m->quit)goto quit;if (m->n_enabled_defer_events)dispatched += dispatch_defer(m);else {mainloop_defer_enable() 在出發 defer event 被回調之外,也會觸發 srb_channel 的 read event 被調用。
在 Linux 平臺上,pulseaudio 系統服務通過 ALSA 與系統音頻硬件交互。ALSA 項目的主頁為 ALSA。ALSA 項目有一份文檔 A Tutorial on Using the ALSA Audio API 簡單說明了 ALSA 庫接口的用法。ALSA 庫的接口還可以參考 ALSA Library API 和 ALSA library API reference,特別是 PCM interface 部分。
有兩種方法可以用來傳輸音頻樣本數據,第一種是標準的讀寫接口。第二種是使用直接音頻緩沖區,來與設備通信。標準的讀寫接口包括 snd_pcm_writei()?/ snd_pcm_readi() 和 snd_pcm_writen()?/?snd_pcm_readn()。直接讀寫傳輸借助于 mmap 的區域來傳輸數據,這些接口包括 snd_pcm_mmap_begin() / snd_pcm_mmap_commit()。pulse audio 中用這兩種方式都支持,一般用的是直接讀寫傳輸。
alsa-sink 內部有一個線程,在需要的時候,會去取播放的數據并寫入設備(pulseaudio/src/modules/alsa/alsa-sink.c):
static int mmap_write(struct userdata *u, pa_usec_t *sleep_usec, bool polled, bool on_timeout) {bool work_done = false; . . . . . .for (;;) {pa_memchunk chunk;void *p;int err;const snd_pcm_channel_area_t *areas;snd_pcm_uframes_t offset, frames;snd_pcm_sframes_t sframes;size_t written;frames = (snd_pcm_uframes_t) (n_bytes / u->frame_size); /* pa_log_debug("%lu frames to write", (unsigned long) frames); */if (PA_UNLIKELY((err = pa_alsa_safe_mmap_begin(u->pcm_handle, &areas, &offset, &frames, u->hwbuf_size, &u->sink->sample_spec)) < 0)) {if (!after_avail && err == -EAGAIN)break;if ((r = try_recover(u, "snd_pcm_mmap_begin", err)) == 0)continue;if (r == 1)break;return r;}/* Make sure that if these memblocks need to be copied they will fit into one slot */frames = PA_MIN(frames, u->frames_per_block);if (!after_avail && frames == 0)break;pa_assert(frames > 0);after_avail = false;/* Check these are multiples of 8 bit */pa_assert((areas[0].first & 7) == 0);pa_assert((areas[0].step & 7) == 0);/* We assume a single interleaved memory buffer */pa_assert((areas[0].first >> 3) == 0);pa_assert((areas[0].step >> 3) == u->frame_size);p = (uint8_t*) areas[0].addr + (offset * u->frame_size);written = frames * u->frame_size;chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, written, true);chunk.length = pa_memblock_get_length(chunk.memblock);chunk.index = 0;pa_sink_render_into_full(u->sink, &chunk);pa_memblock_unref_fixed(chunk.memblock);if (PA_UNLIKELY((sframes = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) {if ((int) sframes == -EAGAIN)break;if ((r = try_recover(u, "snd_pcm_mmap_commit", (int) sframes)) == 0)continue;if (r == 1)break;return r;}如上所示,alsa-sink 模塊通過 snd_pcm_mmap_commit() 接口將接收的音頻數據送給音頻設備。
alsa-sink 線程取數據的過程如下:
#0 sink_input_pop_cb () at ../src/pulsecore/protocol-native.c:1504 #1 pa_sink_input_peek ()at ../src/pulsecore/sink-input.c:931 #2 fill_mix_info () at ../src/pulsecore/sink.c:1100 #3 pa_sink_render_into () at ../src/pulsecore/sink.c:1340 #4 pa_sink_render_into_full () at ../src/pulsecore/sink.c:1424 #5 mmap_write () at ../src/modules/alsa/alsa-sink.c:737 #6 thread_func () at ../src/modules/alsa/alsa-sink.c:1921 #7 internal_thread_func () at ../src/pulsecore/thread-posix.c:81 #8 start_thread ( at pthread_create.c:477pulseaudio 的 ALSA 模塊在 sink_input_pop_cb() 函數中請求音頻數據:
/* Called from thread context */ static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {playback_stream *s;pa_sink_input_assert_ref(i);s = PLAYBACK_STREAM(i->userdata);playback_stream_assert_ref(s);pa_assert(chunk);#ifdef PROTOCOL_NATIVE_DEBUGpa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); #endifif (!handle_input_underrun(s, false))s->is_underrun = false;/* This call will not fail with prebuf=0, hence we check forunderrun explicitly in handle_input_underrun */if (pa_memblockq_peek(s->memblockq, chunk) < 0)return -1;chunk->length = PA_MIN(nbytes, chunk->length);if (i->thread_info.underrun_for > 0)pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);pa_memblockq_drop(s->memblockq, chunk->length);playback_stream_request_bytes(s);return 0; }sink_input_pop_cb() 函數先處理 underrun ,即緩沖區中數據不足的情況:
#0 playback_stream_request_bytes () at ../src/pulsecore/protocol-native.c:1116 #1 handle_input_underrun () at ../src/pulsecore/protocol-native.c:1488 #2 sink_input_pop_cb () at ../src/pulsecore/protocol-native.c:1516 #3 pa_sink_input_peek ()at ../src/pulsecore/sink-input.c:931此時會通過 playback_stream_request_bytes () 給客戶端發消息讓它發數據過來。隨后,從內存塊緩存隊列中取一部分數據出來:
#0 pa_memblockq_peek () at ../src/pulsecore/memblockq.c:474 #1 sink_input_pop_cb () at ../src/pulsecore/protocol-native.c:1521 #2 pa_sink_input_peek ()at ../src/pulsecore/sink-input.c:931最后sink_input_pop_cb() 函數還是會通過 playback_stream_request_bytes () 給客戶端發消息請求數據。playback_stream_request_bytes () 函數定義如下:
static void playback_stream_request_bytes(playback_stream *s) {size_t m;playback_stream_assert_ref(s);m = pa_memblockq_pop_missing(s->memblockq);if (m <= 0)return;#ifdef PROTOCOL_NATIVE_DEBUGpa_log("request_bytes(%lu)", (unsigned long) m); #endifif (pa_atomic_add(&s->missing, (int) m) <= 0)pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); }這最終會導致 pulseaudio 服務向客戶端發送一個請求數據的命令(pulseaudio/src/pulsecore/protocol-native.c)
static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {playback_stream *s = PLAYBACK_STREAM(o);playback_stream_assert_ref(s);if (!s->connection)return -1;switch (code) {case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {pa_tagstruct *t;int l = 0;for (;;) {if ((l = pa_atomic_load(&s->missing)) <= 0)return 0;if (pa_atomic_cmpxchg(&s->missing, l, 0))break;}t = pa_tagstruct_new();pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */pa_tagstruct_putu32(t, s->index);pa_tagstruct_putu32(t, (uint32_t) l);pa_pstream_send_tagstruct(s->connection->pstream, t);播放數據這塊的處理是典型的生產者-消費者模型。前面這里看到的都是消費者的處理,不過那生產者是怎么把數據放進隊列里的呢?
sink_input_process_msg() 函數在處理 SINK_INPUT_MESSAGE_SEEK 消息和 SINK_INPUT_MESSAGE_POST_DATA 消息時將讀取的數據放進隊列里,從 backtrace 可以看到,放數據的動作是在 IO 線程,也就是 alsa-sink 線程,里完成的:
#0 sink_input_process_msg () at ../src/pulsecore/protocol-native.c:1318 #1 pa_asyncmsgq_dispatch ()at ../src/pulsecore/asyncmsgq.c:323 #2 asyncmsgq_read_work () at ../src/pulsecore/rtpoll.c:566 #3 pa_rtpoll_run () at ../src/pulsecore/rtpoll.c:238 #4 thread_func () at ../src/modules/alsa/alsa-sink.c:2003SINK_INPUT_MESSAGE_POST_DATA 和 SINK_INPUT_MESSAGE_POST_DATA 消息,讀取到客戶端發送過來的數據時發送,pstream.c 下的 do_read() 通過 pa_memimport_get () 獲得客戶端發送過來的內存塊:
#0 pa_memimport_get () at ../src/pulsecore/memblock.c:1231 #1 do_read () at ../src/pulsecore/pstream.c:1042 #2 do_pstream_read_write () at ../src/pulsecore/pstream.c:253 #3 srb_callback () at ../src/pulsecore/pstream.c:295 #4 srbchannel_rwloop () at ../src/pulsecore/srbchannel.c:190 #5 semread_cb ()at ../src/pulsecore/srbchannel.c:210在 pstream_memblock_callback() 函數中,發送 SINK_INPUT_MESSAGE_POST_DATA 和 SINK_INPUT_MESSAGE_POST_DATA 消息出去:
#0 pstream_memblock_callback () at ../src/pulsecore/protocol-native.c:5033 #1 do_read () at ../src/pulsecore/pstream.c:1066 #2 do_pstream_read_write () at ../src/pulsecore/pstream.c:253從 PulseAudio Git repo master 分支的 commit 歷史可以看到,第一筆 commit 是在 2004 年提交的,從第一筆 commit 提交到現在已經有近 20 年了,PulseAudio 是一個經過了非常多年發展,非常有歷史的一個項目,想必 PulseAudio 的很多代碼也是經過了相當長的歷史演化變成今天這個樣子的。一天兩天似乎是很難將這個項目完全吃透的。這里對于 PulseAudio 的說明還很粗淺。
PulseAudio 項目不僅僅是要做一個應用和音頻設備之間的數據通道,它更想成為一個音頻框架。這里的介紹只是了解 PulseAudio 設計和實現的一個角度。要想對 PulseAudio 有更深入的了解,對一些技術基礎的了解不可或缺:
- 進程間通信的庫接口的詳細用法,這包括 D-Bus,Unix 域 socket API,IPv4/IPv6 tcp socket 的 API,共享內存接口如 memfd_create() / shm_open() 等。
- udev 庫接口的詳細用法及其相關機制
- ALSA 庫接口的詳細用法
- timerfd 和 eventfd 的用法
還有很多 PulseAudio 相關的內容這里還沒有涉及:
- PulseAudio 的線程模型及線程基礎設施,如 pa_mainloop、pa_mainloop_api 和 pa_threaded_mainloop 等。
- PulseAudio 創建的一些概念和抽象的語義,如 pa_context,pa_stream,pa_iochannel,pa_pstream,pa_srbchannel,pa_sink,pa_sink_input,pa_source,pa_source_output,pa_module 和 pa_card 等等等
- pulseaudio 系統服務管理音頻設備文件
- pulseaudio 系統服務的模塊化架構設計
- 音頻 pipeline 的搭建
- 客戶端和 pulseaudio 系統服務間消息和數據交換的詳細設計
- 等等等。
參考文檔:
如何暫時禁用PulseAudio?
Instructions for building and installing the current development version
README.md of pulseaudio
總結
以上是生活随笔為你收集整理的PulseAudio 设计和实现浅析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于 FFmpeg 的播放器 demo
- 下一篇: WebRTC 的版本号与代码分支