關于python3中的并發編程,經過這些天的學習,歸納如下:
#practice21:多線程
線程的定義
方法一:直接Thread()構造 方法二:構造Thread的子類
from urllib.request
import urlretrieve
import csv
from xml.etree.ElementTree
import ElementTree,Element
from threading
import Thread
def download (sid,filename) :'''下載csv文件''' url =
'http://quotes.money.163.com/service/chddata.html?code=%s&start=20150104&end=20160108' % str(sid)response = urlretrieve(url,filename)
def convert (filename) :'''csv文件轉換為xml文件''' with open(filename,
'rt' ,encoding=
'GB2312' )
as rf:
if rf:reader = csv.reader(rf)header = next(reader)root = Element(
'data' )
for row
in reader:line = Element(
'row' )root.append(line)
for key,value
in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write(
'%s.xml' % filename,encoding=
'utf-8' )
def handle (sid) :print(
"downloading %s :" % str(sid))download(sid,
'demo%s.csv' % str(sid))print(
"converting %s :" % str(sid))convert(
'demo%s.csv' % str(sid))
threads = []
for i
in range(
1000001 ,
1000010 ):t = Thread(target=handle,args=(i,))threads.append(t)t.start()
for t
in threads:t.join()print(
"main thread" )
class Mythread (Thread) :def __init__ (self,sid) :Thread.__init__(self)self.sid = sid
def run (self) :handle(self.sid)print(
'*' *
20 )
threads = []
for i
in range(
1000001 ,
1000010 ):t = Mythread(i)threads.append(t)t.start()
for t
in threads:t.join()print(
"main thread" )
執行結果:
downloading 1000001 :
downloading 1000002 :
downloading 1000003 :
downloading 1000004 :
downloading 1000005 :
downloading 1000006 :
downloading 1000007 :
downloading 1000008 :
downloading 1000009 :
converting 1000003 :
converting 1000006 :
converting 1000004 :
converting 1000009 :
converting 1000001 :
converting 1000005 :
converting 1000008 :
converting 1000002 :
converting 1000007 :
main thread
***** ***** ***** *****
downloading 1000001 :
downloading 1000002 :
downloading 1000003 :
downloading 1000004 :
downloading 1000005 :
downloading 1000006 :
downloading 1000007 :
downloading 1000008 :
downloading 1000009 :
converting 1000003 :
converting 1000002 :
converting 1000005 :
converting 1000004 :
converting 1000001 :
converting 1000009 :
converting 1000008 :
converting 1000006 :
converting 1000007 :
main thread
[Finished in 0.9s]
#practice22:線程間通信
.Queue,該隊列是線程安全的; 一個進程內的多個線程共用地址空間,這是線程間通信的基本依據; 本例采用生產者/消費者模型,有多個生產者和一個消費者,每個生產者占用一個線程 消費者只有一個,故必須使用循環來處理生產者生產的數據
from urllib.request
import urlretrieve
import csv
from xml.etree.ElementTree
import ElementTree,Element
from threading
import Thread
from queue
import Queue
class DownloadThread (Thread) :'''下載線程''' def __init__ (self,sid,queue) :Thread.__init__(self)self.sid = sidself.filename =
'demo{}' .format(str(sid))self.queue = queue
def download (self,sid,filename) :'''下載csv文件''' url =
'http://quotes.money.163.com/service/chddata.html?code=%s&start=20150104&end=20160108' % str(sid)response = urlretrieve(url,filename)
def run (self) :print(
"downloading %s :" % str(self.sid))self.download(self.sid,self.filename)self.queue.put(self.filename)
class ConvertThread (Thread) :'''轉換現場''' def __init__ (self,queue) :Thread.__init__(self)self.queue = queue
def convert (self,filename) :'''csv文件轉換為xml文件''' with open(filename,
'rt' ,encoding=
'GB2312' )
as rf:
if rf:reader = csv.reader(rf)header = next(reader)root = Element(
'data' )
for row
in reader:line = Element(
'row' )root.append(line)
for key,value
in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write(
'%s.xml' % filename,encoding=
'utf-8' )
def run (self) :while True :filename = self.queue.get()
if filename ==
False :
break print(
"converting %s :" % str(filename))self.convert(filename)
if __name__ ==
'__main__' :q = Queue()threads = []
for i
in range(
1000001 ,
1000010 ):t = DownloadThread(i,q)threads.append(t)t.start()ct = ConvertThread(q)ct.start()
for i
in threads:i.join()q.put(
False )
程序優化:(三處) 1. StringIO的使用替代文件 2. sid的構造 3. 列表推導式構造線程列表
from urllib.request
import urlretrieve
import csv
from xml.etree.ElementTree
import ElementTree,Element
from threading
import Thread
from queue
import Queue
from io
import StringIO
import requests
class DownloadThread (Thread) :'''下載線程''' def __init__ (self,sid,queue) :Thread.__init__(self)self.sid = sidself.filename =
'demo{}' .format(str(sid))self.queue = queue
def download (self,sid) :'''下載csv文件''' url =
'http://quotes.money.163.com/service/chddata.html?code=1%s&start=20150104&end=20160108' % str(sid).rjust(
6 ,
'0' )response = requests.get(url)self.data = StringIO(response.text)
def run (self) :print(
"downloading %s :" % str(self.sid))self.download(self.sid)self.queue.put((self.sid,self.data))
class ConvertThread (Thread) :'''轉換現場''' def __init__ (self,queue) :Thread.__init__(self)self.queue = queue
def convert (self,sid,data) :'''csv文件轉換為xml文件''' if data:reader = csv.reader(data)header = next(reader)root = Element(
'data' )
for row
in reader:line = Element(
'row' )root.append(line)
for key,value
in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write(
'1%s.xml' % str(sid).rjust(
6 ,
'0' ),encoding=
'utf-8' )
def run (self) :while True :sid,data = self.queue.get()
if data ==
False :
break print(
"converting %s :" % str(sid))self.convert(sid,data)
if __name__ ==
'__main__' :q = Queue()threads = [DownloadThread(i,q)
for i
in range(
1 ,
10 )]
for thread
in threads:thread.start()ct = ConvertThread(q)ct.start()
for i
in threads:i.join()q.put((
100 ,
False ))
感覺還不是很熟練,來個實例:程序設計要求如下:
1、調用OTCBTC的API,獲取所有買家、賣家出價數據
2、涉及的幣種有:EOS、ETH、BCH、NEO
3、將獲取到的json數據轉換成xml格式并保存
4、要求使用多線程
from threading
import Thread
import requests
from xml.etree.ElementTree
import ElementTree,Element
from queue
import Queue
class DownloadThread (Thread) :'''下載當前某種貨幣的賣單與買單''' def __init__ (self,coin_id,queue) :Thread.__init__(self)self.coin_id = coin_idself.queue = queueself.url =
"https://bb.otcbtc.com/api/v2/depth?market=%s&limit=1000" self.url %= coin_id
def download (self,url) :'''下載json數據,存儲為data''' response = requests.get(url)
return response.json()
def run (self) :print(
'downloading %s' % self.coin_id)data = self.download(self.url)self.queue.put((self.coin_id,data))
class ConvertThread (Thread) :'''把請求響應轉化為xml文件''' def __init__ (self,queue) :Thread.__init__(self)self.queue = queue
def setchildtree (self,superelement_tag,spec_dict) :'''構建asks tree或者bids tree. superelement_tag是子樹的根節點名,spec_dict是整個json字符串轉換后的python字典''' e =Element(superelement_tag)
for list_item
in spec_dict[superelement_tag]:e1 = Element(
'item' )e.append(e1)e2_price = Element(
'price' )e2_price.text = list_item[
0 ]e1.append(e2_price)e2_volumn = Element(
'volumn' )e2_volumn.text = list_item[
1 ]e1.append(e2_volumn)
return e
def convert (self,coin_id,spec_dict) :'''將請求響應body的字典轉換為xml文件''' root = Element(
'data' )e_timestamp = Element(
'timestamp' )e_timestamp.text = str(spec_dict[
'timestamp' ])root.append(e_timestamp)asks_childtree = self.setchildtree(
'asks' ,spec_dict)root.append(asks_childtree)bids_childtree = self.setchildtree(
'bids' ,spec_dict)root.append(bids_childtree)et = ElementTree(root)et.write(
'%s.xml' % coin_id)
def run (self) :while True :coin_id,data = self.queue.get()
if data ==
False :
break print(
'converting %s' % coin_id)self.convert(coin_id,data)
if __name__ ==
'__main__' :queue = Queue()markets = [
'eosbtc' ,
'ethbtc' ,
'bchbtc' ,
'neobtc' ]threads = [DownloadThread(market,queue)
for market
in markets]
for thread
in threads:thread.start()ct = ConvertThread(queue)ct.start()
for thread
in threads:thread.join()queue.put((
'xxx' ,
False ))
#practice23:線程間事件通知
1、 Event的使用 + Event.wait與Event.set
from threading
import Event,Thread
def f (e) :print(
'hello' )e.wait()print(
'world' )e = Event()
t = Thread(target=f,args=(e,))
t.start()
可以看出,e.wait方法相當于阻塞函數,阻塞程序繼續執行,直到等到觸發信號e.set()
從運行框可以看出,程序并未執行完。
from threading
import Event,Thread
def f (e) :print(
'hello' )e.wait()print(
'world' )e = Event()
t = Thread(target=f,args=(e,))
t.start()
e.set()
由于e.set(),線程被觸發繼續執行,程序最后運行完退出。
Event對象調用一對wait/set方法后就不能再次調用這對方法了,若想再次調用,必須先對Event對象調用clear方法!
from threading
import Event,Thread
def f (e) :while True :print(
'hello' )e.wait()print(
'world' )e = Event()
t = Thread(target=f,args=(e,))
t.start()
e.set()
由于e.set()使得線程內的阻塞函數e.wait()失效,故循環無限往復
from threading
import Event,Thread
import time
def f (e) :while True :print(
'hello' )e.wait()e.clear()print(
'world' )e = Event()
t = Thread(target=f,args=(e,))
t.start()
e.set()
time.sleep(
1 )
print(
'*' *
40 )
e.set()
主線程與子線程共同維護Event對象e
e.start()啟動子線程,對應輸出hello,然后開始阻塞;主線程e.set()結束子線程的阻塞,e.clear()使得e.start()可以重新生效,輸出world與hello,然循環再次被e.wait()阻塞;
等待一秒,e.set()使得阻塞再次被解除!
2、 實例:
要求: + 多線程下載股票csv數據(生產者) + 單線程轉換為xml文件(消費者) + 單線程打包xml文件(每當生成3個xml文件便打包為一個tar.gz包)
import csv
from xml.etree.ElementTree
import ElementTree,Element
from threading
import Thread,Event
from queue
import Queue
from io
import StringIO
import requests
import os
import tarfile
class DownloadThread (Thread) :'''下載線程''' def __init__ (self,sid,queue) :Thread.__init__(self)self.sid = sidself.filename =
'demo{}' .format(str(sid))self.queue = queue
def download (self,sid) :'''下載csv文件''' url =
'http://quotes.money.163.com/service/chddata.html?code=1%s&start=20150104&end=20160108' % str(sid).rjust(
6 ,
'0' )response = requests.get(url)self.data = StringIO(response.text)
def run (self) :print(
"downloading %s :" % str(self.sid))self.download(self.sid)self.queue.put((self.sid,self.data))
class ConvertThread (Thread) :'''轉換線程''' def __init__ (self,queue,cevent,tevent) :'''轉換線程與打包線程共同維護兩個事件:轉換事件cevent與打包事件tevent''' Thread.__init__(self)self.queue = queueself.cevent = ceventself.tevent = teventself.count =
0 def convert (self,sid,data) :'''csv文件轉換為xml文件''' if data:reader = csv.reader(data)header = next(reader)root = Element(
'data' )
for row
in reader:line = Element(
'row' )root.append(line)
for key,value
in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write(
'1%s.xml' % str(sid).rjust(
6 ,
'0' ),encoding=
'utf-8' )
def run (self) :while True :sid,data = self.queue.get()
if data ==
False :
global tarstoptarstop =
True self.tevent.set()
break print(
"converting %s :" % str(sid))self.convert(sid,data)self.count +=
1 if self.count ==
3 :self.count =
0 self.tevent.set()self.cevent.wait()self.cevent.clear()
class TarThread (Thread) :'''打包線程''' def __init__ (self,cevent,tevent) :'''轉換線程與打包線程共同維護兩個事件:轉換事件cevent與打包事件tevent''' Thread.__init__(self)self.count =
0 self.cevent = ceventself.tevent = tevent
def tar (self) :'''尋找當前文件夾下xml文件,生成打包文件,同時將源文件刪除!''' self.count +=
1 filename =
'%s.tar.gz' % str(self.count)
with tarfile.open(filename,
'w:gz' )
as tar:
for file
in os.listdir(
'.' ):
if file.endswith(
'.xml' ):tar.add(file)os.remove(file)
if not tar.members:os.remove(filename)
def run (self) :global tarstop
while not tarstop
and True :self.tevent.wait()self.tar()self.tevent.clear()self.cevent.set()
if __name__ ==
'__main__' :dcqueue = Queue()tarstop =
False cevent,tevent = Event(),Event()threads = [DownloadThread(i,dcqueue)
for i
in range(
1 ,
11 )]ct = ConvertThread(dcqueue,cevent,tevent)tt = TarThread(cevent,tevent)
for thread
in threads:thread.start()ct.start()tt.start()
for i
in threads:i.join()dcqueue.put((
100 ,
False ))
不足之處:tar線程 最終的退出方式使用了全局變量,不太優雅;守護線程感覺又不滿足條件
#practice24:線程池
concurrent.futures 函數庫有一個 ThreadPoolExecutor 類,可以構建多線程(異步執行多個調用)。
1、 多線程的使用方法
from concurrent.futures
import ThreadPoolExecutor
def handle (a,b) :print(
'hello world' ,str(a*b))
return a*b
executor = ThreadPoolExecutor(max_workers=
3 )
future = executor.submit(handle,
3 ,
4 )
result = future.result()
print(result)
for result
in executor.map(handle,[
1 ,
2 ,
3 ],[
1 ,
2 ,
3 ]):print(result)
2、實例
要求: 1. 構建echo TCP服務器,響應客戶端的請求,即直接返回客戶端發來的數據。 2. TCP服務器開啟10個線程異步處理客戶端請求。 3. 構建echo客戶端,發送請求驗證多線程。
import socket
from concurrent.futures
import ThreadPoolExecutorHOST =
'localhost'
PORT =
12345 def handle_request (conn) :with conn
as subsock:
while True :data = subsock.recv(
1024 )
if not data:
break subsock.sendall(data)
def server (address) :pool = ThreadPoolExecutor(
10 )ip,port = address
with socket.socket()
as s:s.bind(address)s.listen(
5 )
while True :conn,address = s.accept()print(
'Client ' + ip +
":" + str(port) +
' connected' )pool.submit(handle_request,conn)server((
'' ,
12345 ))
import socket
def run_sockets (addr) :with socket.socket()
as s:s.connect(addr)s.sendall(
b'hello world' )data = s.recv(
1024 )print(data)
for i
in range(
7 ):run_sockets((
'localhost' ,
12345 ))
【運行結果】
先運行服務端代碼,作為服務器是無限循環,等待請求
sendall,recv,accept無數據時都會阻塞
必須先運行服務器代碼,再運行多個客戶端!
#practice25:多進程
1、 多進程的定義
from multiprocessing
import Process
def f (a,b) :print(a*b)p = Process(target=f,args=(
1 ,
5 ))
p.start()
print(
'main process' )
p.join()
print(
'main1 process' )
2、 進程間通信
from multiprocessing
import Process,Queue,Pipe
q = Queue()
def f (q) :print(
'hello' )print(q.get())print(
'world' )p = Process(target=f,args=(q,))
p.start()
q.put(
'yes it is' )
from multiprocessing
import Process,Pipe
def f (c) :data = c.recv()print(data)c1,c2 = Pipe()
p = Process(target=f,args=(c2,))
p.start()
c1.send(
'hello world' )
3、 多進程使用場景:cpu密集型操作
from threading
import Thread
from multiprocessing
import Process
def isarmstrong (n) :'''求n是不是水仙花數,返回bool結果(無須關注具體算法)''' a,t = [],n
while t >
0 :a.append(t %
10 )t /=
10 k = len(a)
return sum(x * k
for x
in a) == n
def findarmstrong (a,b) :'''在a-b間尋找水仙花樹''' result = [x
for x
in range(a,b)
if isarmstrong(x)]print(result)
def run_multithreads (*args) :'''采用多線程處理尋找水仙花樹的任務,args傳入的是多個查找范圍''' threads = [Thread(target=findarmstrong,args=(a,b))
for a,b
in args]
for thread
in threads:thread.start()
for thread
in threads:thread.join()
def run_multiprocess (*args) :'''采用多線程處理尋找水仙花樹的任務,args傳入的是多個查找范圍''' proceses = [Process(target=findarmstrong,args=(a,b))
for a,b
in args]
for process
in proceses:process.start()
for process
in proceses:process.join()
if __name__ ==
'__main__' :
import timestart = time.time()run_multithreads((
200000 ,
300000 ),(
300000 ,
400000 ))end = time.time()print(end-start)
多進程明顯比多線程快
總結
以上是生活随笔 為你收集整理的python3练习题:并发编程(21-25) 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。