【说解】在shell中通过mkfifo创建命名管道来控制多个进程并发执行
背景:
工作中有兩個異地機房需要傳數(shù)據(jù),數(shù)據(jù)全名很規(guī)范,在某個目錄下命名為統(tǒng)一的前綴加上編號。如/path/from/file.{1..100}。而機房間的專線對單個scp進程的傳輸速度是有限制的,比如最大在100Mb/s,如果直接啟動100個scp,則又會遇到ssh的并發(fā)連接數(shù)限制。
所以需要控制并發(fā)數(shù),即不超過ssh的并發(fā)限制,又要讓單網(wǎng)卡上的帶寬接近飽和,盡快完成傳輸(假設(shè)專線帶寬遠大于單機網(wǎng)卡帶寬)
實現(xiàn)
之前知道通過mkfifo創(chuàng)建一個命名管道,可以實現(xiàn)對并發(fā)的控制。現(xiàn)在來實現(xiàn)一個。
在此之前,如果對mkfifo不了解,可以參考這個連接,作者寫得很詳細,我就不造輪子了。
這里直接給出代碼,并做一些解釋。因為單進程的帶寬如上所述,所以考慮9個并發(fā)。代碼如下:
1 #!/bin/bash
2
3 your_func()
4 { # use your cmd or func instead of sleep here. don't end with background(&)
5 date +%s
6 echo "scp HOSTNAME:/home/USER/path/from/file.$1 REMOTE_HOST:/home/USER/path/to/"
7 sleep 2
8 }
9
10 concurrent()
11 { # from $1 to $2, (included $1,$2 itself), con-current $3 cmd
12 start=$1 && end=$2 && cur_num=$3
13
14 # ff_file which is opened by fd 4 will be really removed after script stopped
15 mkfifo ./fifo.$$ && exec 4<> ./fifo.$$ && rm -f ./fifo.$$
16
17 # initial fifo: write $cur_num line to $ff_file
18 for ((i=$start; i<$cur_num+$start; i++)); do
19 echo "init time add $i" >&4
20 done
21
22 for((i=$start; i<=$end; i++)); do
23 read -u 4 # read from mkfifo file
24 { # REPLY is var for read
25 echo -e "-- current loop: [cmd id: $i ; fifo id: $REPLY ]"
26
27 your_func $i
28 echo "real time add $(($i+$cur_num))" 1>&4 # write to $ff_file
29 } & # & to backgroud each process in {}
30 done
31 wait # wait all con-current cmd in { } been running over
32 }
33
34 concurrent 0 8 3
上面以3為并發(fā)數(shù),執(zhí)行0到8號共9次,以便顯示如下執(zhí)行結(jié)果。
1 bash concurrent.sh 2 -- current loop: [cmd id: 0 ; fifo id: init time add 0 ] 3 -- current loop: [cmd id: 1 ; fifo id: init time add 1 ] 4 -- current loop: [cmd id: 2 ; fifo id: init time add 2 ] 5 1453518400 6 1453518400 7 scp HOSTNAME:/home/USER/path/from/file.0 REMOTE_HOST:/home/USER/path/to/ 8 scp HOSTNAME:/home/USER/path/from/file.2 REMOTE_HOST:/home/USER/path/to/ 9 1453518400 10 scp HOSTNAME:/home/USER/path/from/file.1 REMOTE_HOST:/home/USER/path/to/ 11 -- current loop: [cmd id: 3 ; fifo id: real time add 3 ] 12 -- current loop: [cmd id: 4 ; fifo id: real time add 5 ] 13 -- current loop: [cmd id: 5 ; fifo id: real time add 4 ] 14 1453518402 15 scp HOSTNAME:/home/USER/path/from/file.3 REMOTE_HOST:/home/USER/path/to/ 16 1453518402 17 1453518402 18 scp HOSTNAME:/home/USER/path/from/file.5 REMOTE_HOST:/home/USER/path/to/ 19 scp HOSTNAME:/home/USER/path/from/file.4 REMOTE_HOST:/home/USER/path/to/ 20 -- current loop: [cmd id: 6 ; fifo id: real time add 6 ] 21 -- current loop: [cmd id: 7 ; fifo id: real time add 7 ] 22 -- current loop: [cmd id: 8 ; fifo id: real time add 8 ] 23 1453518404 24 scp HOSTNAME:/home/USER/path/from/file.6 REMOTE_HOST:/home/USER/path/to/ 25 1453518404 26 1453518404 27 scp HOSTNAME:/home/USER/path/from/file.7 REMOTE_HOST:/home/USER/path/to/ 28 scp HOSTNAME:/home/USER/path/from/file.8 REMOTE_HOST:/home/USER/path/to/
從date輸出的時間上,可以看出,每2秒會執(zhí)行3個并發(fā)。
說明
整體過程
設(shè)N的值為并發(fā)數(shù)。通過在fifo中初始化N行內(nèi)容(可以為空值),再利用fifo的特性,從fifo中每讀一行,啟動一次your_func調(diào)用,當(dāng)fifo讀完N次時,fifo為空。再讀時就會阻塞。這樣開始執(zhí)行時就是N個并發(fā)(1-N)。
當(dāng)并發(fā)執(zhí)行的進程your_func,任意一個完成操作時,下一步會招待如下語句:
echo "real time add $(($i+$cur_num))" 1>&4
這樣就對fifo新寫入了一行,前面被阻塞的第N+1號待執(zhí)行的進程read成功,開始進入{}語句塊執(zhí)行。這樣通過read fifo的阻塞功能,實現(xiàn)了并發(fā)數(shù)的控制。
需要注意的是,當(dāng)并發(fā)數(shù)較大時,多個并發(fā)進程即使在使用sleep相同秒數(shù)模擬時,也會存在進程調(diào)度的順序問題,因而并不是按啟動順序結(jié)束的,可能會后啟動的進程先結(jié)束。
從而導(dǎo)致如下語句所示的輸出中,兩個數(shù)字并不一定是相等的。并發(fā)數(shù)越大,這種差異性越大。
-- current loop: [cmd id: 8 ; fifo id: real time add 9 ]
自定義函數(shù)
修改自定義函數(shù)your_func,這個函數(shù)實際只需要一行就完成了。
your_func()
{ # use your cmd or func instead of sleep here. don't end with background(&)
date +%s
scp HOSTNAME:/home/USER/path/from/file.$1 REMOTE_HOST:/home/USER/path/to/
}
需要注意的是,scp命令最后不需要添加壓后臺的&符號。因為在上一級就已經(jīng)壓后臺并發(fā)了。
再來說明concurrent函數(shù)的第14行。
exec digit<> filename
這是一個平常很少使用到的命令。特別是‘<>’這個符號。既然不明白我們來查一下系統(tǒng)幫助。
man bash
# search 'exec '
Opening File Descriptors for Reading and Writing
The redirection operator
[n]<>word
causes the file whose name is the expansion of word to be opened for both reading and writing on file
descriptor n, or on file descriptor 0 if n is not specified. If the file does not exist, it is created.
通過man bash來搜索exec加空格,會找到對exec的說明。注意如果直接man exec,會搜索到linux programer's manual,是對execl, execlp, execle, execv, execvp, execvpe - execute a file這一堆系統(tǒng)函數(shù)的調(diào)用說明。
還要注意哦,4<> 這幾個字符不要加空格,必然連著寫。word前可以加空格。
rm file
mkfifo先創(chuàng)建管道文件,再通過exec將該文件綁定到文件描述符4。也許你在疑惑后面的rm操作。其實當(dāng)該文件綁定到文件描述符后,內(nèi)核已經(jīng)通過open系統(tǒng)調(diào)用打開了該文件,這個時候執(zhí)行rm操作,刪除的是文件的Inode,但concurrent函數(shù)已經(jīng)連接到文件的block塊區(qū)。
如果你遇到過這樣的情況,你就明白了:如果線上的nginx日志是沒有切分的,access.log會越來越大,這時你直接rmaccess.log文件后,文件不見了,但df查看系統(tǒng)并沒有釋放磁盤空間。這就是因為rm只是刪除了inode,但這之前nginx早已經(jīng)通過open打開了這個文件,nginx進程的進程控制塊中的文件描述符表中對應(yīng)的fd,已經(jīng)有相應(yīng)的文件指針指向了該文件在內(nèi)存中的文件表,以及其在內(nèi)存中的v節(jié)點表,并最終指向文件的實際存儲塊。因此nginx依然可以繼續(xù)寫日志,磁盤還在被寫入。只有重啟或者reload,讓進程重新讀一次配置,重新打開一遍相應(yīng)的文件時,才會發(fā)現(xiàn)該文件不存在的,并新建該文件。而這時因為Inode節(jié)點已經(jīng)釋放,再用df查看時就能看到可用空間增大了。
不懂可以參考APUE的圖3.1及想著說明。
因此14行的rm并不影響后繼腳本執(zhí)行,直到腳本結(jié)束,系統(tǒng)收回所有文件描述符。
初始化
18-20行在做初始化管道的工作。其中讀取管道有兩類寫法:
1 # style 1 2 for ((i=$start; i<$cur_num+$start; i++)); do 3 echo "init time add $i" >&4 4 done 5 6 # style 2 7 for ((i=$start; i<$cur_num+$start; i++)); do 8 echo "init time add $i" 9 done >&4
差別就是‘>&4’ 這幾個字符放在echo語句后面,還是放在done后面,兩者都可以,前者針對echo語句,后者針對整個for循環(huán)。
同理,在下一個for循環(huán)中,read命令也有兩種方式:
# style 1
for((i=$start; i<=$end; i++)); do
read -u 4
{
your_func $i
echo "real time add $(($i+$cur_num))" 1>&4 # write to $ff_file
} &
done
# style 2
for((i=$start; i<=$end; i++)); do
read
{
your_func $i
echo "real time add $(($i+$cur_num))" 1>&4 # write to $ff_file
} &
done <&4
關(guān)于REPLY
再解釋一下REPLY變量。這是上述循環(huán)中,用來存放read命令從fifo中讀到的內(nèi)容。其實在整個腳本中,是不需要關(guān)注這個點的。不過這里隨帶也解釋一下。
通過能fifo的不斷讀寫,才實現(xiàn)了echo如下語句:
-- current loop: [cmd id: 7 ; fifo id: real time add 7 ]
如何了解到REPLY呢?我們又得man一下了。為了找到read的參數(shù)。先man read發(fā)現(xiàn)不對。再如下查找,因為read是bash自建命令。
1 man bash 2 # search 'Shell Variables' 3 4 REPLY Set to the line of input read by the read builtin command when no arguments are supplied.
總結(jié)
以上是生活随笔為你收集整理的【说解】在shell中通过mkfifo创建命名管道来控制多个进程并发执行的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 密码学专题 非对称加密算法指令概述 DS
- 下一篇: 微信公众号python人工智能回复_py