ASIO协程彻底转变你的思维
生活随笔
收集整理的這篇文章主要介紹了
ASIO协程彻底转变你的思维
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
avbot 發(fā)布了許久了, 最近突然有個(gè)用戶跑來說,希望能增加個(gè)調(diào)用 “外部腳本” 的功能,方便擴(kuò)展。
我一向?qū)υO(shè)計(jì)一個(gè) plugin 機(jī)制極力的避免,不喜歡動態(tài)載入的模塊擴(kuò)展程序本身的功能。何況 avbot 是 c++開發(fā)的,調(diào)用腳本并不是容易的事情。(好吧,真實(shí)的原因是我被 mingw (VC 不支持 utf8源碼,我已經(jīng)拋棄了) 折騰怕了,不想再搞個(gè) python 。windows實(shí)在是恐怖的平臺,寫點(diǎn)程序麻煩的要死,編譯麻煩的要死。可是 avbot 又必須跨平臺,結(jié)果是我一天寫好的東西要在 windows (虛擬機(jī)) 里折騰好幾天,累死人 )
于是我決定提供一個(gè)??JSON 接口,內(nèi)置一個(gè)簡單的 HTTP Server, 用腳本(python應(yīng)該 HTTP JSON 模塊有的是,對吧)連接到 avbot ,然后 avbot 將發(fā)生的每條消息以 json 的形式返回給 外部腳本。
另外,默認(rèn)使用 HTTP 的connection: keep-alive 模式,所以保持一個(gè)長連接即可。
那么,avbot 需要支持 不確定數(shù)目的消息接收方了。
對于鏈接到 avbot 的客戶端而言, avbot 并不保留之前的所有消息,而是從連接上的那一刻開始,后續(xù)的消息才能通知到。
一個(gè)很明顯的思路就是,將鏈接上的客戶端做成一個(gè)鏈表/列隊(duì), avbot 收到消息后,遍歷這個(gè)列隊(duì)執(zhí)行消息發(fā)送。
這個(gè)思路很簡單,可是如果要求 : 必須單線程異步呢?
avbot 是一個(gè)純粹的單線程程序,絕對不允許多線程化。所有的邏輯必須使用異步處理。
那么,這個(gè)問題就復(fù)雜化了, “avbot 收到消息后,遍歷這個(gè)列隊(duì)執(zhí)行消息發(fā)送” 這個(gè)做法,不可避免的帶來了阻塞。好吧,異步遍歷吧。
要是異步遍歷還沒遍歷完,又來一個(gè)消息呢? 考慮這個(gè)問題,你會發(fā)瘋的。因?yàn)楫惒?#xff0c;太多的細(xì)節(jié)需要考慮了。真的。
好吧,又有個(gè)好主意了,為每個(gè)客戶端建立一個(gè)列隊(duì),每次遍歷就是把要發(fā)送的消息掛入列隊(duì)即可。這樣也不需要異步遍歷了,同步就可以。解決了異步遍歷的時(shí)候又來一個(gè)消息導(dǎo)致的痛苦的調(diào)度。
然后細(xì)分,考慮每個(gè)客戶端,就是等待 “發(fā)送列隊(duì)” 不為空!等等,一直這么等待也不行,如果客戶斷開了鏈接呢? 所以要 “同時(shí)等待發(fā)送列隊(duì)不為空&&客戶正常在線,并且已經(jīng)發(fā)送了 HTTP 請求頭部”
好繞口,不過也只能如此了。
avbot 因?yàn)槟J(rèn)使用了 keep-alive , 所以發(fā)送是一個(gè)死循環(huán),知道客戶端主動斷開鏈接或者網(wǎng)絡(luò)發(fā)生錯(cuò)誤。如果 客戶端死了,那么,發(fā)送列隊(duì)興許會出現(xiàn) 爆隊(duì) 的情況。所以要限制發(fā)送列隊(duì)的大小。不是滿了就不發(fā)送,而是滿了后就把早的消息踢掉,也就是讓 客戶端發(fā)生“暫時(shí)性卡死”后,還能繼續(xù)處理最后的幾條信息。
誒,復(fù)雜的邏輯終于理清了,代碼呢?!
啊累?
靠,這么復(fù)雜的 邏輯,得寫一長段代碼,調(diào)試幾百年了吧?
錯(cuò),我只花了幾個(gè)小時(shí), 不到 100 行的代碼就輕松實(shí)現(xiàn)了全部要求。
!!!!!!!!!!!!!!!!!!! WHAT !!!!!!!!!!!!!!!!!!!
這種功能不可能不用個(gè)千把行代碼的吧?!
如果使用以前的老辦法,確實(shí)如此。
可是,自從發(fā)現(xiàn)了 ASIO 后,我被 ASIO 爸爸發(fā)明的協(xié)程深深的 震驚了!
利用 ASIO 爸爸提出的協(xié)程思想,我只用了不到 100行代碼就全部完成了以上復(fù)雜的邏輯,而且,全部都是異步的哦~ 。
好,廢話不多,先貼代碼。然后解釋。// avbot_rpc_server 由 acceptor_server 這個(gè)輔助類調(diào)用
// 為其構(gòu)造函數(shù)傳入一個(gè) m_socket, 是 shared_ptr 的.
class avbot_rpc_server
{
public:
? ? ? ? typedef boost::signals2::signal<
? ? ? ? ? ? ? ? void( std::string protocol, std::string room, std::string who,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? std::string message, sender_flags )
? ? ? ? > on_message_signal_type;
? ? ? ? static on_message_signal_type on_message;
? ? ? ? typedef boost::asio::ip::tcp Protocol;
? ? ? ? typedef boost::asio::basic_stream_socket<Protocol> socket_type;
? ? ? ? typedef void result_type;
? ? ? ? avbot_rpc_server(boost::shared_ptr<socket_type> _socket)
? ? ? ?? ?: m_socket(_socket)
? ? ? ?? ?, m_request(new boost::asio::streambuf)
? ? ? ?? ?, m_responses(new boost::circular_buffer_space_optimized<boost::shared_ptr<boost::asio::streambuf> >(20) )
? ? ? ? {
? ? ? ? ? ? ? ? m_socket->get_io_service().post(
? ? ? ? ? ? ? ? ? ? ? ? boost::asio::detail::bind_handler(*this, boost::coro::coroutine(), boost::system::error_code(), 0)
? ? ? ? ? ? ? ? );
? ? ? ? }
? ? ? ? // 數(shù)據(jù)操作跑這里,嘻嘻.
? ? ? ? void operator()(boost::coro::coroutine coro, boost::system::error_code ec, std::size_t bytestransfered)
? ? ? ? {
? ? ? ? ? ? ? ? boost::shared_ptr<boost::asio::streambuf>? ? ? ? sendbuf;
? ? ? ? ? ? ? ? if (ec){
? ? ? ? ? ? ? ? ? ? ? ? m_socket->close(ec);
? ? ? ? ? ? ? ? ? ? ? ? // 看來不是 HTTP 客戶端,誒,滾蛋啊!
? ? ? ? ? ? ? ? ? ? ? ? // 沉默,直接關(guān)閉鏈接. 取消信號注冊.
? ? ? ? ? ? ? ???? ? ? ? if (m_connect && m_connect->connected())
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? m_connect->disconnect();
? ? ? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? CORO_REENTER(&coro)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? do{
? ? ? ? ? ? ? ? ? ? ? ? // 發(fā)起 HTTP 處理操作.
? ? ? ? ? ? ? ? ? ? ? ? _yield boost::asio::async_read_until(*m_socket, *m_request, "\r\n\r\n", boost::bind(*this, coro, _1, _2));
? ? ? ? ? ? ? ? ? ? ? ? m_request->consume(bytestransfered);
? ? ? ? ? ? ? ? ? ? ? ? // 解析 HTTP
? ? ? ? ? ? ? ? ? ? ? ? // 等待消息.
? ? ? ? ? ? ? ? ? ? ? ? if (m_responses->empty())
? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (!m_connect){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 將自己注冊到 avbot 的 signal 去
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 等 有消息的時(shí)候,on_message 被調(diào)用,也就是下面的 operator() 被調(diào)用.
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? _yield m_connect = boost::make_shared<boost::signals2::connection>
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (on_message.connect(boost::bind(*this, coro, _1, _2, _3, _4, _5)));
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 就這么退出了,但是消息來的時(shí)候,om_message 被調(diào)用,然后下面的那個(gè)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // operator() 就被調(diào)用了,那個(gè) operator() 接著就會重新回調(diào)本 operator()
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 結(jié)果就是隨著 coroutine 的作用,代碼進(jìn)入這一行,然后退出??if 判定
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 然后進(jìn)入發(fā)送過程.
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }else{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 如果已經(jīng)注冊,直接返回。時(shí)候如果消息來了,on_message 被調(diào)用,也就
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 是下面的 operator() 被調(diào)用. 結(jié)果就是隨著 coroutine 的作用,代碼
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 進(jìn)入上面那行,然后退出??if 判定。然后進(jìn)入發(fā)送過程.
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // signals2 回調(diào)的時(shí)候會進(jìn)入到這一行.
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? // 進(jìn)入發(fā)送過程
? ? ? ? ? ? ? ? ? ? ? ? sendbuf = m_responses->front();
? ? ? ? ? ? ? ? ? ? ? ? _yield boost::asio::async_write(*m_socket, *sendbuf, boost::bind(*this, coro, _1, _2) );
? ? ? ? ? ? ? ? ? ? ? ? m_responses->pop_front();
? ? ? ? ? ? ? ? ? ? ? ? // 寫好了,重新開始我們的處理吧!
? ? ? ? ? ? ? ? }while(1);
? ? ? ? ? ? ? ? }
? ? ? ? }
? ? ? ? // signal 的回調(diào)到了這里, 這里我們要區(qū)分對方是不是用了 keep-alive 呢.
? ? ? ? void operator()(boost::coro::coroutine coro, std::string protocol, std::string room, std::string who, std::string message, sender_flags)
? ? ? ? {
? ? ? ? ? ? ? ? pt::ptree jsonmessage;
? ? ? ? ? ? ? ? boost::shared_ptr<boost::asio::streambuf> buf(new boost::asio::streambuf);
? ? ? ? ? ? ? ? std::ostream? ? ? ? stream(buf.get());
? ? ? ? ? ? ? ? std::stringstream? ? ? ? teststream;
? ? ? ? ? ? ? ? jsonmessage.put("protocol", protocol);
? ? ? ? ? ? ? ? jsonmessage.put("root", room);
? ? ? ? ? ? ? ? jsonmessage.put("who", who);
? ? ? ? ? ? ? ? jsonmessage.put("msg", message);
? ? ? ? ? ? ? ? js::write_json(teststream,??jsonmessage);
? ? ? ? ? ? ? ? // 直接寫入 json 格式的消息吧!
? ? ? ? ? ? ? ? stream <<??"HTTP/1.1 200 OK\r\n" <<??"Content-type: application/json\r\n";
? ? ? ? ? ? ? ? stream <<??"connection: keep-alive\r\n" <<??"Content-length: ";
? ? ? ? ? ? ? ? stream << teststream.str().length() <<??"\r\n\r\n";
? ? ? ? ? ? ? ? js::write_json(stream, jsonmessage);
? ? ? ? ? ? ? ? // 檢查 發(fā)送緩沖區(qū).
? ? ? ? ? ? ? ? if (m_responses->empty()){
? ? ? ? ? ? ? ? ? ? ? ? // 打通仁督脈.
? ? ? ? ? ? ? ? ? ? ? ? m_socket->get_io_service().post(boost::asio::detail::bind_handler(*this, coro, boost::system::error_code(), 0));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? // 寫入 m_responses
? ? ? ? ? ? ? ? m_responses->push_back(buf);
? ? ? ? }
private:
? ? ? ? boost::shared_ptr<socket_type> m_socket;
? ? ? ? boost::shared_ptr<boost::signals2::connection> m_connect;
? ? ? ? boost::shared_ptr<boost::asio::streambuf>? ? ? ? m_request;
? ? ? ? boost::shared_ptr<boost::circular_buffer_space_optimized<boost::shared_ptr<boost::asio::streambuf> > >? ? ? ? m_responses;
};
} 復(fù)制代碼
首先這個(gè) avbot_rpc_server 由一個(gè) acceptor_service 輔助類調(diào)用。 acceptor_service 是一個(gè)模板,大家可以去 acceptor_server.hpp 膜拜。
acceptor_service 以 Protocol 和一個(gè) 處理類 為模板。在 main.cpp里,我以 asio::ip::tcp 作為 Protocl 的參數(shù) avbot_rpc_server為 ProtocolProcesser的參數(shù) 調(diào)用acceptor_service。acceptor_service 進(jìn)入一個(gè)死循環(huán)(協(xié)程的)不停的 accept , 然后將 accept 到的 socket 交給 ProtocolProcesser,也就是 avbot_rpc_server 。
avbot_rpc_server 處理一下客戶的請求頭,然后把自己注冊到 on_message 信號處理。
然后,然后就沒然后了。
on_message 在 avbot 接收到消息的時(shí)候發(fā)出。結(jié)果就是 avbot_rpc_server 的 第二個(gè) operator() 被調(diào)用。然后就繼續(xù)發(fā)送了。
當(dāng)然,并不是每一個(gè) on_message 都會導(dǎo)致 avbot_rpc_server 的 第二個(gè) operator() 被調(diào)用的,必須是列隊(duì)為空的時(shí)候。不為空的時(shí)候就不需要調(diào)用。發(fā)送循環(huán)會繼續(xù)循環(huán)的,避免競爭出現(xiàn)
我一向?qū)υO(shè)計(jì)一個(gè) plugin 機(jī)制極力的避免,不喜歡動態(tài)載入的模塊擴(kuò)展程序本身的功能。何況 avbot 是 c++開發(fā)的,調(diào)用腳本并不是容易的事情。(好吧,真實(shí)的原因是我被 mingw (VC 不支持 utf8源碼,我已經(jīng)拋棄了) 折騰怕了,不想再搞個(gè) python 。windows實(shí)在是恐怖的平臺,寫點(diǎn)程序麻煩的要死,編譯麻煩的要死。可是 avbot 又必須跨平臺,結(jié)果是我一天寫好的東西要在 windows (虛擬機(jī)) 里折騰好幾天,累死人 )
于是我決定提供一個(gè)??JSON 接口,內(nèi)置一個(gè)簡單的 HTTP Server, 用腳本(python應(yīng)該 HTTP JSON 模塊有的是,對吧)連接到 avbot ,然后 avbot 將發(fā)生的每條消息以 json 的形式返回給 外部腳本。
另外,默認(rèn)使用 HTTP 的connection: keep-alive 模式,所以保持一個(gè)長連接即可。
那么,avbot 需要支持 不確定數(shù)目的消息接收方了。
對于鏈接到 avbot 的客戶端而言, avbot 并不保留之前的所有消息,而是從連接上的那一刻開始,后續(xù)的消息才能通知到。
一個(gè)很明顯的思路就是,將鏈接上的客戶端做成一個(gè)鏈表/列隊(duì), avbot 收到消息后,遍歷這個(gè)列隊(duì)執(zhí)行消息發(fā)送。
這個(gè)思路很簡單,可是如果要求 : 必須單線程異步呢?
avbot 是一個(gè)純粹的單線程程序,絕對不允許多線程化。所有的邏輯必須使用異步處理。
那么,這個(gè)問題就復(fù)雜化了, “avbot 收到消息后,遍歷這個(gè)列隊(duì)執(zhí)行消息發(fā)送” 這個(gè)做法,不可避免的帶來了阻塞。好吧,異步遍歷吧。
要是異步遍歷還沒遍歷完,又來一個(gè)消息呢? 考慮這個(gè)問題,你會發(fā)瘋的。因?yàn)楫惒?#xff0c;太多的細(xì)節(jié)需要考慮了。真的。
好吧,又有個(gè)好主意了,為每個(gè)客戶端建立一個(gè)列隊(duì),每次遍歷就是把要發(fā)送的消息掛入列隊(duì)即可。這樣也不需要異步遍歷了,同步就可以。解決了異步遍歷的時(shí)候又來一個(gè)消息導(dǎo)致的痛苦的調(diào)度。
然后細(xì)分,考慮每個(gè)客戶端,就是等待 “發(fā)送列隊(duì)” 不為空!等等,一直這么等待也不行,如果客戶斷開了鏈接呢? 所以要 “同時(shí)等待發(fā)送列隊(duì)不為空&&客戶正常在線,并且已經(jīng)發(fā)送了 HTTP 請求頭部”
好繞口,不過也只能如此了。
avbot 因?yàn)槟J(rèn)使用了 keep-alive , 所以發(fā)送是一個(gè)死循環(huán),知道客戶端主動斷開鏈接或者網(wǎng)絡(luò)發(fā)生錯(cuò)誤。如果 客戶端死了,那么,發(fā)送列隊(duì)興許會出現(xiàn) 爆隊(duì) 的情況。所以要限制發(fā)送列隊(duì)的大小。不是滿了就不發(fā)送,而是滿了后就把早的消息踢掉,也就是讓 客戶端發(fā)生“暫時(shí)性卡死”后,還能繼續(xù)處理最后的幾條信息。
誒,復(fù)雜的邏輯終于理清了,代碼呢?!
啊累?
靠,這么復(fù)雜的 邏輯,得寫一長段代碼,調(diào)試幾百年了吧?
錯(cuò),我只花了幾個(gè)小時(shí), 不到 100 行的代碼就輕松實(shí)現(xiàn)了全部要求。
!!!!!!!!!!!!!!!!!!! WHAT !!!!!!!!!!!!!!!!!!!
這種功能不可能不用個(gè)千把行代碼的吧?!
如果使用以前的老辦法,確實(shí)如此。
可是,自從發(fā)現(xiàn)了 ASIO 后,我被 ASIO 爸爸發(fā)明的協(xié)程深深的 震驚了!
利用 ASIO 爸爸提出的協(xié)程思想,我只用了不到 100行代碼就全部完成了以上復(fù)雜的邏輯,而且,全部都是異步的哦~ 。
好,廢話不多,先貼代碼。然后解釋。
acceptor_service 以 Protocol 和一個(gè) 處理類 為模板。在 main.cpp里,我以 asio::ip::tcp 作為 Protocl 的參數(shù) avbot_rpc_server為 ProtocolProcesser的參數(shù) 調(diào)用acceptor_service。acceptor_service 進(jìn)入一個(gè)死循環(huán)(協(xié)程的)不停的 accept , 然后將 accept 到的 socket 交給 ProtocolProcesser,也就是 avbot_rpc_server 。
avbot_rpc_server 處理一下客戶的請求頭,然后把自己注冊到 on_message 信號處理。
然后,然后就沒然后了。
on_message 在 avbot 接收到消息的時(shí)候發(fā)出。結(jié)果就是 avbot_rpc_server 的 第二個(gè) operator() 被調(diào)用。然后就繼續(xù)發(fā)送了。
當(dāng)然,并不是每一個(gè) on_message 都會導(dǎo)致 avbot_rpc_server 的 第二個(gè) operator() 被調(diào)用的,必須是列隊(duì)為空的時(shí)候。不為空的時(shí)候就不需要調(diào)用。發(fā)送循環(huán)會繼續(xù)循環(huán)的,避免競爭出現(xiàn)
總結(jié)
以上是生活随笔為你收集整理的ASIO协程彻底转变你的思维的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 协程,又称微线程和纤程
- 下一篇: c面试题总结(含答案)