python压测接口_python的一个接口压测脚本
1 importrequests2 import queue #Queue模塊中提供了同步的、線程安全的隊列類,包括
3 #FIFO(先入先出)隊列Queue,LIFO(后入先出)隊列
4 #LifoQueue,和優先級隊列PriorityQueue。這些隊列都
5 #實現了鎖原語,可在多線程通信中直接使用。
6 importthreading7 importtime8
9 status_code_list =[]10 exec_time =011 classMyThreadPool:12 def __init__(self, maxsize): #定義隊列時有一個默認的參數
13 #maxsize, 如果不指定隊列的長度,即manxsize=0,那么隊列的長
14 #度為無限長,如果定義了大于0的值,那么隊列的長度就是maxsize。
15 self.maxsize =maxsize16 self._pool =queue.Queue(maxsize)17 #maxsize設置隊列的大小為pool的大小
18 for _ in range(maxsize): #為什么用一個下劃線,因為實際上這
19 #里沒用到這個變量,所以用一個符號就可以了。
20 self._pool.put(threading.Thread) #往pool里放線程數
21
22 defget_thread(self):23 returnself._pool.get()24
25 defadd_thread(self):26 self._pool.put(threading.Thread)27
28 defrequest_time(func):29 def inner(*args, **kwargs):30 globalexec_time31 start_time =time.time()32 func(*args, **kwargs)33 end_time =time.time()34 exec_time = end_time-start_time35
36 returninner37
38
39 defget_url(url):40 globalx,status_code_list41 headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36',42 }43 response = requests.get(url,headers=headers)44 code =response.status_code45 status_code_list.append(code)46 print(code)47 returncode48
49
50 def get_count(_url='http://news.baidu.com/sports',_count=100): #:param count: 每個線程請求的數量
51 globalstatus_code_list,url,count52 for i inrange(count):53 get_url(url)54
55 defrequest_status():56 count_num =len(status_code_list)57 set_code_list =set(status_code_list)58 status_dict ={}59 for i inset_code_list:60 status_dict[i] =str(status_code_list).count(str(i))61 echo_str(count_num, set_code_list, status_dict)62
63 defecho_str(count_num,set_code_list,status_dict):64 print('=======================================')65 print('請求總次數:%s'%count_num)66 print('請求時長:%s秒'%int(exec_time))67 second_request = count_num/int(exec_time)68 print('每秒請求約:%s次'%int(second_request))69 print('狀態碼 | 次數')70
71 for k,v instatus_dict.items():72 print(str(k)+'|'+str(v))73 print('=======================================')74
75
76 @request_time77 def run(url,thread_num=10,thread_pool=10):78 '''
79 :param thread_num: 總共執行的線程數(總的請求數=總共執行的線程數*每個線程循環請求的數量)80 :param thread_pool: 線程池數量81 :param url: 請求的域名地址82 '''
83 globalx,status_code_list84 pool =MyThreadPool(thread_pool)85 for i inrange(thread_num):86 t =pool.get_thread()87 obj = t(target=get_count)88 obj.start()89 obj.join()90
91
92 if __name__ == '__main__':93 count = 10 #單個線程的請求數
94 url = 'http://baidu.com'
95 run(url,100,100)96 request_status()
總結
以上是生活随笔為你收集整理的python压测接口_python的一个接口压测脚本的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: windows10安装python环境_
- 下一篇: 为什么vs会输出一个框作为结果_检测与分