swoole mysql 并发_Swoole4 如何打造高并发的PHP7协程Mysql连接池?
一、數(shù)據(jù)庫連接池基本概念
所謂的數(shù)據(jù)庫連接池,一般指的就是程序和數(shù)據(jù)庫保持一定數(shù)量的數(shù)據(jù)庫連接不斷開,并且各請(qǐng)求的連接可以相互復(fù)用,減少重復(fù)新建數(shù)據(jù)庫連接的消耗和避免在高并發(fā)的情況下出現(xiàn)數(shù)據(jù)庫max connections等錯(cuò)誤。自己總結(jié)一下,如果要實(shí)現(xiàn)一個(gè)數(shù)據(jù)庫連接池,一般有幾個(gè)特點(diǎn):連接復(fù)用,不同的請(qǐng)求連接,可以放回池中,等待下個(gè)請(qǐng)求發(fā)分配和調(diào)用
連接數(shù)量一般維持min-max的最大最少值之間
對(duì)于空閑連接的回收
可以抗一定程度的高并發(fā),也就是說當(dāng)一次并發(fā)請(qǐng)求完池中所有的連接時(shí),獲取不到連接的請(qǐng)求可等待其他連接的釋放
總結(jié)幾個(gè)特性后,一個(gè)基本連接池,大致要實(shí)現(xiàn)下圖功能:
創(chuàng)建連接:連接池啟動(dòng)后,初始化一定的空閑連接,指定為最少的連接min。當(dāng)連接池為空,不夠用時(shí),創(chuàng)建新的連接放到池里,但不能超過指定的最大連接max數(shù)量。
連接釋放:每次使用完連接,一定要調(diào)用釋放方法,把連接放回池中,給其他程序或請(qǐng)求使用。
連接分配:連接池中用pop和push的方式對(duì)等入隊(duì)和出隊(duì)分配與回收。能實(shí)現(xiàn)阻塞分配,也就是在池空并且已創(chuàng)建數(shù)量大于max,阻塞一定時(shí)間等待其他請(qǐng)求的連接釋放,超時(shí)則返回null。
連接管理:對(duì)連接池中的連接,定時(shí)檢活和釋放空閑連接等
二、Fpm+數(shù)據(jù)庫長連接的實(shí)現(xiàn)利用fpm實(shí)現(xiàn):例如你要實(shí)例一個(gè)100連接數(shù)的池,開啟100個(gè)空閑fpm,然后每個(gè)fpm的連接都是數(shù)據(jù)庫長連接。一般pm.max_spare_servers = 8這個(gè)配置項(xiàng)就是維持連接池的空閑數(shù)量,然后pm.max_children = 50就是最大的連接數(shù)量。和fpm的進(jìn)程數(shù)量一致。
三、基于swoole的實(shí)現(xiàn)swoole簡單介紹(更多參閱swoole官網(wǎng))
swoole是一個(gè)PHP實(shí)現(xiàn)異步網(wǎng)絡(luò)通信的引擎或者擴(kuò)展,其中實(shí)現(xiàn)了很多傳統(tǒng)PHP-fpm沒有的東西,例如異步的客戶端,異步Io,常駐內(nèi)存,協(xié)程等等,一個(gè)個(gè)優(yōu)秀的擴(kuò)展,其中異步和協(xié)程等概念能應(yīng)用于高并發(fā)場景。缺點(diǎn)是文檔和入門的門檻都比較高,需要排坑。附上swoole的運(yùn)行流程和進(jìn)程結(jié)構(gòu)圖:
運(yùn)行流程圖
進(jìn)程/線程架構(gòu)圖
基于swoole現(xiàn)實(shí)時(shí)的注意事項(xiàng)
首先,為了減少大家對(duì)之后運(yùn)行示例代碼產(chǎn)生不必要的天坑,先把注意事項(xiàng)和場景問題放前面:
1、程序中使用了協(xié)程的通信管道channel(與go的chan差不多的),其中swoole2是不支持chan->pop($timeout)中timeout超時(shí)等待的,所以必須用swoole4版本
3、筆者使用的環(huán)境為:PHP 7.1.18和swoole4作為此次開發(fā)的環(huán)境基于swoole現(xiàn)實(shí)連接池的方法
首先,此次利用swoole實(shí)現(xiàn)連接池,運(yùn)用到swoole以下技術(shù)或者概念
1、連接變量池,這里可以看做一個(gè)數(shù)組或者隊(duì)列,利用swoole全局變量的常駐內(nèi)存特性,只要變量沒主動(dòng)unset掉,數(shù)組或隊(duì)列中的連接對(duì)象可以一直保持,不釋放。
2、協(xié)程。協(xié)程是純用戶狀態(tài)的線程,通過協(xié)作的方式而不是搶占的方式來切換。首先此次的連接池兩處用到協(xié)程:一個(gè)是mysql的協(xié)程客戶端,為什么要用協(xié)程客戶端,因?yàn)槿绻怯猛娇蛻舳薖DO,在一個(gè)進(jìn)程處理內(nèi),就算有幾百個(gè)連接池,swoole worker進(jìn)程中用普通的PDO方式,隨便并發(fā)多少個(gè)請(qǐng)求,每一個(gè)請(qǐng)求都只能等上一個(gè)請(qǐng)求執(zhí)行完畢,woker才處理下一個(gè)請(qǐng)求,這里就算阻塞了。為了讓一個(gè)worker支持阻塞切換出cpu去處理其他請(qǐng)求,所以要用到協(xié)程的協(xié)助切換,或者異步客戶端也可以,但是異步客戶端使用起來嵌套太多,很不方便。swoole協(xié)程可以無感知的用同步的代碼編寫方式達(dá)到異步IO的效果和性能。
第二個(gè)是底層實(shí)現(xiàn)了協(xié)程切換和調(diào)度的channel,以下詳述什么是channel
3、Coroutine/channel通道,類似于go語言的chan,支持多生產(chǎn)者協(xié)程和多消費(fèi)者協(xié)程。底層自動(dòng)實(shí)現(xiàn)了協(xié)程的切換和調(diào)度。高并發(fā)時(shí),容易出連接池為空時(shí),如果用一般的array或者splqueue()作為介質(zhì)存儲(chǔ)連接對(duì)象變量,不能產(chǎn)生阻塞等待其他請(qǐng)求釋放的效果,也就是說只能直接返回null.。所以這里用了一個(gè)swoole4協(xié)程中很牛逼的channel通過管道作為存儲(chǔ)介質(zhì),它的出隊(duì)方法pop($timeout)可以指定阻塞等待指定時(shí)間后返回。注意,是swoole2是沒有超時(shí)timeout的參數(shù),不適用此場景。在go語言中,如果chan等待或者push了沒有消費(fèi)或者生產(chǎn)一對(duì)一的情況,是會(huì)發(fā)生死鎖。所以swoole4的timeout應(yīng)該是為了避免無限等待為空channel情況而產(chǎn)生。
channel切換的例子:
$chan = new Channel();
go(function()use($chan)
{ echo"我是第一個(gè)協(xié)程,等待3秒內(nèi)有push就執(zhí)行返回" . PHP_EOL;
$p = $chan->pop(2);#1
echo"pop返回結(jié)果" . PHP_EOL;
var_dump($p);
});
go(function()use($chan){
co::sleep(1);#2
$chan->push(1);
});
echo"main" . PHP_EOL;
#1處代碼會(huì)首先執(zhí)行,然后遇到pop(),因?yàn)閏hannel還是空,會(huì)等待2s。此時(shí)協(xié)程會(huì)讓出cpu,跳到第二個(gè)協(xié)程執(zhí)行,然后#2出睡眠1秒,push變量1進(jìn)去channel后返回#1處繼續(xù)執(zhí)行,成功取車通過中剛push的值1.運(yùn)行結(jié)果為:
如果把#2處的睡眠時(shí)間換成大于pop()的等待時(shí)間,結(jié)果是:
根據(jù)這些特性最終實(shí)現(xiàn)連接池的抽象封裝類為:
/**
* 連接池封裝.
* User: user
* Date: 2018/9/1
* Time: 13:36
*/
use Swoole\Coroutine\Channel;
abstract class AbstractPool
{
private $min;//最少連接數(shù)
private $max;//最大連接數(shù)
private $count;//當(dāng)前連接數(shù)
private $connections;//連接池組
protected $spareTime;//用于空閑連接回收判斷
//數(shù)據(jù)庫配置
protected $dbConfig = array(
'host' => '10.0.2.2',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 2,
);
private $inited = false;
protected abstract function createDb();
public function __construct()
{
$this->min = 10;
$this->max = 100;
$this->spareTime = 10 * 3600;
$this->connections = new Channel($this->max + 1);
}
protected function createObject()
{
$obj = null;
$db = $this->createDb();
if ($db) {
$obj = [
'last_used_time' => time(),
'db' => $db,
];
}
return $obj;
}
/**
* 初始換最小數(shù)量連接池
* @return $this|null
*/
public function init()
{
if ($this->inited) {
return null;
}
for ($i = 0; $i < $this->min; $i++) {
$obj = $this->createObject();
$this->count++;
$this->connections->push($obj);
}
return $this;
}
public function getConnection($timeOut = 3)
{
$obj = null;
if ($this->connections->isEmpty()) {
if ($this->count < $this->max) {//連接數(shù)沒達(dá)到最大,新建連接入池
$this->count++;
$obj = $this->createObject();
} else {
$obj = $this->connections->pop($timeOut);//timeout為出隊(duì)的最大的等待時(shí)間
}
} else {
$obj = $this->connections->pop($timeOut);
}
return $obj;
}
public function free($obj)
{
if ($obj) {
$this->connections->push($obj);
}
}
/**
* 處理空閑連接
*/
public function gcSpareObject()
{
//大約2分鐘檢測一次連接
swoole_timer_tick(120000, function () {
$list = [];
/*echo "開始檢測回收空閑鏈接" . $this->connections->length() . PHP_EOL;*/
if ($this->connections->length() < intval($this->max * 0.5)) {
echo "請(qǐng)求連接數(shù)還比較多,暫不回收空閑連接\n";
}#1
while (true) {
if (!$this->connections->isEmpty()) {
$obj = $this->connections->pop(0.001);
$last_used_time = $obj['last_used_time'];
if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {//回收
$this->count--;
} else {
array_push($list, $obj);
}
} else {
break;
}
}
foreach ($list as $item) {
$this->connections->push($item);
}
unset($list);
});
}
}同步PDO客戶端下實(shí)現(xiàn)
<<?php
/**
* 數(shù)據(jù)庫連接池PDO方式
* User: user
* Date: 2018/9/8
* Time: 11:30
*/
require "AbstractPool.php";
class MysqlPoolPdo extends AbstractPool
{
protected $dbConfig = array(
'host' => 'mysql:host=10.0.2.2:3306;dbname=test',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 2,
);
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolPdo();
}
return self::$instance;
}
protected function createDb()
{
return new PDO($this->dbConfig['host'], $this->dbConfig['user'], $this->dbConfig['password']);
}
}
$httpServer = new swoole_http_server('0.0.0.0', 9501);
$httpServer->set(
['worker_num' => 1]
);
$httpServer->on("WorkerStart", function () {
MysqlPoolPdo::getInstance()->init();
});
$httpServer->on("request", function ($request, $response) {
$db = null;
$obj = MysqlPoolPdo::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj ? $obj['db'] : null;
}
if ($db) {
$db->query("select sleep(2)");
$ret = $db->query("select * from guestbook limit 1");
MysqlPoolPdo::getInstance()->free($obj);
$response->end(json_encode($ret));
}
});
$httpServer->start();
代碼調(diào)用過程詳解:
1、server啟動(dòng)時(shí),調(diào)用init()方法初始化最少數(shù)量(min指定)的連接對(duì)象,放進(jìn)類型為channelle的connections對(duì)象中。在init中循環(huán)調(diào)用中,依賴了createObject()返回連接對(duì)象,而createObject()
中是調(diào)用了本來實(shí)現(xiàn)的抽象方法,初始化返回一個(gè)PDO db連接。所以此時(shí),連接池connections中有min個(gè)對(duì)象。
2、server監(jiān)聽用戶請(qǐng)求,當(dāng)接收發(fā)請(qǐng)求時(shí),調(diào)用連接數(shù)的getConnection()方法從connections通道中pop()一個(gè)對(duì)象。此時(shí)如果并發(fā)了10個(gè)請(qǐng)求,server因?yàn)榕渲昧?個(gè)worker,所以再pop到一個(gè)對(duì)象返回時(shí),遇到sleep()的查詢,因?yàn)橛玫倪B接對(duì)象是pdo的查詢,此時(shí)的woker進(jìn)程只能等待,完成后才能進(jìn)入下一個(gè)請(qǐng)求。因此,池中的其余連接其實(shí)是多余的,同步客戶端的請(qǐng)求速度只能和woker的數(shù)量有關(guān)。
3、查詢結(jié)束后,調(diào)用free()方法把連接對(duì)象放回connections池中。
ab -c 10 -n 10運(yùn)行的結(jié)果,單個(gè)worker處理,select sleep(2) 查詢睡眠2s,同步客戶端方式總共運(yùn)行時(shí)間為20s以上,而且mysql的連接始終維持在一條。結(jié)果如下:
協(xié)程客戶端Coroutine\MySQL方式的調(diào)用
/**
* 數(shù)據(jù)庫連接池協(xié)程方式
* User: user
* Date: 2018/9/8
* Time: 11:30
*/
require "AbstractPool.php";
class MysqlPoolCoroutine extends AbstractPool
{
protected $dbConfig = array(
'host' => '10.0.2.2',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 10,
);
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolCoroutine();
}
return self::$instance;
}
protected function createDb()
{
$db = new Swoole\Coroutine\Mysql();
$db->connect(
$this->dbConfig
);
return $db;
}
}
$httpServer = new swoole_http_server('0.0.0.0', 9501);
$httpServer->set(
['worker_num' => 1]
);
$httpServer->on("WorkerStart", function () {
//MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();
MysqlPoolCoroutine::getInstance()->init();
});
$httpServer->on("request", function ($request, $response) {
$db = null;
$obj = MysqlPoolCoroutine::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj ? $obj['db'] : null;
}
if ($db) {
$db->query("select sleep(2)");
$ret = $db->query("select * from guestbook limit 1");
MysqlPoolCoroutine::getInstance()->free($obj);
$response->end(json_encode($ret));
}
});
$httpServer->start();
代碼調(diào)用過程詳解
1、同樣的,協(xié)程客戶端方式下的調(diào)用,也是實(shí)現(xiàn)了之前封裝好的連接池類AbstractPool.php。只是createDb()的抽象方法用了swoole內(nèi)置的協(xié)程客戶端去實(shí)現(xiàn)。
2、server啟動(dòng)后,初始化都和同步一樣。不一樣的在獲取連接對(duì)象的時(shí)候,此時(shí)如果并發(fā)了10個(gè)請(qǐng)求,同樣是配置了1個(gè)worker進(jìn)程在處理,但是在第一請(qǐng)求到達(dá),pop出池中的一個(gè)連接對(duì)象,執(zhí)行到query()方法,遇上sleep阻塞時(shí),此時(shí),woker進(jìn)程不是在等待select的完成,而是切換到另外的協(xié)程去處理下一個(gè)請(qǐng)求。完成后同樣釋放對(duì)象到池中。當(dāng)中有重點(diǎn)解釋的代碼段中g(shù)etConnection()中。
public function getConnection($timeOut = 3)
{
$obj = null;
if ($this->connections->isEmpty()) {
if ($this->count < $this->max) {//連接數(shù)沒達(dá)到最大,新建連接入池
$this->count++;
$obj = $this->createObject();#1
} else {
$obj = $this->connections->pop($timeOut);#2
}
} else {
$obj = $this->connections->pop($timeOut);#3
}
return $obj;
}
當(dāng)調(diào)用到getConnection()時(shí),如果此時(shí)由于大量并發(fā)請(qǐng)求過多,連接池connections為空,而沒達(dá)到最大連接max數(shù)量時(shí)時(shí),代碼運(yùn)行到#1處,調(diào)用了createObject(),新建連接返回;但如果連接池connections為空,而到達(dá)了最大連接數(shù)max時(shí),代碼運(yùn)行到了#2處,也就是$this->connections->pop($timeOut),此時(shí)會(huì)阻塞$timeOut的時(shí)間,如果期間有鏈接釋放了,會(huì)成功獲取到,然后協(xié)程返回。超時(shí)沒獲取到,則返回false。
3、最后說一下協(xié)程Mysql客戶端一項(xiàng)重要配置,那就是代碼里$dbConfig中timeout值的配置。這個(gè)配置是意思是最長的查詢等待時(shí)間??梢钥匆粋€(gè)例子說明下:
go(function () {
$start = microtime(true);
$db = new Swoole\Coroutine\MySQL();
$db->connect([
'host' => '10.0.2.2',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'timeout' => 4#1
]);
$db->query("select sleep(5)");
echo "我是第一個(gè)sleep五秒之后\n";
$ret = $db->query("select user from guestbook limit 1");#2
var_dump($ret);
$use = microtime(true) - $start;
echo "協(xié)程mysql輸出用時(shí):" . $use . PHP_EOL;
});
#1處代碼,如果timeout配了4s查詢超時(shí),而第一條查詢select sleep(5)阻塞后,協(xié)程切換到下一條sql的執(zhí)行,其實(shí)$db并不能執(zhí)行成功,因?yàn)橛靡粋€(gè)連接,同一個(gè)協(xié)程中,其實(shí)執(zhí)行是同步的,所以此時(shí)第二條查詢?cè)诘却?s超時(shí)后,沒獲取到db的連接執(zhí)行,就會(huì)執(zhí)行失敗。而如果第一條查詢執(zhí)行的時(shí)間少于這個(gè)timeout,那么會(huì)執(zhí)行查詢成功。猜猜上面執(zhí)行用時(shí)多少?結(jié)果如下:
如果把timeout換成6s呢,結(jié)果如下:
所以要注意的是,協(xié)程的客戶端內(nèi)執(zhí)行其實(shí)是同步的,不要理解為異步,它只是遇到IO阻塞時(shí)能讓出執(zhí)行權(quán),切換到其他協(xié)程而已,不能和異步混淆。
ab -c 10 -n 10運(yùn)行的結(jié)果,單個(gè)worker處理,select sleep(2) 查詢睡眠2s,協(xié)程客戶端方式總共運(yùn)行時(shí)間為2s多。結(jié)果如下:
數(shù)據(jù)庫此時(shí)的連接數(shù)為10條(show full PROCESSLIST):
再嘗試 ab -c 200 -n 1000 http://127.0.0.1:9501/,200多個(gè)并發(fā)的處理,時(shí)間是20多秒,mysql連接數(shù)達(dá)到指定的最大值100個(gè)。結(jié)果如下:
四、后言
現(xiàn)在連接池基本實(shí)現(xiàn)了高并發(fā)時(shí)的連接分配和控制,但是還有一些細(xì)節(jié)要處理,例如:并發(fā)時(shí),建立了max個(gè)池對(duì)象,不能一直在池中維護(hù)這么多,要在請(qǐng)求空閑時(shí),把連接池的數(shù)量維持在一個(gè)空閑值內(nèi)。這里是簡單做了gcSpareObject()的方法實(shí)現(xiàn)空閑處理。直接在初始化woker的時(shí)候調(diào)用:MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();就會(huì)定時(shí)檢測回收。問題是如何判斷程序比較空閑,值得再去優(yōu)化。
定時(shí)檢測連接時(shí)候是活的,剔除死鏈
假如程序忘記調(diào)用free()釋放對(duì)象到池,是否有更好方法避免這種情況?
本書做了一個(gè)很好的歸納。同時(shí)也歡迎點(diǎn)擊:正在跳轉(zhuǎn)加入,任何問題可以一起討論,也歡迎大家一起踴躍發(fā)言!
總結(jié)
以上是生活随笔為你收集整理的swoole mysql 并发_Swoole4 如何打造高并发的PHP7协程Mysql连接池?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linus启动mysql失败_Linux
- 下一篇: c3p0 mysql 连接池配置文件_数