python进程\协程\异步IO
進程
學習python中有什么不懂的地方,小編這里推薦加小編的python學習群:895 817 687有任何不懂的都可以在里面交流,還有很好的視頻教程pdf學習資料,大家一起學習交流!
Python中的多線程無法利用多核優勢 , 所以如果我們想要充分地使用多核CPU的資源 , 那么就只能靠多進程了
multiprocessing模塊中提供了Process , Queue , Pipe , Lock , RLock , Event , Condition等組件 , 與threading模塊有很多相似之處
1.創建進程
from multiprocessing import Process import timedef func(name):time.sleep(2)print('hello',name)if __name__ == '__main__':p= Process(target=func,args=('derek',))p.start()# p.join()print('end...')2.進程間通訊
(1)Queue
不同進程間內存是不共享的,要想實現兩個進程間的數據交換。進程間通信有兩種主要形式 , 隊列和管道
from multiprocessing import Process, Queue #Queue是進程排列def f(test):test.put('22') #通過創建的子進程往隊列添加數據,實線父子進程交互if __name__ == '__main__':q = Queue() #父進程q.put("11")p = Process(target=f, args=(q,)) #子進程p.start()p.join()print("取到:",q.get_nowait())print("取到:",q.get_nowait())#父進程在創建子進程的時候就把q克隆一份給子進程 #通過pickle序列化、反序列化,來達到兩個進程之間的交互結果: 取到: 11 取到: 22(2)Pipe(管道)
The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).
from multiprocessing import Process, Pipedef f(conn):conn.send('11')conn.send('22')print("from parent:",conn.recv())print("from parent:", conn.recv())conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe() #生成管道實例,可以互相send()和recv()p = Process(target=f, args=(child_conn,))p.start()print(parent_conn.recv()) # prints "11"print(parent_conn.recv()) # prints "22"parent_conn.send("33") # parent 發消息給 childparent_conn.send("44")p.join()3.Manager
進程之間是相互獨立的 ,Queue和pipe只是實現了數據交互,并沒實現數據共享,Manager可以實現進程間數據共享 。
Manager還支持進程中的很多操作 , 比如Condition , Lock , Namespace , Queue , RLock , Semaphore等
from multiprocessing import Process, Manager import osdef f(d, l):d[os.getpid()] =os.getpid()l.append(os.getpid())print(l)if __name__ == '__main__':with Manager() as manager:d = manager.dict() #{} #生成一個字典,可在多個進程間共享和傳遞l = manager.list(range(5)) #生成一個列表,可在多個進程間共享和傳遞p_list = []for i in range(2):p = Process(target=f, args=(d, l))p.start()p_list.append(p)for res in p_list: #等待結果res.join()print(d)print(l)4.lock
from multiprocessing import Process, Lockdef f(l, i):#l.acquire()print('hello world', i)#l.release()if __name__ == '__main__':lock = Lock()for num in range(100):Process(target=f, args=(lock, num)).start() #要把lock傳到函數的參數l#lock防止在屏幕上打印的時候會亂5.進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有以下幾個主要方法:
- apply:從進程池里取一個進程并執行
- apply_async:apply的異步版本
- terminate:立刻關閉線程池
- join:主進程等待所有子進程執行完畢,必須在close或terminate之后
- close:等待所有進程結束后,才關閉線程池
協程
1.簡介
協程(Coroutine) : 是單線程下的并發 , 又稱微線程 , 纖程 . 協程是一種用戶態的輕量級線程 , 即協程有用戶自己控制調度
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當于進入上一次調用的狀態
使用協程的優缺點
優點 :
- 協程的切換開銷更小 , 屬于程序級別的切換 , 更加輕量級
- 單線程內就可以實現并發的效果 , 最大限度利用CPU
缺點 :
- 協程的本質是單線程下 , 無法利用多核 , 可以是一個程序開啟多個進程 , 每個進程內開啟多個線程 , 每個線程內開啟協程
- 協程指的是單個線程 , 因而一旦協程出現阻塞 將會阻塞整個線程
2.Greenlet
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator
手動切換
from greenlet import greenletdef test1():print(12)gr2.switch() #到這里切換到gr2,執行test2()print(34)gr2.switch() #切換到上次gr2運行的位置def test2():print(56)gr1.switch() #切換到上次gr1運行的位置print(78)gr1 = greenlet(test1) #啟動一個協程gr1 gr2 = greenlet(test2) #啟動一個協程gr2gr1.switch() #開始運行gr13.Gevent
Gevent 是一個第三方庫,可以輕松通過gevent實現并發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。
(1)IO阻塞自動切換
import geventdef foo():print('Running in foo')gevent.sleep(2)print('阻塞時間最長,最后運行')def bar():print('running in bar')gevent.sleep(1)print('foo()還在阻塞,這里第二個運行')def func3():print("running in func3 ")gevent.sleep(0)print("其它兩個還在IO阻塞先運行")#創建協程實例 gevent.joinall([gevent.spawn(foo), #生成,gevent.spawn(bar),gevent.spawn(func3), ])#遇到IO自動切換結果: Running in foo running in bar running in func3 其它兩個還在IO阻塞先運行 foo()還在阻塞,這里第二個運行 阻塞時間最長,最后運行Process finished with exit code 0由于切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,這一過程在啟動時通過monkey patch完成:
(2)爬蟲例子:
from urllib import request import gevent,time from gevent import monkey monkey.patch_all() #作用:把當前程序的所有的io操作給我單獨的做上標記def f(url):print('GET: %s' % url)resp = request.urlopen(url)data = resp.read()print('%d bytes received from %s.' % (len(data), url))#同步需要的時間 urls = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/' ] time_start = time.time() for url in urls:f(url) print("同步cost",time.time() - time_start)#下面是異步花費的時間 async_time_start = time.time() gevent.joinall([gevent.spawn(f, 'https://www.python.org/'),gevent.spawn(f, 'https://www.yahoo.com/'),gevent.spawn(f, 'https://github.com/'), ]) print("異步cost",time.time() - async_time_start)結果: GET: https://www.python.org/ 48954 bytes received from https://www.python.org/. GET: https://www.yahoo.com/ 491871 bytes received from https://www.yahoo.com/. GET: https://github.com/ 51595 bytes received from https://github.com/. 同步cost 4.928282260894775 GET: https://www.python.org/ GET: https://www.yahoo.com/ GET: https://github.com/ 48954 bytes received from https://www.python.org/. 494958 bytes received from https://www.yahoo.com/. 51599 bytes received from https://github.com/. 異步cost 1.4920852184295654IO多路復用
詳解:http://www.cnblogs.com/alex3714/articles/5876749.html
selectors模塊
selectors基于select模塊實現IO多路復用,調用語句selectors.DefaultSelector(),特點是根據平臺自動選擇最佳IO多路復用機制,調用順序:epoll > poll > select
做一個socket servers
server
import selectors import socket sel = selectors.DefaultSelector() # 根據平臺自動選擇最佳IO多路復用機制def accept(sock, mask):conn, addr = sock.accept() # Should be ready# print('accepted', conn, 'from', addr,mask)conn.setblocking(False) #設置為非阻塞IOsel.register(conn, selectors.EVENT_READ, read)#新連接注冊read回調函數#將conn和read函數注冊到一起,當conn有變化時執行read函數def read(conn, mask):data = conn.recv(1024) # Should be readyif data:print('echoing', repr(data), 'to', conn)conn.send(data) # Hope it won't blockelse:print('closing', conn)sel.unregister(conn)conn.close()sock = socket.socket() sock.bind(('localhost', 9999)) sock.listen(100) sock.setblocking(False) #設置為非阻塞IO sel.register(sock, selectors.EVENT_READ, accept)# 將sock和accept函數注冊到一起,當sock有變化時執行accept函數while True:events = sel.select() #默認阻塞,有活動連接就返回活動的連接列表,監聽[(key1,mask1),(key2),(mask2)]for key, mask in events:callback = key.data #accept #1 key.data就是accept # 2 key.data就是readcallback(key.fileobj, mask) #key.fileobj= 文件句柄# 1 key.fileobj就是sock # 2 key.fileobj就是connclient
mutlti conn socket client
import socket import sysmessages = [ b'This is the message. ',b'It will be sent ',b'in parts.',] server_address = ('localhost', 9999)# Create a TCP/IP socket socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(5)] print(socks) # Connect the socket to the port where the server is listening print('connecting to %s port %s' % server_address) for s in socks:s.connect(server_address)for message in messages:# Send messages on both socketsfor s in socks:print('%s: sending "%s"' % (s.getsockname(), message) )s.send(message)# Read responses on both socketsfor s in socks:data = s.recv(1024)print( '%s: received "%s"' % (s.getsockname(), data) )if not data:print( 'closing socket', s.getsockname() )總結
以上是生活随笔為你收集整理的python进程\协程\异步IO的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python常用模块(二)
- 下一篇: Ubuntu基本命令