Seastar Tutorial 简明教程
這是對于Seastar官方tutorial的部分翻譯,同時也結合了自己的思考。
Introduction
Seastar是一個用于在現(xiàn)代多核機器上編寫高性能的復雜的服務端應用的C++庫。
Seastar是一個完整的異步編程框架,使用futures和continuations兩個概念來抽象對于I/O事件的處理。
因為現(xiàn)代處理器核心共享數(shù)據(jù)會帶來很多懲罰,所以Seastor想要避免多個核心共享內存,而用消息傳遞機制取代之。
Since modern multi-core and multi-socket machines have steep penalties for sharing data between cores (atomic instructions, cache line bouncing and memory fences), Seastar programs use the share-nothing programming model, i.e., the available memory is divided between the cores, each core works on data in its own part of memory, and communication between cores happens via explicit message passing (which itself happens using the SMP’s shared memory hardware, of course).
Asynchronous programming
最開始的服務器都是同步模型,為每一條連接分配一個進程(線程),然后在線程內以同步方式編寫代碼,但是很快遇到了C10K問題。
隨后異步模型——事件驅動模型開始得到流行,one thread per CPU,每個線程運行一個loop,不斷地使用poll(epoll)獲取讀寫事件并處理。
但是事件驅動模型有兩個問題:
另外,在追求極致性能的時候,編程框架無法避免的要考慮兩個問題:
Seastar就是一個為了解決上述問題的異步服務器應用編程框架,它是完全單線程的(對每個核心來說是單線程),可以很容易地擴展到多核并且共享內存帶來的開銷被最小化了,并且基于C++14。
Seastar
Seastar使用一些概念來實現(xiàn)極致性能:
- Cooperative micro-task scheduler: 每個核心上運行一個協(xié)作式任務調度器,每個任務都非常輕量——只處理I/O操作的結果或者提交I/O操作。
- Share-nothing SMP architecture: 每個核心都獨立于其他核心,內存、數(shù)據(jù)結構、CPU時間都不共享,核心之間的通信通過消息傳遞完成。一個Seastar核心被稱做一個shard。
- Future based APIs: 這允許你提交一個I/O操作并且鏈式添加在I/O操作完成后要執(zhí)行的任務。很容易去并行執(zhí)行多個I/O操作。
- Share-nothing TCP stack: Seastar也提供一個內置的高性能TCP/IP協(xié)議棧,它構建在任務調度器和無共享架構的基礎之上,提供零拷貝功能。
- DMA-based storage APIs: Seastar提供零拷貝存儲的API,允許你使用DMA傳輸數(shù)據(jù)到存儲設備上或者反過來。
Threads and memory
Seastar threads
Seastar程序在每個CPU上運行一個線程,每個線程運行自己的事件循環(huán),它也被稱作engine。Seastar程序默認會使用所有可用的核心,在每個核心上都開啟一個線程。可以打印seastar::smp::count來查看被啟動的線程數(shù)。
每個engine都被綁定在一個不同的硬件線程上(為了CPU親和性),且app的初始化函數(shù)只在一個線程上運行。
可以傳遞命令行參數(shù)-cn來制定要運行的線程數(shù),但是這個數(shù)字不能超過實際的硬件線程數(shù)。
Seastar memory
Seastar程序會把內存分片,每個線程預分配一大塊內存,然后只使用自己的那部分內存進行分配工作。
默認情況下,Seastar程序預分配所有的機器內存(除了操作系統(tǒng)保留的部分),可以使用--reserve-memory選項指定要為操作系統(tǒng)保留的內存,或者使用-m選項指定Seastar程序要使用的內存,當然所用內存不能超過物理內存大小。
Introducing futures and continuations
future和continuation的存在使得編寫大型的、復雜的異步程序更容易,編寫出來的代碼通??勺x性好、更容易理解。
future表示一個計算的結果但是可能尚未就緒,一個future通常被一個異步函數(shù)返回,這個函數(shù)會確保future最終就緒。
continuation是一個當future就緒時被運行的回調,使用then()方法可以把一個continuation附加在一個future上。
下面的程序將在輸出Sleeping…一秒后輸出Done.并退出:
using namespace seastar; int main(int argc, char **argv) {app_template app;app.run(argc, argv, [] {std::cout << "Sleeping..." << std::flush;using namespace std::chrono_literals;return seastar::sleep(1s).then([]{std::cout << "Done.\n";});}); }下面是一個并行執(zhí)行任務的例子:
seastar::future<> f() {std::cout << "Sleeping... " << std::flush;using namespace std::chrono_literals;seastar::sleep(200ms).then([] { std::cout << "200ms " << std::flush; });seastar::sleep(100ms).then([] { std::cout << "100ms " << std::flush; });return seastar::sleep(1s).then([] { std::cout << "Done.\n"; }); }一個future<T>類型的then中的函數(shù)體將接受一個T類型的參數(shù):
seastar::future<int> slow() {using namespace std::chrono_literals;return seastar::sleep(100ms).then([] { return 3; }); }seastar::future<> f() {return slow().then([] (int val) {std::cout << "Got " << val << "\n";}); }Ready futures
當調用then時,對應future的值可能已經(jīng)就緒了,那么then()將不會把continuation鏈式追加到future對象中,而是直接執(zhí)行。
但是這種優(yōu)化也有限制,then()的實現(xiàn)會統(tǒng)計這種立即執(zhí)行的continuation的次數(shù),如果達到上限(目前是256次)將會把控制返回給event loop。否則有可能造成event loop乃至其他future的continuations的饑餓現(xiàn)象。
make_ready_future<>將返回一個已就緒的future對象。
Coroutines
需要C++20支持。
Continuations
Capturing state in continuations
C++11中的lambda對象可以作為then()參數(shù),在future就緒后被調用,而lambda是可以捕獲參數(shù)的,所以如果continuation不能立即執(zhí)行,那么lambda所捕獲的參數(shù)要被保存,這需要拷貝數(shù)據(jù)并會產(chǎn)生運行時開銷,但是這是不可避免的(總比讓當前線程阻塞然后讓參數(shù)保存在棧中,并且切換到別的線程要更快)。
按值捕獲參數(shù)沒有問題,但是如果按引用捕獲就可能引起引用失效等嚴重bug,比如下面這個反例:
seastar::future<int> incr(int i) {using namespace std::chrono_literals;// Oops, the "&" below is wrong:return seastar::sleep(10ms).then([&i] { return i + 1; }); }一個解決方法是使用do_with()習語,它可以確保一個對象的生命期長于對應continuation的生命期,這使得按引用捕獲成為可能,并且非常方便。
Seastar應用中常常使用按移動捕獲,即廣義捕獲,下面是一個例子:
int do_something(std::unique_ptr<T> obj) {// do some computation based on the contents of obj, let's say the result is 17return 17;// at this point, obj goes out of scope so the compiler delete()s it. } seastar::future<int> slow_do_something(std::unique_ptr<T> obj) {using namespace std::chrono_literals;return seastar::sleep(10ms).then([obj = std::move(obj)] () mutable {return do_something(std::move(obj));}); }注意要使用兩次std::move并且聲明為mutable(否則無法移動第二次——只讀對象不可移動)。
Evaluation order considerations(C++14 only)
在C++17之前,針對以下調用方法:
return do_something(obj).then([obj = std::move(obj)] () mutable {return do_something_else(std::move(obj)); });有可能先計算obj = std::move(obj),再調用do_something(obj),就導致了use–after-move問題。
解決方法就是把do_something和then分開寫,不要寫在一條語句中。
Handling exceptions
continuation執(zhí)行中拋出的異常會被系統(tǒng)捕獲并存儲到對應的future對象中,此時future對象變?yōu)榫途w態(tài)(但它不持有一個值,而是持有一個異常)。
對一個持有異常的future調用的then()方法將會被忽略,異常會傳遞給后面的continuation,就像普通函數(shù)中的異常拋出事件一樣:
// 普通函數(shù)發(fā)生異常后的執(zhí)行流 line1(); line2(); // throws! line3(); // skipped// 有異常的future的執(zhí)行流 retur line1().then([] {return line2(); // throws! }).then([] {return line3(); // skipped });通常then()提供的終止執(zhí)行鏈是合理的,但還有其他選擇:
-  .then_wrapped(): 會把future<T>對象直接傳給continuation(而不是像then()一樣傳遞T對象),所以后面的continuation可以自己判斷是否存在異常。 
-  .finally():無論future是否攜帶異常,finally()中的continuation都將被執(zhí)行,就像java中的finally塊一樣。 
-  .handle_exception():僅當future攜帶異常時才會執(zhí)行,其中的處理函數(shù)應該接受異常類型的參數(shù)。 一個用法是fut.discard_result().handle_exception(...),當future就緒時忽略成功值(if exists)而只處理異常(if exists)。 
Exceptions vs. exceptional futures
TODO: https://github.com/scylladb/seastar/blob/master/doc/tutorial.md#exceptions-vs-exceptional-futures
Lifetime management
一個異步函數(shù)往往在調用返回后過一段時間才執(zhí)行一個操作,如果它要操作已經(jīng)存在的對象或者臨時對象,那么我們就要關心這些對象的生命期:確保在異步函數(shù)完成前這些對象不會被銷毀,并確保在這些對象不會被使用時銷毀它們。Seastar提供了多種機制,可以安全有效地保證對象正確地“存活”。
Passing ownership to continuation
最直接的方法就是讓continuation擁有對象的所有權。
可以通過捕獲對象副本或移動捕獲對象從而將對象的所有權轉移到continuation。
但有些時候移動對象不受歡迎,比如一些代碼可能保持著對該對象的引用,那么在移動后他們將持有失效的引用;另外對一些復雜對象來說,移動的代價可能非常大,這時往往需要使用unique_ptr<T>。
Keeping ownership at the caller
傳遞所有權這一方法確實有效,但是有時會變得非常復雜,比如要在continuation之間頻繁移動一個對象:
seastar::future<> slow_op(T o) {return seastar::sleep(10ms).then([o = std::move(o)] {...return std::move(o);}).then([](T o) {...}); }一種更簡單的方法是讓調用者保存對象,而只傳遞對象的引用:
seastar::future<> slow_op(T& o) { // <-- pass by reference return seastar::sleep(10ms).then([&o] {// <-- capture by reference // first continuation, doing something with o ... }).then([&o]) { // <-- another capture by reference // second continuation, doing something with o ... });}不過這樣就相當于把生命期管理的責任轉移給了調用者,而調用者怎么知道這個對象應該生存多久呢?
其實也很簡單,就是對象最晚生存到對應的future就緒時,所以有了以下慣例:
Whenever an asynchronous function takes a parameter by reference, the caller must ensure that the referred object lives until the future returned by the function is resolved.
不幸的是,這條慣例并沒法用C++語法約束,而且很多non-Seastar程序員也不遵循此慣例。
Seastar提供了一個方便的習語,do_with():
seastar::future<> f() { return seastar::do_with(T1(), T2(), [] (auto& obj1, auto& obj2) { return slow_op(obj1, obj2); }}do_with會把給定的對象保存在堆上,對應的lambda中應該以引用形式使用這個對象,do_with保證在返回的future就緒前銷毀此對象。
do_with有一些注意事項:
- 一般傳入右值對象作為參數(shù),比如臨時對象或者被std::move()的對象。
- lambda應接受auto&形式的參數(shù),以避免忘記按引用使用對象。
- lambda函數(shù)體中仍要把該對象以引用形式傳遞。
- 在令返回的future成為就緒態(tài)后,不該再使用這些對象。
下面是一個反例:
seastar::future<> slow_op(T obj); // WRONG: should be T&, not Tseastar::future<> f() { return seastar::do_with(T(), [] (auto& obj) { return slow_op(obj); }}一個慣例是返回的future就緒后,對應的異步函數(shù)不應該在駐留在后臺了:
In general, it is rarely a good idea for an asynchronous function to resolve while leaving behind background operations - even if those operations do not use the do_with()ed objects. Background operations that we do not wait for may cause us to run out of memory (if we don’t limit their number) and make it difficult to shut down the application cleanly.
Sharing ownership(reference counting)
引用計數(shù)法可以實現(xiàn)即通過“拷貝”方式來傳遞對象,又不需要付出巨大拷貝開銷。
一個實例是seastar::file,它持有一個打開的文件對象,一個file對象可以被拷貝,所有的副本都指向同一個打開的文件,拷貝時引用計數(shù)加1,file對象銷毀時引用計數(shù)減1,當引用計數(shù)減到0時實際的文件將被關閉。
下面是一個例子:
seastar::future<uint64_t> slow_size(file f) { return seastar::sleep(10ms).then([f] { return f.size(); });}這個例子中返回了一個最后的f對象的相關future,按理來說slow_size函數(shù)返回后file對象會被銷毀,返回的future<uint64_t>對象將失效,但是既然file對象沒有調用close(),說明在別處它還有一份副本存在(比如slow_size的調用方),所以f仍有效。
引用計數(shù)會有一些運行時開銷,但是要知道Seastar對象僅被一個CPU核心使用,所以引用計數(shù)的增減不是原子操作而是普通的整數(shù)增減,故開銷會很小。
C++11提供了shared_ptr<T>作為引用計數(shù)的實現(xiàn),但是處于多線程環(huán)境下運行的考慮,它使用較慢的原子操作來加減計數(shù)值,故Seastar提供了對應的單線程實現(xiàn)——seastar::shared_ptr<T>,其不使用原子操作。
Seastar還提供了seastar::lw_shared_ptr<T>類,它不支持多態(tài)類型所以開銷更小,優(yōu)先推薦使用此類型,當T為多態(tài)類型時切換為seastar::shared_ptr<T>類型,永遠不應該在Seastar應用中使用std::shared_ptr<T>類型。
Saving objects on the stack
把對象保存在棧中是一個方便的做法,Seastar提供了seastar::thread來完成此工作:
seastar::future<> slow_incr(int i) { return seastar::async([i] { seastar::sleep(10ms).get(); // We get here after the 10ms of wait, i is still available. return i + 1; });}詳細介紹參見[seastar::thread]一節(jié)。
Advanced futures
Futures and interruption
一個運行的future無法被中斷。
Futures are single use
對一個future<int>變量調用get()或then()后,它將會失效,我們需要把值存儲到其他地方。
Loops
Seastar提供幾種原語,用于以與future/promise模型完美結合的方式表達循環(huán)語義,一個重要特點是Seastor循環(huán)原語的每輪迭代后都有一個搶占點,允許循環(huán)中執(zhí)行其他任務。
repeat
循環(huán)執(zhí)行函數(shù),該函數(shù)應返回future<stop_iteration>(stop_iteration::no)或future<stop_iteration>(stop_iteration::yes),repeat據(jù)此來決定重新執(zhí)行該函數(shù)還是結束循環(huán)。repeat會返回future<>,此future在循環(huán)結束或者循環(huán)體執(zhí)行出錯時就緒。
seastar::future<int> recompute_number(int number);seastar::future<> push_until_100(seastar::lw_shared_ptr<std::vector<int>> queue, int element) { return seastar::repeat([queue, element] { if (queue->size() == 100) { return make_ready_future<stop_iteration>(stop_iteration::yes); } return recompute_number(element).then([queue] (int new_element) { queue->push_back(new_element); return stop_iteration::no; }); });}do_until
循環(huán)執(zhí)行函數(shù),但它通過一個條件來檢查是否要停止循環(huán)。
seastar::future<int> recompute_number(int number);seastar::future<> push_until_100(seastar::lw_shared_ptr<std::vector<int>> queue, int element) { return seastar::do_until([queue] { return queue->size() == 100; }, [queue, element] { return recompute_number(element).then([queue] (int new_element) { queue->push_back(new_element); }); });}do_for_each
接受一個范圍(或者一對迭代器)和一個函數(shù)體,一個接一個地將范圍中的元素作為參數(shù)調用此函數(shù)體。通常函數(shù)體應該返回一個future<>。
seastar::future<>append(seastar::lw_shared_ptr<std::vector<int>> queue1, seastar::lw_shared_ptr<std::vector<int>> queue2) { return seastar::do_for_each(queue2->begin(), queue2->end(), [queue1](int element) { std::cout << "in loop - " << element << std::endl; queue1->push_back(element); return make_ready_future<>(); });}parallel_for_each
是do_for_each的高并發(fā)版變體,所有的迭代同時排隊,所以函數(shù)體的調用順序不能保證。
例如對于以下場景,parallel_for_each將在1s左右完成所有迭代,而do_for_each要花費10s:
seastar::future<>append(seastar::lw_shared_ptr<std::vector<int>> queue1, seastar::lw_shared_ptr<std::vector<int>> queue2) { return seastar::max_concurrent_for_each(queue2->begin(), queue2->end(), 2,[queue1](int element) { std::cout << "in loop - " << element << std::endl; queue1->push_back(element); return sleep(1s); });}max_concurrent_for_each
是parallel_for_each的限制版,使用時可以傳入一個max_concurrent表示最多排隊的迭代數(shù)。
when_all: waiting for multiple futures
when_all()返回一個future<tuple<future<t1>, future<t2>...>>,該future會在給定的futures全部就緒后就緒。
when_all()只接收右值參數(shù)(臨時future或者被std::move的future),
一個future就緒時可能是得到了結果也可能是出現(xiàn)了異常,無論哪種情況都不影響when_all()的工作流程,只需要在獲取future的結果的時候判斷一下是否有異常發(fā)生:
future<> f() {using namespace std::chrono_literals;future<> slow_success = sleep(1s);future<> slow_exception = sleep(2s).then([] { throw 1; });future<int> slow_two = sleep(2s).then([] { return 2; });return when_all(std::move(slow_success), std::move(slow_exception), std::move(slow_two)).then([] (auto tup) {std::cout << std::get<0>(tup).available() << "\n";std::cout << std::get<1>(tup).failed() << "\n";// 如果不顯式ignore失敗的future,那么將會有"Exceptional future ignored"錯誤信息被輸出.std::get<1>(tup).ignore_ready_future();std::cout << std::get<2>(tup).get0() << "\n";}); }上面的例子要自己分類處理異常還是比較繁瑣,所以Seastar提供了when_all_succeed()函數(shù),當所有futures都就緒時,when_all_succeed將結果傳遞給continuation(<tuple<t1, t2...>>形式)。如果有一個或多個futures失效,when_all_succeed返回某一個失效的future(包含了某一個失效的futures的異常),這時就可以使用handle_exception()continuation來方便地處理這些異常,示例代碼如下:
using namespace seastar; future<> f() {using namespace std::chrono_literals;return when_all_succeed(make_ready_future<int>(2),make_exception_future<double>("oops")).then([] (int i, double d) {std::cout << i << " " << d << "\n";}).handle_exception([] (std::exception_ptr e) {std::cout << "exception: " << e << "\n";}); }但是Tutorial給的這個代碼是有問題的,when_all_succeed成功時不是返回int, double這些零散的結果,而是返回一個tuple<int, double>,所以以上代碼會編譯失敗,正確的寫法是:
future<> f() {using namespace std::chrono_literals;return when_all_succeed(make_ready_future<int>(2),make_exception_future<double>("oops")).then([] (std::tuple<int, double> t) {std::cout << std::get<0>(t) << " " << std::get<1>(t) << "\n";}).handle_exception([] (std::exception_ptr e) {std::cout << "exception: " << e << "\n";}); }Semaphores
Pipes
Seastar提供pipe<T>來實現(xiàn)類似UNIX管道的功能,一個pipe<T>有一個固定大小的緩沖區(qū),通過read()和write()方法可以獲取到pipe_reader<T>和pipe_writer<T>以供管道兩端讀寫,任何一端都可以關閉pipe,此時另一端將得到通知。
pipe,pipe_reader,pipe_writer對象都只能移動而不能被拷貝,為了實現(xiàn)拷貝語義可以使用shared_ptr<T>包裝它們。
自己寫的一個生產(chǎn)者消費者例子:
seastar::future<> f() {return seastar::async([] {seastar::pipe<int> p(1);when_all(repeat([&p] {return p.reader.read().then([](std::optional<int> op) {if (op) {std::cout << "read " << op.value() << std::endl;return stop_iteration::no;} else {std::cout << "pipe closed." << std::endl;return stop_iteration::yes;}});}),keep_doing([&p] {return p.writer.write(1).then([] {std::cout << "write succeed." << std::endl;});})).get();}).discard_result(); }Shutting down a service with a gate
考慮一個長操作slow(),任何時刻都有可能啟動這種操作,甚至可能目前有一些slow()操作在并行運行,你想要關閉這個服務,同時想確保在關閉服務前等待所有正在進行的slow()操作全部完成,并且期間你不希望新的slow()操作被啟動。
這就是seastar::gate的用途,一個gate對象g會維護一個內部計數(shù)器,我們可以在進入一個操作時調用g.enter()并在退出一個操作時調用g.leave()。g.close()會關閉該gate(并返回一個future),這意味著新的g.enter()將不被允許,但是g.leave()仍可進行。這就實現(xiàn)了調用close()后只能結束已有的操作而不能開啟新的操作;當所有正在進行的操作完成時(即內部計數(shù)器降為0時)close()返回的future處于就緒態(tài)。
為了應對一個slow()長操作很久才停止的情況,slow()可以主動在內部調用g.check()來檢查g.close()是否已經(jīng)被調用,如果gate已被關閉那么g.check()將拋出一個異常,從而停止當前的slow()操作。
Introducing Seastar’s network stack
Seastar的網(wǎng)絡棧是分片的,每個shard(thread)都負責一部分連接,一旦一個連接建立,那么它將只會被一個線程處理。
如我們所見,main()只會在第一個線程上運行一次我們的主體函數(shù)一次,如果需要在指定核心上運行一個函數(shù),那么需要使用smp::submit_to函數(shù):
seastar::future<> service_loop();seastar::future<> f() { return seastar::parallel_for_each(boost::irange<unsigned>(0, seastar::smp::count), [] (unsigned c) { return seastar::smp::submit_to(c, service_loop); });}下面是一個簡單的tcp服務器:
seastar::future<> service_loop() { return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234})), // 創(chuàng)建了server_socket對象即listener [] (auto& listener) { return seastar::keep_doing([&listener] () { return listener.accept().then( // accept()返回future<accept_result>對象 [] (seastar::accept_result res) { std::cout << "Accepted connection from " << res.remote_address << "\n"; }); }); });}如果要避免重啟server時出現(xiàn)Address already in use問題,可以打開SO_REUSEADDR選項:
seastar::listen_options lo;lo.reuse_address = true;return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo)),// ...下面是一個高級一些的tcp server:
seastar::future<> handle_connection(seastar::connected_socket s, seastar::socket_address a) { auto out = s.output(); auto in = s.input(); return do_with(std::move(s), std::move(out), std::move(in), [] (auto& s, auto& out, auto& in) { return seastar::repeat([&out, &in] { return in.read().then([&out] (auto buf) { if (buf) { return out.write(std::move(buf)).then([&out] { return out.flush(); }).then([] { return seastar::stop_iteration::no; }); } else { return seastar::make_ready_future<seastar::stop_iteration>( seastar::stop_iteration::yes); } }); }).then([&out] { return out.close(); }); });}seastar::future<> service_loop_3() { seastar::listen_options lo; lo.reuse_address = true; return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo), [] (auto& listener) { // keep_doing等待future就緒后開始下一輪迭代,故如果下面return handle_connection()那么將在一條連接關閉后才能開始下一次accept() // 這里直接調用handle_connection(),故可以并行接受多條連接而不需要等待前一條連接完成。 return seastar::keep_doing([&listener] () { return listener.accept().then( [] (seastar::accept_result res) { // Note we ignore, not return, the future returned by // handle_connection(), so we do not wait for one // connection to be handled before accepting the next one. (void)handle_connection(std::move(res.connection), std::move(res.remote_address)).handle_exception( [] (std::exception_ptr ep) { fmt::print(stderr, "Could not handle connection: {}\n", ep); }); }); }); });}s.output()會返回一個output_stream<char>對象,對該對象調用write()方法會得到一個在數(shù)據(jù)轉移到TCP緩沖區(qū)后就緒的future。
s.input()會返回一個input_stream<char>對象,對該對象調用read()方法會得到一個temporary_buffer<char>對象,該對象的行為類似unique_ptr<char>,但是有更靈活的內存管理方法(例如share);當一條連接結束時,read()方法返回一個空的temporary_buffer<char>對象。
代碼中通過忽略handle_connection返回的future實現(xiàn)了并發(fā)accept + handle連接,但要記得用handle_exception處理異常(例如向已關閉的客戶端寫數(shù)據(jù))。
Sharded services
我們看到seastar::smp::submit_to能夠讓一個任務在多個核心上運行,但如果我們還想要有一個對象能夠保存任務的狀態(tài),另外核心之間應該能夠交互,并且能夠停止運行在不同核心上的任務。
Seastar提供了seastar::sharded<>模板來創(chuàng)建這種shareded service,例如:
class my_service {public: std::string _str; my_service(const std::string& str) : _str(str) { } seastar::future<> run() { std::cerr << "running on " << seastar::engine().cpu_id() << ", _str = " << _str << "\n"; return seastar::make_ready_future<>(); } seastar::future<> stop() { return seastar::make_ready_future<>(); }};seastar::sharded<my_service> s;seastar::future<> f() { return s.start(std::string("hello")).then([] { return s.invoke_on_all([] (my_service& local_service) { return local_service.run(); }); }).then([] { return s.stop(); });}用戶需要自己定義一個類來保存狀態(tài),sharded<T>對象的start方法會把指定參數(shù)傳遞給該用戶自定義類的構造函數(shù)。
要運行任務,則需要使用s.invoke_on_all方法;要停止所有服務,則需要使用s.stop方法。
要實現(xiàn)sharded service之間的交互,可以使用s.invoke_on方法實現(xiàn)在指定shard上運行任務:
seastar::sharded<my_service> s;...return s.invoke_on(0, [](my_service& local_service) { std::cerr << "invoked on " << seastar::engine().cpu_id() << ", _str = " << local_service._str << "\n";})Command line options
Debugging a Seastar program
Promise objects
一個異步函數(shù)也被稱為一個promise,它會返回一個future并且負責(在事件完成后)令future處于就緒態(tài)。
編寫promises的基本構件塊是promise<T>對象,它有future<T> get_future()方法可以返回一個future,有set_value(T)方法可以resolve這個future(令future就緒)。典型的異步函數(shù)實現(xiàn)會創(chuàng)建一個promise<T>對象,然后返回它的future,并且最終調用set_value(T)方法。
其實這和C++11中的future和promise的語義相同,實現(xiàn)了一種異步通知手段。
Memory allocation in Seastar
Per-thread memory allocation
Seastar要求應用程序是分片的,也就是說不同線程應該操作不同內存中的對象。Seastar靜態(tài)地為每個線程分配大致相等的內存。
如果機器使用了NUMA,那么Seastar也會考慮到某些內存離某些核更近這一事實來劃分內存。
為了實現(xiàn)在每個線程上獨自分配內存,Seastar重新定義了包括malloc(),free()在內的許多分配函數(shù),也重新定義了C++中的operator new,operator delete及它們的變體。
雖然可以,但是Sestar不建議一個線程去訪問其他線程的內存,這會帶來巨大的同步開銷。我們應該使用消息傳遞機制(例如submit_to())來實現(xiàn)線程間的交互。
Foreign pointers
對象應該在創(chuàng)建它的線程中被銷毀,Seastar也支持在其他線程中銷毀,這是為了支持一些不受Seastar控制的庫代碼(例如std::exception_ptr),但是這種跨線程的對象銷毀過程會是低效的。
另外有的時候我們想要將一個對象的所有權轉移給另一個線程,那么我們可以使用seastar::foreign_ptr<P>(P是一個指針或者智能指針)包裝對象,然后使用submit_to方法傳遞給其他的線程。
經(jīng)典用法是seastar::foreign_ptr<std::unique_ptr<T>>,對象將始終被一個線程獨占,當接受該對象的線程銷毀這個foreign_ptr時,它將返回到原來的線程去銷毀std::unique_ptr<T>對象,T對象的析構函數(shù)也將在原始線程中被調用(故析構函數(shù)中可以訪問原始線程的數(shù)據(jù)結構)。
盡管foreign_ptr把對象傳遞給了另一個線程,但在接收線程中只能調用簡單的方法(例如只讀方法 ),對于那些需要訪問原始線程數(shù)據(jù)的方法,接收線程無法直接調用,而應該使用如下模式讓原始線程來調用這些方法:
// fp is some foreign_ptr<>return smp::submit_to(fp.get_owner_shard(), [p=fp.get()] { return p->some_method(); });有時候我們希望不移動所有權,而是讓其他線程共享使用一個對象(回到了共享內存架構),那么我們可以使用 seastar::foreign_ptr<stastar::lw_shared_ptr<T>>,用戶需要自己處理可能發(fā)生的數(shù)據(jù)競爭(可能還要使用submit_to)。
Seastar::thread
雖然futures和continuations模型很強大且有效率,但是編碼實在不如同步方法簡單,所以Seastar提供了seastar::thread來讓你可以編寫同步代碼。
一個seastar::thread可以提供一個執(zhí)行上下文,你可以在其中阻塞式編碼:調用異步函數(shù)然后等待(而不是接著調用then())。
要注意seastar::thread并不是操作系統(tǒng)線程,它仍然使用continuations,這是由Seastar的(每個核心上的)單線程調度的。工作原理如下:
- 每個seastar::thread會分配一個128KB的棧,然后由所屬線程運行thread中的任務(在thread上下文環(huán)境中),當任務中調用一個future的get()(或wait()等)方法時,如果future未就緒,那么當前線程將會保存上下文到seastar::thread的棧中并為future安排一個continuation(內容是使用seastar::thread的上下文棧繼續(xù)執(zhí)行后面的代碼)。
- seastar::thread是協(xié)作式的,只有seastar::future::get()阻塞或者seastar::thread::yield()被調用時才會讓出線程,如果調用了阻塞的系統(tǒng)調用(比如::sleep()),那么線程仍將阻塞而無法實現(xiàn)異步。
猜測seastar::thread是通過在future::get()檢查當前執(zhí)行環(huán)境然后直接返回還是添加continuation來實現(xiàn)同步變異步的。
另外,事實上不能在普通線程中直接調用future::get來等待事件完成(除非future已就緒)否則會assert失敗。而在seastar::thread中調用future::get永遠不會報錯。
這是因為get()中調用的wait()函數(shù)只能在seastar::thread()中調用:
/// Wait for the future to be available (in a seastar::thread)// When called from a seastar::thread, this function blocks the/// thread until the future is availble. Other threads and/// continuations continue to execute; only the thread is blocked.void wait() noexcept { if (_state.available()) { return; } do_wait();}void internal::future_base::do_wait() noexcept { auto thread = thread_impl::get(); assert(thread); // 如果不是在 seastar::thread 中將assert失敗! thread_wake_task wake_task{thread}; wake_task.make_backtrace(); _promise->_task = &wake_task; thread_impl::switch_out(thread);}Starting and ending a seastar::thread
創(chuàng)建一個seastar::thread 對象后,可以使用join()方法來等待它結束,一個例子是:
seastar::future<> f() { seastar::thread th([] { std::cout << "Hi.\n"; for (int i = 1; i < 4; i++) { seastar::sleep(std::chrono::seconds(1)).get(); std::cout << i << "\n"; } }); return do_with(std::move(th), [] (auto& th) { return th.join(); });}seastar::async簡化了這件事,它返回一個當seastar::thread結束時就緒的future:
seastar::future<> f() { return seastar::async([] { std::cout << "Hi.\n"; for (int i = 1; i < 4; i++) { seastar::sleep(std::chrono::seconds(1)).get(); std::cout << i << "\n"; } });}seastar::async中的lambda也可以返回一個值,這個值將被future<T>包裝:
seastar::future<seastar::sstring> read_file(sstring file_name) { return seastar::async([file_name] () { // lambda executed in a thread file f = seastar::open_file_dma(file_name).get0(); // get0() call "blocks" auto buf = f.dma_read(0, 512).get0(); // "block" again return seastar::sstring(buf.get(), buf.size()); });};Isolation of application components
總結
以上是生活随笔為你收集整理的Seastar Tutorial 简明教程的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 光彩夺目的30款太阳光线照射Ps笔刷
- 下一篇: hdu4833 Best Financi
