上下文管理、线程池、redis订阅和发布
一:上下文管理:
對(duì)于一些對(duì)象在使用之后,需要關(guān)閉操作的。比如說(shuō):socket、mysql數(shù)據(jù)庫(kù)連接、文件句柄等。
都可以用上下文來(lái)管理。
語(yǔ)法結(jié)構(gòu):
1 Typical usage: 2 3 @contextmanager 4 def some_generator(<arguments>): 5 <setup> 6 try: 7 yield <value> 8 finally: 9 <cleanup> 10 11 This makes this: 12 13 with some_generator(<arguments>) as <variable>: 14 <body>?
code:
1 import socket 2 import contextlib 3 4 5 @contextlib.contextmanager 6 def sock_server(host,port): 7 sk=socket.socket() 8 sk.bind((host,port)) 9 sk.listen(4) 10 try: 11 yield sk 12 finally: 13 sk.close() 14 15 with sock_server("127.0.0.1",22) as soc: 16 print(soc)執(zhí)行順序:
解釋:python從上到下依次解釋:
1、當(dāng)?shù)絯ith的時(shí)候,執(zhí)行with內(nèi)socket_server("127.0.0.1",22),跳到
2、被contextlib.contextmanager裝飾的函數(shù)。
3、依次執(zhí)行函數(shù)socket_server到y(tǒng)ield 并把sk返回給第4步的sco變量
4、然后執(zhí)行with下面的代碼塊,執(zhí)行print語(yǔ)句。
5、當(dāng)with語(yǔ)句的代碼塊執(zhí)行完。跳到第3步的yeild。
6、執(zhí)行finally語(yǔ)句里的代碼塊。
二:線程池(threadpool)
自己版本:
1 #!/bin/env python 2 #author:evil_liu 3 #date:2016-7-21 4 #description: thread pool 5 6 import threading 7 import time 8 import queue 9 10 class Thread_Poll: 11 ''' 12 功能:該類(lèi)主要實(shí)現(xiàn)多線程,以及線程復(fù)用。 13 ''' 14 def __init__(self,task_num,max_size): 15 ''' 16 功能:該函數(shù)是初始化線程池對(duì)象。 17 :param task_num: 任務(wù)數(shù)量。 18 :param max_size: 線程數(shù)量。 19 :return:無(wú)。 20 ''' 21 self.task_num=task_num 22 self.max_size=max_size 23 self.q=queue.Queue(task_num)#設(shè)置任務(wù)隊(duì)列的。 24 self.thread_list=[] 25 self.res_q=queue.Queue()#設(shè)置結(jié)果隊(duì)列。 26 27 def run(self,func,i,call_back=None): 28 ''' 29 功能:該函數(shù)是線程池運(yùn)行主函數(shù)。 30 :param func: 傳入任務(wù)主函數(shù)。 31 :param *args: 任務(wù)函數(shù)參數(shù),需要是元組形式。 32 :param call_back: 回調(diào)函數(shù)。 33 :return: 無(wú)。 34 ''' 35 if len(self.thread_list)<self.max_size:#如果目前線程數(shù)小于我們定義的線程的個(gè)數(shù),進(jìn)行創(chuàng)建。 36 self.creat_thread() 37 misson=(func,i,call_back)#往任務(wù)隊(duì)列放任務(wù)。 38 self.q.put(misson) 39 40 def creat_thread(self): 41 ''' 42 功能:該函數(shù)主要是創(chuàng)建線程,并調(diào)用call方法。 43 :return: 無(wú)。 44 ''' 45 t=threading.Thread(target=self.call)#創(chuàng)建線程 46 t.start() 47 48 def call(self): 49 ''' 50 功能:該函數(shù)是線程循環(huán)執(zhí)行任務(wù)函數(shù)。 51 :return: 無(wú)。 52 ''' 53 cur_thread=threading.currentThread 54 self.thread_list.append(cur_thread) 55 event=self.q.get() 56 while True: 57 func,args,cal_ba=event#獲取任務(wù)函數(shù)。 58 try: 59 res=func(*args)#執(zhí)行任務(wù)函數(shù)。注意參數(shù)形式是元組形式。 60 flag="OK" 61 except Exception as e: 62 print(e) 63 res=False 64 flag="fail" 65 self.res(res,flag)#調(diào)用回調(diào)函數(shù),將執(zhí)行結(jié)果返回到隊(duì)列中。 66 try: 67 event=self.q.get(timeout=2)#如果任務(wù)隊(duì)列為空,獲取任務(wù)超時(shí)2s超過(guò)2s線程停止執(zhí)行任務(wù),并退出。 68 except Exception: 69 self.thread_list.remove(cur_thread) 70 break 71 def res(self,res,status): 72 ''' 73 功能:該方法主要是將執(zhí)行結(jié)果方法隊(duì)列中。 74 :param res: 任務(wù)函數(shù)的執(zhí)行結(jié)果。 75 :param status: 執(zhí)行任務(wù)函數(shù)的結(jié)果,成功還是失敗。 76 :return: 無(wú)。 77 ''' 78 da_res=(res,status) 79 self.res_q.put(da_res) 80 81 def task(x,y): 82 ''' 83 功能:該函數(shù)主要需要執(zhí)行函數(shù)。 84 :param x: 參數(shù)。 85 :return: 返回值1,表示執(zhí)行成功。 86 ''' 87 print(x) 88 return x+y 89 def wri_fil(x): 90 ''' 91 功能:該函數(shù)主要講結(jié)果隊(duì)列中的結(jié)果寫(xiě)入文件中。 92 :param x: 任務(wù)長(zhǎng)度。 93 :return: 無(wú)。 94 ''' 95 while True:#將執(zhí)行結(jié)果,從隊(duì)列中獲取結(jié)果并將結(jié)果寫(xiě)入文件中。 96 time.sleep(1) 97 if pool.res_q.qsize()==x:#當(dāng)隊(duì)列當(dāng)前的長(zhǎng)度等于任務(wù)執(zhí)行次數(shù),表示任務(wù)執(zhí)行完成。 98 with open('1.txt','w') as f1: 99 for i in range(pool.res_q.qsize()): 100 try: 101 data=pool.res_q.get(timeout=2) 102 f1.write('mission result:%s,status:%s\n'%data) 103 except Exception: 104 break 105 break 106 else: 107 continue 108 if __name__ == '__main__': 109 pool=Thread_Poll(10,5)#初始化線程池對(duì)象。 110 for i in range(10):#循環(huán)任務(wù)。 111 pool.run(task,(1,2)) 112 wri_fil(10)?
老師版本:注意老師在創(chuàng)建線程的時(shí)候,如果此時(shí)任務(wù)隊(duì)列中沒(méi)有任務(wù)的時(shí)候,不會(huì)創(chuàng)建其他線程。在線程執(zhí)行完任務(wù)之后,將線程加入空閑線程的列表中,然后讓當(dāng)前線程去隊(duì)列里獲取任務(wù),利用queue里的get()方法阻塞的作用的,如果一直阻塞的話,
然后表示空閑的列表中的加入的線程 一直有,此時(shí)表示創(chuàng)建線程數(shù)已經(jīng)滿足任務(wù)需求,如果不阻塞則空閑線程列表里沒(méi)有空余線程。而是獲取任務(wù),執(zhí)行任務(wù)。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import queue 5 import threading 6 import contextlib 7 import time 8 9 StopEvent = object() 10 11 12 class ThreadPool(object): 13 14 def __init__(self, max_num, max_task_num = None): 15 if max_task_num: 16 self.q = queue.Queue(max_task_num) 17 else: 18 self.q = queue.Queue() 19 self.max_num = max_num 20 self.cancel = False 21 self.terminal = False 22 self.generate_list = [] 23 self.free_list = [] 24 25 def run(self, func, args, callback=None): 26 """ 27 線程池執(zhí)行一個(gè)任務(wù) 28 :param func: 任務(wù)函數(shù) 29 :param args: 任務(wù)函數(shù)所需參數(shù) 30 :param callback: 任務(wù)執(zhí)行失敗或成功后執(zhí)行的回調(diào)函數(shù),回調(diào)函數(shù)有兩個(gè)參數(shù)1、任務(wù)函數(shù)執(zhí)行狀態(tài);2、任務(wù)函數(shù)返回值(默認(rèn)為None,即:不執(zhí)行回調(diào)函數(shù)) 31 :return: 如果線程池已經(jīng)終止,則返回True否則None 32 """ 33 if self.cancel: 34 return 35 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 36 self.generate_thread() 37 w = (func, args, callback,) 38 self.q.put(w) 39 40 def generate_thread(self): 41 """ 42 創(chuàng)建一個(gè)線程 43 """ 44 t = threading.Thread(target=self.call) 45 t.start() 46 47 def call(self): 48 """ 49 循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù) 50 """ 51 current_thread = threading.currentThread() 52 self.generate_list.append(current_thread) 53 54 event = self.q.get() 55 while event != StopEvent: 56 57 func, arguments, callback = event 58 try: 59 result = func(*arguments) 60 success = True 61 except Exception as e: 62 success = False 63 result = None 64 65 if callback is not None: 66 try: 67 callback(success, result) 68 except Exception as e: 69 pass 70 71 with self.worker_state(self.free_list, current_thread): 72 if self.terminal: 73 event = StopEvent 74 else: 75 event = self.q.get() 76 else: 77 78 self.generate_list.remove(current_thread) 79 80 def close(self): 81 """ 82 執(zhí)行完所有的任務(wù)后,所有線程停止 83 """ 84 self.cancel = True 85 full_size = len(self.generate_list) 86 while full_size: 87 self.q.put(StopEvent) 88 full_size -= 1 89 90 def terminate(self): 91 """ 92 無(wú)論是否還有任務(wù),終止線程 93 """ 94 self.terminal = True 95 96 while self.generate_list: 97 self.q.put(StopEvent) 98 99 self.q.queue.clear() 100 101 @contextlib.contextmanager 102 def worker_state(self, state_list, worker_thread): 103 """ 104 用于記錄線程中正在等待的線程數(shù) 105 """ 106 state_list.append(worker_thread) 107 try: 108 yield 109 finally: 110 state_list.remove(worker_thread) 111 112 113 114 # How to use 115 116 117 pool = ThreadPool(5) 118 119 def callback(status, result): 120 # status, execute action status 121 # result, execute action return value 122 pass 123 124 125 def action(i): 126 print(i) 127 128 for i in range(30): 129 ret = pool.run(action, (i,), callback) 130 131 time.sleep(5) 132 print(len(pool.generate_list), len(pool.free_list)) 133 print(len(pool.generate_list), len(pool.free_list)) 134 # pool.close() 135 # pool.terminate()?
轉(zhuǎn)載于:https://www.cnblogs.com/evilliu/p/5724933.html
總結(jié)
以上是生活随笔為你收集整理的上下文管理、线程池、redis订阅和发布的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 川p是哪里的车牌 川p车牌的区域归属及相
- 下一篇: 06款的大众速腾,播放车载U盘音乐怎么调