python的进程线程和协程_python成长之路 :线程、进程和协程
python線程
進程與線程的歷史
我們都知道計算機是由硬件和軟件組成的。硬件中的CPU是計算機的核心,它承擔計算機的所有任務。 操作系統是運行在硬件之上的軟件,是計算機的管理者,它負責資源的管理和分配、任務的調度。 程序是運行在系統上的具有某種功能的軟件,比如說瀏覽器,音樂播放器等。 每次執行程序的時候,都會完成一定的功能,比如說瀏覽器幫我們打開網頁,為了保證其獨立性,就需要一個專門的管理和控制執行程序的數據結構——進程控制 塊。?進程就是一個程序在一個數據集上的一次動態執行過程。?進程一般由程序、數據集、進程控制塊三部分組成。我們編 寫的程序用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程中所需要使用的資源;進程控制塊用來記錄進程的外部特征,描述進程的執行變化 過程,系統可以利用它來控制和管理進程,它是系統感知進程存在的唯一標志。
在早期的操作系統里,計算機只有一個核心,進程執行程序的最小單位,任務調度采用時間片輪轉的搶占式方式進行進程調度。每個進程都有各自的一塊獨立的內 存,保證進程彼此間的內存地址空間的隔離。 隨著計算機技術的發展,進程出現了很多弊端,一是進程的創建、撤銷和切換的開銷比較大,二是由于對稱多處理機(對稱多處理機 (SymmetricalMulti-Processing)又叫SMP,是指在一個計算機上匯集了一組處理器(多CPU),各CPU之間共享內存子系統 以及總線結構)的出現,可以滿足多個運行單位,而多進程并行開銷過大。 這個時候就引入了線程的概念。 線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程中的最小單元,由線程ID、程序計數器、寄存器集合 和堆棧共同組成。線程的引入減小了程序并發執行時的開銷,提高了操作系統的并發性能。 線程沒有自己的系統資源,只擁有在運行時必不可少的資源。但線程可以與同屬與同一進程的其他線程共享進程所擁有的其他資源。
進程與線程之間的關系
線程是屬于進程的,線程運行在進程空間內,同一進程所產生的線程共享同一內存空間,當進程退出時該進程所產生的線程都會被強制退出并清除。線程可與屬于同 一進程的其它線程共享進程所擁有的全部資源,但是其本身基本上不擁有系統資源,只擁有一點在運行中必不可少的信息(如程序計數器、一組寄存器和棧)。
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 importthreading4 importtime,datetime5
6 defshow(arg):7 time.sleep(5)8 print('thread'+str(arg),time.time())9
10
11 for i in range(4):12 t = threading.Thread(target=show, args=(i,))13 #t.setName('name%s' % i)
14 #print(t.name)
15 #t.setDaemon(False) #默認是False, 如果改成True,主線程不等待子線程
16
17 t.start()18 t.run() #立即執行
19 t.join(2)20 print('main thread stop')
上述代碼創建了10個“前臺”線程,然后控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。
更多方法:
start ? ? ? ? ? ?線程準備就緒,等待CPU調度
setName ? ? ?為線程設置名稱
getName ? ? ?獲取線程名稱
setDaemon ? 設置為后臺線程或前臺線程(默認為False)
如果是后臺線程,主線程執行過程中,后臺線程也在進行,主線程執行完畢后,后臺線程不論成功與否,均停止
如果是前臺線程,主線程執行過程中,前臺線程也在進行,主線程執行完畢后,等待前臺線程也執行完成后,程序停止
join ? ? ? ? ? ? ?逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義
run ? ? ? ? ? ? ?線程被cpu調度后自動執行線程對象的run方法
線程鎖threading.RLock和threading.Lock
我們使用線程對數據進行操作的時候,如果多個線程同時修改某個數據,可能會出現不可預料的結果,為了保證數據的準確性,引入了鎖的概念
例:假設列表A的所有元素就為0,當一個線程從前向后打印列表的所有元素,另外一個線程則從后向前修改列表的元素為1,那么輸出的時候,列表的元素就會一部分為0,一部分為1,這就導致了數據的不一致。鎖的出現解決了這個問題。
1 #未加鎖
2 importthreading3 importtime4
5 gl_num =06
7 defshow(arg):8 globalgl_num9 time.sleep(1)10 gl_num += 1
11 print(gl_num)12
13 for i in range(10):14 t = threading.Thread(target=show, args=(i,))15 t.start()16
17 print('main thread stop')
未加鎖代碼
1 線程鎖2 importthreading3 importtime4
5 gl_num =06
7 lock = threading.RLock() #獲取一個鎖的對象
8
9 defFunc():10 lock.acquire() #創建一把鎖
11 global gl_num #聲明全局變量
12 gl_num +=1
13 time.sleep(1)14 print(gl_num)15 lock.release() #更改完釋放鎖
16
17 for i in range(10):18 t = threading.Thread(target=Func) #利用線程執行,執行10次 結果1,2,3,4,5,6.。
19 t.start()
線程鎖
threading.Event
Event是線程間通信最間的機制之一:一個線程發送一個event信號,其他的線程則等待這個信號。用于主線程控制其他線程的執行。 Events 管理一個flag,這個flag可以使用set()設置成True或者使用clear()重置為False,wait()則用于阻塞,在flag為 True之前。flag默認為False。
Event.wait([timeout]) : 堵塞線程,直到Event對象內部標識位被設為True或超時(如果提供了參數timeout)。
Event.set() :將標識位設為Ture
Event.clear() : 將標識伴設為False。
Event.isSet() :判斷標識位是否為Ture。
1 event2 importthreading3
4
5 def do(event): #定義一個函數,傳遞參數envent,其實就是傳遞了一個對象
6 print 'start'
7 event.wait() #True則不阻塞
8 print 'execute'
9
10
11 event_obj =threading.Event()12 for i in range(10):13 t = threading.Thread(target=do, args=(event_obj,)) #傳遞執行函數和參數
14 t.start()15
16 event_obj.clear() #將“Flag”設置為False
17 inp = raw_input('input:')18 if inp == 'true':19 event_obj.set() #set默認為True
event
queue模塊
Queue 就是對隊列,它是線程安全的
舉例來說,我們去肯德基吃飯。廚房是給我們做飯的地方,前臺負責把廚房做好的飯賣給顧客,顧客則去前臺領取做好的飯。這里的前臺就相當于我們的隊列。
這個模型也叫生產者-消費者模型。
復制代碼
import queue
q = queue.Queue(maxsize=0) # 構造一個先進顯出隊列,maxsize指定隊列長度,為0 時,表示隊列長度無限制。
q.join() # 等到隊列為kong的時候,在執行別的操作
q.qsize() # 返回隊列的大小 (不可靠)
q.empty() # 當隊列為空的時候,返回True 否則返回False (不可靠)
q.full() # 當隊列滿的時候,返回True,否則返回False (不可靠)
q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,可以參數block默認為True,表示當隊列滿時,會等待隊列給出可用位置,
為False時為非阻塞,此時如果隊列已滿,會引發queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,過后,
如果隊列無法給出放入item的位置,則引發 queue.Full 異常
q.get(block=True, timeout=None) # 移除并返回隊列頭部的一個值,可選參數block默認為True,表示獲取值的時候,如果隊列為空,則阻塞,為False時,不阻塞,
若此時隊列為空,則引發 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,過后,如果隊列為空,則引發Empty異常。
q.put_nowait(item) # 等效于 put(item,block=False)
q.get_nowait() # 等效于 get(item,block=False)
生產者--消費者:
1 #!/usr/bin/env python
2 importQueue3 importthreading4
5
6 message = Queue.Queue(10)7
8
9 defproducer(i):10 whileTrue:11 message.put(i)12
13
14 defconsumer(i):15 whileTrue:16 msg =message.get()17
18
19 for i in range(12):20 t = threading.Thread(target=producer, args=(i,))21 t.start()22
23 for i in range(10):24 t = threading.Thread(target=consumer, args=(i,))25 t.start()
生產者消費者模型
線程池
1 importqueue2 importthreading3 importcontextlib4 importtime5
6 StopEvent =object()7
8
9 classThreadPool(object):10
11 def __init__(self, max_num):12 self.q =queue.Queue()13 self.max_num =max_num14
15 self.terminal =False16 self.generate_list =[]17 self.free_list =[]18
19 def run(self, func, args, callback=None):20 """
21 線程池執行一個任務22 :param func: 任務函數23 :param args: 任務函數所需參數24 :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數)25 :return: 如果線程池已經終止,則返回True否則None26 """
27
28 if len(self.free_list) == 0 and len(self.generate_list) <29 self.generate_thread w="(func," args callback self.q.put>
33 defgenerate_thread(self):34 """
35 創建一個線程36 """
37 t = threading.Thread(target=self.call)38 t.start()39
40 @contextlib.contextmanager41 defworker_state(self, xxx, val):42 xxx.append(val)43 try:44 yield
45 finally:46 xxx.remove(val)47
48 defcall(self):49 """
50 循環去獲取任務函數并執行任務函數51 """
52 current_thread =threading.currentThread53 self.generate_list.append(current_thread)54
55 event =self.q.get()56 while event !=StopEvent:57
58 func, arguments, callback =event59 try:60 result = func(*arguments)61 status =True62 exceptException as e:63 status =False64 result =e65
66 if callback is notNone:67 try:68 callback(status, result)69 exceptException as e:70 pass
71
72 if self.terminal: #False
73 event =StopEvent74 else:75 #self.free_list.append(current_thread)
76 #event = self.q.get()
77 #self.free_list.remove(current_thread)
78 with self.worker_state(self.free_list, current_thread):79 event =self.q.get()80 else:81 self.generate_list.remove(current_thread)82
83 defclose(self):84 num =len(self.generate_list)85 whilenum:86 self.q.put(StopEvent)87 num -= 1
88
89 #終止線程(清空隊列)
90 defterminate(self):91
92 self.terminal =True93
94 whileself.generate_list:95 self.q.put(StopEvent)96 self.q.empty()97 importtime98
99 defwork(i):100 print(i,"----")101
102 pool = ThreadPool(10)103 for item in range(50):104 pool.run(func=work, args=(item,))105
106 pool.terminate()
完整版線程池
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 importqueue4 importthreading5 importcontextlib6 importtime7
8 StopEvent = object() #定義標記的意義在于任務結束后退出的標記
9
10
11 classThreadPool(object):12
13 def __init__(self, max_num):14 self.q = queue.Queue() #定義隊列無限大
15 self.max_num = max_num #定義最大值
16
17 self.terminal = False #定義為false
18 self.generate_list = [] #多少個進程正在執行
19 self.free_list = [] #定義空閑列表--空閑線程 初始化各種屬性
20
21 def run(self, func, args, callback=None):22 """
23 線程池執行一個任務24 :param func: 任務函數25 :param args: 任務函數所需參數26 :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數)27 :return: 如果線程池已經終止,則返回True否則None28 """
29
30 if len(self.free_list) == 0 and len(self.generate_list) <31 self.generate_thread w="(func," args callback self.q.put>
35 defgenerate_thread(self):36 """
37 創建一個線程38 """
39 t = threading.Thread(target=self.call)#并執行call方法
40 t.start()41
42 defcall(self):43 """
44 循環去獲取任務函數并執行任務函數45 """
46 current_thread = threading.currentThread #拿到當前線程
47 self.generate_list.append(current_thread) #添加到正在使用線程隊列
48
49 event = self.q.get() #--這里是一個獲取到的元組w=()....
50 while event !=StopEvent:51
52 func, arguments, callback = event #w= (func, args, callback)
53 try:54 result = func(*arguments) #執行任務 ret = aaa() --def aaa(): return 1
55 status =True56 except Exception as e: #如果我這個任務報錯
57 status =False58 result =e59
60 if callback is not None: #這是一個返回值
61 try:62 callback(status, result)63 exceptException as e:64 pass
65
66 self.free_list.append(current_thread) #---記得看上下文代碼,執行完任務,把這個線程放到空閑隊列里面
67 event = self.q.get()#當前的狀態應該是沒任務,線程等待任務,不結束
68 self.free_list.remove(current_thread) #獲取任務移除休息線程
69 else:70 if event =StopEvent:71 self.generate_list.remove(current_thread) #移除當前正在運行的線程,等他運行完
72
73 defclose(self):74 num =len(self.generate_list)75 whilenum:76 self.q.put(StopEvent)#77 num -= 1
78
79
80 importtime81
82 defwork(i):83 print(i)84
85 pool = ThreadPool(10) #定義最大線程為10個,實例化,并初始化
86 for item in range(50): #創建了50個任務
87 pool.run(func=work, args=(item,)) #執行work函數,args是傳參
88
89 pool.close() #關閉線程池
理解注釋版線程池
Python 進程
1 from multiprocessing importProcess2 importthreading3 importtime4
5 deffoo(i):6 print 'say hi',i7
8 for i in range(10):9 p = Process(target=foo,args=(i,))10 p.start()
注意:由于進程之間的數據需要各自持有一份,所以創建進程需要的非常大的開銷。
multiprocessing模塊
multiprocessing是python的多進程管理包,和threading.Thread類似。直接從側面用subprocesses替換線程使用GIL的方式,由于這一點,multiprocessing模塊可以讓程序員在給定的機器上充分的利用CPU。
在multiprocessing中,通過創建Process對象生成進程,然后調用它的start()方法,
各種參數的意思:
注意:
start() 開始進程
setDaemon(True):守護進程:mutilprocess.setDaemon(True)
terminate():退出進程:最好使用 poison pill,強制的使用terminate()
is_alive(): 進程創建之后,可以使用multiprocessing對象的is_alive方法查看線程是否運行,為True則運行
join():阻塞當前進程,直到調用join方法的那個進程執行完,再繼續執行當前進程。|? 或者也可以理解為: 進程池中進程執行完畢后在關閉, 如果注釋, 那么程序直接關閉
守護進程
守護進程就是不阻擋主程序退出,自己干自己的?mutilprocess.setDaemon(True)
就這句
等待守護進程退出,要加上join,join可以傳入浮點數值,等待n久就不等了
importmultiprocessingimporttimeimportsysdefdaemon():
name=multiprocessing.current_process().nameprint 'Starting:', name
time.sleep(2)print 'Exiting :', namedefnon_daemon():
name=multiprocessing.current_process().nameprint 'Starting:', nameprint 'Exiting :', nameif __name__ == '__main__':
d= multiprocessing.Process(name='daemon',
target=daemon)
d.daemon=True
n= multiprocessing.Process(name='non-daemon',
target=non_daemon)
n.daemon=False
d.start()
n.start()
d.join(1)print 'd.is_alive()', d.is_alive()
n.join()
View Code
終止進程
最好使用 poison pill,強制的使用terminate()
注意 terminate之后要join,使其可以更新狀態
importmultiprocessingimporttimedefslow_worker():print 'Starting worker'time.sleep(0.1)print 'Finished worker'
if __name__ == '__main__':
p= multiprocessing.Process(target=slow_worker)print 'BEFORE:', p, p.is_alive()
p.start()print 'DURING:', p, p.is_alive()
p.terminate()print 'TERMINATED:', p, p.is_alive()
p.join()print 'JOINED:', p, p.is_alive()
簡單示例:
1 from multiprocessing importProcess2
3 deff(name):4 print('hello', name)5
6 if __name__ == '__main__':7 p = Process(target=f, args=('bob',))8 p.start()9 p.join()
join()方法的示例:
from multiprocessing importProcessimportos, time, randomdefr1(process_name):for i in range(5):print process_name, os.getpid() #打印出當前進程的id
time.sleep(random.random())defr2(process_name):for i in range(5):print process_name, os.getpid() #打印出當前進程的id
time.sleep(random.random())if __name__ == "__main__":print "main process run..."p1= Process(target=r1, args=('process_name1', ))
p2= Process(target=r2, args=('process_name2', ))
p1.start()
p2.start()#p1.join()
#p2.join()
print "main process runned all lines..."
進程數據共享
進程各自持有一份數據,默認無法共享數據
1 #!/usr/bin/env python
2 #coding:utf-8
3
4 from multiprocessing importProcess5 from multiprocessing importManager6
7 importtime8
9 li =[]10
11 deffoo(i):12 li.append(i)13 print 'say hi',li14
15 for i in range(10):16 p = Process(target=foo,args=(i,))17 p.start()18
19 print 'ending',li
進程間無數據共享
1 #方法一,Array
2 from multiprocessing importProcess,Array3 temp = Array('i', [11,22,33,44])4
5 defFoo(i):6 temp[i] = 100+i7 for item intemp:8 print i,'----->',item9
10 for i in range(2):11 p = Process(target=Foo,args=(i,))12 p.start()13
14 #方法二:manage.dict()共享數據
15 from multiprocessing importProcess,Manager16
17 manage =Manager()18 dic =manage.dict()19
20 defFoo(i):21 dic[i] = 100+i22 printdic.values()23
24 for i in range(2):25 p = Process(target=Foo,args=(i,))26 p.start()27 p.join()
1 'c': ctypes.c_char, 'u': ctypes.c_wchar,2 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,3 'h': ctypes.c_short, 'H': ctypes.c_ushort,4 'i': ctypes.c_int, 'I': ctypes.c_uint,5 'l': ctypes.c_long, 'L': ctypes.c_ulong,6 'f': ctypes.c_float, 'd': ctypes.c_double
當創建進程時(非使用時),共享數據會被拿到子進程中,當進程中執行完畢后,再賦值給原值。
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3
4 from multiprocessing importProcess, Array, RLock5
6 defFoo(lock,temp,i):7 """
8 將第0個數加1009 """
10 lock.acquire()11 temp[0] = 100+i12 for item intemp:13 print i,'----->',item14 lock.release()15
16 lock =RLock()17 temp = Array('i', [11, 22, 33, 44])18
19 for i in range(20):20 p = Process(target=Foo,args=(lock,temp,i,))21 p.start()
進程鎖實例
進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
apply
apply_async
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3 from multiprocessing importProcess,Pool4 importtime5
6 defFoo(i):7 time.sleep(2)8 return i+100
9
10 defBar(arg):11 printarg12
13 pool = Pool(5) #進程池14 #print pool.apply(Foo,(1,))
15 #print pool.apply_async(func =Foo, args=(1,)).get()
16
17 for i in range(10):18 pool.apply_async(func=Foo, args=(i,),callback=Bar)#但它是非阻塞且支持結果返回進行回調 (回調 ret = pool.apply....)19
20 print 'end'
21 pool.close()#關閉進程池,不再接受新的進程
22 pool.join()#進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
1 #apply和apply_async
2
3
4 from multiprocessing importPool5 importtime6
7 deff1(a):8 time.sleep(1)9 print(a)10 return 1000
11 deff2(arg):12 print(arg)13
14 if __name__ == '__main__':15 pool = Pool(5)16 for i in range(10):17 pool.apply_async(func=f1, args=(i,), callback=f2)18 #pool.apply(func=f1, args=(i,))
19 print('1111111')20
21 pool.close()22 pool.join()
window環境代碼
協程
線程和進程的操作是由程序觸發系統接口,最后的執行者是系統;協程的操作則是程序員。
協程存在的意義:對于多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用于協程;
greenlet
1 #!/usr/bin/env python
2 #-*- coding:utf-8 -*-
3
4
5 from greenlet importgreenlet6
7
8 deftest1():9 print 12
10 gr2.switch()11 print 34
12 gr2.switch()13
14
15 deftest2():16 print 56
17 gr1.switch()18 print 78
19
20 gr1 =greenlet(test1)21 gr2 =greenlet(test2)22 gr1.switch()
gevent
1 importgevent2
3 deffoo():4 print('Running in foo')5 gevent.sleep(0)6 print('Explicit context switch to foo again')7
8 defbar():9 print('Explicit context to bar')10 gevent.sleep(0)11 print('Implicit context switch back to bar')12
13 gevent.joinall([14 gevent.spawn(foo),15 gevent.spawn(bar),16 ])
遇到IO操作自動切換:
1 from gevent importmonkey; monkey.patch_all()2 importgevent3 importurllib24
5 deff(url):6 print('GET: %s' %url)7 resp =urllib2.urlopen(url)8 data =resp.read()9 print('%d bytes received from %s.' %(len(data), url))10
11 gevent.joinall([12 gevent.spawn(f, 'https://www.python.org/'),13 gevent.spawn(f, 'https://www.yahoo.com/'),14 gevent.spawn(f, 'https://github.com/'),15 ])
View Code
31>29>總結
以上是生活随笔為你收集整理的python的进程线程和协程_python成长之路 :线程、进程和协程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hive xmlserde_hive多分
- 下一篇: java 切换主线程_Java线程状态及