Nextflow patterns
Nextflow patterns
1 Basic Patterns
1.1 Channel duplication
P:需要在兩個或多個進程中使用相同的通道作為輸入
S:使用into運算符創建源通道的兩個(或更多)副本。然后,使用新通道作為流程的輸入。
代碼:
Channel.fromPath('prots/*_?.fa').into { prot1_ch; prot2_ch }process foo {input: file x from prot1_chscript:"""echo your_command --input $x""" }process bar {input: file x from prot2_chscript:"""your_command --input $x""" }2 Scatter executions
2.1 Process per file path
P:需要為每個匹配 glob 模式的文件執行一個任務
S:使用Channel.fromPath方法創建一個通道,發出與 glob 模式匹配的所有文件。然后,使用通道作為執行任務的流程的輸入。
代碼:
Channel.fromPath('reads/*_1.fq.gz').set{ samples_ch }process foo {input:file x from samples_chscript:"""your_command --input $x""" }2.2 Process per file chunk
P:需要將一個或多個輸入文件拆分為塊并為每個文件執行一項任務
S:使用splitText運算符將文件拆分為給定大小的塊。然后將結果通道用作執行任務的流程的輸入
代碼:
Channel.fromPath('poem.txt').splitText(by: 5).set{ chunks_ch }process foo {echo trueinput:file x from chunks_chscript:"""rev $x | rev""" }2.3 Process per file pairs
P:需要將文件處理到按對分組的目錄中
S:使用Channel.fromFilePairs方法創建一個通道,該通道發出與 glob 模式匹配的文件對。該模式必須匹配成對文件名中的公共前綴。匹配文件作為元組發出,其中第一個元素是匹配文件的分組鍵,第二個元素是文件對本身。
代碼:
Channel.fromFilePairs('reads/*_{1,2}.fq.gz').set { samples_ch }process foo {input:set sampleId, file(reads) from samples_chscript:"""your_command --sample $sampleId --reads $reads""" }- 自定義分組策略
需要時,可以定義自定義分組策略。一個常見的用例是對齊 BAM 文件 ( sample1.bam) 隨附的索引文件。困難在于索引有時會被調用sample1.bai,有時sample1.bam.bai取決于所使用的軟件。下面的例子可以適應這兩種情況。
代碼:
Channel.fromFilePairs('alignment/*.{bam,bai}') { file -> file.name.replaceAll(/.bam|.bai$/,'') }.set { samples_ch }process foo {input:set sampleId, file(bam) from samples_chscript:"""your_command --sample $sampleId --bam ${sampleId}.bam""" }2.4 Process per file range
P:需要在具有共同索引范圍的兩個或更多系列文件上執行任務
S:使用from方法定義重復執行任務的范圍,然后將其與map運算符鏈接以將每個索引與相應的輸入文件相關聯。最后使用結果通道作為過程的輸入
代碼:
Channel.from(1..23).map { chr -> tuple("sample$chr", file("/some/path/foo.${chr}.indels.vcf"), file("/other/path/foo.snvs.${chr}.vcf")) }.set { pairs_ch }process foo {tag "$sampleId"input:set sampleId, file(indels), file(snps) from pairs_ch"""echo foo_command --this $indels --that $snps""" }2.5 Process per CSV record
P:需要為一個或多個 CSV 文件中的每條記錄執行一項任務
S:使用splitCsv運算符逐行讀取 CSV 文件,然后使用map運算符返回每行所需字段的元組,并使用該file函數將任何字符串路徑轉換為文件路徑對象。最后使用結果通道作為過程的輸入
index.csv
| FC816RLABXX | read/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_1.fq.gz | read/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_2.fq.gz |
| FC812MWABXX | read/110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_1.fq.gz | read110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_2.fq.gz |
| FC81DE8ABXX | read/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_1.fq.gz | read/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_2.fq.gz |
| FC81DB5ABXX | read/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_1.fq.gz | read/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_2.fq.gz |
| FC819P0ABXX | read/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_1.fq.gz | read/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_2.fq.gz |
代碼:
params.index = 'index.csv'Channel.fromPath(params.index).splitCsv(header:true).map{ row-> tuple(row.sampleId, file(row.read1), file(row.read2)) }.set { samples_ch }process foo {input:set sampleId, file(read1), file(read2) from samples_chscript:"""echo your_command --sample $sampleId --reads $read1 $read2""" }2.6 Process per file output
P:工作流中的任務一次生成兩個或更多文件。下游任務需要獨立處理這些文件中的每一個
S:使用flatten運算符將上游進程的輸出轉換為單獨發送到每個文件的通道。然后將此通道用作下游過程的輸入
代碼:
process foo {output:file '*.txt' into foo_chscript:'''echo Hello there! > file1.txtecho What a beautiful day > file2.txtecho I wish you are having fun1 > file3.txt''' }process bar {input:file x from foo_ch.flatten()script:"""cat $x""" }3 Gather results
3.1 Process all outputs altogether
P:需要完全處理上游任務的所有輸出
S:使用collect運算符收集上游任務產生的所有輸出,并將它們作為唯一輸出發出。然后使用結果通道作為過程的輸入輸入
代碼:
Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }process foo {input:file x from samples_choutput:file 'file.fq' into unzipped_chscript:"""< $x zcat > file.fq""" }process bar {echo trueinput:file '*.fq' from unzipped_ch.collect()"""cat *.fq""" }3.2 Process outputs into groups
P:需要在同一批次中處理文件名中具有匹配鍵的所有文件
S:使用map運算符將每個文件關聯一個從文件名中提取的鍵。然后將結果通道與groupTuple運算符鏈接起來,將所有具有匹配鍵的文件組合在一起。最后使用結果通道作為過程的輸入。
代碼:
Channel.fromPath('reads/*').map { file ->def key = file.name.toString().tokenize('_').get(0)return tuple(key, file)}.groupTuple().set{ groups_ch }process foo {input:set key, file(samples) from groups_chscript:"""echo your_command --batch $key --input $samples""" }3.3 Collect outputs into a file
P:需要將上游進程生成的所有輸出文件連接到一個文件中
S:使用collectFile運算符將所有輸出文件合并為一個文件
代碼:
Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }process foo {input:file x from samples_choutput:file 'file.fq' into unzipped_chscript:"""< $x zcat > file.fq""" }unzipped_ch.collectFile().println{ it.text }4 Organize outputs
4.1 Store process outputs
P:需要將一個或多個進程的輸出存儲到您選擇的目錄結構中
S:使用publishDir指令設置一個自定義目錄,在該目錄中需要提供流程輸出
代碼:
params.reads = 'reads/*{1,2}.fq.gz' params.outdir = 'my-results'Channel.fromFilePairs(params.reads).set{ samples_ch }process foo {publishDir "$params.outdir/$sampleId"input:set sampleId, file(samples) from samples_choutput:file '*.fq'script:"""< ${samples[0]} zcat > sample_1.fq< ${samples[1]} zcat > sample_2.fq""" }4.2 Store outputs matching a glob pattern
P:工作流中的任務會創建下游任務所需的許多輸出文件。可根據文件名將其中一些文件存儲到單獨的目錄中
S:使用兩個或多個publishDir指令將輸出文件存儲到單獨的存儲路徑中。對于每個指令,使用選項指定一個不同的 glob 字符串,pattern僅將與提供的模式匹配的文件存儲到每個目錄中
代碼:
Channel.fromFilePairs(params.reads, flat: true).set{ samples_ch }process foo {publishDir "$params.outdir/$sampleId/counts", pattern: "*_counts.txt"publishDir "$params.outdir/$sampleId/outlooks", pattern: '*_outlook.txt'publishDir "$params.outdir/$sampleId/", pattern: '*.fq'input:set sampleId, file('sample1.fq.gz'), file('sample2.fq.gz') from samples_choutput:file "*"script:"""< sample1.fq.gz zcat > sample1.fq< sample2.fq.gz zcat > sample2.fqawk '{s++}END{print s/4}' sample1.fq > sample1_counts.txtawk '{s++}END{print s/4}' sample2.fq > sample2_counts.txthead -n 50 sample1.fq > sample1_outlook.txthead -n 50 sample2.fq > sample2_outlook.txt""" }4.3 Rename process outputs
P:需要將進程的輸出存儲到一個目錄中,為文件指定一個您選擇的名稱
S:publishDir 允許在過程輸出存儲在所選擇的目錄。指定saveAs參數為每個文件提供您選擇的名稱,證明自定義規則作為閉包(closure)
代碼:
process foo {publishDir 'results', saveAs: { filename -> "foo_$filename" }output:file '*.txt''''touch this.txttouch that.txt''' }- 若保存在子目錄中:
可以使用相同的模式將特定文件存儲在不同的目錄中,具體取決于實際名稱
代碼:
process foo {publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }output:file '*''''touch this.txttouch that.zip''' }5 Other
5.1 Get process work directory
P:需要當前任務工作目錄的顯式路徑
S:使用$PWDBash 變量或pwd命令檢索任務工作目錄路徑
代碼:
process foo {echo truescript:"""echo foo task path: \$PWD""" }process bar {echo truescript:'''echo bar task path: $PWD''' }注意:$當命令腳本包含在雙引號字符中時,請確保使用轉義變量占位符,如上在雙引號中的$前加上了\來進行了轉義
- 使用以下命令為相同的腳本提供一些輸入文件,以防止進程被執行
5.2 Ignore failing process
P:預期任務在特定條件下會失敗,由此希望忽略失敗并繼續執行工作流中的剩余任務
S:使用 process指令 errorStrategy 'ignore'忽略錯誤條件
代碼:
process foo {errorStrategy 'ignore'script:'''echo This is going to fail!exit 1''' }process bar {script:'''echo OK''' }5.3 Mock dependency
P:需要同步兩個沒有直接輸入輸出關系的進程bar的執行,以便該進程僅在 process 完成后執行foo
S:將foo產生標志值的通道添加到進程的輸出中。然后將此通道用作進程的輸入以bar在其他進程完成時觸發其執行
代碼:
Channel.fromPath('.data/reads/*.fq.gz').set{ reads_ch }process foo {output:val true into done_chscript:"""your_command_here""" }process bar {input:val flag from done_chfile fq from reads_chscript:"""other_commad_here --reads $fq""" }6 Advanced patterns
6.1 Conditional resources definition
P:工作流中的任務需要使用一定量的計算資源,例如。內存取決于一個或多個輸入文件的大小或名稱。
S:使用閉包以動態方式聲明資源需求,例如memory,cpus等。閉包使用size流程定義中聲明的輸入的文件屬性(例如等),來計算所需的資源量。
代碼:
Channel.fromPath('reads/*_1.fq.gz').set { reads_ch }process foo {memory { reads.size() < 70.KB ? 1.GB : 5.GB }input:file reads from reads_ch"""your_command_here --in $reads""" }6.2 Conditional process executions
P:兩個不同的任務需要以互斥的方式執行,那么第三個任務應該對前一次執行的結果進行后處理。
S:使用when語句有條件地執行兩個不同的進程。每個進程聲明自己的輸出通道。然后使用mix運算符創建一個新通道,該通道將發出兩個進程產生的輸出,并將其用作第三個進程的輸入。
代碼:
params.flag = falseprocess foo {output:file 'x.txt' into foo_chwhen:!params.flagscript:'''echo foo > x.txt''' }process bar {output:file 'x.txt' into bar_chwhen:params.flagscript:'''echo bar > x.txt''' }process omega {echo trueinput:file x from foo_ch.mix(bar_ch)script:"""cat $x""" }可根據flag的不同來選擇不同的進程執行,執行foo和omega:
nextflow run patterns/conditional-process.nf執行bar和omega:
nextflow run patterns/conditional-process.nf --flag結果:
peng@sin-try2:~/patterns$ nextflow run conditional-process.nf N E X T F L O W ~ version 21.04.0-edge Launching `conditional-process.nf` [soggy_hypatia] - revision: 1b07ba0b38 executor > local (2) [82/ea867d] process > foo [100%] 1 of 1 ? [- ] process > bar - [38/be26fb] process > omega (1) [100%] 1 of 1 ? foopeng@sin-try2:~/patterns$ nextflow run conditional-process.nf --flag N E X T F L O W ~ version 21.04.0-edge Launching `conditional-process.nf` [astonishing_goldwasser] - revision: 1b07ba0b38 executor > local (2) [- ] process > foo - [7f/158b8c] process > bar [100%] 1 of 1 ? [05/b06073] process > omega (1) [100%] 1 of 1 ? bar- 有條件地正常(使用數據)或作為空通道創建輸入 通道。使用各個輸入通道的過程僅在通道被填充時才會執行。每個進程仍然聲明自己的輸出通道。然后使用mix運算符創建一個新通道,該通道將發出兩個進程產生的輸出,并將其用作第三個進程的輸入
代碼:
params.flag = false(foo_inch, bar_inch) = ( params.flag? [ Channel.empty(), Channel.from(1,2,3) ]: [ Channel.from(4,5,6), Channel.empty() ] )process foo {input:val(f) from foo_inchoutput:file 'x.txt' into foo_chscript:"""echo $f > x.txt""" }process bar {input:val(b) from bar_inchoutput:file 'x.txt' into bar_chscript:"""echo $b > x.txt""" }process omega {echo trueinput:file x from foo_ch.mix(bar_ch)script:"""cat $x""" }運行結果:
peng@sin-try2:~/patterns$ nextflow run conditional-process2.nf N E X T F L O W ~ version 21.04.0-edge Launching `conditional-process2.nf` [naughty_minsky] - revision: 296937c5d2 executor > local (6) [0b/8183d2] process > foo (3) [100%] 3 of 3 ? [- ] process > bar - [e8/e9334f] process > omega (3) [100%] 3 of 3 ? 654peng@sin-try2:~/patterns$ nextflow run conditional-process2.nf --flag N E X T F L O W ~ version 21.04.0-edge Launching `conditional-process2.nf` [disturbed_goldwasser] - revision: 296937c5d2 executor > local (6) [- ] process > foo - [aa/907692] process > bar (3) [100%] 3 of 3 ? [c1/e15b2f] process > omega (3) [100%] 3 of 3 ? 1236.3 Skip process execution
P:工作流程中有兩個連續的任務,當指定了可選標志時,不應執行第一個任務,其輸入由第二個任務處理。
S:使用條件表達式中創建的空通道,在指定可選參數時跳過第一個執行流程。然后將第二進程的輸入定義為第一進程的輸出(當被執行時)與輸入信道的mix
代碼:
params.skip = false params.input = "$baseDir/sample.fq.gz"Channel.fromPath(params.input).set{ input_ch }(foo_ch, bar_ch) = ( params.skip? [Channel.empty(), input_ch]: [input_ch, Channel.empty()] )process foo {input:file x from foo_choutput:file('*.fastq') into optional_chscript:"""< $x zcat > ${x.simpleName}.fastq""" }process bar {echo trueinput:file x from bar_ch.mix(optional_ch)"""echo your_command --input $x""" }6.4 Feedback loop☆
P:需要重復執行一個或多個任務,使用輸出作為新迭代的輸入,直到達到特定條件(i=o=i…)
S:使用迭代循環中最后一個進程的輸出作為第一個進程的輸入。使用Channel.create方法顯式創建輸出通道。然后將過程輸入定義為初始輸入和過程輸出的mix,該過程輸出應用于定義終止條件的until運算符
代碼;
params.input = 'hello.txt'condition = { it.readLines().size()>3 } feedback_ch = Channel.create() input_ch = Channel.fromPath(params.input).mix( feedback_ch.until(condition) )process foo {input:file x from input_choutput:file 'foo.txt' into foo_chscript:"""cat $x > foo.txt""" }process bar {input:file x from foo_choutput:file 'bar.txt' into feedback_chfile 'bar.txt' into result_chscript:"""cat $x > bar.txtecho World >> bar.txt""" }result_ch.last().println { "Result:\n${it.text.indent(' ')}" }6.5 Optional input
P:一個或多個進程有一個可選的輸入文件
S:使用特殊的文件名來標記文件參數的缺失
代碼:
params.inputs = 'prots/*{1,2,3}.fa' params.filter = 'NO_FILE'prots_ch = Channel.fromPath(params.inputs) opt_file = file(params.filter)process foo {input:file seq from prots_chfile opt from opt_filescript:def filter = opt.name != 'NO_FILE' ? "--filter $opt" : ''"""your_commad --input $seq $filter""" }6.6 Optional output
P:在某些情況下,工作流中的任務預計不會創建輸出文件
S:將此類輸出聲明為optional文件
代碼:
process foo {output:file 'foo.txt' optional true into foo_chscript:'''your_command''' }6.7 Execute when empty
P:如果通道為空,您需要執行一個過程
S:使用ifEmpty運算符發出標記值以觸發流程的執行
代碼:
process foo {input:val x from ch.ifEmpty { 'EMPTY' }when:x == 'EMPTY'script:'''your_command''' }總結
以上是生活随笔為你收集整理的Nextflow patterns的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 区分度评估指标-KS
- 下一篇: MC34063在扩展后的降压应用