python用input输入列表有缺陷_Python 三程三器的那些事
裝飾器
1、什么是裝飾器
裝飾器本質是函數,用來給其他函數添加新的功能
特點:不修改調用方式、不修改源代碼
2、裝飾器的作用
裝飾器作用:本質是函數(裝飾其他函數)就是為其他函數添加其他功能
裝飾器必須準尋得原則:
不能修改被裝飾函數的源代碼、不能修改被裝飾函數的調用方式
實現裝飾器知識儲備:
函數即“變量”
高階函數
嵌套函數 高階函數+潛逃函數=》裝飾器
3、使用高階函數模仿裝飾器功能
1.定義:把一個函數名當做實參傳給另一個函數
2.返回值中包含函數名
3.下面使用高階函數雖然可以實現裝飾器的一些功能,但是違反了裝飾器不能改變調用方式的原則,
以前使用bar()現在將調用方式改編成了test1(bar)就是將bar的函數名當做變量傳給了test1()
#! /usr/bin/env python#-*- coding: utf-8 -*-
importtimedeftimer(func):
start_time=time.time()
func()print '函數執行時間為', time.time() -start_timedeftest():print '開始執行test'time.sleep(3)print 'test執行結束'timer(test)'''開始執行test
test執行結束
函數執行時間為 3.00332999229'''
改變了調用方式
4.高階函數——不修改高階函數的調用方式增加新的功能(但是無法傳參數)
注:bar = test2(bar) 等價于:@timer重新將函數名bar賦值,將原函數bar的內存地址當做實參傳遞該函數test2(),再將test2()賦值給bar
importtimedefbar():
time.sleep(3)print("in the bar")deftest2(func):print(func)returnfunc
bar=test2(bar)
bar()
不改變調用方式
5.嵌套函數
嵌套函數:在一個函數中嵌套另一個函數,并在函數內部調用
deffoo():print("in the foo")defbar():print("in the bar")
bar()
foo()
嵌套函數
4、能夠適應90%的業務需求
在裝飾器中?@timer等價于?test1=timer(test1)
在timer()函數中返回值是returndeco
所以timer(test1)作用是將函數test1內存地址當做參數傳遞給timer()
timer() 函數最后將運行后的函數deco內存地址作為返回值返回
test1=timer(test1)作用就是將將deco函數內存地址賦值給test1,所以最后運行test1()就相當于運行deco()
所以最后調用時給test2()傳入參數就相當于給deco傳入參數
importtimedef timer(func): #timer(test1) func=test1
def deco(*args,**kwargs):
start_time=time.time()
func(*args,**kwargs) #run test1
stop_time =time.time()print("running time is %s"%(stop_time-start_time))returndeco
@timer#test1=timer(test1)
deftest1():
time.sleep(3)print("in the test1")
@timerdeftest2(name):print("in the test2",name)
test1()
test2("tom")
裝飾器1
5、對特定網頁進行身份驗證
importtime
user,passwd= 'aaa','123'
defauth(func):def wrapper(*args,**kwargs):
username= input("Username:").strip()
password= input("Password:").strip()if user == username and password ==passwd:print("User has passed authentication")
res= func(*args,**kwargs) #這里執行func()相當于執行調用的函數如home()
return res #為了獲得home()函數返回值,可以將執行結果賦值給res然后返回print(home())結果是"from home"而不是"None"了
else:
exit("Invalid username or password")returnwrapperdefindex():print("welcome to index page")
@authdefhome():print("welcome to home page")return "from home"@authdefbbs():print("welcome to bbs page")
index()print(home()) #在這里調用home()相當于調用wrapper()
bbs()
裝飾器2
6、實現對不同網頁不同方式的身份認證
@auth(auth_type="local")代碼作用
在上面的代碼中使用@auth相當于先將home函數的內存地址當做變量傳入auth()函數,執行結果后home()相當于wrapper()
而在這里驗證的時候猶豫@auth(auth_type="local")中有()括號,那么就相當于將執行auth()函數而且是將auth_type="local當做參數傳入到auth()函數執行
所以outer_wrapper函數也會執行,outer_wrapper函數的執行結果返回的就是wrapper()函數的內存地址
所以最終結果同樣是執行home()函數就相當于執行wrapper函數
但是有所不同的是著這個版本中我們可以在外層的auth函數中傳入新的參數幫組我們根據需求判斷
importtime
user,passwd= 'aaa','123'
defauth(auth_type):print("auth func:",auth_type)defouter_wrapper(func):def wrapper(*args, **kwargs):print("wrapper func args:", *args, **kwargs)if auth_type == "local":
username= input("Username:").strip()
password= input("Password:").strip()if user == username and passwd ==password:print("\033[32;1mUser has passed authentication\033[0m")
res= func(*args, **kwargs) #from home
print("---after authenticaion")returnreselse:
exit("\033[31;1mInvalid username or password\033[0m")elif auth_type == "ldap":print("搞毛線ldap,不會。。。。")returnwrapperreturnouter_wrapperdefindex():print("welcome to index page")
@auth(auth_type="local") #home = wrapper()
defhome():print("welcome to home page")return "from home"@auth(auth_type="ldap")defbbs():print("welcome to bbs page")
index()print(home()) #wrapper()
bbs()
裝飾器3
#! /usr/bin/env python#-*- coding: utf-8 -*-
importtimedefauth(auth_type):print("auth func:",auth_type)defouter_wrapper(func):def wrapper(*args, **kwargs):print("wrapper func args:", *args, **kwargs)print('運行前')
func(*args, **kwargs)print('運行后')returnwrapperreturnouter_wrapper
@auth(auth_type="local") #home = wrapper()
defhome():print("welcome to home page")return "from home"home()
三級裝飾器簡寫
7、使用閉包實現裝飾器功能
閉包概念:
在一個外函數中定義了一個內函數,內函數里運用了外函數的臨時變量,并且外函數的返回值是內函數的引用,這樣就構成了一個閉包
一般情況下,在我們認知當中,如果一個函數結束,函數的內部所有東西都會釋放掉,還給內存,局部變量都會消失。
但是閉包是一種特殊情況,如果外函數在結束的時候發現有自己的臨時變量將來會在內部函數中用到,就把這個臨時變量綁定給了內部函數,然后自己再結束。
#! /usr/bin/env python#-*- coding: utf-8 -*-
importtimedef timer(func): #timer(test1) func=test1
def deco(*args,**kwargs): ## 函數嵌套
start_time =time.time()
func(*args,**kwargs) #跨域訪問,引用了外部變量func (func實質是函數內存地址)
stop_time =time.time()print "running time is %s"%(stop_time-start_time)return deco #內層函數作為外層函數返回值
deftest(name):print "in the test2",name
time.sleep(2)
test= timer(test) #等價于 ==》 @timer語法糖
test("tom")'''運行結果:
in the test2 tom
running time is 2.00302696228'''
閉包實現裝飾器功能
生成器
1、什么是生成器
生成器就是一個特殊的迭代器
一個有yield關鍵字的函數就是一個生成器
生成器是這樣一個函數,它記住上一次返回時在函數體中的位置。
對生成器函數的第二次(或第 n 次)調用跳轉至該函數中間,而上次調用的所有局部變量都保持不變。
2、定義
生成器,即生成一個容器。
在Python中,一邊循環,一邊計算的機制,稱為生成器。
生成器可以理解為一種數據類型,這種數據類型自動實現了迭代器協議(其他數據類型需要調用自己的內置iter()方法或__iter__()的內置函數),
所以,生成器就是一個可迭代對象。
3、生成器哪些場景應用
生成器是一個概念,我們平常寫代碼可能用的并不多,但是python源碼大量使用
比如我們tornado框架就是基于 生成器+協程
在我們代碼中使用舉例
比如我們要生成一百萬個數據,如果用生成器非常節省空間,用列表浪費大量空間
importtime
t1=time.time()
g= (i for i in range(100000000))
t2=time.time()
lst= [i for i in range(100000000)]
t3=time.time()print('生成器時間:',t2 - t1) #生成器時間: 0.0
print('列表時間:',t3 - t2) #列表時間: 5.821957349777222
4、生成器的作用
通過列表生成式,我們可以直接創建一個列表,但是,受到內存限制,列表容量肯定是有限的。
而且,創建一個包含100萬個元素的列表,不僅占用很大的存儲空間,如果我們僅僅需要訪問前面幾個元素,那后面絕大多數元素占用的空間都白白浪費了。
所以,如果列表元素可以按照某種算法推算出來,那我們是否可以在循環的過程中不斷推算出后續的元素呢?
這樣就不必創建完整的list,從而節省大量的空間。在Python中,這種一邊循環一邊計算的機制,稱為生成器:generator。
要創建一個generator,有很多種方法,第一種方法很簡單,只要把一個列表生成式的[]改成(),就創建了一個generator:
print( [i*2 for i in range(10)] ) #列表生成式: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
print( (i*2 for i in range(10)) ) #生 成 器: at 0x005A3690>
我們可以直接打印出list的每一個元素,但我們怎么打印出generator的每一個元素呢?
如果要一個一個打印出來,可以通過next()函數獲得generator的下一個返回值:
g = (i*2 for i in range(10))print( g.__next__() ) #0
print( g.__next__() ) #2
5、生成器工作原理
生成器是這樣一個函數,它記住上一次返回時在函數體中的位置。
對生成器函數的第二次(或第?n?次)調用跳轉至該函數中間,而上次調用的所有局部變量都保持不變。
生成器不僅“記住”了它數據狀態;生成器還“記住”了它在流控制構造(在命令式編程中,這種構造不只是數據值)中的位置。
生成器是一個函數,而且函數的參數都會保留。
迭代到下一次的調用時,所使用的參數都是第一次所保留下的,即是說,在整個所有函數調用的參數都是第一次所調用時保留的,而不是新創建的
6、yield生成器運行機制
在Python中,yield就是這樣的一個生成器。
當你問生成器要一個數時,生成器會執行,直至出現?yield?語句,生成器把yield?的參數給你,之后生成器就不會往下繼續運行。
當你問他要下一個數時,他會從上次的狀態開始運行,直至出現yield語句,把參數給你,之后停下。如此反復
在python中,當你定義一個函數,使用了yield關鍵字時,這個函數就是一個生成器
它的執行會和其他普通的函數有很多不同,函數返回的是一個對象,而不是你平常所用return語句那樣,能得到結果值。如果想取得值,那得調用next()函數
每當調用一次迭代器的next函數,生成器函數運行到yield之處,返回yield后面的值且在這個地方暫停,所有的狀態都會被保持住,直到下次next函數被調用,或者碰到異常循環退出。
deffib(max_num):
a,b= 1,1
while a
a,b=b,a+b
g= fib(10) #生成一個生成器:[1,2, 3, 5, 8, 13]
print(g.__next__()) #第一次調用返回:1
print(list(g)) #把剩下元素變成列表:[2, 3, 5, 8, 13]
yield實現fib數
7、yield實現單線程下的并發效果
yield相當于 return 返回一個值,并且記住這個返回的位置,下次迭代時,代碼從yield的下一條語句開始執行。
send() 和next()一樣,都能讓生成器繼續往下走一步(下次遇到yield停),但send()能傳一個值,這個值作為yield表達式整體的結果
defconsumer(name):print("%s 準備吃包子啦!" %name)whileTrue:
baozi= yield
print("包子[%s]來了,被[%s]吃了!" %(baozi,name))
c= consumer("Tom")
c.__next__()
b1= "韭菜餡包子"c.send(b1)#c.send(b1)作用:#c.send()的作用是給yied的傳遞一個值,并且每次調用c.send()的同時自動調用一次__next__
'''運行結果:
Tom 準備吃包子啦!
包子[韭菜餡包子]來了,被[Tom]吃了!'''
一次調用
importtimedefconsumer(name):print("%s 準備吃包子啦!" %name)whileTrue:
baozi= yield
print("包子[%s]來了,被[%s]吃了!" %(baozi,name))defproducer(name):
c= consumer('A')
c2= consumer('B')
c.__next__()
c2.__next__()print("老子開始準備做包子啦!")for i in range(10):
time.sleep(1)print("做了2個包子!")
c.send(i)
c2.send(i)
producer("alex")'''運行結果:
A 準備吃包子啦!
B 準備吃包子啦!
老子開始準備做包子啦!
做了2個包子!
包子[0]來了,被[A]吃了!
包子[0]來了,被[B]吃了!
做了2個包子!
包子[1]來了,被[A]吃了!
包子[1]來了,被[B]吃了!
做了2個包子!
包子[2]來了,被[A]吃了!
包子[2]來了,被[B]吃了!
做了2個包子!
包子[3]來了,被[A]吃了!
包子[3]來了,被[B]吃了!
做了2個包子!
包子[4]來了,被[A]吃了!
包子[4]來了,被[B]吃了!
做了2個包子!
包子[5]來了,被[A]吃了!
包子[5]來了,被[B]吃了!'''
for 循環調用
迭代器
1、什么是迭代器
迭代器是訪問集合內元素的一種方法
總是從集合內第一個元素訪問,直到所有元素都被訪問過結束,當調用 __next__而元素返回會引發一個,StopIteration異常
有兩個方法:_iter_ _next_
_iter_ : 返回迭代器自身
_next_: 返回下一個元素
2、定義:
迭代器是訪問集合內元素的一種方式。迭代器對象從集合的第一個元素開始訪問,直到所有的元素都被訪問一遍后結束。
3、迭代器和可迭代對象
凡是可作用于for循環的對象都是可迭代的(Iterable)類型;
凡是可作用于next()函數的對象都是迭代器(Iterator)類型,它們表示一個惰性計算的序列;
集合數據類型如list、dict、str等是可迭代的但不是迭代器,不過可以通過iter()函數獲得一個Iterator對象。
Python的for循環本質上就是通過不斷調用next()函數實現的
總結:?一個實現了__iter__方法的對象是可迭代的,一個實現next方法的對象是迭代器
4、迭代器的兩個方法
迭代器僅是一容器對象,它實現了迭代器協議。它有兩個基本方法
__next__方法:返回容器的下一個元素
__iter__方法:返回迭代器自身
迭代器是訪問集合內元素的一種方式。迭代器對象從集合的第一個元素開始訪問,直到所有的元素都被訪問一遍后結束。
__iter__方法會返回一個迭代器(iterator),所謂的迭代器就是具有next方法的對象。
在調用next方法時,迭代器會返回它的下一個值,如果next方法被調用,但迭代器中沒有值可以返就會引發一個StopIteration異常
a = iter([1,2,]) #生成一個迭代器
print(a.__next__())print(a.__next__())print(a.__next__()) #在這一步會引發 “StopIteration” 的異常
5、判斷是迭代器和可迭代對象
注:列表,元組,字典是可迭代的但不是迭代器
from collections importIterableprint(isinstance([],Iterable)) #True
print(isinstance({},Iterable)) #True
print(isinstance((),Iterable)) #True
print(isinstance("aaa",Iterable)) #True
print(isinstance((x for x in range(10)),Iterable)) #True
相關代碼
6、列表不是迭代器,只有生成器是迭代器
from collections importIterator
t= [1,2,3,4]print(isinstance(t,Iterator)) #False
t1 =iter(t)print(isinstance(t1,Iterator)) #True
相關代碼
7、自定義迭代器
#! /usr/bin/env python#-*- coding: utf-8 -*-
classMyRange(object):def __init__(self, n):
self.idx=0
self.n=ndef __iter__(self):returnselfdefnext(self):if self.idx
val=self.idx
self.idx+= 1
returnself.n[val]else:raiseStopIteration()
l= [4,5,6,7,8]
obj=MyRange(l)print obj.next() #4
print obj.next() #5
print obj.next() #6
自定義迭代器
8、迭代器與生成器
#! /usr/bin/env python#-*- coding: utf-8 -*
l = [1,2,3,4,5] #列表是一個可迭代對象,不是一個迭代器
print dir(l) #所以 l 中有 __iter__() 方法,沒有 __next__()方法
iter_obj = l.__iter__() #__iter__()方法返回迭代器對象本身(這個迭代器對象就會有 next 方法了)
print '###################################\n'
print iter_obj.next() #1
print iter_obj.next() #2
print iter_obj.next() #3
相關代碼
進程與線程的簡介
1、什么是進程(process)?(進程是資源集合)
2、進程是資源分配的最小單位( 內存、cpu、網絡、io)
3、一個運行起來的程序就是一個進程
什么是程序(程序是我們存儲在硬盤里的代碼)
硬盤(256G)、內存條(8G)
當我們雙擊圖標,打開程序的時候,實際上就是通過I/O操作(讀寫)內存條里面
內存條就是我們所指的資源
CPU分時
CPU比你的手速快多了,分時處理每個線程,但是由于太快然你覺得每個線程都是獨占cpu
cpu是計算,只有時間片到了,獲取cpu,線程真正執行
當你想使用 網絡、磁盤等資源的時候,需要cpu的調度
進程具有獨立的內存空間,所以沒有辦法相互通信
進程如何通信
進程queue(父子進程通信)
pipe(同一程序下兩個進程通信)
managers(同一程序下多個進程通信)
RabbitMQ、redis等(不同程序間通信)
為什么需要進程池
一次性開啟指定數量的進程
如果有十個進程,有一百個任務,一次可以處理多少個(一次性只能處理十個)
防止進程開啟數量過多導致服務器壓力過大
2、定義:進程是資源分配最小單位
當一個可執行程序被系統執行(分配內存資源)就變成了一個進程
程序并不能單獨運行,只有將程序裝載到內存中,系統為它分配資源才能運行,這種執行的程序就稱之為進程
程序和進程的區別就在于:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬于動態概念
在多道編程中,我們允許多個程序同時加載到內存中,在操作系統的調度下,可以實現并發地執行。
進程的出現讓每個用戶感覺到自己獨享CPU,因此,進程就是為了在CPU上實現多道編程而提出的。
進程之間有自己獨立的內存,各進程之間不能相互訪問
創建一個新線程很簡單,創建新進程需要對父進程進行復制
多道編程: 在計算機內存中同時存放幾道相互獨立的程序,他們共享系統資源,相互穿插運行
單道編程: 計算機內存中只允許一個的程序運行
3、進程并發性:
在一個系統中,同時會存在多個進程被加載到內存中,同處于開始到結束之間的狀態
對于一個單CPU系統來說,程序同時處于運行狀態只是一種宏觀上的概念
他們雖然都已經開始運行,但就微觀而言,任意時刻,CPU上運行的程序只有一個
由于操作系統分時,讓每個進程都覺得自己獨占CPU等資源
注:如果是多核CPU(處理器)實際上是可以實現正在意義的同一時間點有多個線程同時運行
4、線程并發性:
操作系統將時間劃分為很多時間段,盡可能的均勻分配給每一個線程。
獲取到時間片的線程被CPU執行,其他則一直在等待,所以微觀上是走走停停,宏觀上都在運行。
多核CPU情況:
如果你的程序的線程數少于CPU的核心數,且系統此時沒有其他進程同時運行,那么這個程序的每個線程會享有一個CPU,
當同時運行的線程數多于CPU核心數時,CPU會采用一定的調度算法每隔一段時間就將這些線程調入或調出CPU
以確保每個線程都能分享一部分CPU時間,實現多線程并發。
5、有了進程為什么還要線程?
1.進程優點:
提供了多道編程,讓我們感覺我們每個人都擁有自己的CPU和其他資源,可以提高計算機的利用率
2. 進程的兩個重要缺點
進程只能在一個時間干一件事,如果想同時干兩件事或多件事,進程就無能為力了。
進程在執行的過程中如果阻塞,即使進程中有些工作不依賴于輸入的數據,也將無法執行(例如等待輸入,整個進程就會掛起)。
例如,我們在使用qq聊天, qq做為一個獨立進程如果同一時間只能干一件事,那他如何實現在同一時刻 即能監聽鍵盤輸入、又能監聽其它人給你發的消息
你會說,操作系統不是有分時么?分時是指在不同進程間的分時呀
即操作系統處理一會你的qq任務,又切換到word文檔任務上了,每個cpu時間片分給你的qq程序時,你的qq還是只能同時干一件事呀
6、什么是線程(thread)(線程是操作系統最小的調度單位)
定義:
線程是操作系統調度的最小單位
它被包含在進程之中,是進程中的實際運作單位
進程本身是無法自己執行的,要操作cpu,必須創建一個線程,線程是一系列指令的集合
線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位
一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發多個線程,每條線程并行執行不同的任務
無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行
進程本身是無法自己執行的,要操作cpu,必須創建一個線程,線程是一系列指令的集合
所有在同一個進程里的線程是共享同一塊內存空間的,不同進程間內存空間不同
同一個進程中的各線程可以相互訪問資源,線程可以操作同進程中的其他線程,但進程僅能操作子進程
兩個進程想通信,必須要通過一個中間代理
對主線程的修改可能回影響其他子線程,對主進程修改不會影響其他進程因為進程間內存相互獨立,但是同一進程下的線程共享內存
7、進程和線程的區別
啟動一個線程比啟動一個進程快,運行速度沒有可比性。
先有一個進程然后才能有線程。
進程包含線程
線程共享內存空間
進程內存是獨立的(不可互相訪問)
進程可以生成子進程,子進程之間互相不能互相訪問(相當于在父級進程克隆兩個子進程)
在一個進程里面線程之間可以交流。兩個進程想通信,必須通過一個中間代理來實現
創建新線程很簡單,創建新進程需要對其父進程進行克隆。
一個線程可以控制或操作同一個進程里面的其它線程。但進程只能操作子進程。
父進程可以修改不影響子進程,但不能修改。
線程可以幫助應用程序同時做幾件事
8、進程和程序的區別
程序只是一個普通文件,是一個機器代碼指令和數據的集合,所以,程序是一個靜態的實體
而進程是程序運行在數據集上的動態過程,進程是一個動態實體,它應創建而產生,應調度執行因等待資?源或事件而被處于等待狀態,因完成任務而被撤消
進程是系統進行資源分配和調度的一個獨立單位
一個程序對應多個進程,一個進程為多個程序服務(兩者之間是多對多的關系)
一個程序執行在不同的數據集上就成為不同的進程,可以用進程控制塊來唯一地標識每個進程
多線程
Python多線程編程中常用方法:
join()方法:如果一個線程或者在函數執行的過程中調用另一個線程,并且希望待其完成操作后才能執行,那么在調用線程的時就可以使用被調線程的join方法join([timeout]) timeout:可選參數,線程運行的最長時間
isAlive()方法:查看線程是否還在運行
getName()方法:獲得線程名
setDaemon()方法:主線程退出時,需要子線程隨主線程退出,則設置子線程的setDaemon()
GIL全局解釋器鎖:
在python全局解釋器下,保證同一時間只有一個線程運行
防止多個線程都修改數據
線程鎖(互斥鎖):
GIL鎖只能保證同一時間只能有一個線程對某個資源操作,但當上一個線程還未執行完畢時可能就會釋放GIL,其他線程就可以操作了
線程鎖本質把線程中的數據加了一把互斥鎖
mysql中共享鎖 & 互斥鎖
mysql共享鎖:共享鎖,所有線程都能讀,而不能寫
mysql排它鎖:排它,任何線程讀取這個這個數據的權利都沒有
加上線程鎖之后所有其他線程,讀都不能讀這個數據
有了GIL全局解釋器鎖為什么還需要線程鎖
因為cpu是分時使用的
1、線程2種調用方式:直接調用, 繼承式調用
importthreadingimporttimedef sayhi(num): #定義每個線程要運行的函數
print("running on number:%s" %num)
time.sleep(3)#1、target=sayhi :sayhi是定義的一個函數的名字#2、args=(1,) : 括號內寫的是函數的參數
t1 = threading.Thread(target=sayhi, args=(1,)) #生成一個線程實例
t2 = threading.Thread(target=sayhi, args=(2,)) #生成另一個線程實例
t1.start()#啟動線程
t2.start() #啟動另一個線程
print(t1.getName()) #獲取線程名
print(t2.getName())
直接調用
importthreadingimporttimeclassMyThread(threading.Thread):def __init__(self,num):
threading.Thread.__init__(self)
self.num=numdef run(self):#定義每個線程要運行的函數
print("running on number:%s" %self.num)
time.sleep(3)if __name__ == '__main__':
t1= MyThread(1)
t2= MyThread(2)
t1.start()
t2.start()
繼承式調用
2、for循環同時啟動多個線程
說明:下面利用for循環同時啟動50個線程并行執行,執行時間是3秒而不是所有線程執行時間的總和
importthreadingimporttimedef sayhi(num): #定義每個線程要運行的函數
print("running on number:%s" %num)
time.sleep(3)for i in range(50):
t= threading.Thread(target=sayhi,args=('t-%s'%i,))
t.start()
for循環啟動多個線程
3、t.join(): 實現所有線程都執行結束后再執行主線程
說明:在4中雖然可以實現50個線程同時并發執行,但是主線程不會等待子線程結束在這里我們可以使用t.join()指定等待某個線程結束的結果
importthreadingimporttime
start_time=time.time()def sayhi(num): #定義每個線程要運行的函數
print("running on number:%s" %num)
time.sleep(3)
t_objs= [] #將進程實例對象存儲在這個列表中
for i in range(50):
t= threading.Thread(target=sayhi,args=('t-%s'%i,))
t.start()#啟動一個線程,程序不會阻塞
t_objs.append(t)print(threading.active_count()) #打印當前活躍進程數量
for t in t_objs: #利用for循環等待上面50個進程全部結束
t.join() #阻塞某個程序
print(threading.current_thread()) #打印執行這個命令進程
print("----------------all threads has finished.....")print(threading.active_count())print('cost time:',time.time() - start_time)
t.join() 主線程等待子線程
4、setDaemon(): 守護線程,主線程退出時,需要子線程隨主線程退出
importthreadingimporttime
start_time=time.time()def sayhi(num): #定義每個線程要運行的函數
print("running on number:%s" %num)
time.sleep(3)for i in range(50):
t= threading.Thread(target=sayhi,args=('t-%s'%i,))
t.setDaemon(True)#把當前線程變成守護線程,必須在t.start()前設置
t.start() #啟動一個線程,程序不會阻塞
print('cost time:',time.time() - start_time)
守護線程
注:因為剛剛創建的線程是守護線程,所以主線程結束后子線程就結束了,運行時間不是3秒而是0.01秒
5、GIL鎖和用戶鎖(Global Interpreter Lock 全局解釋器鎖)
全局解釋器鎖:保證同一時間僅有一個線程對資源有操作權限
作用:在一個進程內,同一時刻只能有一個線程通過GIL鎖 被CUP調用,切換條件:I/O操作、固定時間(系統決定)
說明:python多線程中GIL鎖只是在CPU操作時(如:計算)才是串行的,其他都是并行的,所以比串行快很多
為了解決不同線程同時訪問同一資源時,數據保護問題,而產生了GIL
GIL在解釋器的層面限制了程序在同一時間只有一個線程被CPU實際執行,而不管你的程序里實際開了多少條線程
為了解決這個問題,CPython自己定義了一個全局解釋器鎖,同一時間僅僅有一個線程可以拿到這個數據
python之所以會產生這種不好的狀況是因為python啟用一個線程是調用操作系統原生線程,就是C接口
但是這僅僅是CPython這個版本的問題,在PyPy,中就沒有這種缺陷
用戶鎖:線程鎖(互斥鎖Mutex)? :當前線程還未操作完成前其他所有線程都無法對其操作,即使已經釋放了GIL鎖
在有GIL鎖時為何還需要用戶鎖
GIL鎖只能保證同一時間只能有一個線程對某個資源操作,但當上一個線程還未執行完畢時可能就會釋放GIL,其他線程就可以操作了
線程鎖的原理
當一個線程對某個資源進行CPU計算的操作時加一個線程鎖,只有當前線程計算完成主動釋放鎖,其他線程才能對其操作
這樣就可以防止還未計算完成,釋放GIL鎖后其他線程對這個資源操作導致混亂問題
importtimeimportthreading
lock= threading.Lock() #1 生成全局鎖
defaddNum():global num #2 在每個線程中都獲取這個全局變量
print('--get num:',num )
time.sleep(1)
lock.acquire()#3 修改數據前加鎖
num -= 1 #4 對此公共變量進行-1操作
lock.release() #5 修改后釋放
用戶鎖使用舉例
在有GIL的情況下執行 count = count + 1 會出錯原因解析,用線程鎖解決方法
#1)第一步:count = 0 count初始值為0
#2)第二步:線程1要執行對count加1的操作首先申請GIL全局解釋器鎖
#3)第三步:調用操作系統原生線程在操作系統中執行
#4)第四步:count加1還未執行完畢,時間到了被要求釋放GIL
#5)第五步:線程1釋放了GIL后線程2此時也要對count進行操作,此時線程1還未執行完,所以count還是0
#6)第六步:線程2此時拿到count = 0后也要對count進行加1操作,假如線程2執行很快,一次就完成了
#count加1的操作,那么count此時就從0變成了1
#7)第七步:線程2執行完加1后就賦值count=1并釋放GIL
#8)第八步:線程2執行完后cpu又交給了線程1,線程1根據上下文繼續執行count加1操作,先拿到GIL
#鎖,完成加1操作,由于線程1先拿到的數據count=0,執行完加1后結果還是1
#9)第九步:線程1將count=1在次賦值給count并釋放GIL鎖,此時連個線程都對數據加1,但是值最終是1
報錯原因分析
使用線程鎖解決上面問題的原理
在GIL鎖中再加一個線程鎖,線程鎖是用戶層面的鎖
線程鎖就是一個線程在對數據操作前加一把鎖,防止其他線程復制或者操作這個數據
只有這個線程對數據操作完畢后才會釋放這個鎖,其他線程才能操作這個數據
定義一個線程鎖非常簡單只用三步
1 >> lock = threading.Lock() #定義一把鎖
2 >> lock.acquire()#對數據操作前加鎖防止數據被另一線程操作
3 >> lock.release()#對數據操作完成后釋放鎖
6、死鎖
死鎖定義:
兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象若無外力作用,它們都將無法推進去。
死鎖舉例:
啟動5個線程,執行run方法,假如thread1首先搶到了A鎖,此時thread1沒有釋放A鎖,緊接著執行代碼mutexB.acquire(),搶到了B鎖,
在搶B鎖時候,沒有其他線程與thread1爭搶,因為A鎖沒有釋放,其他線程只能等待
thread1執行完func1函數,然后執行func2函數,此時thread1拿到B鎖,然后執行time.sleep(2),此時不會釋放B鎖
在thread1執行func2的同時thread2開始執行func1獲取到了A鎖,然后繼續要獲取B鎖
不幸的是B鎖還被thread1占用,thread1占用B鎖時還需要同時獲取A鎖才能向下執行,但是此時發現A鎖已經被thread2暫用,這樣就死鎖了
from threading importThread,Lockimporttime
mutexA=Lock()
mutexB=Lock()classMyThread(Thread):defrun(self):
self.func1()
self.func2()deffunc1(self):
mutexA.acquire()print('\033[41m%s 拿到A鎖\033[0m' %self.name)
mutexB.acquire()print('\033[42m%s 拿到B鎖\033[0m' %self.name)
mutexB.release()
mutexA.release()deffunc2(self):
mutexB.acquire()print('\033[43m%s 拿到B鎖\033[0m' %self.name)
time.sleep(2)
mutexA.acquire()print('\033[44m%s 拿到A鎖\033[0m' %self.name)
mutexA.release()
mutexB.release()if __name__ == '__main__':for i in range(2):
t=MyThread()
t.start()#運行結果:輸出下面結果后程序卡死,不再向下進行了#Thread-1 拿到A鎖#Thread-1 拿到B鎖#Thread-1 拿到B鎖#Thread-2 拿到A鎖
產生死鎖代碼
7、遞歸鎖:lock = threading.RLock()? 解決死鎖問題
遞歸鎖的作用是同一線程中多次請求同一資源,但是不會參數死鎖。
這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。
直到一個線程所有的acquire都被release,其他的線程才能獲得資源。
from threading importThread,Lock,RLockimporttime
mutexA=mutexB=RLock()classMyThread(Thread):defrun(self):
self.f1()
self.f2()deff1(self):
mutexA.acquire()print('%s 拿到A鎖' %self.name)
mutexB.acquire()print('%s 拿到B鎖' %self.name)
mutexB.release()
mutexA.release()deff2(self):
mutexB.acquire()print('%s 拿到B鎖' %self.name)
time.sleep(0.1)
mutexA.acquire()print('%s 拿到A鎖' %self.name)
mutexA.release()
mutexB.release()if __name__ == '__main__':for i in range(5):
t=MyThread()
t.start()#下面是運行結果:不會產生死鎖#Thread-1 拿到A鎖#Thread-1 拿到B鎖#Thread-1 拿到B鎖#Thread-1 拿到A鎖#Thread-2 拿到A鎖#Thread-2 拿到B鎖#Thread-2 拿到B鎖#Thread-2 拿到A鎖#Thread-4 拿到A鎖#Thread-4 拿到B鎖#Thread-4 拿到B鎖#Thread-4 拿到A鎖#Thread-3 拿到A鎖#Thread-3 拿到B鎖#Thread-3 拿到B鎖#Thread-3 拿到A鎖#Thread-5 拿到A鎖#Thread-5 拿到B鎖#Thread-5 拿到B鎖#Thread-5 拿到A鎖
如果使用RLock代替Lock,則不會發生死鎖
8、Semaphore(信號量)
互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據
比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去
作用就是同一時刻允許運行的線程數量
#import threading,time#def run(n):#semaphore.acquire()#time.sleep(1)#print("run the thread: %s\n" %n)#semaphore.release()# #if __name__ == '__main__':#semaphore = threading.BoundedSemaphore(5) #最多允許5個線程同時運行#for i in range(22):#t = threading.Thread(target=run,args=(i,))#t.start()# #while threading.active_count() != 1:#pass #print threading.active_count()#else:#print('----all threads done---')
#代碼結果說明:這里可以清晰看到運行時0-4是同時運行的沒有順序,而且是前五個,#表示再semaphore這個信號量的定義下程序同時僅能執行5個線程
信號量舉例
9、events總共就只有四個方法
1. event.set() : #設置標志位
2. event.clear() : #清除標志位
3. event.wait() : #等待標志被設定
4. event.is_set() : #判斷標志位是否被設定
importtime,threading
event=threading.Event()#第一:寫一個紅綠燈的死循環
deflighter():
count=0
event.set()#1先設置為綠燈
whileTrue:if count > 5 and count <10: #2改成紅燈
event.clear() #3把標志位清了
print("red light is on.....")elif count > 10:
event.set()#4再設置標志位,變綠燈
count =0else:print("green light is on.....")
time.sleep(1)
count+= 1
#第二:寫一個車的死循環
defcar(name):whileTrue:if event.is_set(): #設置了標志位代表綠燈
print("[%s] is running"%name)
time.sleep(1)else:print('[%s] sees red light, waiting......'%name)
event.wait()print('[%s] green light is on,start going.....'%name)
light= threading.Thread(target=lighter,)
light.start()
car1= threading.Thread(target=car,args=("Tesla",))
car1.start()
events(紅綠燈例子)
進程
多線程和多進程各自應用場景
I/O操作不占用CPU(從硬盤,網路讀入數據等)
計算占用CPU,這種情況最好不用多線程
python多線程不適合CPU密集型的任務,適合I/O密集型的任務
python的多進程適合CPU密集型任務
一次性起多個進程,并在進程中調用線程
importmultiprocessing,time,threading#3 被多線程調用的函數
defthread_run():print(threading.get_ident()) #打印線程id號
time.sleep(2)#2 被多進程調用的函數,以及在這個函數中起一個進程
defrun(name):
time.sleep(2)print("hello",name)
t= threading.Thread(target=thread_run,) #在進程調用的函數中啟用一個線程
t.start()#1 一次性啟動多個進程
if __name__ == '__main__':for i in range(10):
p= multiprocessing.Process(target=run,args=('bob %s'%i,)) #啟用一個多線程
p.start()
一次性起多個進程,并在進程中調用線程
進程間互相訪問數據的三種方法
注:不同進程間內存是不共享的,所以互相之間不能訪問對方數據
在父進程中定義隊列q,使用父進程啟用一個子進程,子進程中無法操作父進程的q
from multiprocessing importProcessimportqueueimportthreadingdeff():
q.put([42, None, 'hello'])if __name__ == '__main__':
q= queue.Queue() #1 在父進程中定義一個隊列實例q
#p = threading.Thread(target=f,) #在線程程中就可以相互訪問,線程中內存共享
p = Process(target=f,) #2 在父進程中起一個子進程 p,在子進程中使用父進程的q會報錯
p.start()print(q.get())
p.join()
子進程無法訪問父進程數據舉例
利用Queues實現父進程到子進程(或子進程間)的數據傳遞
我們以前學的queue是線程queue.Queue()只有在同一個進程的線程間才能訪問
如果兩個進程間想要通信必須要使用進程Queue,用法和多線程的相同
queue.Queue()是線程q不可以傳遞給子進程,但是Queue是進程q,父進程會將進程q克隆了一份給子進程
既然是兩個q為什么在子進程中在q中放入一個數據在父進程中可以取出來呢? 其實原因是這樣的:
子進程向q中放入數據的時候,用pickle序列化將數據放到一個中間地方(翻譯),翻譯又把子進程放
入的數據用pickle反序列化給父進程,父進程就可以訪問這個q了,這樣就實現了進程間的數據通信了
在多線程中兩個線程可以修改同一份數據,而Queue僅僅實現了進程間的數據傳遞
from multiprocessing importProcess, Queuedef f(qq): #將符進程中的q傳遞過來叫qq
qq.put([42, None, 'hello']) #此時子進程就可以使用符進程中的q
if __name__ == '__main__':
q= Queue() #使用Queue()在父進程中定義一個隊列實例q
p = Process(target=f, args=(q,)) #在父進程中起一個子進程 p,將父進程剛定義的q傳遞給子進程p
p.start()print(q.get())
p.join()#運行結果: [42, None, 'hello']
Queues實現父子進程間傳遞數據
使用管道pipe實現兩個進程間數據傳遞
說明:其實pip實現進程間通信就好像一條電話線一樣,一個在電話線這頭發送,一個在電話線那頭接收
from multiprocessing importProcess, Pipedeff(conn):
conn.send([42, None, 'hello']) #3 子進程發送數據,就像socket一樣
print("son process recv:", conn.recv())
conn.close()if __name__ == '__main__':
parent_conn, child_conn=Pipe()#1 生成一個管道實例,實例一生成就會生成兩個返回對象,一個是管道這頭,一個是管道那頭
p = Process(target=f, args=(child_conn,)) #2 啟動一個子進程將管道其中一頭傳遞給子進程
p.start()print(parent_conn.recv()) #4 父進程收消息 # prints "[42, None, 'hello']"
parent_conn.send('i am parent process')
p.join()#運行結果:#[42, None, 'hello']#son process recv: i am parent process
pip實現進程間通信
Managers實現很多進程間數據共享
說明:manager實質和Queue一樣,啟用是個線程其實就是將字典或者列表copy十份
from multiprocessing importProcess, Managerimportosdeff(d, l):
d[1] = '1' #是個進程對字典放入的是同一個值,所以看上去效果不明顯
l.append(os.getpid()) #將這是個進程的進程id放入列表中
if __name__ == '__main__':
with Manager() as manager:#1 將Manager()賦值給manager
d = manager.dict() #2 定義一個可以在多個進程間可以共享的字典
l = manager.list(range(5)) #3 定義一個可以在多個進程間可以共享的列表,默認寫五個數據
p_list =[]for i in range(10): #生成是個進程
p = Process(target=f, args=(d, l)) #將剛剛生成的可共享字典和列表傳遞給子進程
p.start()
p_list.append(p)for res inp_list:
res.join()print(d)print(l)
managers實現進程間數據共享
進程之間需要鎖的原因
說明:雖然每個進程是獨立運行的,但是他們共享同一塊屏幕,如果大家都在屏幕打數據就會打亂了
from multiprocessing importProcess, Lockdeff(l, i):
l.acquire()#一個進程要打印數據時先鎖定
print('hello world', i)
l.release()#打印完畢后就釋放這把鎖
if __name__ == '__main__':
lock= Lock() #先生成一把鎖
for num in range(5):
Process(target=f, args=(lock, num)).start()#運行結果:#hello world 4#hello world 0#hello world 2#hello world 3#hello world 1
進程鎖
進程池
進程池的作用就是限制同一時間可以啟動進程的=數量
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
apply: 多個進程異步執行,一個一個的執行
apply_async: 多個進程同步執行,同時執行多個進程
from multiprocessing importProcess,Poolimporttime,osdeffoo(i):
time.sleep(2)print("in the process",os.getpid()) #打印子進程的pid
return i+100
defcall(arg):print('-->exec done:',arg,os.getpid())if __name__ == '__main__':
pool= Pool(3) #進程池最多允許5個進程放入進程池
print("主進程pid:",os.getpid()) #打印父進程的pid
for i in range(10):#用法1 callback作用是指定只有當Foo運行結束后就執行callback調用的函數,父進程調用的callback函數
pool.apply_async(func=foo, args=(i,),callback=call)#用法2 串行 啟動進程不在用Process而是直接用pool.apply()
#pool.apply(func=foo, args=(i,))
print('end')
pool.close()#關閉pool
pool.join() #進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
進程池
僵尸進程
僵尸進程定義
僵尸進程產生的原因就是父進程產生子進程后,子進程先于父進程退出
但是父進程由于種種原因,并沒有處理子進程發送的退出信號,那么這個子進程就會成為僵尸進程。
用python寫一個僵尸進程
#!/usr/bin/env python#coding=utf8
importos, sys, time#產生子進程
pid =os.fork()if pid ==0:#子進程退出
sys.exit(0)#父進程休息30秒
time.sleep(30)#先產生一個子進程,子進程退出,父進程休息30秒,那就會產生一個僵尸進程
defunct.py
[root@linux-node4 ~]#ps -ef| grep defunct
root 110401 96083 0 19:11 pts/2 00:00:00python defunct.py
root110402 110401 0 19:11 pts/2 00:00:00 [python] root110406 96105 0 19:11 pts/3 00:00:00 grep --color=auto defunct
ps -ef| grep defunct 在linux下查看僵尸進程
協程(Coroutine)
1、什么是協程(進入上一次調用的狀態)
協程,又稱微線程,纖程,協程是一種用戶態的輕量級線程。
線程的切換會保存到CPU的棧里,協程擁有自己的寄存器上下文和棧,
協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當于進入上一次調用的狀態
協程最主要的作用是在單線程的條件下實現并發的效果,但實際上還是串行的(像yield一樣)
2、協程的好處
無需線程上下文切換的開銷(可以理解為協程切換就是在不同函數間切換,不用像線程那樣切換上下文CPU)
不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突
用法:最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。
3、協程缺點
無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上
線程阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
4、使用yield實現協程相同效果
importtimeimportqueuedefconsumer(name):print("--->starting eating baozi...")whileTrue:
new_baozi= yield #只要遇到yield程序就返回,yield還可以接收數據
print("[%s] is eating baozi %s" %(name, new_baozi))
time.sleep(1)defproducer():
r= con.__next__() #直接調用消費者的__next__方法
r = con2.__next__() #函數里面有yield第一次加括號調用會變成一個生成器函數不執行,運行next才執行
n =0while n < 5:
n+= 1con.send(n)#send恢復生成器同時并傳遞一個值給yield
con2.send(n)print("\033[32;1m[producer]\033[0m is making baozi %s" %n)if __name__ == '__main__':
con= consumer("c1")
con2= consumer("c2")
p= producer()
yield模擬實現協程效果
5、協程為何能處理大并發1:Greenlet遇到I/O手動切換
協程之所以快是因為遇到I/O操作就切換(最后只有CPU運算)
這里先演示用greenlet實現手動的對各個協程之間切換
其實Gevent模塊僅僅是對greenlet的再封裝,將I/O間的手動切換變成自動切換
from greenlet importgreenletdeftest1():print(12) #4 gr1會調用test1()先打印12
gr2.switch() #5 然后gr2.switch()就會切換到gr2這個協程
print(34) #8 由于在test2()切換到了gr1,所以gr1又從上次停止的位置開始執行
gr2.switch() #9 在這里又切換到gr2,會再次切換到test2()中執行
deftest2():print(56) #6 啟動gr2后會調用test2()打印56
gr1.switch() #7 然后又切換到gr1
print(78) #10 切換到gr2后會接著上次執行,打印78
gr1= greenlet(test1) #1 啟動一個協程gr1
gr2 = greenlet(test2) #2 啟動第二個協程gr2
gr1.switch() #3 首先gr1.switch() 就會去執行gr1這個協程
Greenlet遇到I/O手動切換
6、協程為何能處理大并發2:Gevent遇到I/O自動切換
Gevent 是一個第三方庫,可以輕松通過gevent實現并發同步或異步編程
在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程
Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
Gevent原理是只要遇到I/O操作就會自動切換到下一個協程
7、Gevent實現簡單的自動切換小例子
注:在Gevent模仿I/O切換的時候,只要遇到I/O就會切換,哪怕gevent.sleep(0)也要切換一次
importgeventdeffunc1():print('\033[31;1m第一次打印\033[0m')
gevent.sleep(2) #為什么用gevent.sleep()而不是time.sleep()因為是為了模仿I/O
print('\033[31;1m第六次打印\033[0m')deffunc2():print('\033[32;1m第二次打印\033[0m')
gevent.sleep(1)print('\033[32;1m第四次打印\033[0m')deffunc3():print('\033[32;1m第三次打印\033[0m')
gevent.sleep(1)print('\033[32;1m第五次打印\033[0m')
gevent.joinall([#將要啟動的多個協程放到event.joinall的列表中,即可實現自動切換
gevent.spawn(func1), #gevent.spawn(func1)啟動這個協程
gevent.spawn(func2),
gevent.spawn(func3),
])#運行結果:#第一次打印#第二次打印#第三次打印#第四次打印#第五次打印#第六次打印
Gevent實現簡單的自動切換小例子
8、使用Gevent實現并發下載網頁與串行下載網頁時間比較
from urllib importrequestimportgevent,timefrom gevent importmonkey
monkey.patch_all()#把當前程序所有的I/O操作給我單獨做上標記
deff(url):print('GET: %s' %url)
resp=request.urlopen(url)
data=resp.read()print('%d bytes received from %s.' %(len(data), url))#1 并發執行部分
time_binxing =time.time()
gevent.joinall([
gevent.spawn(f,'https://www.python.org/'),
gevent.spawn(f,'https://www.yahoo.com/'),
gevent.spawn(f,'https://github.com/'),
])print("并行時間:",time.time()-time_binxing)#2 串行部分
time_chuanxing =time.time()
urls=['https://www.python.org/','https://www.yahoo.com/','https://github.com/',
]for url inurls:
f(url)print("串行時間:",time.time()-time_chuanxing)#注:為什么要在文件開通使用monkey.patch_all()#1. 因為有很多模塊在使用I / O操作時Gevent是無法捕獲的,所以為了使Gevent能夠識別出程序中的I / O操作。#2. 就必須使用Gevent模塊的monkey模塊,把當前程序所有的I / O操作給我單獨做上標記#3.使用monkey做標記僅用兩步即可:
第一步(導入monkey模塊): from gevent importmonkey
第二步(聲明做標記) : monkey.patch_all()
并行串行時間比較
說明:monkey.patch_all()猴子補丁作用
用過gevent就會知道,會在最開頭的地方gevent.monkey.patch_all();
作用是把標準庫中的thread/socket等給替換掉.這樣我們在后面使用socket的時候可以跟平常一樣使用,無需修改任何代碼,但是它變成非阻塞的了.
9、通過gevent自己實現單線程下的多socket并發
importgeventfrom gevent import socket,monkey #下面使用的socket是Gevent的socket,實際測試monkey沒用#monkey.patch_all()
defserver(port):
s=socket.socket()
s.bind(('0.0.0.0',port))
s.listen(5)whileTrue:
cli,addr=s.accept()
gevent.spawn(handle_request,cli)defhandle_request(conn):try:whileTrue:
data= conn.recv(1024)print('recv:',data)
conn.send(data)if notdata:
conn.shutdown(socket.SHUT_WR)exceptException as e:print(e)finally:
conn.close()if __name__=='__main__':
server(8001)
server端
importsocket
HOST= 'localhost' #The remote host
PORT = 8001 #The same port as used by the server
s =socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))whileTrue:
msg= bytes(input(">>:"),encoding="utf8").strip()if len(msg) == 0:continues.sendall(msg)
data= s.recv(1024)print('Received', repr(data))
s.close()
client端
10、協程本質原理
協程1通過os去讀一個file,這個時候就是一個io操作,在調用os的接口前,就會有一個列表
協程1的這個操作就會被注冊到這個列表中,然后就切換到其他協程去處理;
等待os拿到要讀file后,也會把這個文件句柄放在這個列表中
然后等待在切換到協程1的時候,協程1就可以直接從列表中拿到數據,這樣就可以實現不阻塞了
epoll返回給協程的任務列表在內核態,協程在用戶態,用戶態協程是不能直接訪問內核態的任務列表的,所以需要拷貝整個內核態的任務列表到用戶態,供協程去訪問和查詢
11、epoll處理 I/O 請求原理
epoll() 中內核則維護一個鏈表,epoll_wait 直接檢查鏈表是不是空就知道是否有文件描述符準備好了。
在內核實現中 epoll 是根據每個 sockfd 上面的與設備驅動程序建立起來的回調函數實現的。
某個 sockfd 上的事件發生時,與它對應的回調函數就會被調用,來把這個 sockfd 加入鏈表,其他處于“空閑的”狀態的則不會。
epoll上面鏈表中獲取文件描述,這里使用內存映射(mmap)技術, 避免了復制大量文件描述符帶來的開銷
內存映射(mmap):內存映射文件,是由一個文件到一塊內存的映射,將不必再對文件執行I/O操作
12、select處理協程
拷貝所有的文件描述符給協程,不論這些任務的是否就緒,都會被返回
那么協程就只能for循環去查找自己的文件描述符,也就是任務列表,select的兼容性非常好,支持linux和windows
13、select、epool、pool
I/O的實質是什么?
I/O的實質是將硬盤中的數據,或收到的數據實現從內核態 copy到 用戶態的過程
本文討論的背景是Linux環境下的network IO。
比如微信讀取本地硬盤的過程
微信進程會發送一個讀取硬盤的請求----》操作系統
只有內核才能夠讀取硬盤中的數據---》數據返回給微信程序(看上去就好像是微信直接讀取)
用戶態 & 內核態
系統空間分為兩個部分,一部分是內核態,一部分是用戶態的部分
內核態:內核態的空間資源只有操作系統能夠訪問
用戶態:我們寫的普通程序使用的空間
select
只能處理1024個連接(每一個請求都可以理解為一個連接)
不能告訴用戶程序,哪一個連接是活躍的
pool
只是取消了最大1024個活躍的限制
不能告訴用戶程序,哪一個連接是活躍的
epool
不僅取消了1024這個最大連接限制
而且能告訴用戶程序哪一個是活躍的
select實現單線程下的多并發(必須是非阻塞模式)
importselectimportsocketimportqueue
server=socket.socket()
server.bind(("localhost",9999))
server.listen(1000)
server.setblocking(False)#設置非阻塞模式,recv沒數據不阻塞,server.accept不阻塞但報錯
msg_dic ={}#因為剛開沒有連接可以監控,所以將server自己交給內核監測,只要server自己活動了就代表有人連我了
inputs = [server,] #有多少連接需要監測就必須放到inputs列表中,將列表交給select相當于交給內核
outputs =[]#第一個inputs是指定要內核監控那些鏈接,鏈接中只要有一個有數據就返回所有連接#第二個outputs是存放還未發送的數據,下次就會發送#第三個inputs也是監控所有連接,但是只有連接出問題是才返回所有連接
whileTrue:
readable,writeable,exceptional=select.select(inputs, outputs, inputs)print(readable,writeable,exceptional)for r inreadable:if r is server: #代表來了一個新鏈接
conn,addr =server.accept()print("來了一個新鏈接:",addr)
inputs.append(conn)#因為這個新建立的鏈接還沒發數據過來,現在收就報錯
#所以要想實現這個客戶端發數據server端知道,就需要讓這個select再監測這個
msg_dic[conn] = queue.Queue() #為每個鏈接都建立一個隊列,里面存返回給客戶端數據
else: #代表有客戶端發數據過來
data = r.recv(1024)print("收到數據",data)
msg_dic[r].put(data)
outputs.append(r)#放入返回的連接隊列里
#r.send(data)
#print('send done')
for w inwriteable:
data_to_client=msg_dic[w].get()
w.send(data_to_client)#返回給客戶端原數據
outputs.remove(w) #確保下次循環的時候writeable,不返回這個已經處理完的鏈接啦
for e in exceptional: #有斷開的連接就從各個列表中刪除
if e inoutputs:
outputs.remove(e)
inputs.remove()del msg_dic[e]
select實現單線程下的多并發: server端
importsocket
HOST= 'localhost' #The remote host
PORT = 9999 #The same port as used by the server
s =socket.socket(socket.AF_INET, socket.SOCK_STREAM)print("s",s)
s.connect((HOST, PORT))whileTrue:
msg= bytes(input(">>:"),encoding="utf8").strip()if len(msg) == 0:continues.sendall(msg)
data= s.recv(1024)print('Received', repr(data))
s.close()
select實現單線程下的多并發: client
說明:server.setblocking(False)設置成非阻塞模式作用
python默認情況下所有的socket都是blocking,即阻塞然后等待I/O操作完成,接收數據
當使用協程實現單線程并發效果時需要設置成非租塞模式,不等待I/O操作
使用selector模塊實現單線程下的多并發效果
作用:selector的實質是對select,poll,epoll的封裝,他默認使用epoll,但是如果系統不支持就用select
importselectors,socket
sel= selectors.DefaultSelector() #1 生成一個selector對象
def accept(sock, mask): #只要來一個新鏈接就調用accept
conn, addr = sock.accept() #創建這個鏈接
conn.setblocking(False) #6 把這個鏈接設置為非阻塞模式
sel.register(conn, selectors.EVENT_READ, read) #只有把活動連接注冊到sel中sel才會去檢測它
#7 把新建立的鏈接conn又放到selector注冊對象sel里了,這時的回調函數變成read了
#這時如果再活動就會調用read了,執行完accept后就會返回到events = sel.select()繼續監測
def read(conn, mask): #2第二次卡住: 客戶端連接成功就卡在這里,等待客戶端發送數據
try: #如果客戶端斷開后,收數據就會引發ConnectionResetError異常
data = conn.recv(1024) #Should be ready
ifdata:
conn.send(data)#Hope it won't block
exceptConnectionResetError as e:print('closing', conn)
sel.unregister(conn)#取消注冊,關閉鏈接
conn.close()
sock=socket.socket()
sock.bind(('localhost', 9999))
sock.listen(100)
sock.setblocking(False)#2 設置為非阻塞模式
#3 將前面寫的sock(server實例)注冊到selselector對象中,讓selector對象sel監測自己,自己活躍說明有鏈接或者發送數據
sel.register(sock, selectors.EVENT_READ, accept)while True: #第一次有活動可定有新鏈接了,只要有新鏈接就會調用accept方法建立鏈接
print("監測活躍:新鏈接或者已連接發送數據")
events= sel.select() #4 第一次卡住: 運行服務端就會卡在這里等待客戶端連接
#這里雖然寫的select但是可能是epoll看系統支持什么
#當有連接過來時,就會將連接實例賦值給events
for key, mask in events: #for循環這個events,默認是阻塞的,只要不阻塞肯定有新的連接
callback = key.data #這里的key.data就是回調函數內存地址(accept或者read)
callback(key.fileobj, mask) #5 key.fileobj是連接的socket實例conn和addr, mask=1 不知道什么
#callback(key.fileobj, mask)是執行實例的回調函數
#如果是新連接回調函數是accept函數,如果已連接發數據回調函數是read#注:for循環中key包含以下內容#SelectorKey( #fileobj是連接實例:conn,addr#fileobj = ,#fd = 320, #文件描述符#events = 1,#data = )) #回調函數內存地址
selector實現單線程下的多并發: server端
importsocket
HOST= 'localhost' #The remote host
PORT = 9999 #The same port as used by the server
s =socket.socket(socket.AF_INET, socket.SOCK_STREAM)print("s",s)
s.connect((HOST, PORT))whileTrue:
msg= bytes(input(">>:"),encoding="utf8").strip()if len(msg) == 0:continues.sendall(msg)
data= s.recv(1024)print('Received', repr(data))
s.close()
selector實現單線程下的多并發: client端
#使用selector的幾個關鍵步驟#1)sel = selectors.DefaultSelector() # 生成一個selector對象#2)sock.setblocking(False) # 設置為非阻塞模式#3)sel.register(sock, selectors.EVENT_READ, accept)## 將前面寫的sock(server實例)注冊到selselector對象中,讓selector對象sel監測,accept是回調函數#4)events = sel.select() # 1第一次卡住: 運行服務端就會卡在這里等待客戶端連接#5)callback(key.fileobj, mask) # key.fileobj是連接的socket實例conn, mask=1 不知道什么東東#6)sel.register(conn, selectors.EVENT_READ, read) # 將回調函數從accept變成read#當客戶端第一次連接時會使用accept作為回調函數,連接成功后就使用read變成回調函數#7)當執行完accept函數后就再次回到events = sel.select()#第一次卡住的地方等待活躍數據#8)如果此時活躍的是已經連接的客戶端,會調用callback(key.fileobj, mask),因為客戶端第一次連接的時候調用#的是accept方法,執行了sel.register(conn, selectors.EVENT_READ, read),所以回調函數已經變成了read,所#以如果是客戶端發送數據過來調用的是read方法,而不是accept方法#9)如果在第七步中活躍的是一個新連接,那么回調函數依然是accept,就會重復上面步驟建立一個新連接
#注:無論是新連接還舊鏈接發送數據過來,實質上沒有太多區別,都是使用callback(key.fileobj, mask)調用回調函數#1.但是如果是第一次連接回調函數是accept,在調用完accept后就將回調函數變成了read,執行完accept函#后會回到第一次卡住的地方events = sel.select(),監測活躍的連接#2.如果活躍的連接是已經連接的客戶端發送數據,就會調用read函數去接收數據,運行完read后也會回到#events = sel.select(),監測活躍的連接
使用selector的幾個關鍵步驟
Python進程池和線程池(ThreadPoolExecutor&ProcessPoolExecutor)
Python標準庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼
但是當項目達到一定的規模,頻繁創建/銷毀進程或者線程是非常消耗資源的,這個時候我們就要編寫自己的線程池/進程池,以空間換時間。
但從Python3.2開始,標準庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,
實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持。
Executor和Future
1. Executor
concurrent.futures模塊的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。
但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用
我們可以將相應的tasks直接放入線程池/進程池,不需要維護Queue來操心死鎖的問題,線程池/進程池會自動幫我們調度。
2. Future
Future你可以把它理解為一個在未來完成的操作,這是異步編程的基礎,
傳統編程模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,
而Future的引入幫助我們在等待的這段時間可以完成其他的操作。
ThreadPoolExecutor(線程池)
from concurrent.futures importThreadPoolExecutorimporttimedefreturn_future_result(message):
time.sleep(2)returnmessage
pool= ThreadPoolExecutor(max_workers=2) #創建一個最大可容納2個task的線程池
future1 = pool.submit(return_future_result, ("hello")) #往線程池里面加入一個task
future2 = pool.submit(return_future_result, ("world")) #往線程池里面加入一個task
print(future1.done()) #判斷task1是否結束
time.sleep(3)print(future2.done()) #判斷task2是否結束
print(future1.result()) #查看task1返回的結果
print(future2.result()) #查看task2返回的結果
#運行結果:#False # 這個False與下面的True會等待3秒#True # 后面三個輸出都是一起打出來的#hello#world
使用submit來操作線程池/進程池
importconcurrent.futuresimporturllib.request
URLS= ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']defload_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:returnconn.read()#We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:#Start the load operations and mark each future with its URL
#future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} # 這一句相當于下面for循環獲取的字典
future_to_url ={}for url inURLS:
future_to_url[executor.submit(load_url,url,60)] = url #{'future對象':'url'} future對象作為key,url作為value
for future in concurrent.futures.as_completed(future_to_url): #as_completed返回已經有返回結果的future對象
url = future_to_url[future] #通過future對象獲取對應的url
try:
data= future.result() #獲取future對象的返回結果
exceptException as exc:print('%r generated an exception: %s' %(url, exc))else:print('%r page is %d bytes' % (url, len(data)))
使用for循環使用線程池,并將future對象加入字典中
from concurrent.futures importThreadPoolExecutor#創建線程池
executor = ThreadPoolExecutor(10)deftest_function(num1,num2):return "%s + %s = %s"%(num1,num2,num1+num2)
result_iterators= executor.map(test_function,[1,2,3],[5,6,7])for result inresult_iterators:print(result)#1 + 5 = 6#2 + 6 = 8#3 + 7 = 10
最簡單map舉例
importconcurrent.futuresimporturllib.request
URLS= ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']defload_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:returnconn.read()#We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_dic={}for url, data inzip(URLS, executor.map(load_url, URLS)):print('%r page is %d bytes' %(url, len(data)))
future_dic[url]= data #{'url':'執行結果'} url作為key,執行結果作為value
#'http://httpbin.org' page is 13011 bytes#'http://example.com/' page is 1270 bytes#'https://api.github.com/' page is 2039 bytes
使用map同時獲取多個頁面中的數據
使用線程池、進程池、協程向多個url并發獲取頁面數據比較
特點:
進程:啟用進程非常浪費資源
線程:線程多,并且在阻塞過程中無法執行其他任務
協程:gevent只用起一個線程,當請求發出去后gevent就不管,永遠就只有一個線程工作,誰先回來先處理
使用for循環串行拿取頁面數據(第四:性能最差)
importrequests
url_list=['https://www.baidu.com','http://dig.chouti.com/',
]for url inurl_list:
result=requests.get(url)print(result.text)
使用for循環串行拿取頁面數據(效果最差)
進程池實現并發(第三)
缺點:啟用進程非常浪費資源
importrequestsfrom concurrent.futures importProcessPoolExecutordeffetch_request(url):
result=requests.get(url)print(result.text)
url_list=['https://www.baidu.com','https://www.google.com/', #google頁面會卡住,知道頁面超時后這個進程才結束
'http://dig.chouti.com/', #chouti頁面內容會直接返回,不會等待Google頁面的返回
]if __name__ == '__main__':
pool= ProcessPoolExecutor(10) #創建線程池
for url inurl_list:
pool.submit(fetch_request,url)#去線程池中獲取一個進程,進程去執行fetch_request方法
pool.shutdown(False)
使用進程池實現并發
線程池實現并發(第二)
缺點: 創建一個新線程將消耗大量的計算資源,并且在阻塞過程中無法執行其他任務。
例: 比如線程池中10個線程同時去10個url獲取數據,當數據還沒來時這些線程全部都在等待,不做事。
importrequestsfrom concurrent.futures importThreadPoolExecutordeffetch_request(url):
result=requests.get(url)print(result.text)
url_list=['https://www.baidu.com','https://www.google.com/', #google頁面會卡住,知道頁面超時后這個進程才結束
'http://dig.chouti.com/', #chouti頁面內容會直接返回,不會等待Google頁面的返回
]
pool= ThreadPoolExecutor(10) #創建一個線程池,最多開10個線程
for url inurl_list:
pool.submit(fetch_request,url)#去線程池中獲取一個線程,線程去執行fetch_request方法
pool.shutdown(True)#主線程自己關閉,讓子線程自己拿任務執行
使用線程池實現并發
from concurrent.futures importThreadPoolExecutorimportrequestsdeffetch_async(url):
response=requests.get(url)returnresponse.textdefcallback(future):print(future.result())
url_list= ['http://www.github.com', 'http://www.bing.com']
pool= ThreadPoolExecutor(5)for url inurl_list:
v=pool.submit(fetch_async, url)
v.add_done_callback(callback)
pool.shutdown(wait=True)
多線程+回調函數執行
協程:微線程實現異步(第一:性能最好)
特點 :gevent只用起一個線程,當請求發出去后gevent就不管,永遠就只有一個線程工作,誰先回來先處理
importgeventimportrequestsfrom gevent importmonkey
monkey.patch_all()#這些請求誰先回來就先處理誰
deffetch_async(method, url, req_kwargs):
response= requests.request(method=method, url=url, **req_kwargs)print(response.url, response.content)###### 發送請求 #####
gevent.joinall([
gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://www.google.com/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
])
協程:微線程實現異步
總結
以上是生活随笔為你收集整理的python用input输入列表有缺陷_Python 三程三器的那些事的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 信用良好房贷被拒的原因
- 下一篇: asa防火墙升级固件_奇淫巧技 | 在路