同步异步 阻塞 非阻塞 异步调用 线程队列 协程
生活随笔
收集整理的這篇文章主要介紹了
同步异步 阻塞 非阻塞 异步调用 线程队列 协程
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
阻塞 非阻塞
阻塞:程序遇到了IO操作 導致代碼無法繼續執行 交出了COU執行權
非阻塞:沒有IO操作 或者即使遇到IO操作 也不阻塞代碼執行
阻塞 就緒 運行
指的是應用程序所處的狀態
寫程序時 盡量減少IO操作
同步 異步
同步:發起一個任務后,必須原地等待任務執行結束 拿到一個明確的結果
異步:發起一個任務后,不需要等待,代碼繼續往下執行
指的是任務的發起方式
異步任務的效率高于同步
場景:當你的任務是不需要立即獲取結果的 并且還有其他任務需要處理 那就發起異步任務
如何發起異步任務:多進程 多線程 # from multiprocessing import Process # from threading import Thread # # import time # # def task(): # time.sleep(2) # print("run....") # # # if __name__ == '__main__': # p = Thread(target=task) # p.start() # print("over") # from concurrent.futures import ProcessPoolExecutor # # # import time,random # # # def task(num): # time.sleep(random.randint(1,3)) # print(num**2) # return num**2 # # if __name__ == '__main__': # # pool=ProcessPoolExecutor() # # fs=[] # for i in range(6): # f=pool.submit(task,i) # fs.append(f) # # # 方式1 # # 按照順序來獲取結果將導致一些任務可能已經完成了但是沒有及時獲取到結果 # # 例如第二個任務先執行完畢 但是必須要等到第一個執行結束才能拿到結果 # # for f in fs: # # print(f.result()) # # # shutdown 關閉進程池 會阻塞知道所有進程任務全部執行完畢 # # pool.shutdown(wait=True) # for f in fs: # print(f.result()) # #關閉之后不能提交新任務 # # pool.submit(task,10) # # print('over') View Code
同步和阻塞的區別:
阻塞一定意味著CPU切走了
而同步有可能是因為計算任務比較耗時
獲取異步任務結果的方式
爬蟲:
1.獲取到HTML文檔
2.從文檔中取出需要的數據 # from concurrent.futures import ThreadPoolExecutor # import requests # import threading # # # #生產 # def get_data(url): # print("%s正在處理%s" % (threading.current_thread().name, url)) # resp = requests.get(url) # print("%s獲取完成" % url) # # 方式3 直接調用處理函數 生產和消費者 強行耦合在一起 # # parser_data(resp.text,url) # # return resp.text,url # # #消費 # def parser_data(f): # res = f.result() # print("解析長度為:%s,地址:%s" % (len(res[0]),res[1])) # print("當前線程:%s" % threading.current_thread().name) # # # # 要爬取的地址列表 # urls = ["https://www.baidu.com/","https://www.bilibili.com","https://www.csdn.net"] # # pool = ThreadPoolExecutor() # # # # fs = [] # for url in urls: # f = pool.submit(get_data,url) #提交任務 # f.add_done_callback(parser_data) # 綁定一個回調函數# 方式1 把并發變成了串行 # data = f.result() # 獲取結果 # parser_data(data) # 解析結果 # fs.append(f)# 方式2 必須等待全部完成 不能及時處理 # pool.shutdown() # for f in fs: # data = f.result() # parser_data(data[0],data[1]) 爬蟲
異步回調
回調:就是回調函數
給異步任務綁定一個函數 當任務完成時會自動調用該函數
具體使用:
當你往poll中添加了一個異步任務,會返回一個表示結果的對象
有一個對象綁定方法 add_done_callback(綁定一個回調函數 會在任務結束后自動調入) 需要一個函數作為參數
注意: 回調函數 必須有且只有一個參數 就是對象本身
通過對象.result來獲取結果
回調函數交給子線程執行 誰有空誰處理
優點:不用原地等待 任務結果可以立即獲取到 # from threading import Thread # from multiprocessing import Process # # def task(callback): # print("run......") # callback() # # # def callback(): # print("這是回調函數.....") # # # if __name__ == '__main__': # t = Process(target=task,args=(callback,)) # t.start() # # from concurrent.futures import ProcessPoolExecutor # import os # # # def task(): # print("task....run") # # def call_back(obj): # print(os.getpid()) # print(obj.result()) # # if __name__ == '__main__': # print("主:",os.getpid()) # pool = ProcessPoolExecutor() # f = pool.submit(task) # f.add_done_callback(call_back) 如何為線程進程綁定回調函數
線程隊列: import queue queue該模塊下提供了一些常用的數據容器 但是他們僅僅是容器 每一個數據共享這個提點 from queue import Queue,LifoQueue,PriorityQueue # q = Queue() # q.put(1) # q.put(2) # print(q.get()) # print(q.get()) # print(q.get(timeout=2))
Lifoqueue() 后進先出 跟堆棧相似 # q = LifoQueue() # q.put(1) # # q.put(2) # # print(q.get())
優先級隊列
需要傳入一個元組類型 第一個是優先級 第二個是值 # q = PriorityQueue() # # q.put((-111111,"abc")) # # q.put((2,"abc")) # q.put(("a","hello world1")) # q.put(("A","hello world2")) # # q.put(("c","hello world")) # print(q.get())
事件是一個通知信息 表示什么時間發生了什么事情
用于線程間通訊
線程間 本來就是數據共享的 也就是說 即使沒有事件這個東西 也是沒有問題的
線程之間,執行流程是完全獨立的 一些時候可能需要知道另一個線程發生了什么然后采取一些行動
事件其實就是幫你維護了一個bool值
在bool為True之前 wait函數將一直阻塞 這樣一來就避免了不斷地詢問對方的狀態
假設有兩條線程 一個用于開啟服務器 一個用于連接服務器
連接服務器一定要保證 服務器已經開啟成功了 服務器啟動需要花費一些時間
from threading import Thread,Event import time,random # 不使用事件 # 默認未啟動 # is_boot=False # # def boot_server(): # global is_boot # print('啟動服務器') # time.sleep(random.randint(1,3)) # print('服務器啟動成功') # is_boot=True # # # def connect_server(): # while True: # if is_boot: # print('連接成功!') # break # else: # print('連接失敗') # time.sleep(1) # # t1 = Thread(target=boot_server) # t1.start() # # t2 = Thread(target=connect_server) # t2.start()# 使用事件 # 一個事件 # boot = Event() # # def boot_server(): # print("正在啟動服務器.......") # time.sleep(random.randint(2,5)) # print("服務器啟動成功.......") # boot.set() # # # def connect_server(): # print("開始嘗試連接.....") # boot.wait() # 是一個阻塞函數 會一直等到set()函數被調用 # print("連接服務器成功!") # # t1 = Thread(target=boot_server) # t1.start() # # t2 = Thread(target=connect_server) # t2.start() 事件
協程就是要用單線程實現并發
GIL導致多個線程不能并行 效率低
Cpython中多個線程不能并行
既然多個線程也不乏提高執行效率 還有沒有必要開線程
例子:只有一個崗位 但是有十個任務 請十個人也提高不了效率 反而增加了系統開銷
現在只有一個線程 那要如何并發的處理十個任務
一個線程如何能并發?
多道技術
切換+保存狀態
首先任務其實就是一堆代碼 一堆代碼可以組成一個函數
如何能使得可以再多個函數之間進行切換
協程指的就是一個線程完成并發執行
用于提高效率 在檢測到IO操作時 切換到其他的非IO操作 這樣一來 在操作系統眼里程序依舊沒有阻塞
力求盡可能多的占用CPU執行權
固定使用場景:IO密集型任務
如果任務在CPython中 是計算密集型 使用協程是無法提交效率的 反而因為切換任務導致效率降低 只能靠進程了
IO密集型 多線程會比多進程效率高 因為線程的開銷比進程小很多
本質上協程還是只有一個線程 所以 一旦遇到IO操作 整個線程就卡住了
協程僅在以下場景能夠提交效率
1.任務是IO密集型
2.一定要可以檢測IO操作并且在IO即將阻塞時 切換到計算任務 從而使得CPU盡可能多的執行你的線程
TCP的服務器端
10W線程 10個客戶端
from gevent import monkey,spawn;monkey.patch_all()import time def task():print("task run")time.sleep(3)print("task over")def task2():print("task2 run")time.sleep(3)print("task2 over")spawn(task) spawn(task2)print("over") time.sleep(2) 協程join的用法
greenlet 是對yield進行了封裝
簡化了書寫 但是他不能檢測IO操作 # import time # # def task1(): # for i in range(10000): # 1+1 # yield # # def task2(): # for i in range(10000): # g=task1() # 1+1 # next(g) # # # start_time=time.time() # task2() # print(time.time()-start_time)
形式 打補丁的代碼必須放在模塊之前 import gevent def task():.... g=gevent.spawn(task) g.join() print('over')
協程創建完成后必須調用join函數 否則任務不會開啟 可以理解為線程中的start函數
# import gevent.monkey # gevent.monkey.patch_all() # import socket # import time # # def task(): # s = socket.socket() # s.bind(("127.0.0.1",8989)) # s.listen() # while True: # c,addr = s.accept() # g2 = gevent.spawn(handler,c) # # g2.join() # # def handler(c): # while True: # print(c.recv(1024).decode("utf-8")) # # # # def task2(): # # while True: # # time.sleep(1) # # print("-----------") # # # g1 = gevent.spawn(task) # g1.join() gevent
阻塞:程序遇到了IO操作 導致代碼無法繼續執行 交出了COU執行權
非阻塞:沒有IO操作 或者即使遇到IO操作 也不阻塞代碼執行
阻塞 就緒 運行
指的是應用程序所處的狀態
寫程序時 盡量減少IO操作
同步 異步
同步:發起一個任務后,必須原地等待任務執行結束 拿到一個明確的結果
異步:發起一個任務后,不需要等待,代碼繼續往下執行
指的是任務的發起方式
異步任務的效率高于同步
場景:當你的任務是不需要立即獲取結果的 并且還有其他任務需要處理 那就發起異步任務
如何發起異步任務:多進程 多線程 # from multiprocessing import Process # from threading import Thread # # import time # # def task(): # time.sleep(2) # print("run....") # # # if __name__ == '__main__': # p = Thread(target=task) # p.start() # print("over") # from concurrent.futures import ProcessPoolExecutor # # # import time,random # # # def task(num): # time.sleep(random.randint(1,3)) # print(num**2) # return num**2 # # if __name__ == '__main__': # # pool=ProcessPoolExecutor() # # fs=[] # for i in range(6): # f=pool.submit(task,i) # fs.append(f) # # # 方式1 # # 按照順序來獲取結果將導致一些任務可能已經完成了但是沒有及時獲取到結果 # # 例如第二個任務先執行完畢 但是必須要等到第一個執行結束才能拿到結果 # # for f in fs: # # print(f.result()) # # # shutdown 關閉進程池 會阻塞知道所有進程任務全部執行完畢 # # pool.shutdown(wait=True) # for f in fs: # print(f.result()) # #關閉之后不能提交新任務 # # pool.submit(task,10) # # print('over') View Code
?
同步和阻塞的區別:
阻塞一定意味著CPU切走了
而同步有可能是因為計算任務比較耗時
獲取異步任務結果的方式
爬蟲:
1.獲取到HTML文檔
2.從文檔中取出需要的數據 # from concurrent.futures import ThreadPoolExecutor # import requests # import threading # # # #生產 # def get_data(url): # print("%s正在處理%s" % (threading.current_thread().name, url)) # resp = requests.get(url) # print("%s獲取完成" % url) # # 方式3 直接調用處理函數 生產和消費者 強行耦合在一起 # # parser_data(resp.text,url) # # return resp.text,url # # #消費 # def parser_data(f): # res = f.result() # print("解析長度為:%s,地址:%s" % (len(res[0]),res[1])) # print("當前線程:%s" % threading.current_thread().name) # # # # 要爬取的地址列表 # urls = ["https://www.baidu.com/","https://www.bilibili.com","https://www.csdn.net"] # # pool = ThreadPoolExecutor() # # # # fs = [] # for url in urls: # f = pool.submit(get_data,url) #提交任務 # f.add_done_callback(parser_data) # 綁定一個回調函數# 方式1 把并發變成了串行 # data = f.result() # 獲取結果 # parser_data(data) # 解析結果 # fs.append(f)# 方式2 必須等待全部完成 不能及時處理 # pool.shutdown() # for f in fs: # data = f.result() # parser_data(data[0],data[1]) 爬蟲
?
異步回調
回調:就是回調函數
給異步任務綁定一個函數 當任務完成時會自動調用該函數
具體使用:
當你往poll中添加了一個異步任務,會返回一個表示結果的對象
有一個對象綁定方法 add_done_callback(綁定一個回調函數 會在任務結束后自動調入) 需要一個函數作為參數
注意: 回調函數 必須有且只有一個參數 就是對象本身
通過對象.result來獲取結果
回調函數交給子線程執行 誰有空誰處理
優點:不用原地等待 任務結果可以立即獲取到 # from threading import Thread # from multiprocessing import Process # # def task(callback): # print("run......") # callback() # # # def callback(): # print("這是回調函數.....") # # # if __name__ == '__main__': # t = Process(target=task,args=(callback,)) # t.start() # # from concurrent.futures import ProcessPoolExecutor # import os # # # def task(): # print("task....run") # # def call_back(obj): # print(os.getpid()) # print(obj.result()) # # if __name__ == '__main__': # print("主:",os.getpid()) # pool = ProcessPoolExecutor() # f = pool.submit(task) # f.add_done_callback(call_back) 如何為線程進程綁定回調函數
?
線程隊列: import queue queue該模塊下提供了一些常用的數據容器 但是他們僅僅是容器 每一個數據共享這個提點 from queue import Queue,LifoQueue,PriorityQueue # q = Queue() # q.put(1) # q.put(2) # print(q.get()) # print(q.get()) # print(q.get(timeout=2))
Lifoqueue() 后進先出 跟堆棧相似 # q = LifoQueue() # q.put(1) # # q.put(2) # # print(q.get())
優先級隊列
需要傳入一個元組類型 第一個是優先級 第二個是值 # q = PriorityQueue() # # q.put((-111111,"abc")) # # q.put((2,"abc")) # q.put(("a","hello world1")) # q.put(("A","hello world2")) # # q.put(("c","hello world")) # print(q.get())
?
事件是一個通知信息 表示什么時間發生了什么事情
用于線程間通訊
線程間 本來就是數據共享的 也就是說 即使沒有事件這個東西 也是沒有問題的
線程之間,執行流程是完全獨立的 一些時候可能需要知道另一個線程發生了什么然后采取一些行動
事件其實就是幫你維護了一個bool值
在bool為True之前 wait函數將一直阻塞 這樣一來就避免了不斷地詢問對方的狀態
假設有兩條線程 一個用于開啟服務器 一個用于連接服務器
連接服務器一定要保證 服務器已經開啟成功了 服務器啟動需要花費一些時間
from threading import Thread,Event import time,random # 不使用事件 # 默認未啟動 # is_boot=False # # def boot_server(): # global is_boot # print('啟動服務器') # time.sleep(random.randint(1,3)) # print('服務器啟動成功') # is_boot=True # # # def connect_server(): # while True: # if is_boot: # print('連接成功!') # break # else: # print('連接失敗') # time.sleep(1) # # t1 = Thread(target=boot_server) # t1.start() # # t2 = Thread(target=connect_server) # t2.start()# 使用事件 # 一個事件 # boot = Event() # # def boot_server(): # print("正在啟動服務器.......") # time.sleep(random.randint(2,5)) # print("服務器啟動成功.......") # boot.set() # # # def connect_server(): # print("開始嘗試連接.....") # boot.wait() # 是一個阻塞函數 會一直等到set()函數被調用 # print("連接服務器成功!") # # t1 = Thread(target=boot_server) # t1.start() # # t2 = Thread(target=connect_server) # t2.start() 事件
協程就是要用單線程實現并發
GIL導致多個線程不能并行 效率低
Cpython中多個線程不能并行
既然多個線程也不乏提高執行效率 還有沒有必要開線程
例子:只有一個崗位 但是有十個任務 請十個人也提高不了效率 反而增加了系統開銷
現在只有一個線程 那要如何并發的處理十個任務
一個線程如何能并發?
多道技術
切換+保存狀態
首先任務其實就是一堆代碼 一堆代碼可以組成一個函數
如何能使得可以再多個函數之間進行切換
協程指的就是一個線程完成并發執行
用于提高效率 在檢測到IO操作時 切換到其他的非IO操作 這樣一來 在操作系統眼里程序依舊沒有阻塞
力求盡可能多的占用CPU執行權
固定使用場景:IO密集型任務
如果任務在CPython中 是計算密集型 使用協程是無法提交效率的 反而因為切換任務導致效率降低 只能靠進程了
IO密集型 多線程會比多進程效率高 因為線程的開銷比進程小很多
本質上協程還是只有一個線程 所以 一旦遇到IO操作 整個線程就卡住了
協程僅在以下場景能夠提交效率
1.任務是IO密集型
2.一定要可以檢測IO操作并且在IO即將阻塞時 切換到計算任務 從而使得CPU盡可能多的執行你的線程
TCP的服務器端
10W線程 10個客戶端
from gevent import monkey,spawn;monkey.patch_all()import time def task():print("task run")time.sleep(3)print("task over")def task2():print("task2 run")time.sleep(3)print("task2 over")spawn(task) spawn(task2)print("over") time.sleep(2) 協程join的用法
?
greenlet 是對yield進行了封裝
簡化了書寫 但是他不能檢測IO操作 # import time # # def task1(): # for i in range(10000): # 1+1 # yield # # def task2(): # for i in range(10000): # g=task1() # 1+1 # next(g) # # # start_time=time.time() # task2() # print(time.time()-start_time)
?
greenlet 無法檢測IO操作 gevent 即可單線程實現并發 又可以檢測IO操作 import greenlet import timedef task1():print("task1 run....")time.sleep(20)g2.switch()print("task1 over....")def task2():print("task2 run....")print("task2 over....")g1.switch()g1 = greenlet.greenlet(task1) g2 = greenlet.greenlet(task2) g1.switch() # greenlet 無法檢測IO操作 # gevent 即可單線程實現并發又可以檢測IO操作 # import gevent # import gevent.monkey # # 打補丁的代碼必須放在 導入模塊之前 # # # import time # import threading # # # def task1(a,b): # print(a,b) # # for i in range(1000): # print("task1") # print(threading.current_thread()) # time.sleep(1) # # # def task2(): # # for i in range(1000): # print("task2") # print(threading.current_thread()) # # # g1 = gevent.spawn(task1,123,321) # g2 = gevent.spawn(task2) # # # 協程創建完成后 必須調用join函數 否則任務不會開啟 可以理解為線程中的start函數 # # g.start() # # # g1.join() # # g2.join() # gevent.joinall([g1,g2]) # print("over") View Code形式 打補丁的代碼必須放在模塊之前 import gevent def task():.... g=gevent.spawn(task) g.join() print('over')
協程創建完成后必須調用join函數 否則任務不會開啟 可以理解為線程中的start函數
# import gevent.monkey # gevent.monkey.patch_all() # import socket # import time # # def task(): # s = socket.socket() # s.bind(("127.0.0.1",8989)) # s.listen() # while True: # c,addr = s.accept() # g2 = gevent.spawn(handler,c) # # g2.join() # # def handler(c): # while True: # print(c.recv(1024).decode("utf-8")) # # # # def task2(): # # while True: # # time.sleep(1) # # print("-----------") # # # g1 = gevent.spawn(task) # g1.join() gevent
?
?轉載于:https://www.cnblogs.com/gengbinjia/p/10511707.html
總結
以上是生活随笔為你收集整理的同步异步 阻塞 非阻塞 异步调用 线程队列 协程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C语言(第二章):数据类型、运算符、表达
- 下一篇: python学习—python中的引用本