【eBPF-02】入门:基于 BCC 框架的程序进阶
本文是 eBPF 系列的第二篇文章,我們來學(xué)習(xí) eBPF BCC 框架的進階用法,對上一篇文章中的代碼進行升級,動態(tài)輸出進程運行時的參數(shù)情況。
主要內(nèi)容包括:
- 通過
kprobe掛載內(nèi)核事件的 eBPF 程序要如何編寫?- 通過
tracepoint掛載內(nèi)核事件的 eBPF 程序要如何編寫?- eBPF 的程序事件類型有哪些?
在開始之前,我們來回顧一下前一篇文章的內(nèi)容。
前一篇文章介紹了如何通過 BCC 框架來編寫一個簡單的 eBPF 程序。在內(nèi)核空間,使用 c 程序?qū)崿F(xiàn) eBPF 的核心邏輯;在用戶空間,使用 python 腳本作為 eBPF 程序的控制、加載和展示。其中,內(nèi)核態(tài)通過若干 eBPF helper 函數(shù),獲取內(nèi)核觀測數(shù)據(jù),并通過 PERF 區(qū)域,將這些數(shù)據(jù)傳遞到用戶空間;用戶態(tài)使用attach_kprobe() 將內(nèi)核 eBPF 函數(shù)綁定到某個內(nèi)核事件上。
整個流程如下圖所示:
在上面的實現(xiàn)過程中,用戶態(tài)通過 kprobe 的方式,為某個內(nèi)核事件掛載自定義處理邏輯(圖中是指定了內(nèi)核中 do_execve 函數(shù))。通過這種方式,我們能夠監(jiān)測絕大部分的內(nèi)核函數(shù),這正是 eBPF 技術(shù)牛逼的原因。
對于這種 kprobe 類型的 eBPF 程序,我們再來看一個例子(改編自 Brendan Gregg 大神的 execsnoop 工具:https://github.com/iovisor/bcc/blob/master/tools/execsnoop.py )
1 進程執(zhí)行參數(shù)的監(jiān)控
接下來,我們要對上圖中的工具再次進行功能升級,我希望這個工具在運行時,能夠輸出當前執(zhí)行進程的參數(shù)信息。
如果將 eBPF 程序等同于 C 程序來看,這個問題似乎沒那么困難。何以見得?
1.1 分析
sys_execve 系統(tǒng)調(diào)用的函數(shù)簽名為:int execve(const char *filename, char *const argv[], char *const envp[]), 其中,argv[]便記錄了進程執(zhí)行的參數(shù)。我們大可以像提取 filename 的方式那樣,提取 argv[],并將其傳入到用戶空間中。
但實際上,eBPF 程序與 C 程序并不等同。eBPF 編程中有 “兩座大山” 般的限制,分別是:
限制一:eBPF 程序運行棧僅有 512 字節(jié)。
限制二:eBPF 程序可以調(diào)用的接口極其有限。
因此,如果我們想嘗試在 512 字節(jié)的 eBPF 運行棧中完整拼接整理不定長的 argv[] 參數(shù)列表,是根本不可能的。
基于以上分析,本文給出一個比較合理的解決方案:
Q:如何防止運行棧爆棧?
1)既然運行棧有大小限制,不如直接將拼接操作轉(zhuǎn)移到用戶態(tài)完成。eBPF 程序只需要將 argv[] 數(shù)組中每個 argv 傳輸?shù)接脩魬B(tài)程序中。
2)對于長度過長的 argv,沒辦法了,只能手動截斷了。
Q:用戶態(tài)何時進行參數(shù)拼接?何時進行參數(shù)展示?
1)既然需要用戶態(tài)完成拼接,那么,可以分為兩個階段。STEP-1,僅專注字符串的拼接;STEP-2,僅專注字符串展示。
2)對于 execve 系統(tǒng)調(diào)用,我們可以在 enter 時執(zhí)行 STEP-1 操作,在 exit 是執(zhí)行 STEP-2 操作。
接下來更新代碼。
1.2 定義
首先,對于用于交互的結(jié)構(gòu)體,增加兩個個字段,其一用于記錄 execve 調(diào)用的每個參數(shù),其二用于記錄 eBPF 執(zhí)行的階段;同時,去掉冗余字段 fname
#define ARGSIZE 128
#define MAXARG 60
enum event_step {
STEP_1, // STEP 1: 執(zhí)行 argv 拼接
STEP_2, // STEP 2: 執(zhí)行 argv 展示
};
struct data_t {
u32 pid;
enum event_step step; // 記錄 eBPF 執(zhí)行階段
char comm[TASK_COMM_LEN];
char argv[ARGSIZE]; // 記錄每一個參數(shù)
};
定義 BPF_PERF_OUTPUT:
BPF_PERF_OUTPUT(events);
1.3 處理
實現(xiàn) execve 系統(tǒng)調(diào)用 enter 和 exit 回調(diào)函數(shù):
// exter execve
int syscall__execve(struct pt_regs *ctx, const char __user *filename,
const char __user *const __user *__argv,
const char __user *const __user *__envp) {
struct data_t data = {};
// 設(shè)置 step = STEP 1
data.step = STEP_1;
// 設(shè)置 pid
data.pid = bpf_get_current_pid_tgid() >> 32;
// 設(shè)置 comm
bpf_get_current_comm(&data.comm, sizeof(data.comm));
// 設(shè)置每一個 argv,并導(dǎo)出
...
return 0;
}
// exit execve
int do_ret_sys_execve(struct pt_regs *ctx) {
struct data_t data = {};
// 設(shè)置 step = STEP 1
data.step = STEP_2;
// 設(shè)置 pid
data.pid = bpf_get_current_pid_tgid() >> 32;
// 設(shè)置 comm
bpf_get_current_comm(&data.comm, sizeof(data.comm));
// 提交 perf
events.perf_submit(ctx, &data, sizeof(data));
return 0;
}
注意,這里 bpf_get_current_pid_tgid() 輔助函數(shù)返回值高 32 為內(nèi)核視角下的 process ID(用戶視角下為 TID),低 32 位為內(nèi)核視角下的 thread group ID(用戶視角下的 PID)。這里右移 32 位,是獲取用戶視角的 PID。
1.4 綁定
用戶態(tài)綁定 kprobe 事件:
b = BPF(src_file="execsnoop.c")
execve_fnname = b.get_syscall_fnname("execve")
# enter 事件
b.attach_kprobe(event=execve_fnname, fn_name="syscall__execve")
# exit 事件
b.attach_kretprobe(event=execve_fnname, fn_name="do_ret_sys_execve")
1.5 難點
內(nèi)核態(tài)如何設(shè)置并導(dǎo)出每一個 argv[]?
// 字符串提交
static int __submit_arg(struct pt_regs *ctx, void *ptr, struct data_t *data) {
// 提交 perf 之前,需要拷貝到用戶態(tài)變量中
bpf_probe_read_user(data->argv, sizeof(data->argv), ptr);
// 將這個 argv 提交
events.perf_submit(ctx, data, sizeof(struct data_t));
return 1;
}
// 字符串控制
static int submit_arg(struct pt_regs *ctx, void *ptr, struct data_t *data) {
const char *argp = NULL;
bpf_probe_read_user(&argp, sizeof(argp), ptr);
// 是否到達末尾字符串
if (argp) {
return __submit_arg(ctx, (void *)(argp), data);
}
return 0;
}
int syscall__execve(struct pt_regs *ctx, const char __user *filename,
const char __user *const __user *__argv,
const char __user *const __user *__envp) {
// 設(shè)置過程
...
// (A) 設(shè)置每一個 argv,并導(dǎo)出
#pragma unroll
for (int i = 1; i < MAXARG; i++) {
if (submit_arg(ctx, (void *)&__argv[i], &data) == 0)
goto out;
}
// (B) 如果當前的 argv[] 太長了,進行截斷操作
char ellipsis[] = "...";
__submit_arg(ctx, (void *)ellipsis, &data);
out:
return 0;
}
關(guān)注核心的兩個步驟:
(A) MAXARG 代表一個 argv[] 的最大監(jiān)測數(shù)量。首先要遍歷這個 argv[] 的每一個字符串,如果這個字符不為 NULL(說明沒有到當前 argv[] 結(jié)尾)或不超過最大值 MAXARG,那么將每個字符串提交到 PERF 區(qū)域。
注意:
低版本(5.3 以前)的 eBPF 程序不支持循環(huán)。5.3 版本后也僅支持有界循環(huán)。在低版本的 eBPF 中使用循環(huán)有一個小技巧,那就是通過#pragma unroll進行編譯器循環(huán)展開預(yù)處理。
(B) 如果超過了這個最大數(shù)量 MAXARG,后面及時再有參數(shù),也進行截斷處理。
1.6 拼接
用戶態(tài)獲取和拼接參數(shù)列表是基于 eBPF 階段的。
from collections import defaultdict
argv = defaultdict(list)
class EventStep(object):
STEP_1 = 0
STEP_2 = 1
# PERF 事件回調(diào)處理
def print_event(cpu, data, size):
event = b["events"].event(data)
# STEP 1:拼接
if event.step == EventStep.STEP_1:
argv[event.pid].append(event.argv)
# STEP 2:顯示
elif event.step == EventStep.STEP_2:
argv_text = b' '.join(argv[event.pid]).replace(b'\n', b'\\n')
printb(b"%-16s %-7d %s" % (event.comm, event.pid, argv_text))
try:
del(argv[event.pid])
except Exception:
pass
# 綁定 PERF 事件回調(diào)處理
b["events"].open_perf_buffer(print_event)
while 1:
try:
b.perf_buffer_poll()
except KeyboardInterrupt:
exit()
用戶態(tài)程序需要注意:event 事件通過 PERF 獲取的結(jié)構(gòu)數(shù)據(jù)為 Byte 類型,需要通過 decode('utf-8')/encode() 與 str 類型進行轉(zhuǎn)換。
1.7 完整代碼和運行效果
// execsnoop.c
#include <linux/sched.h>
#include <linux/fs.h>
#define ARGSIZE 128
#define MAXARG 60
enum event_step {
STEP_1,
STEP_2,
};
struct data_t {
u32 pid;
enum event_step step;
char comm[TASK_COMM_LEN];
char argv[ARGSIZE];
};
BPF_PERF_OUTPUT(events);
static int __submit_arg(struct pt_regs *ctx, void *ptr, struct data_t *data) {
bpf_probe_read_user(data->argv, sizeof(data->argv), ptr);
events.perf_submit(ctx, data, sizeof(struct data_t));
return 1;
}
static int submit_arg(struct pt_regs *ctx, void *ptr, struct data_t *data) {
const char *argp = NULL;
bpf_probe_read_user(&argp, sizeof(argp), ptr);
if (argp) {
return __submit_arg(ctx, (void *)(argp), data);
}
return 0;
}
// exter execve
int syscall__execve(struct pt_regs *ctx, const char __user *filename,
const char __user *const __user *__argv,
const char __user *const __user *__envp) {
struct data_t data = {};
data.step = STEP_1;
data.pid = bpf_get_current_pid_tgid() >> 32;
bpf_get_current_comm(&data.comm, sizeof(data.comm));
#pragma unroll
for (int i = 1; i < MAXARG; i++) {
if (submit_arg(ctx, (void *)&__argv[i], &data) == 0)
goto out;
}
char ellipsis[] = "...";
__submit_arg(ctx, (void *)ellipsis, &data);
out:
return 0;
}
// exit execve
int do_ret_sys_execve(struct pt_regs *ctx) {
struct data_t data = {};
data.step = STEP_2;
data.pid = bpf_get_current_pid_tgid() >> 32;
bpf_get_current_comm(&data.comm, sizeof(data.comm));
events.perf_submit(ctx, &data, sizeof(data));
return 0;
}
# execsnoop.py
#!/usr/bin/python3
from bcc import BPF
from bcc.utils import printb
from collections import defaultdict
argv = defaultdict(list)
class EventStep(object):
STEP_1 = 0
STEP_2 = 1
b = BPF(src_file="execsnoop.c")
execve_fnname = b.get_syscall_fnname("execve")
b.attach_kprobe(event=execve_fnname, fn_name="syscall__execve")
b.attach_kretprobe(event=execve_fnname, fn_name="do_ret_sys_execve")
print("%-7s %-16s %s" % ("PID", "PCOMM", "ARGS"))
# process event
def print_event(cpu, data, size):
event = b["events"].event(data)
fname = ""
if event.step == EventStep.STEP_1:
argv[event.pid].append(event.argv)
elif event.step == EventStep.STEP_2:
argv_text = b' '.join(argv[event.pid]).replace(b'\n', b'\\n')
printb(b"%-7d %-16s %s" % (event.pid, event.comm, argv_text))
try:
del(argv[event.pid])
except Exception:
pass
# loop with callback to print_event
b["events"].open_perf_buffer(print_event)
while 1:
try:
b.perf_buffer_poll()
except KeyboardInterrupt:
exit()
運行效果:
2 Tracepoint 追蹤點
前文提到過,kprobe 方式,幾乎可以使 eBPF 掛載到內(nèi)核中任意一個函數(shù)事件上,隨著內(nèi)核函數(shù)的執(zhí)行而觸發(fā)。但是,由于不同的內(nèi)核版本,其某個具體函數(shù)的定義、參數(shù)和實現(xiàn)可能會有所不同(kprobe 實現(xiàn)的事件處理函數(shù)要求和掛載點函數(shù)擁有相同的參數(shù))。因此,使用 kprobe 方式實現(xiàn)的 eBPF 程序可能無法在其他內(nèi)核的主機上運行。此外,kprobe 無法掛載到靜態(tài)函數(shù)或內(nèi)聯(lián)函數(shù)上。而出于性能考慮,大部分網(wǎng)絡(luò)相關(guān)的內(nèi)層函數(shù)都是內(nèi)聯(lián)或者靜態(tài)的,因此,kprobe 方式在這些領(lǐng)域也只能望洋興嘆了。
上述兩點,均為 kprobe 方式的局限性,它并不具備很好的可移植性。于是,從 Linux 內(nèi)核 4.7 開始,能讓 eBPF 使用的 tracepoint 出現(xiàn)了(官方文檔)。tracepoint 是由內(nèi)核開發(fā)人員在代碼中設(shè)置的靜態(tài) hook 點,具有穩(wěn)定的 API 接口,不會隨著內(nèi)核版本的變化而變化。但由于 tracepoint 是需要內(nèi)核研發(fā)人員參數(shù)編寫,其數(shù)量有限,并不是所有的內(nèi)核函數(shù)中都具有類似的跟蹤點,所以從靈活性上不如 kprobes 這種方式。
2.1 kprobe 和 tracepoint 對比
在 3.10 內(nèi)核中,kprobe 與 tracepoint 方式對比如下:
| 內(nèi)容 | kprobe | tracepoint |
|---|---|---|
| 追蹤類型 | 動態(tài) | 靜態(tài) |
| Hook 點數(shù)量 | 100000+ | 1200+ |
| 穩(wěn)定的 API | 否 | 是 |
可以使用以下命令查看系統(tǒng)支持的 tracepoint,支持 grep 檢索。
perf list
perf list | grep execve
上面的執(zhí)行結(jié)果可以看到,execve系統(tǒng)調(diào)用具有兩個 syscalls 類型的靜態(tài)跟蹤點,并且,tracepoint 已經(jīng)對 enter 和 exit 做了區(qū)分,其功能基本等同于 kprobe/kretprobe。
在使用 tracepoint 之前,我們需要了解 tracepoint 相關(guān)參數(shù)的格式。syscalls:sys_enter_execve 格式定義在 /sys/kernel/debug/tracing/events/syscalls/sys_enter_execve/format 文件中。
# 查看 syscalls:sys_enter_execve 參數(shù)
cat /sys/kernel/debug/tracing/events/syscalls/sys_enter_execve/format
2.2 重構(gòu)代碼
接下來,使用 tracepoint 方式重構(gòu)第 1 節(jié)的代碼,如下:
// execsnoop.c
#include <linux/sched.h>
#include <linux/fs.h>
#define ARGSIZE 128
#define MAXARG 60
enum event_step {
STEP_1,
STEP_2,
};
struct data_t {
u32 pid;
char comm[TASK_COMM_LEN];
enum event_step step;
char argv[ARGSIZE];
};
BPF_PERF_OUTPUT(events);
static int __submit_arg(struct pt_regs *ctx, void *ptr, struct data_t *data) {
bpf_probe_read_user(data->argv, sizeof(data->argv), ptr);
events.perf_submit(ctx, data, sizeof(struct data_t));
return 1;
}
static int submit_arg(struct pt_regs *ctx, void *ptr, struct data_t *data) {
const char *argp = NULL;
bpf_probe_read_user(&argp, sizeof(argp), ptr);
if (argp) {
return __submit_arg(ctx, (void *)(argp), data);
}
return 0;
}
// (A) sys_enter_execve tracepoint
TRACEPOINT_PROBE(syscalls, sys_enter_execve) {
struct data_t data = {};
const char **argv = (const char **) (args->argv);
data.step = STEP_1;
data.pid = bpf_get_current_pid_tgid() >> 32;
bpf_get_current_comm(&data.comm, sizeof(data.comm));
#pragma unroll
for (int i = 1; i < MAXARG; i++) {
// (B) args 強制轉(zhuǎn)換為 ctx
if (submit_arg((struct pt_regs *)args, (void *)&argv[i], &data) == 0)
goto out;
}
char ellipsis[] = "...";
__submit_arg((struct pt_regs *)args, (void *)ellipsis, &data);
out:
return 0;
}
// sys_exit_execve tracepoint
TRACEPOINT_PROBE(syscalls, sys_exit_execve) {
struct data_t data = {};
data.step = STEP_2;
data.pid = bpf_get_current_pid_tgid() >> 32;
bpf_get_current_comm(&data.comm, sizeof(data.comm));
events.perf_submit(args, &data, sizeof(data));
return 0;
}
# execsnoop.py
#!/usr/bin/python3
from bcc import BPF
from bcc.utils import printb
from collections import defaultdict
argv = defaultdict(list)
class EventStep(object):
STEP_1 = 0
STEP_2 = 1
# (C) 不再通過 kprobe 綁定
b = BPF(src_file="execsnoop.c")
print("%-7s %-16s %s" % ("PID", "PCOMM", "ARGS"))
# process event
def print_event(cpu, data, size):
event = b["events"].event(data)
if event.step == EventStep.STEP_1:
argv[event.pid].append(event.argv)
elif event.step == EventStep.STEP_2:
argv_text = b' '.join(argv[event.pid]).replace(b'\n', b'\\n')
printb(b"%-7d %-16s %s" % (event.pid, event.comm, argv_text))
try:
del(argv[event.pid])
except Exception:
pass
# loop with callback to print_event
b["events"].open_perf_buffer(print_event)
while 1:
try:
b.perf_buffer_poll()
except KeyboardInterrupt:
exit()
注意:
A)一個 tracepoint 定義接收兩個參數(shù),TRACEPOINT_PROBE(syscalls, sys_enter_execve) 第一個為子系統(tǒng)名稱,第二個為事件名稱。
B)tracepoint 中的所有參數(shù)都會包含在一個固定名稱的 args 的結(jié)構(gòu)體中。args 類型為 struct tracepoint__syscalls__sys_enter_open,其第一個字段為 u64 __do_not_use__;,該字段為 ctx 的保留位置。因此,args 可以被強制轉(zhuǎn)換為 ctx。
ctx是啥?在《Linux 內(nèi)核觀測技術(shù) BPF》一書中,
ctx被稱為“上下文”,提供了訪問內(nèi)核正在處理的信息。我們可以通過PT_REGS_RC(ctx)來獲取當前函數(shù)的返回值。
C)用戶態(tài)代碼不再需要 attach_kprobe 手動綁定。
3 eBPF 程序事件類型
像是 kprobe、tracepoint 將 eBPF 程序掛載到內(nèi)核事件的方式,可以暫且被稱為 eBPF 事件類型。事實上,除了以上列出的兩種,eBPF 事件類型還有很多,選取其中一些列舉如下:
-
kprobes/kretprobes:內(nèi)核函數(shù)事件。不再贅述。 -
tracepoint:內(nèi)核跟蹤點事件。不再贅述。 -
uprobes/uretprobes:用戶空間函數(shù)事件,可以綁定監(jiān)聽一個用戶空間的函數(shù)。 -
USDT probes:用戶自定義的靜態(tài)追蹤點。用戶可以在用戶空間的程序中插入靜態(tài)追蹤點,用于掛載 eBPF。 -
LSM Probes:LSM Hook 掛載點。需要內(nèi)核版本 5.7 以上。
由于篇幅限制,不再列舉其他 eBPF 事件類型了,后面如果有精力,再補一篇文章。
4 總結(jié)
本文在前一篇文章的基礎(chǔ)上,對進程執(zhí)行監(jiān)控工具(execsnoop)進行了升級,實時打印進程執(zhí)行時傳入的參數(shù)列表;并通過 kprobe 和 tracepoint 兩種方式,綁定 eBPF 程序,給出了代碼實現(xiàn)。同時,對這兩種 eBPF 事件類型進行了簡單比較。顯然,在你手動開發(fā)一個 eBPF 程序時,建議使用 tracepoint,以追求更好的穩(wěn)定性和可移植性。文章的最后,簡單列出了一些支持的 eBPF 事件類型。
以上拋磚引玉,如有不正確指出,請大家及時斧正。如果你喜歡這篇文章,請點個推薦吧!
總結(jié)
以上是生活随笔為你收集整理的【eBPF-02】入门:基于 BCC 框架的程序进阶的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 突破限制:解密《气球塔防6》英雄解锁方法
- 下一篇: 网线水晶头排序的口诀是什么