mysql 连接查询_Swoole 实战:MySQL 查询器的实现(协程连接池)
Swoole 實戰:MySQL 查詢器的實現(協程連接池)
需求分析
本篇我們將通過 Swoole 實現一個自帶連接池的 MySQL 查詢器:
1. 支持通過鏈式調用構造并執行 SQL 語句;
2. 支持連接池技術;
3. 支持多協程事務并發執行(協程安全性);
4. 支持連接對象的健康檢測;
5. 支持連接對象斷線重連;
6. 程序需要可擴展,為未來的改造留好擴展點;
使用示例
查詢:
$query->select(['uid', 'name'])
->from('users u')
->join('auth_users au', "u.uid=au.uid")
->where(['uid' => $uid])
->groupBy("u.phone")
->having("count(u.phone)>1")
->orderBy("u.uid desc")
->limit(10, 0)
->list();
插入:
$query->insert('users')
->values(
[
[
'name' => 'linvanda',
'phone' => '18687664562',
'nickname' => '林子',
],
[
'name' => 'xiake',
'phone' => '18989876543',
'nickname' => '俠客',
],
])->execute();// 這里是批量插入,不需要批量插入的話,傳入一維數組即可
// 延遲插入$query->insert('users')
->delayed()
->values(
[
'name' => 'linvanda',
'phone' => '18687664562',
'nickname' => '林子',
])->execute();
更新:
$query->update('users u')
->join('auth_users au', "u.uid=au.uid")
->set(['u.name' => '粽子'])
->where("u.uid=:uid", ['uid' => 123])
->execute();
刪除:
$query->delete('users')
->where("uid=:uid", ['uid' => 123])
->execute();
事務:
$query->begin();$query->update('users u')
->join('auth_users au', "u.uid=au.uid")
->set(['u.name' => '粽子'])
->where("u.uid=:uid", ['uid' => 123])
->execute();...$query->commit();
模塊設計
1. 查詢模塊:
查詢器(Query,入口)
SQL構造器(Builder)
2. 事務模塊:
事務接口(ITransaction)
協程版事務類(CoTransaction)
協程上下文(TContext)
3. 連接池模塊:
連接池接口(IPool)
協程連接池類(CoPool)
4. 數據庫連接(驅動)模塊:
連接接口(IConnector)
連接生成器接口(IConnectorBuilder)
協程連接類(CoConnector)
協程連接生成器(CoConnectorBuilder)
數據庫連接配置類(DBConfig)
數據庫連接(統計)信息類(ConnectorInfo)
我們希望通過統一的入口對外提供服務,將復雜性隱藏在內部。該統一入口由查詢模塊提供。該模塊由查詢器和SQL 構造器構成,其中查詢器作為外界唯一入口,而構造器是一個 Trait,因為這樣可以讓外界通過查詢器入口直接使用構造器提供的 SQL 組裝功能。
查詢器通過事務模塊執行 SQL。這里的事務有兩個層面含義:數據庫操作的事務性(顯式或隱式事務,由 CoTransaction 類保障),以及多協程下的執行環境隔離性(由 TContext 類保障)。
事務模塊需要通過數據庫連接對象執行具體的 SQL。連接對象由連接池模塊提供。
連接池模塊維護(創建、回收、銷毀)數據庫連接對象,具體是通過數據庫連接模塊的連接生成器生成新數據庫連接。
模塊之間依賴于接口而非具體實現:查詢模塊依賴事務模塊的 ITransaction 接口;事務模塊依賴連接池模塊的 IPool 接口和數據庫連接模塊的 IConnector 接口;連接池模塊依賴數據庫連接模塊的 IConnectorBuilder 接口。
UML 類圖
下面,我們分模塊具體講解。
入口
由查詢模塊對外提供統一的使用入口。查詢模塊由兩部分構成:查詢器和 SQL 構造器。為了讓調用方可以直接通過查詢器來構造 SQL(而不用先實例化一個構造器構造 SQL 然后傳給查詢器),我將構造器設計成 Trait 供查詢器 Query 使用。
該入口類做了以下幾件事情:
· 提供 list()、one()、page()、execute() 等方法執行 SQL 語句,其內部是通過 transaction 實現的;
· 通過 Builder 這個 Trait 對外提供 SQL 構造功能;
· 委托 transaction 實現事務功能;
構造器主要提供和 SQL 子句對應的方法來構造和編譯 SQL,并提供對原生 SQL 的支持。
該構造器并未對所有的 SQL 語句做方法上的實現(比如子查詢),只對最常用的功能提供了支持,復雜的 SQL 建議直接寫 SQL 語句(一些框架對復雜 SQL 構造也提供了方法級別的支持,但這其實會帶來使用和維護上的復雜性,它導致 SQL 不夠直觀)。
完整的查詢模塊代碼
事務
事務是集中管理 SQL 執行上下文的地方,所有的 SQL 都是在事務中執行的(沒有調 begin() 則是隱式事務)。
我們的查詢器是協程安全的,即一個 Query 實例可以在多個協程中并發執行事務而不會相互影響。協程安全性是通過事務模塊保證的,這里需要處理兩個維度的“事務”:數據庫維度和協程維度。不但需要保證數據庫事務的完整執行,還要保證多個協程間的 SQL 執行不會相互影響。
我們先看一個多協程并發執行事務的例子(在兩個子協程中使用同一個 Query 實例執行事務:先從數據庫查詢用戶信息,然后更新姓名):
$query = new Query(...);
for ($i = 0; $i < 2; $i++) {
go(function () use ($query) {
$query->begin();
$user = $query->select("uid,name")->from("users")->where("phone=:phone", ["phone" => "13908987654"])->one();
$query->update('users')->set(['name' => "李四"])->where("uid=:uid", ['uid' => $user['uid']])->execute();
$query->commit();
});}
上面代碼執行步驟如圖:
在上圖兩個協程不斷切換過程中,各自的事務是在獨立執行的,互不影響。
現實中,我們會在倉儲中使用查詢器,每個倉儲持有一個查詢器實例,而倉儲是單例模式,多協程共享的,因而查詢器也是多協程共享的。如下:
/**
* MySQL 倉儲基類
* 倉儲是單例模式(通過容器實現單例),多協程會共享同一個倉儲實例
*/abstract class MySQLRepository extends Repository implements ITransactional{
/**
* 查詢器
*/
protected $query;
public function __construct()
{
if (!$this->dbAlias()) {
throw new Exception('dbName can not be null');
}
// 通過工廠創建查詢器實例
$this->query = MySQLFactory::build($this->dbAlias());
}
...}
事務模塊是如何實現協程并發事務的隔離性呢?我們用協程上下文 TContext 類實現協程間數據的隔離,事務類 CoTransaction 持有 TContext 實例,事務中所有的狀態信息都通過 TContext 存取,以實現協程間狀態數據互不影響。
我們先看看協程上下文類:
class TContext implements ArrayAccess{
private $container = [];
...
public function offsetGet($offset)
{
if (!isset($this->container[Co::getuid()])) {
return null;
}
return $this->container[Co::getuid()][$offset] ?? null;
}
public function offsetSet($offset, $value)
{
$cuid = Co::getuid();
if (!isset($this->container[$cuid])) {
$this->init();
}
$this->container[$cuid][$offset] = $value;
}
private function init()
{
$this->container[Co::getuid()] = [];
// 協程退出時需要清理當前協程上下文
Co::defer(function () {
unset($this->container[Co::getuid()]);
});
}}
協程上下文內部通過 $container 數組維護每個協程的數據。該類實現了 ArrayAccess 接口,可以通過下標訪問,如:
// 創建上下文實例$context = new TContext();// 設置當前協程的數據$context["model"] = "write";// 訪問當前協程的數據$context["model"];
再看看事務。
事務接口定義:
interface ITransaction{
public function begin(string $model = 'write', bool $isImplicit = false): bool;
/**
* 發送 SQL 指令
*/
public function command(string $preSql, array $params = []);
/**
* 提交事務
* @param bool $isImplicit 是否隱式事務,隱式事務不會向 MySQL 提交 commit (要求數據庫服務器開啟了自動提交的配置)
* @return bool
* @throws Exception
*/
public function commit(bool $isImplicit = false): bool;
public function rollback(): bool;
/**
* 獲取或設置當前事務執行模式
* @param string 讀/寫模式 read/write
* @return string 當前事務執行模式
*/
public function model(?string $model = null): string;
...
/**
* 獲取一次事務中執行的 SQL 列表
* @return array
*/
public function sql():array;}
上面接口定義了事務管理器的主要工作:開啟事務、執行 SQL、提交/回滾事務以及和本次事務執行相關的信息。
我們再來看看它的實現類 CoTransaction,該類是整個查詢器中最重要的類,我們把整個類的代碼完整貼出來:
/**
* 協程版事務管理器
* 注意:事務開啟直到提交/回滾的過程中會一直占用某個 IConnector 實例,如果有很多長事務,則會很快耗完連接池資源
*/class CoTransaction implements ITransaction{
private $pool;
// 事務的所有狀態信息(運行狀態、SQL、運行模式、運行結果等)都是存儲在上下文中
private $context;
/**
* 創建事務實例時需要提供連接池,并在內部創建該事物的協程上下文實例
*/
public function __construct(IPool $pool)
{
$this->pool = $pool;
$this->context = new TContext();
}
public function __destruct()
{
// 如果事務沒有結束,則回滾
if ($this->isRunning()) {
$this->rollback();
}
}
/**
* 開啟事務
*/
public function begin(string $model = 'write', bool $isImplicit = false): bool
{
// 如果事務已經開啟了,則直接返回
if ($this->isRunning()) {
return true;
}
// 事務模式(決定從讀連接池還是寫連接池拿連接對象)
$this->model($model);
// 設置事務運行狀態
$this->isRunning(true);
// 獲取數據庫連接
try {
if (!($connector = $this->connector())) {
throw new ConnectException("獲取連接失敗");
}
} catch (Exception $exception) {
$this->isRunning(false);
throw new TransactionException($exception->getMessage(), $exception->getCode());
}
// 開啟新事務前,需要清除上一次事務的數據
$this->resetLastExecInfo();
$this->clearSQL();
// 調用數據庫連接對象的 begin 方法開始事務(如果是隱式事務則不調用)
return $isImplicit || $connector->begin();
}
/**
* 執行 SQL 指令
* 如果是隱式事務,則在該方法中自動調用 begin 和 commit 方法
*/
public function command(string $preSql, array $params = [])
{
if (!$preSql) {
return false;
}
// 是否隱式事務:外界沒有調用 begin 而是直接調用 command 則為隱式事務
$isImplicit = !$this->isRunning();
// 如果是隱式事務,則需要自動開啟事務
if ($isImplicit && !$this->begin($this->calcModelFromSQL($preSql), true)) {
return false;
}
// 執行 SQL
$result = $this->exec([$preSql, $params]);
// 隱式事務需要及時提交
if ($isImplicit && !$this->commit($isImplicit)) {
return false;
}
return $result;
}
/**
* 提交事務
*/
public function commit(bool $isImplicit = false): bool
{
if (!$this->isRunning()) {
return true;
}
$result = true;
if (!$isImplicit) {
// 顯式事務才需要真正提交到 MySQL 服務器
if ($conn = $this->connector(false)) {
$result = $conn->commit();
if ($result === false) {
// 執行失敗,試圖回滾
$this->rollback();
return false;
}
} else {
return false;
}
}
// 釋放事務占用的資源
$this->releaseTransResource();
return $result;
}
/**
* 回滾事務
* 無論是提交還是回滾,都需要釋放本次事務占用的資源
*/
public function rollback(): bool
{
if (!$this->isRunning()) {
return true;
}
if ($conn = $this->connector(false)) {
$conn->rollback();
}
$this->releaseTransResource();
return true;
}
/**
* 獲取或設置當前事務執行模式
*/
public function model(?string $model = null): string
{
// 事務處于開啟狀態時不允許切換運行模式
if (!isset($model) || $this->isRunning()) {
return $this->context['model'];
}
$this->context['model'] = $model === 'read' ? 'read' : 'write';
return $model;
}
public function lastInsertId()
{
return $this->getLastExecInfo('insert_id');
}
public function affectedRows()
{
return $this->getLastExecInfo('affected_rows');
}
public function lastError()
{
return $this->getLastExecInfo('error');
}
public function lastErrorNo()
{
return $this->getLastExecInfo('error_no');
}
/**
* 本次事務執行的所有 SQL
* 該版本并沒有做記錄
*/
public function sql(): array
{
return $this->context['sql'] ?? [];
}
/**
* 釋放當前事務占用的資源
*/
private function releaseTransResource()
{
// 保存本次事務相關執行結果供外界查詢使用
$this->saveLastExecInfo();
// 歸還連接資源
$this->giveBackConnector();
unset($this->context['model']);
$this->isRunning(false);
}
/**
* 保存事務最終執行的一些信息
*/
private function saveLastExecInfo()
{
if ($conn = $this->connector(false)) {
$this->context['last_exec_info'] = [
'insert_id' => $conn->insertId(),
'error' => $conn->lastError(),
'error_no' => $conn->lastErrorNo(),
'affected_rows' => $conn->affectedRows(),
];
} else {
$this->context['last_exec_info'] = [];
}
}
private function resetLastExecInfo()
{
unset($this->context['last_exec_info']);
}
private function getLastExecInfo(string $key)
{
return isset($this->context['last_exec_info']) ? $this->context['last_exec_info'][$key] : '';
}
/**
* 執行指令池中的指令
* @param $sqlInfo
* @return mixed
* @throws
*/
private function exec(array $sqlInfo)
{
if (!$sqlInfo || !$this->isRunning()) {
return true;
}
return $this->connector()->query($sqlInfo[0], $sqlInfo[1]);
}
private function clearSQL()
{
unset($this->context['sql']);
}
private function calcModelFromSQL(string $sql): string
{
if (preg_match('/^(update|replace|delete|insert|drop|grant|truncate|alter|create)s/i', trim($sql))) {
return 'write';
}
return 'read';
}
/**
* 獲取連接資源
*/
private function connector(bool $usePool = true)
{
if ($connector = $this->context['connector']) {
return $connector;
}
if (!$usePool) {
return null;
}
$this->context['connector'] = $this->pool->getConnector($this->model());
return $this->context['connector'];
}
/**
* 歸還連接資源
*/
private function giveBackConnector()
{
if ($this->context['connector']) {
$this->pool->pushConnector($this->context['connector']);
}
unset($this->context['connector']);
}
private function isRunning(?bool $val = null)
{
if (isset($val)) {
$this->context['is_running'] = $val;
} else {
return $this->context['is_running'] ?? false;
}
}}
該類中,一次 SQL 執行(無論是顯式事務還是隱式事務)的步驟:
begin -> exec -> commit/rollback
1. begin:
判斷是否可開啟新事務(如果已有事務在運行,則不可開啟);
設置事務執行模式(read/write);
將當前事務狀態設置為 running;
獲取連接對象;
清理本事務實例中上次事務的痕跡(上下文、SQL);
調連接對象的 begin 啟動數據庫事務;
2. exec:
調用連接對象的 query 方法執行 SQL(prepare 模式);
3. commit:
判斷當前狀態是否可提交(running 狀態才可以提交);
調用連接對象的 commit 方法提交數據庫事務(如果失敗則走回滾);
釋放本次事務占用的資源(保存本次事務執行的相關信息、歸還連接對象、清除上下文里面相關信息)
4. rollback:
判斷當前狀態是否可回滾;
調用連接對象的 rollback 回滾數據庫事務;
釋放本次事務占用的資源(同上);
優化:
類 CoTransaction 依賴 IPool 連接池,這種設計并不合理(違反了迪米特法則)。從邏輯上說,事務管理類真正依賴的是連接對象,而非連接池對象,因而事務模塊應該依賴連接模塊而不是連接池模塊。讓事務管理類依賴連接池,一方面向事務模塊暴露了連接管理的細節, 另一方面意味著如果使用該事務管理類,就必須使用連接池技術。
一種優化方案是,在連接模塊提供一個連接管理類供外部(事務模塊)取還連接:
interface IConnectorManager{
public function getConnector() IConnector;
public function giveBackConnector(IConnector $conn);}
將 IConnectorManager 注入到 CoTransaction 中:
class CoTransaction implements ITransaction{
...
public function __construct(IConnectorManager $connMgr)
{
...
}}
連接管理器 IConnectorManager 承擔了工廠方法角色,至此,事務模塊僅依賴連接模塊,而不用依賴連接池。
連接池
連接池模塊由 IPool 接口和 CoPool 實現類組成。
連接池模塊和連接模塊之間的關系比較巧妙(上面優化后的方案)。從高層(接口層面)來說,連接池模塊依賴連接模塊:連接池操作(取還)IConnector 的實例;從實現上來說,連接模塊同時又依賴連接池模塊:PoolConnectorManager(使用連接池技術的連接管理器)依賴連接池模塊來操作連接對象(由于該依賴是實現層面的而非接口層面,因而它不是必然的,如果連接管理器不使用連接池技術則不需要依賴連接池模塊)。“連接管理器”這個角色很重要:它對外(事務模塊)屏蔽了連接池模塊的存在,代價是在內部引入了對連接池模塊的依賴(也就是用內部依賴換外部依賴)。
經過上面的分析我們得出,連接池模塊和連接模塊具有較強的耦合性,連接模塊可以對外屏蔽掉連接池模塊的存在,因而在設計上我們可以將這兩個模塊看成一個大模塊放在一個目錄下面,在該目錄下再細分成兩個內部模塊即可。
我們可以先去看看連接池接口:
這里有幾點需要注意:
1. 連接池使用的是偽單例模式,同一個生成器對應的是同一個連接池實例;
2. 連接池內部維護了讀寫兩個池子,生成器生成的讀寫連接對象分別放入對應的池子里面;
3. 從連接池取連接對象的時候,如果連接池為空,則根據情況決定是創建新連接還是等待。此處并非是在池子滿了的情況下就等待,而是會超額創建,為的是應對峰值等異常情況。當然一個優化點是,將溢出比例做成可配置的,由具體的項目決定溢出多少。另外,如果創建新連接的時候數據庫服務器報連接過多的錯誤,也需要轉為等待連接歸還;
4. 如果多次等待連接失敗(超時),則后面的請求會直接拋出異常(直到池子不為空)。這里有個優化點:目前的實現沒有區分是讀池子超時還是寫池子超時;
5. 歸還連接時,如果池子滿了,或者連接壽命到期了,則直接關閉連接;
后面在連接模塊會講解連接生成器,到時我們會知道一個連接池實例到底維護的是哪些連接對象。
連接
連接模塊負責和數據庫建立連接并發出 SQL 請求,其底層使用 Swoole 的 MySQL 驅動。連接模塊由連接對象和連接生成器構成,對外暴露 IConnector 和 IConnectorBuilder 接口。
(在我們的優化版本中,一方面引入了連接管理器 IConnectorManager,另一方面將連接模塊和連接池模塊合并成一個大模塊,因而整個連接模塊對外暴露的是 IConnectorManager 和 IConnector 兩個接口。)
連接對象的實現比較簡單,我們重點看下 CoConnector 里面查詢的處理:
該生成器是針對一主多從數據庫架構的(包括未走讀寫分離的),如果使用是是其他數據庫架構(如多主架構),則創建其他生成器即可。
同一套讀寫配置使用同一個生成器,對應的連接池也是同一個。
DBConfig 是一個 DTO 對象,不再闡述。
查詢器的組裝
使用工廠組裝查詢器實例:
class MySQLFactory{
/**
* @param string $dbAlias 數據庫配置別名,對應配置文件中數據庫配置的 key
*/
public static function build(string $dbAlias): Query
{
// 從配置文件獲取數據庫配置
$dbConf = Config::getInstance()->getConf("mysql.$dbAlias");
if (!$dbConf) {
throw new ConfigNotFoundException("mysql." . $dbAlias);
}
if (!isset($dbConf['read']) && !isset($dbConf['write'])) {
$writeConf = $dbConf;
$readConfs = [$writeConf];
} else {
$writeConf = $dbConf['write'] ?? [];
$readConfs = $dbConf['read'] ?? [$writeConf];
}
$writeConfObj = self::createConfObj($writeConf);
$readConfObjs = [];
foreach ($readConfs as $readConf) {
$readConfObjs[] = self::createConfObj($readConf);
}
// 創建生成器、連接池、事務管理器
// 在優化后版本中,用連接管理器代替連接池的位置即可
$mySQLBuilder = CoConnectorBuilder::instance($writeConfObj, $readConfObjs);
$pool = CoPool::instance($mySQLBuilder, $dbConf['pool']['size'] ?? 30);
$transaction = new CoTransaction($pool);
return new Query($transaction);
}
private static function createConfObj(array $config): DBConfig
{
if (!$config) {
throw new Exception("config is null");
}
return new DBConfig(
$config['host'],
$config['user'],
$config['password'],
$config['database'],
$config['port'] ?? 3306,
$config['timeout'] ?? 3,
$config['charset'] ?? 'utf8'
);
}}
至此,整個查詢器的編寫、創建和使用就完成了。
總結
1. 項目的開發需要劃分模塊,模塊之間盡量減少耦合,通過接口通信(模塊之間依賴接口而不是實現);
2. 如果兩個模塊之間具有強耦合性,則往往意味著兩者本身應該歸并到同一個模塊中,在其內部劃分子模塊,對外屏蔽內部細節,如本項目的連接模塊和連接池模塊;
3. 如果模塊之間存在不合常理的依賴關系,則意味著模塊劃分有問題,如本項目中的事務模塊依賴連接池模塊;
4. 有問題的模塊劃分往往違反第一點(也就是迪米特法則),會造成模塊暴露細節、過多的依賴關系,影響設計的靈活性、可擴展性,如本項目中事務模塊依賴連接池模塊(雖然是實現層面的依賴而非接口層面),造成要使用 CoTransaction 時必須同時使用連接池;
5. 編寫生產可用的項目時需要注意處理異常場景,如本項目中從連接池獲取連接對象,以及在連接對象上執行 SQL 時的斷線重連;
6. 設計本身是迭代式的,并非一蹴而就、一次性設計即可完成的,本項目在開發過程中已經經歷過幾次小重構,在本次分析時仍然發現一些設計上的缺陷。重構屬于項目開發的一部分;
優化版 UML 圖:
以上內容希望幫助到大家,很多PHPer在進階的時候總會遇到一些問題和瓶頸,業務代碼寫多了沒有方向感,不知道該從那里入手去提升,對此我整理了一些資料,包括但不限于:分布式架構、高可擴展、高性能、高并發、服務器性能調優、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql優化、shell腳本、Docker、微服務、Nginx等多個知識點高級進階干貨需要的可以免費分享給大家,PHP進階學習交流群
關注:架構師學習路線圖,每日更新互聯網最新技術文章與你不斷前行,實戰資料,筆試面試。
總結
以上是生活随笔為你收集整理的mysql 连接查询_Swoole 实战:MySQL 查询器的实现(协程连接池)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python爬取jsp网页_帮MM用py
- 下一篇: 徐州有没有黑户低首付购车?