Day10-Python3基础-协程、异步IO、redis缓存、rabbitMQ队列
內容目錄:
? 網絡并發編程的2個套路, 多進程,多線程
?
協程
?
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程。
?
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
?
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當于進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
?
協程的好處:
- 無需線程上下文切換的開銷
- 無需原子操作鎖定及同步的開銷方便切換控制流,簡化編程模型
- "原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。
- 原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
- 高并發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用于高并發處理。
缺點:
- 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
- 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
使用yield實現協程操作例子:
import time import queue def consumer(name):print("--->starting eating baozi...")while True:new_baozi = yieldprint("[%s] is eating baozi %s" % (name,new_baozi))#time.sleep(1)def producer():r = con.__next__()r = con2.__next__()n = 0while n < 5:n +=1con.send(n)con2.send(n)print("\033[32;1m[producer]\033[0m is making baozi %s" %n )if __name__ == '__main__':con = consumer("c1")con2 = consumer("c2")p = producer() View Code?
?
協程標準定義,即符合什么條件就能稱之為協程:
?
Greenlet
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator
import time ''' greenlet是封裝好的協程,手動切換 協程遇到IO操作就切換! '''from greenlet import greenlet def test1():print(12)gr2.switch()print(34)gr2.switch()def test2():print(56)gr1.switch()print(78)gr1 = greenlet(test1)#啟動一個協程 gr2 = greenlet(test2)gr1.switch()?
?
?
?比generator還簡單了呢,但沒有解決一個問題,就是遇到IO操作,自動切換?
Gevent
Gevent 是一個第三方庫,可以輕松通過gevent實現并發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。
Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
?
#Author:Yun import gevent def foo():print('Runing in foo')gevent.sleep(2)print('haha')def bar():print('Running in bar')gevent.sleep(1)print('hehe')def func3():print('runing func3')gevent.sleep(0)print('fun3 ...again')gevent.joinall([gevent.spawn(foo),gevent.spawn(bar),gevent.spawn(func3),] ) View Code?
?
?輸出:
Runing in foo
Running in bar
runing func3
fun3 ...again
hehe
haha
同步與異步的性能區別 :
?
import geventdef task(pid):"""Some non-deterministic task"""gevent.sleep(0.5)print('Task %s done' % pid)def synchronous():for i in range(1, 10):task(i)def asynchronous():threads = [gevent.spawn(task, i) for i in range(10)]gevent.joinall(threads)print('Synchronous:')#同步 synchronous()print('Asynchronous:')#異步 asynchronous( View Code?
?
?上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。
初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall?函數,
后者阻塞當前流程,并執行所有給定的greenlet。執行流程只會在 所有greenlet執行完后才會繼續向下走。
?
遇到IO阻塞時會自動切換任務:
?
import gevent,time from urllib import request from gevent import monkeygevent.monkey.patch_all()#把當前程序的所有得IO操作給我單獨的做上標記def f(url):print('GET:%s' % url)resp = request.urlopen(url)data = resp.read()print('%d bytes received from %s'%(len(data),url))async_time_start = time.time() gevent.joinall([gevent.spawn(f,'https://www.python.org/'),gevent.spawn(f,'https://www.yahoo.com/'),gevent.spawn(f,'https://github.com/')])print("異步cost",time.time()-async_time_start) View Code?
?
通過gevent實現單線程下的多socket并發:
?
?
#Author:Yun #通過gevent實現單線程下的多socket并發 import sys import socket import time import geventfrom gevent import socket, monkeymonkey.patch_all()def server(port):s = socket.socket()s.bind(('0.0.0.0', port))s.listen(500)while True:conn, addr = s.accept()gevent.spawn(handle_request, conn)#生成一個協程def handle_request(conn):try:while True:data = conn.recv(1024)print("recv:", data)conn.send(data)if not data:conn.shutdown(socket.SHUT_WR)except Exception as ex:print(ex)finally:conn.close()if __name__ == '__main__':server(8989) server?
?
?
?
#Author:Yun import socketHOST = 'localhost' # The remote host PORT = 8989 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True:msg = bytes(input(">>:"), encoding="utf8")s.sendall(msg)data = s.recv(1024)# print(data)print('Received', repr(data)) s.close() Client?
?
并發100個sock連接
#Author:Yun #通過gevent實現單線程下的多socket并發 import sys import socket import time import geventfrom gevent import socket, monkeymonkey.patch_all()def server(port):s = socket.socket()s.bind(('0.0.0.0', port))s.listen(500)while True:conn, addr = s.accept()gevent.spawn(handle_request, conn)#生成一個協程def handle_request(conn):try:while True:data = conn.recv(1024)print("recv:", data)conn.send(data)if not data:conn.shutdown(socket.SHUT_WR)except Exception as ex:print(ex)finally:conn.close()if __name__ == '__main__':server(8989) Client2.py?
?
論事件驅動與異步IO
通常,我們寫服務器處理模型的程序時,有以下幾種模型: (1)每收到一個請求,創建一個新的進程,來處理該請求; (2)每收到一個請求,創建一個新的線程,來處理該請求; (3)每收到一個請求,放入一個事件列表,讓主進程通過非阻塞I/O方式來處理請求 上面的幾種方式,各有千秋, 第(1)種方法,由于創建新的進程的開銷比較大,所以,會導致服務器性能比較差,但實現比較簡單。 第(2)種方式,由于要涉及到線程的同步,有可能會面臨死鎖等問題。 第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都復雜。 綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數網絡服務器采用的方式看圖說話事件驅動模型
在UI編程中,常常要對鼠標點擊進行相應,首先如何獲得鼠標點擊呢?
方式一:創建一個線程,該線程一直循環檢測是否有鼠標點擊,那么這個方式有以下幾個缺點:
1. CPU資源浪費,可能鼠標點擊的頻率非常小,但是掃描線程還是會一直循環檢測,這會造成很多的CPU資源浪費;如果掃描鼠標點擊的接口是阻塞的呢?
2. 如果是堵塞的,又會出現下面這樣的問題,如果我們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,由于掃描鼠標時被堵塞了,那么可能永遠不會去掃描鍵盤;
3. 如果一個循環需要掃描的設備非常多,這又會引來響應時間的問題;
所以,該方式是非常不好的。
方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如很多UI平臺都會提供onClick()事件,這個事件就代表鼠標按下事件。事件驅動模型大體思路如下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增加一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不同的事件,調用不同的函數,如onClick()、onKeyDown()等;
4. 事件(消息)一般都各自保存各自的處理函數指針,這樣,每個消息都有獨立的處理函數;
?
?
?
事件驅動編程是一種編程范式,這里程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。
另外兩種常見的編程范式是(單線程)同步以及多線程編程。
讓我們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展示了隨著時間的推移,這三種模式下程序所做的工作。
?
這個程序有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間已經用灰色框標示出來了。
?
?
?
?
在單線程同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之后它們才能依次執行。
這種明確的執行順序和串行化處理的行為是很容易推斷得出的。如果任務之間并沒有互相依賴的關系,但仍然需要互相等待的話這就使得程序不必要的降低了運行速度。
在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操作系統來管理,在多處理器系統上可以并行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其他線程得以繼續執行。與完成類似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,因為這類程序不得不通過線程同步機制如鎖、可重入函數、線程局部存儲或者其他機制來處理線程安全問題,如果實現不當就會導致出現微妙且令人痛不欲生的bug。
在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其他昂貴的操作時,注冊一個回調到事件循環中,然后當I/O操作完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢所有的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序盡可能的得以執行而不需要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行為,因為程序員不需要關心線程安全問題。
當我們面對如下的環境時,事件驅動模型通常是一個好的選擇:
當應用程序需要在任務間共享可變的數據時,這也是一個不錯的選擇,因為這里不需要采用同步處理。
網絡應用程序通常都有上述這些特點,這使得它們能夠很好的契合事件驅動編程模型。
?
Select\Poll\Epoll異步IO
首先列一下,sellect、poll、epoll三者的區別?
select?
select最早于1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。
select目前幾乎在所有的平臺上支持,其良好跨平臺支持也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。
select的一個缺點在于單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨著文件描述符數量的增大,其復制的開銷也線性增長。同時,由于網絡響應時間的延遲使得大量TCP連接處于非活躍狀態,但調用select()會對所有socket進行一次線性掃描,
所以這也浪費了一定的開銷。
poll?
poll在1986年誕生于System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制。
poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制于用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨著文件描述符數量的增加而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)。
epoll?
直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。
epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。
epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。
另一個本質的改進在于epoll采用基于事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基于某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
?
Python select?
Python的select()方法直接調用操作系統的IO接口,它監控sockets,open files, and pipes(所有帶fileno()方法的文件句柄)同時變成readable 和writeable, 或者通信錯誤,select()使得同時監控多個連接變的簡單,
并且這比寫一個長循環來等待和監控多客戶端連接要高效,因為select直接通過操作系統提供的C的網絡接口進行操作,而不是通過Python的解釋器。
?注意:使用帶有select()的Python文件對象適用于Unix,但在Windows下不受支持。
接下來通過echo server例子要以了解select 是如何通過單進程實現同時處理多個非阻塞的socket連接的
?
import select import socket import sys import Queue# Create a TCP/IP socket server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(0)# Bind the socket to the port server_address = ('localhost', 10000) print(sys.stderr, 'starting up on %s port %s' % server_address) server.bind(server_address)# Listen for incoming connections server.listen(5)?
?
?select()方法接收并監控3個通信列表, 第一個是所有的輸入的data,就是指外部發過來的數據,第2個是監控和接收所有要發出去的data(outgoing data),
第3個監控錯誤信息,接下來我們需要創建2個列表來包含輸入和輸出信息來傳給select().
# Sockets from which we expect to read inputs = [ server ] # Sockets to which we expect to write outputs = [ ]所有客戶端的進來的連接和數據將會被server的主循環程序放在上面的list中處理,我們現在的server端需要等待連接可寫(writable)之后才能過來,
然后接收數據并返回(因此不是在接收到數據之后就立刻返回),因為每個連接要把輸入或輸出的數據先緩存到queue里,然后再由select取出來再發出去。
?服務器主循環將連接添加到這些列表中或從這些列表中刪除。 由于此版本的服務器將在發送任何數據之前等待套接字變為可寫(而不是立即發送回復),
因此每個輸出連接都需要一個隊列作為通過它發送數據的緩沖區。
# Outgoing message queues (socket:Queue) message_queues = {}下面是此程序的主循環,調用select()時會阻塞和等待直到新的連接和數據進來
while inputs:# Wait for at least one of the sockets to be ready for processingprint(sys.stderr, '\nwaiting for the next event')readable, writable, exceptional = select.select(inputs, outputs, inputs)
?
當你把inputs,outputs,exceptional(這里跟inputs共用)傳給select()后,它返回3個新的list,
我們上面將他們分別賦值為readable,writable,exceptional,
所有在readable list中的socket連接代表有數據可接收(recv),所有在writable list中的
存放著你可以對其進行發送(send)操作的socket連接,當連接通信出現error時會把error寫到exceptional列表中。
Readable list 中的socket 可以有3種可能狀態,第一種是如果這個socket是main "server" socket,它負責監聽客戶端的連接,
如果這個main server socket出現在readable里,那代表這是server端已經ready來接收一個新的連接進來了,
為了讓這個main server能同時處理多個連接,在下面的代碼里,我們把這個main server的socket設置為非阻塞模式。
for s in readable:if s is server:# A "readable" server socket is ready to accept a connectionconnection, client_address = s.accept()print(sys.stderr, 'new connection from', client_address)connection.setblocking(0)#非阻塞模式,為了讓這個main server能同時處理多個連接inputs.append(connection)# Give the connection a queue for data we want to sendmessage_queues[connection] = Queue.Queue()
?
第二種情況是這個socket是已經建立了的連接,它把數據發了過來,這個時候你就可以通過recv()來接收它發過來的數據,然后把接收到的數據放到queue里,這樣你就可以把接收到的數據再傳回給客戶端了
else:data = s.recv(1024)if data:# A readable client socket has dataprint(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()))message_queues[s].put(data)# Add output channel for responseif s not in outputs:outputs.append(s)
?
第三種情況就是這個客戶端已經斷開了,所以你再通過recv()接收到的數據就為空了,所以這個時候你就可以把這個跟客戶端的連接關閉了。else:# Interpret empty result as closed connectionprint(sys.stderr, 'closing', client_address, 'after reading no data')# Stop listening for input on the connectionif s in outputs:outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,所以這時候如果這個客戶端的連接對象還在outputs列表中,就把它刪掉inputs.remove(s) #inputs中也刪除掉s.close() #把這個連接關閉掉# Remove message queuedel message_queues[s]
?
對于writable list中的socket,也有幾種狀態,如果這個客戶端連接在跟它對應的queue里有數據,就把這個數據取出來再發回給這個客戶端,否則就把這個連接從output list中移除,
這樣下一次循環select()調用時檢測到outputs list中沒有這個連接,那就會認為這個連接還處于非活動狀態
for s in writable:try:next_msg = message_queues[s].get_nowait()except Queue.Empty:# No messages waiting so stop checking for writability.print(sys.stderr, 'output queue for', s.getpeername(), 'is empty')outputs.remove(s)else:print(sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername()))s.send(next_msg)
最后,如果在跟某個socket連接通信過程中出了錯誤,就把這個連接對象在inputs\outputs\message_queue中都刪除,再把連接關閉掉
for s in exceptional:print(sys.stderr, 'handling exceptional condition for', s.getpeername())# Stop listening for input on the connection inputs.remove(s)if s in outputs:outputs.remove(s)s.close()# Remove message queuedel message_queues[s]?
最后服務器端的完整代碼如下: 
?
?
import select import socket import sys import queue# Create a TCP/IP socket server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False)# Bind the socket to the port server_address = ('localhost', 10000) print(sys.stderr, 'starting up on %s port %s' % server_address) server.bind(server_address)# Listen for incoming connections server.listen(5)# Sockets from which we expect to read inputs = [ server ]# Sockets to which we expect to write outputs = [ ]message_queues = {} while inputs:# Wait for at least one of the sockets to be ready for processingprint( '\nwaiting for the next event')readable, writable, exceptional = select.select(inputs, outputs, inputs)# Handle inputsfor s in readable:if s is server:# A "readable" server socket is ready to accept a connectionconnection, client_address = s.accept()print('new connection from', client_address)connection.setblocking(False)inputs.append(connection)# Give the connection a queue for data we want to sendmessage_queues[connection] = queue.Queue()else:data = s.recv(1024)if data:# A readable client socket has dataprint(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )message_queues[s].put(data)# Add output channel for responseif s not in outputs:outputs.append(s)else:# Interpret empty result as closed connectionprint('closing', client_address, 'after reading no data')# Stop listening for input on the connectionif s in outputs:outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,所以這時候如果這個客戶端的連接對象還在outputs列表中,就把它刪掉inputs.remove(s) #inputs中也刪除掉s.close() #把這個連接關閉掉# Remove message queuedel message_queues[s]# Handle outputsfor s in writable:try:next_msg = message_queues[s].get_nowait()except queue.Empty:# No messages waiting so stop checking for writability.print('output queue for', s.getpeername(), 'is empty')outputs.remove(s)else:print( 'sending "%s" to %s' % (next_msg, s.getpeername()))s.send(next_msg)# Handle "exceptional conditions"for s in exceptional:print('handling exceptional condition for', s.getpeername() )# Stop listening for input on the connection inputs.remove(s)if s in outputs:outputs.remove(s)s.close()# Remove message queuedel message_queues[s] Server?
import socket import sysmessages = [ 'This is the message. ','It will be sent ','in parts.',] server_address = ('localhost', 9898)# Create a TCP/IP socket socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),socket.socket(socket.AF_INET, socket.SOCK_STREAM),]# Connect the socket to the port where the server is listening print >>sys.stderr, 'connecting to %s port %s' % server_address for s in socks:s.connect(server_address)for message in messages:# Send messages on both socketsfor s in socks:print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)s.send(message)# Read responses on both socketsfor s in socks:data = s.recv(1024)print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)if not data:print >>sys.stderr, 'closing socket', s.getsockname()s.close() Client?
?RabbitMQ隊列
?為什么會有RabbitMQ隊列?
?
?
?
?安裝python rabbitMQ module
pip install pika or easy_install pika or 源碼https://pypi.python.org/pypi/pika?
?
實現最簡單的隊列通信
?
?
?
?send端:
?
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()#聲明一個管道#聲明queue channel.queue_declare(queue='hello')#RabbitMQ消息永遠不能直接發送到隊列,它總是需要通過交換. channel.basic_publish(exchange='',routing_key='hello',body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()?
?
?
?
receive端:
?
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()#聲明一個管道'''您可能會問為什么我們再次聲明隊列 - 我們已經在之前的代碼中聲明了它。
如果我們確定隊列已經存在,我們可以避免這種情況。 例如,如果是receive端的程序
先運行,程序就會報錯,因為找不到queue。 但我們還不確定首先運行哪個程序。在這種情況下,在兩個程序中都聲明隊列,
就可以避免receive端程序先運行而出錯的情況。
''' channel.queue_declare(queue='hello')#聲明queuedef callback(ch, method, properties, body):#callback回調函數
#ch是channel的內存對象print(" [x] Received %r" % body)
# 處理完消息手動去跟服務器端確認,如果不確認服務器端是不會刪除消息
ch.basic_ack(delivery_tag=method.delivery_tag)
#處理完消息手動去跟服務器端確認,服務器端就會刪除消息channel.basic_consume(callback,#消費消息queue='hello',no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
?
?Work Queues
?
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差不多。
消息提供者代碼:
?
import pika import timeimport sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()#聲明管道# 聲明queue channel.queue_declare(queue='task_queue')# RabbitMQ消息永遠不能直接發送到隊列,它總是需要通過交換. message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() channel.basic_publish(exchange='',routing_key='task_queue',body=message,properties=pika.BasicProperties(delivery_mode=2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
?
?
?
?
?消費者代碼:
import pika, timeconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()def callback(ch, method, properties, body):print(" [x] Received %r" % body)time.sleep(20)print(" [x] Done")print("method.delivery_tag",method.delivery_tag)# 處理完消息手動去跟服務器端確認,如果不確認服務器端是不會刪除消息
ch.basic_ack(delivery_tag=method.delivery_tag)
#處理完消息手動去跟服務器端確認,服務器端就會刪除消息channel.basic_consume(callback,queue='task_queue',no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
?
?
?
?執行任務可能需要幾秒鐘。您可能想知道如果其中一個消費者開始執行長任務并且僅在部分完成時死亡會發生什么。使用我們當前的代碼,一旦RabbitMQ向客戶發送消息,它立即將其從內存中刪除。
在這種情況下,如果你殺死一個工人,我們將丟失它剛剛處理的消息。我們還將丟失分發給這個特定工作者但尚未處理的所有消息。
但我們不想失去任何任務。如果工人死亡,我們希望將任務交付給另一名工人。
為了確保消息永不丟失,RabbitMQ支持消息確認。從消費者發回ack(nowledgement)以告知RabbitMQ已收到,處理了特定消息,并且RabbitMQ可以自由刪除它。
如果消費者死亡(其通道關閉,連接關閉或TCP連接丟失)而不發送確認,RabbitMQ將理解消息未完全處理并將重新排隊。如果其他消費者同時在線,則會迅速將其重新發送給其他消費者。
這樣你就可以確保沒有消息丟失,即使工人偶爾會死亡。沒有任何消息超時;當消費者死亡時,RabbitMQ將重新發送消息。即使處理消息需要非常長的時間,也沒關系。
默認情況下,消息確認已打開。在前面的示例中,我們通過no_ack = True標志明確地將它們關閉。一旦我們完成任務,就應該刪除此標志并從工作人員發送適當的確認。
?
def callback(ch, method, properties, body):print " [x] Received %r" % (body,)time.sleep( body.count('.') )print " [x] Done"ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(callback,queue='hello')?
?
使用此代碼,我們可以確定即使您在處理消息時使用CTRL + C殺死一名工作人員,也不會丟失任何內容。 工人死后不久,所有未經確認的消息將被重新傳遞
?
?消息持久化
?我們已經學會了如何確保即使消費者死亡,任務也不會丟失(默認情況下,如果要禁用使用no_ack = True)。 但是如果RabbitMQ服務器停止,我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它將忘記隊列和消息,除非你告訴它不要。 確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標記為持久。
首先,我們需要確保RabbitMQ永遠不會丟失我們的隊列。 為此,我們需要聲明它是持久的:
?
?
?雖然此命令本身是正確的,但它在我們的設置中不起作用。 那是因為我們已經定義了一個名為hello的隊列,這個隊列不耐用。 RabbitMQ不允許您使用不同的參數重新定義現有隊列,
并將向嘗試執行此操作的任何程序返回錯誤。 但是有一個快速的解決方法 - 讓我們聲明一個具有不同名稱的隊列,例如:task_queue:
channel.queue_declare(queue='task_queue', durable=True)?
?
此queue_declare更改需要應用于生產者和消費者代碼。
此時我們確信即使RabbitMQ重新啟動,task_queue隊列也不會丟失。 現在我們需要將消息標記為持久性 - 通過提供值為2的delivery_mode屬性。
?
消息公平分發
?如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,
同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
?
?????????????????????????????????????????????????????????????????????????????????????????????
?
?
channel.basic_qos(prefetch_count=1)?
?
?帶消息持久化+公平分發的完整代碼:
生產者端:
?
#Author:Yun import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()#聲明一個管道# 聲明queue channel.queue_declare(queue='hello2',durable=True)#durable=True只是把隊列持久化# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='',routing_key='hello2',body='Hello World!',properties=pika.BasicProperties(delivery_mode= 2,#使消息持久化 )) print(" [x] Sent 'Hello World!'") connection.close() View Code?
?
?
?
?消費消息端:
?
#Author:Yun import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()#聲明一個管道'''您可能會問為什么我們再次聲明隊列 - 我們已經在之前的代碼中聲明了它。 如果我們確定隊列已經存在,我們可以避免這種情況。 例如,如果是consumer.py程序 先運行程序會因為找不到queue而報錯。 但我們還不確定首先運行哪個程序。在這種情況下,在兩個程序中都聲明隊列, 就可以避免consumer.py先運行而出錯的情況。 ''' channel.queue_declare(queue='hello2',durable=True)def callback(ch, method, properties, body):#callback回調函數#ch是channel的內存對象print(" [x] Received %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)#告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。 channel.basic_consume(#消費消息 callback,queue='hello2',no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() View Code?
?
?
?
?
Publish\Subscribe(消息發布\訂閱)
之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了。
交換是一件非常簡單的事情。 一方面,它接收來自生產者的消息,另一方面將它們推送到隊列。 交易所必須確切知道如何處理收到的消息。 它應該附加到特定隊列嗎?
它應該附加到許多隊列嗎? 或者它應該被丟棄。 其規則由交換類型定義。
?
?
Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息
fanout:?所有bind到此exchange的queue都可以接收消息
direct:?通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
?
表達式符號說明:#代表一個或多個字符,*代表任何字符
? ? ? 例:#.a會匹配a.a,aa.a,aaa.a等
? ? ? ? ? *.a會匹配a.a,b.a,c.a等
? ? ?注:使用RoutingKey為#,Exchange Type為topic的時候相當于使用fanout 
headers: 通過headers 來決定把消息發給哪些queue
?
?
?消息publisher:
?
import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout')message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs',routing_key='',body=message) print(" [x] Sent %r" % message) connection.close() View Code?
?
?
?
?消息subscriber:
?
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout')result = channel.queue_declare(exclusive=True)#exclusive唯一的,不指定queue名字,rabbit會隨機分配一個名字,exclusive=True #會在使用此queue的消費者斷開后,自動將queue刪除 queue_name = result.method.queuechannel.queue_bind(exchange='logs',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming() View Code?
?
?
?
?有選擇的接收消息(exchange type=direct)
?
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。
?
?
?
?消息publisher:
import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='direct_logs',type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close() View Code?
?
??消息subscriber:
?
import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='direct_logs',type='direct')result = channel.queue_declare(exclusive=True) queue_name = result.method.queueseverities = sys.argv[1:] if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])sys.exit(1)for severity in severities:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming() View Code?
?
?
?
?測試程序步驟:
1、先通過命令行,啟動sub端的程序(聲明接收消息的關鍵字),如,python? consumer.py? info
?[*] Waiting for logs. To exit press CTRL+C
2、再通過命令行,啟動pub端程序(聲明發送消息的關鍵字),如,python? producer.py??? info? hehe
?[x] Sent 'info':'hehe'
3、結果:
H:\Python3_study\jichu\day10\RabbitMQ\direct_broadcast>python? consumer.py? info
?[*] Waiting for logs. To exit press CTRL+C
?[x] 'info':b'hehe'
更細致的消息過濾
雖然使用直接交換改進了我們的系統,但它仍然有局限性 - 它不能基于多個標準進行路由。
在我們的日志系統中,我們可能不僅要根據嚴重性訂閱日志,還要根據發出日志的源來訂閱日志。 您可能從syslog unix工具中了解這個概念,
該工具根據嚴重性(info / warn / crit ...)和facility(auth / cron / kern ...)來路由日志。
這會給我們帶來很大的靈活性 - 我們可能想聽聽來自'cron'的關鍵錯誤以及來自'kern'的所有日志。
?
?
publisher:
?
import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()?
?
?
?
subscriber:
?
#Author:Yun import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic')result = channel.queue_declare(exclusive=True) queue_name = result.method.queuebinding_keys = sys.argv[1:] if not binding_keys:sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()?
?
?
?
?
要從發布端接收所有運行的日志:
要從發布端接收所有以kern.開頭的日志:
python receive_logs_topic.py "kern.*"要從發布端接收所有以.critical結尾的日志:
python receive_logs_topic.py "*.critical"您可以創建多個綁定:
python receive_logs_topic.py "kern.*" "*.critical"并使用路由鍵“kern.critical”類型發出日志:
python emit_log_topic.py "kern.critical" "A critical kernel error"?
Remote procedure call (RPC)??
為什么需要RPC(remote? procedure? call)?遠程過程調用
當你有這樣一個需求,從客戶端發一條命令讓遠程服務器執行完后,返回一個數據。
前面的rabbitMQ 操作消息傳遞只是單向的,無法滿足這種需求。因此RPC就橫空出世。
什么是RPC?
從客戶端發一條命令讓遠程服務器執行完后,返回一個數據這樣的模式稱為RPC。如,典型的RPC--簡單網絡管理協議(SNMP)
?
?為了說明如何使用RPC服務,我們將創建一個簡單的客戶端類。 它將公開一個名為call的方法,
該方法發送一個RPC請求并阻塞,直到收到答案為止:
?
fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print("fib(4) is %r" % result)?
? 官方版:
?
? 個人版:
?
?RPC server:
?
#Author:Yun #'remote procedure call' import pika import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):#斐波那契數列 1 1 2 3 5 8 13 21if n == 0:return 0elif n == 1:return 1else:return fib(n - 1) + fib(n - 2)def on_request(ch, method, props, body):n = int(body)print(" [.] fib(%s)" % n)response = fib(n)#獲取結果 ch.basic_publish(exchange='',routing_key=props.reply_to,#拿到客戶端隨機生成的queueproperties=pika.BasicProperties(correlation_id= \props.correlation_id),#拿到客戶端的correlation_id,并把它返回給客戶端body=str(response))ch.basic_ack(delivery_tag=method.delivery_tag)#確保消息被消費,代表任務完成 channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests") channel.start_consuming() View Code?
?
?
RPC client:
?
#Author:Yun import pika import uuid import timeclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#連接遠程 self.channel = self.connection.channel()result = self.channel.queue_declare(exclusive=True)# exclusive唯一的,不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除self.callback_queue = result.method.queue#生成隨機queue為callback_queue#聲明接收消息self.channel.basic_consume(self.on_response,#只要收到消息就調用on_response方法no_ack=True,queue=self.callback_queue#聲明從哪個queue接收消息 )def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:#此處判斷服務器發回來的corrid是不是和本機相等,如果相等就說明服務器返回的是客戶端發送指令所產生的結果self.response = body'''if self.corr_id == props.correlation_id:self.response = body為什么要有這一步,因為客戶端可以給服務器連續發兩個指令,而返回順序不一定是先發的先返回;為了確保服務器返回的是客戶端發送指令所產生的結果,所以必須有此步驟。'''def call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())#發消息self.channel.basic_publish(exchange='',routing_key='rpc_queue',#聲明消息發到rpc_queueproperties=pika.BasicProperties(reply_to=self.callback_queue,#讓服務器執行完命令,把結果返回到callback_queuecorrelation_id=self.corr_id,#給服務器發送本機生成的correlation_id ),body=str(n))#消息內容#當收到信息后,會觸發on_response方法,這個方法使self.response不再為None#所以收到信息后,就會跳出循環while self.response is None:self.connection.process_data_events()#非阻塞版的start_consumiting()每過一段時間就檢查一次#有消息就收消息沒消息不阻塞就繼續往下走print('no msg...')#沒有消息就會打印'no msg...'time.sleep(0.5)return int(self.response)fibonacci_rpc = FibonacciRpcClient()#實例化print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(8)#調用類中的call()方法 print(" [.] Got %r" % response) View Code?
?
?
?
Memcached & Redis使用
memcached?
http://www.cnblogs.com/wupeiqi/articles/5132791.html?
?
redis 使用
http://www.cnblogs.com/alex3714/articles/6217453.html
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
轉載于:https://www.cnblogs.com/yunwangjun-python-520/p/9983730.html
總結
以上是生活随笔為你收集整理的Day10-Python3基础-协程、异步IO、redis缓存、rabbitMQ队列的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: JavaScript-4.2函数,变量作
- 下一篇: cf366C Dima and Sala
