11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查
11. 用Rust手把手編寫一個(gè)wmproxy(代理,內(nèi)網(wǎng)穿透等), 實(shí)現(xiàn)健康檢查
項(xiàng)目 ++wmproxy++
gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
健康檢查的意義
健康檢查維持著系統(tǒng)的穩(wěn)定運(yùn)行, 極大的加速著服務(wù)的響應(yīng)時(shí)間, 并保證服務(wù)器不會(huì)把消息包轉(zhuǎn)發(fā)到不能響應(yīng)的服務(wù)器上, 從而使系統(tǒng)快速穩(wěn)定的運(yùn)轉(zhuǎn)
在LINUX系統(tǒng)中,系統(tǒng)默認(rèn)TCP建立連接超時(shí)時(shí)間為127秒。通常網(wǎng)絡(luò)不可達(dá)或者網(wǎng)絡(luò)連接被拒絕或者網(wǎng)絡(luò)連接超時(shí)需要耗時(shí)的時(shí)長較長。此時(shí)會(huì)超成服務(wù)器的響應(yīng)時(shí)間變長很多,而且重復(fù)發(fā)起不可達(dá)的連接嘗試也在耗著大量的IO資源。
當(dāng)健康檢查介入后,如果短時(shí)間內(nèi)多次建立連接失敗,則暫時(shí)判定該地址不可達(dá),狀態(tài)設(shè)置為不可達(dá)。如果此時(shí)接收到該地址的請求時(shí)直接返回錯(cuò)誤。大大提高了響應(yīng)的時(shí)間。
所以健康檢查是必不可少的存在。
如何實(shí)現(xiàn)
由于健康狀態(tài)需要調(diào)用的地方可能在任意處需要發(fā)起連接的地方,如果通過參數(shù)透傳也會(huì)涉及到多線程的數(shù)據(jù)共用,如Arc<Mutex<Data>>,取用的時(shí)候也是要通過鎖共用,且編碼的復(fù)雜度和理解成本急劇升高,所以此處健康檢查選用的是多線程共用的靜態(tài)處理變量。
Rust中的靜態(tài)變量
在Rust中,全局變量可以分為兩種:
- 編譯期初始化的全局變量
- 運(yùn)行期初始化的全局變量
編譯期初始化的全局變量有:
const創(chuàng)建的常量,如 const MAX_ID:usize=usize::MAX/2;
static創(chuàng)建的靜態(tài)變量,如 static mut REQUEST_RECV:usize=0;
運(yùn)行期初始化的全局變量有lazy_static用于懶初始化。例如:
lazy_static! {
static ref HEALTH_CHECK: RwLock<HealthCheck> = RwLock::new(HealthCheck::new(60, 3, 2));
}
此外還有
- 實(shí)現(xiàn)你自己的運(yùn)行時(shí)初始化:
std::sync::Once + static mut T - 單線程運(yùn)行時(shí)初始化的特殊情況:
thread_local
我們此處維持一個(gè)HealthCheck的全局變量,因?yàn)槌绦蚴嵌嗑€程,用thread_local,無法共用其它線程的檢測,不條例預(yù)期,所以此處用讀寫鎖來保證全局變量的正確性,讀寫鎖的特點(diǎn)是允許存在多個(gè)讀,但如果獲取寫必須保證唯一。
源碼解析,暫時(shí)不做主動(dòng)性的健康檢查
接下來我們看HealthCheck的定義
pub struct HealthCheck {
/// 健康檢查的重置時(shí)間, 失敗超過該時(shí)間會(huì)重新檢查, 統(tǒng)一單位秒
fail_timeout: usize,
/// 最大失敗次數(shù), 一定時(shí)間內(nèi)超過該次數(shù)認(rèn)為不可訪問
max_fails: usize,
/// 最小上線次數(shù), 到達(dá)這個(gè)次數(shù)被認(rèn)為存活
min_rises: usize,
/// 記錄跟地址相關(guān)的信息
health_map: HashMap<SocketAddr, HealthRecord>,
}
/// 每個(gè)SocketAddr的記錄值
struct HealthRecord {
/// 最后的記錄時(shí)間
last_record: Instant,
/// 失敗的恢復(fù)時(shí)間
fail_timeout: Duration,
/// 當(dāng)前連續(xù)失敗的次數(shù)
fall_times: usize,
/// 當(dāng)前連續(xù)成功的次數(shù)
rise_times: usize,
/// 當(dāng)前的狀態(tài)
failed: bool,
}
主要有最后記錄時(shí)間,失敗次數(shù),成功次數(shù),最大失敗懲罰時(shí)間等元素組成
我們通過函數(shù)is_fall_down判定是否是異常狀態(tài),未檢查前默認(rèn)為正常狀態(tài),超出一定時(shí)間后,解除異常狀態(tài)。
/// 檢測狀態(tài)是否能連接
pub fn is_fall_down(addr: &SocketAddr) -> bool {
// 只讀,獲取讀鎖
if let Ok(h) = HEALTH_CHECK.read() {
if !h.health_map.contains_key(addr) {
return false;
}
let value = h.health_map.get(&addr).unwrap();
if Instant::now().duration_since(value.last_record) > value.fail_timeout {
return false;
}
h.health_map[addr].failed
} else {
false
}
}
如果連接TCP失敗則調(diào)用add_fall_down將該地址失敗連接次數(shù)+1,如果失敗次數(shù)達(dá)到最大失敗次數(shù)將狀態(tài)置為不可用。
/// 失敗時(shí)調(diào)用
pub fn add_fall_down(addr: SocketAddr) {
// 需要寫入,獲取寫入鎖
if let Ok(mut h) = HEALTH_CHECK.write() {
if !h.health_map.contains_key(&addr) {
let mut health = HealthRecord::new(h.fail_timeout);
health.fall_times = 1;
h.health_map.insert(addr, health);
} else {
let max_fails = h.max_fails;
let value = h.health_map.get_mut(&addr).unwrap();
// 超出最大的失敗時(shí)長,重新計(jì)算狀態(tài)
if Instant::now().duration_since(value.last_record) > value.fail_timeout {
value.clear_status();
}
value.last_record = Instant::now();
value.fall_times += 1;
value.rise_times = 0;
if value.fall_times >= max_fails {
value.failed = true;
}
}
}
}
如果連接TCP成功則調(diào)用add_rise_up將該地址成功連接次數(shù)+1,如果成功次數(shù)達(dá)到最小次數(shù)將狀態(tài)置為不可用。
/// 成功時(shí)調(diào)用
pub fn add_rise_up(addr: SocketAddr) {
// 需要寫入,獲取寫入鎖
if let Ok(mut h) = HEALTH_CHECK.write() {
if !h.health_map.contains_key(&addr) {
let mut health = HealthRecord::new(h.fail_timeout);
health.rise_times = 1;
h.health_map.insert(addr, health);
} else {
let min_rises = h.min_rises;
let value = h.health_map.get_mut(&addr).unwrap();
// 超出最大的失敗時(shí)長,重新計(jì)算狀態(tài)
if Instant::now().duration_since(value.last_record) > value.fail_timeout {
value.clear_status();
}
value.last_record = Instant::now();
value.rise_times += 1;
value.fall_times = 0;
if value.rise_times >= min_rises {
value.failed = false;
}
}
}
}
接下來我們將TcpStream::connect函數(shù)統(tǒng)一替換成HealthCheck::connect外部修改幾乎為0,可實(shí)現(xiàn)開啟健康檢查,后續(xù)還會(huì)有主動(dòng)式的健康檢查。
pub async fn connect<A>(addr: &A) -> io::Result<TcpStream>
where
A: ToSocketAddrs,
{
let addrs = addr.to_socket_addrs()?;
let mut last_err = None;
for addr in addrs {
// 健康檢查失敗,直接返回錯(cuò)誤
if Self::is_fall_down(&addr) {
last_err = Some(io::Error::new(io::ErrorKind::Other, "health check falldown"));
} else {
match TcpStream::connect(&addr).await {
Ok(stream) =>
{
Self::add_rise_up(addr);
return Ok(stream)
},
Err(e) => {
Self::add_fall_down(addr);
last_err = Some(e)
},
}
}
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any address",
)
}))
}
效果
在前三次請求的時(shí)候,將花費(fèi)5秒左右才拋出拒絕鏈接的錯(cuò)誤
connect server Err(Os { code: 10061, kind: ConnectionRefused, message: "由于目標(biāo)計(jì)算機(jī)積極拒絕,無
法連接。" })
可以發(fā)現(xiàn)三次之后,將會(huì)快速的拋出錯(cuò)誤,達(dá)成健康檢查的目標(biāo)
connect server Err(Custom { kind: Other, error: "health check falldown" })
此時(shí)被動(dòng)式的健康檢查已完成,后續(xù)按需要的話將按需看是否實(shí)現(xiàn)主動(dòng)式的健康檢查。
總結(jié)
以上是生活随笔為你收集整理的11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jQury+Ajax与C#后台交换数据
- 下一篇: web开发技术点解析