python locust 能压测数据库_深入浅出 Locust 实现
寫在前面
本文將嘗試通過一篇文章講清楚開源壓測工具——Locust的原理和實現過程,聚焦在實現上,不拘泥在一堆源碼中(本身Locust的源碼就比較簡單)。本人并不是Locust鐵粉,只是對它的實現方式感興趣,所以jmeter、wrk的粉絲們就不要來battle我了。在我看來,工具都有各自的優勢和缺陷,區別在于使用的人,能否選擇最合適的工具達到目的,當然你可以掌握所有的主流工具,對于特定的任務,哪個簡單用哪個,也可以鐘情于一個工具,信手拈來,只要你能在預期時間內完成也無可厚非。
網上有很多關于Locust源碼實現的講解,但是我覺得都太片面了,并且沒有講清楚,恰巧Locust在上個月(5月)發布了1.x版本,整個重構力度還是蠻大的,因此我想通過這篇文章,介紹它最新的設計以及展示最全面的Locust實現,讓大家感受到Locust的簡單與優雅(整個Locust項目不過4M大小)。
一、架構與核心類
基本介紹
Locust是開源、使用Python開發、基于事件、支持分布式并且提供Web UI進行測試執行和結果展示的性能測試工具。
Locust的主要特性有兩個:
模擬用戶操作:支持多協議,Locust可以用于壓測任意協議類型的系統
并發機制:摒棄了進程和線程,采用協程(gevent)的機制,單臺測試機可以產生數千并發壓力
Locust使用了以下幾個核心庫:
1) gevent
gevent是一種基于協程的Python網絡庫,它用到Greenlet提供的,封裝了libevent事件循環的高層同步API。
2) flask
Python編寫的輕量級Web應用框架。
3) requests
Python Http庫
4) msgpack-python
MessagePack是一種快速、緊湊的二進制序列化格式,適用于類似JSON的數據格式。msgpack-python主要提供MessagePack數據序列化及反序列化的方法。
5) pyzmq
pyzmq是zeromq(一種通信隊列)的Python實現,主要用來實現Locust的分布式模式運行
系統架構及對比
我們知道,完整的壓測系統應該包含以下組件:
相比于主流的壓測系統LoadRunner和Jmeter,Locust是一個更為純粹的開源壓測系統,框架本身的結構并不復雜,甚至只提供了最基礎的組件,但也正因為如此,Locust具有極高的可編程性和擴展性。對于測試開發同學來說,可以比較輕松地使用Locust實現對任意協議的壓測。
工具簡單對比
LoadRunner
Jmeter
Locust
壓力生成器
√
√
√
負載控制器
√
√
√
系統資源監控器
√
x
x
結果采集器
√
√
√
結果分析器
√
√
√
上表簡單展示了幾個工具包含的壓測組件,Locust的架構非常簡單,部分組件的實現甚至都不完善,比如結果分析器,Locust本身并沒有很詳細的測試報告。但這并不妨礙它成為優秀的開源框架。
Locust的架構
Locust 最近發布了1.x 版本,代碼進行了重構,重封裝,對很多類進行了重命名,本文盡量使用新名詞,但涉及的舊名詞不影響理解。
locust架構上使用master-slave模型,支持單機和分布式
master和slave(即worker)使用 ZeroMQ 協議通訊
提供web頁面管理master,從而控制slave,同時展示壓測過程和匯總結果
可選no-web模式(一般用于調試)
基于Python本身已經支持跨平臺
為了減少CPython的GIL限制,充分利用多核CPU,建議單機啟動CPU核數的slave(多進程)。
核心類
上面展示的Locust架構,從代碼層面來看究竟是如何實現的呢?下面我們就來窺視一番:
簡單來說,Locust的代碼分為以下模塊:
Runner-執行器,Locust的核心類,定義了整個框架的執行邏輯,實現了Master、Slave(worker)等執行器
User-壓測用例,提供了HttpUser壓測http協議,用戶可以定義事務,斷言等,也可以實現特定協議的User
Socket-通信器,提供了分布式模式下master和slave的交互方式
RequestStats-采集、分析器,定義了結果分析和數據上報格式
EventHook-事件鉤子,通過預先定義的事件使得我們可以在這些事件發生時(比如slave上報)做一些額外的操作
WebUI-提供web界面的操作臺和壓測過程展示
下面我們看看核心類的主要成員變量和方法
用戶定義的User類作為Runner的user_classes傳入
master的client_listener監聽施壓端client消息
slave的worker方法監聽master消息
slave的stats_reporter方法上報壓測數據,默認3s上報一次
slave的start啟動協程,使用master分配的并發數開始壓測
slave默認1s上報一次心跳,如果master超過3s未收到某個slave的心跳則會將其標記為missing狀態
TaskSet和User持有client,可以在類中直接發起客戶端請求,client可以自己實現,Locust只實現了HttpUser
二、Runner的狀態與通信機制
接下來我們繼續了解Locust分布式壓測的核心:Runner的狀態和通信機制。我們知道Locust等壓測工具支持分布式壓測,就是說理論上可以通過不斷添加壓力機(worker)提高并發數量,這個機制讓使用者可以自由地增減機器資源,從而達到期望的施壓能力。
Runner狀態機
在分布式場景下,除了數據一致性,狀態同步也是非常重要的。在Locust的master-slave架構下,需要管理master和slave的狀態,不僅為了控制壓測的開始或停止,也是為了掌握當前的壓力機情況。那么都有哪些狀態?
狀態
說明
ready
準備就緒,master和slave啟動后默認狀態
hatching
正在孵化壓力機,對master來說正在告訴slave們開始干活,對slave來說是過渡狀態,因為它們馬上要running
running
正在壓測
cleanup
當發生GreenletExit時的狀態,一般不會出現
stopping
表示正在通知slave們停止,只有master有這個狀態
stopped
壓測已經停止
missing
狀態丟失,只有slave有的狀態,默認3秒如果master沒有收到slave的心跳就會認為它missing了,一般是進程沒有正常退出導致
Runner的狀態不多,但是在壓測過程中起到非常重要的作用,狀態之間是按約定的方式進行扭轉的,我們使用Locust的web界面管理master的狀態,master根據我們的操作通過通信機制推進slave的狀態。
通信機制
Master與Slave之間是通過Zeromq建立的TCP連接進行通信的(一對多)。
ZeroMQ(簡稱ZMQ)是一個基于消息隊列的多線程網絡庫,其對套接字類型、連接處理、幀、甚至路由的底層細節進行抽象,提供跨越多種傳輸協議的套接字。
ZMQ是網絡通信中新的一層,介于應用層和傳輸層之間(按照TCP/IP劃分),其是一個可伸縮層,可并行運行,分散在分布式系統間。
master與各個slave各維持一個TCP連接,在每個連接中,master下發的命令,slave上報的信息等自由地的傳輸著。
消息格式
class Message(object):
def __init__(self, message_type, data, node_id):
self.type = message_type
self.data = data
self.node_id = node_id
def serialize(self):
return msgpack.dumps((self.type, self.data, self.node_id))
@classmethod
def unserialize(cls, data):
msg = cls(*msgpack.loads(data, raw=False))
return msg
其中message_type指明消息類型,data是實際的消息內容,node_id指明機器ID。Locust使用msgpack做序列化與反序列化處理。
消息類型和結構
master和slave之間的消息類型不過10種,其中大部分消息由slave上報給master,下方表格可以清楚的看到,實現一套分布式系統并沒有那么復雜。
序號
message_type
發送者
data格式
發送時機
1
client_ready
slave
空
slave啟動后或壓測停止完成
2
hatching
slave
空
接受到master的hatch先發送一個確認
3
hatch_complete
slave
{“count”:n}
所有并發用戶已經孵化完成
4
client_stopped
slave
空
停止所有并發用戶后
5
heatbeat
slave
{“state”: x,”current_cpu_usage”:x}
默認每3秒上報一次心跳
6
stats
slave
{“stats”:[], “stats_total”:{}, “errors”:{},”user_count”:x}
每3秒上報一次壓測信息
7
exception
slave
{“msg”:x, “traceback”:x}
TaskSet.run出現異常
8
hatch
master
{“hatch_rate”:x, “num_users”:x, “host”:x, “stop_timeout”:x}
開始swarm
9
stop
master
空
點擊stop
10
quit
master,slave
空
手動或其他方式退出的時候
可以看到上面有一種非常重要的消息類型——stats,壓測的結果采集都封裝在這個消息里。
三、結果采集器
統計對象
從上面我們知道有一個非常重要的消息類型——stats,這個是slave給master發送的消息,默認每3秒鐘上報一次。stats消息的結構如下所示:
{
"stats": [],
"stats_total": {},
"errors": {},
"user_count": 10
}
實際上,slave也是持有著類似如上json格式的三個對象:
RequestStats
StatsEntry
StatsError
其中每一個locust進程會維護一個全局RequestStats單例global_stats,這個實例包含一個StatsEntry實例total(對應json的stats_total),以及兩個字典entries(對應json的stats)和errors(對應json的errors),其中entries字典key為(name,method),對應值為一個StatsEntry實例,另一個errors字典的key為(name,method,error),對應值為一個StatsError實例。可見,global_stats包含了單個slave匯總的信息,以及各個請求url或name的統計信息。在分布式的場景下,每個slave都會維護一個global_stats, 在一個上報周期達到后將信息發送到master,發送完成后就會重置所有數據,進入下一個周期的統計。
上圖中紅色的字段是slave真正上報給master的數據。
統計過程
那么slave是如何統計消息,又究竟需要上報什么內容給master?master又是如何匯總的呢?下面我們來看看整個統計過程:
在每一次請求后,無論是成功還是失敗,都會觸發對應的request_success或者request_failure事件,stats.py文件中的數據統計模塊訂閱了對應的事件,會調用global_stats對數據進行統計。
在slave的一個上報周期達到后,觸發on_report_to_master事件,此時global_stats就會依次調用以下方法對數據進行序列化:
serialize_stats
total.get_stripped_report
serialize_errors
其實也就是對上面提到的total和兩個字典中的內容進行序列化,其實就是轉為json字符串。
def on_report_to_master(client_id, data):
data["stats"] = global_stats.serialize_stats()
data["stats_total"] = global_stats.total.get_stripped_report()
data["errors"] = global_stats.serialize_errors()
global_stats.errors = {}
下圖是斷點過程看到的stats消息內容(在msgpack序列化之前):
每秒請求數和響應時間及其對應請求個數
StatsEntry有兩個比較重要的對象,分別是num_reqs_per_sec和response_times,它們都是字典類型,其中num_reqs_per_sec的key是秒時間戳,顯示當前秒完成了多少個請求,統計的時間是完成請求的時刻,比如如果一個請求從第0秒開始,第3秒完成,那么這個請求統計在第3秒的時間戳上,這個對象可以很方便的計算出rps。response_times的key是響應時間,單位是豪秒,為了防止key過多,做了取整,比如147 取 150, 3432 取 3400 和 58760 取 59000,這個是為了方便獲得類似90%請求的完成時間(小于等于該時間),99%請求的完成時間,下面具體的源碼:
def _log_time_of_request(self, current_time):
t = int(current_time)
self.num_reqs_per_sec[t] = self.num_reqs_per_sec.setdefault(t, 0) + 1
self.last_request_timestamp = current_time
def _log_response_time(self, response_time):
if response_time is None:
self.num_none_requests += 1
return
self.total_response_time += response_time
if self.min_response_time is None:
self.min_response_time = response_time
self.min_response_time = min(self.min_response_time, response_time)
self.max_response_time = max(self.max_response_time, response_time)
# to avoid to much data that has to be transfered to the master node when
# running in distributed mode, we save the response time rounded in a dict
# so that 147 becomes 150, 3432 becomes 3400 and 58760 becomes 59000
if response_time < 100:
rounded_response_time = response_time
elif response_time < 1000:
rounded_response_time = int(round(response_time, -1))
elif response_time < 10000:
rounded_response_time = int(round(response_time, -2))
else:
rounded_response_time = int(round(response_time, -3))
# increase request count for the rounded key in response time dict
self.response_times.setdefault(rounded_response_time, 0)
self.response_times[rounded_response_time] += 1
master匯總信息
slave的每一個stats消息到達master后,都會觸發master的slave_report事件,master也擁有自己的global_stats,因此只需要將對應的信息進行累加(可以理解是所有slave對應內容的匯總)。具體在StatsEntry的extend方法:
def extend(self, other):
"""
Extend the data from the current StatsEntry with the stats from another
StatsEntry instance.
"""
if self.last_request_timestamp is not None and other.last_request_timestamp is not None:
self.last_request_timestamp = max(self.last_request_timestamp, other.last_request_timestamp)
elif other.last_request_timestamp is not None:
self.last_request_timestamp = other.last_request_timestamp
self.start_time = min(self.start_time, other.start_time)
self.num_requests = self.num_requests + other.num_requests
self.num_none_requests = self.num_none_requests + other.num_none_requests
self.num_failures = self.num_failures + other.num_failures
self.total_response_time = self.total_response_time + other.total_response_time
self.max_response_time = max(self.max_response_time, other.max_response_time)
if self.min_response_time is not None and other.min_response_time is not None:
self.min_response_time = min(self.min_response_time, other.min_response_time)
elif other.min_response_time is not None:
# this means self.min_response_time is None, so we can safely replace it
self.min_response_time = other.min_response_time
self.total_content_length = self.total_content_length + other.total_content_length
for key in other.response_times:
self.response_times[key] = self.response_times.get(key, 0) + other.response_times[key]
for key in other.num_reqs_per_sec:
self.num_reqs_per_sec[key] = self.num_reqs_per_sec.get(key, 0) + other.num_reqs_per_sec[key]
for key in other.num_fail_per_sec:
self.num_fail_per_sec[key] = self.num_fail_per_sec.get(key, 0) + other.num_fail_per_sec[key]
核心指標
Locust核心的指標其實就4個:
并發數
RPS
響應時間
異常率
我們還是回到文章開頭的那個json:
{
"stats": [],
"stats_total": {},
"errors": {},
"user_count": 10
}
結合上方Locust的壓測過程截圖,我們可以看到,各個接口的指標其實就是stats對象里的各個字段,而最下方匯總的Aggregated這一行則對應stats_total的各個字段,盡管這個json只是slave單個stats消息的內容,卻也是最終要顯示的內容,只是master對各個消息做了匯總而已。匯總的方式也相當簡單,請見上方的StatsEntry的extend方法。
因為master和web模塊是一起部署的,因此web可以直接使用master的global_stats對象并展示其內容,可以做到動態顯示。
本文講解的eventHook基于Locust的1.x版本。
四、事件鉤子
Locust中的事件鉤子,我認為設計得非常巧妙,所以最后再來講一下它。event.py模塊包含了兩個類,一個是事件鉤子定義類EventHook,一個是事件鉤子類型類Events,為不同的事件提供hook。事件處理函數注冊相應的hook以后,我們可以很方便的的基于event觸發處理函數,實現事件驅動。
EventHook定義了三個方法:
def add_listener(self, handler):
self._handlers.append(handler)
return handler
def remove_listener(self, handler):
self._handlers.remove(handler)
def fire(self, *, reverse=False, **kwargs):
if reverse:
handlers = reversed(self._handlers)
else:
handlers = self._handlers
for handler in handlers:
handler(**kwargs)
可以看到add_listener、remove_listener的作用是注冊或刪除監聽函數,fire方法是觸發處理函數。EventHook的實現相比Locust 0.x版本有較大改變(不再使用Python的內置魔方方法)。
Events中包含了11個事件鉤子,分別是:
事件鉤子的實現原理
事件鉤子的原理可以簡單理解成,1.定義處理函數 ——> 2.注冊到某個eventHook ——> 3.在某個時機觸發eventHook ——> 4.該eventHook遍歷執行所有處理函數。在代碼層面就是定義函數,然后add_listener,最后在想要的位置觸發eventHook的fire方法并傳入定義好的參數,這里參數是固定的,不能隨意傳入,之所以每個處理函數都能對參數進行修改,是因為這里的參數傳遞是『引用傳遞』,明白了這一點就算是掌握了EventHook的基本原理。
其中Locust本身預定義并注冊了一些處理函數,比如事件鉤子report_to_master、worker_report都有對應的處理函數,實現分布式模式下數據上報時數據的構造和計算,比如事件鉤子init,初始化master的WebUI。
事件鉤子的作用
那么,事件鉤子究竟有什么作用?
在我看來有以下作用:
代碼解耦,比如解耦Runner和Stats模塊。
提供擴展性,通過預置的鉤子和觸發點,為使用者提供了可擴展的特性。
舉擴展性的例子,使用者可以很輕松地:
往worker上報的stats消息添加自定義數據
在Locust實例啟動時在WebUI中添加自定義接口
在每次請求成功或失敗之后做一些額外的事情
如何使用鉤子
在Locust 1.x版本之前,使用以下方法定義和注冊鉤子:
def on_report_to_master(client_id, data):
data["stats"] = global_stats.serialize_stats()
data["stats_total"] = global_stats.total.get_stripped_report()
data["errors"] = global_stats.serialize_errors()
global_stats.errors = {}
def on_slave_report(client_id, data):
for stats_data in data["stats"]:
entry = StatsEntry.unserialize(stats_data)
request_key = (entry.name, entry.method)
if not request_key in global_stats.entries:
global_stats.entries[request_key] = StatsEntry(global_stats, entry.name, entry.method)
global_stats.entries[request_key].extend(entry)
...
events.report_to_master += on_report_to_master
events.slave_report += on_slave_report
在1.x及之后的版本有兩種使用方式:
# 方式一與之前類似
def on_report_to_master(client_id, data):
data["stats"] = stats.serialize_stats()
data["stats_total"] = stats.total.get_stripped_report()
data["errors"] = stats.serialize_errors()
stats.errors = {}
def on_worker_report(client_id, data):
for stats_data in data["stats"]:
entry = StatsEntry.unserialize(stats_data)
request_key = (entry.name, entry.method)
if not request_key in stats.entries:
stats.entries[request_key] = StatsEntry(stats, entry.name, entry.method, use_response_times_cache=True)
stats.entries[request_key].extend(entry)
for error_key, error in data["errors"].items():
if error_key not in stats.errors:
stats.errors[error_key] = StatsError.from_dict(error)
else:
stats.errors[error_key].occurrences += error["occurrences"]
stats.total.extend(StatsEntry.unserialize(data["stats_total"]))
events.report_to_master.add_listener(on_report_to_master)
events.worker_report.add_listener(on_worker_report)
# 方式二,使用裝飾器
@events.report_to_master.add_listener
def on_report_to_master(client_id, data):
"""
This event is triggered on the worker instances every time a stats report is
to be sent to the locust master. It will allow us to add our extra content-length
data to the dict that is being sent, and then we clear the local stats in the worker.
"""
data["content-length"] = stats["content-length"]
stats["content-length"] = 0
@events.worker_report.add_listener
def on_worker_report(client_id, data):
"""
This event is triggered on the master instance when a new stats report arrives
from a worker. Here we just add the content-length to the master's aggregated
stats dict.
"""
stats["content-length"] += data["content-length"]
第二種方式更加簡潔,不容易忘記注冊處理函數。
寫在最后
首先需要聲明的是,本人并不是性能測試方向的專家,只是做過一些性能測試,搗鼓過一些工具,之所以寫出這篇文章,初衷只是為了在組內分享。了解Locust的人,很少會真的用他來進行大型業務場景壓測,一是它的壓測性能受限,二是它沒有很好的測試報告,現在這些在我看來已經不是問題,我想在后面分兩篇文章介紹,敬請期待:
一、如何提高Locust的壓測性能——boomer
二、重新定義Locust的測試報告
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的python locust 能压测数据库_深入浅出 Locust 实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 和搜狗输入法快捷键冲突_电脑输入法怎么设
- 下一篇: pythonexcel表格教程_pyth