Operators
branch
New in version 19.08.0-edge.
Returns: multiple queue channels or value channels, matching the source type
The branch
operator forwards each item from a source channel to one of multiple output channels, based on a selection criteria.
The selection criteria is a closure that defines, for each output channel, a unique label followed by a boolean expression. When an item is received, it is routed to the first output channel whose expression evaluates to true
. For example:
Channel.of(1, 2, 3, 40, 50)
.branch {
small: it < 10
large: it > 10
}
.set { result }
result.small.view { "$it is small" }
result.large.view { "$it is large" }
1 is small
2 is small
3 is small
40 is large
50 is large
Note
The above output may be printed in any order since the two view
operations are executed asynchronously.
A fallback condition can be specified using true
as the last branch condition:
Channel.of(1, 2, 3, 40, 50)
.branch {
small: it < 10
large: it < 50
other: true
}
.set { result }
result.small.view { "$it is small" }
result.large.view { "$it is large" }
result.other.view { "$it is other" }
1 is small
2 is small
3 is small
40 is large
50 is other
The value emitted to each branch can be customized with an expression statement (or statements) after the branch condition:
Channel.of(1, 2, 3, 40, 50)
.branch {
foo: it < 10
return it+2
bar: it < 50
return it-2
other: true
return 0
}
.set { result }
result.foo.view { "$it is foo" }
result.bar.view { "$it is bar" }
result.other.view { "$it is other" }
3 is foo
4 is foo
5 is foo
38 is bar
0 is other
Tip
When the return
keyword is omitted, the value of the last expression statement is implicitly returned.
The branchCriteria()
method can be used to create a branch criteria as a variable that can be passed as an argument to any number of branch
operations, as shown below:
def criteria = branchCriteria {
small: it < 10
large: it > 10
}
Channel.of(1, 2, 30).branch(criteria).set { ch1 }
Channel.of(10, 20, 3).branch(criteria).set { ch2 }
ch1.small.view { "$it is small" }
ch1.large.view { "$it is large" }
ch2.small.view { "$it is small" }
ch2.large.view { "$it is large" }
1 is small
2 is small
3 is small
20 is large
30 is large
buffer
Returns: queue channel
The buffer
operator collects items from a source channel into subsets and emits each subset separately.
This operator has multiple variants:
buffer( closingCondition )
Emits each subset when
closingCondition
is satisfied. The closing condition can be a literal value, a regular expression, a type qualifier (i.e. Java class), or a boolean predicate. For example:Channel.of( 1, 2, 3, 1, 2, 3 ) .buffer { it == 2 } .view()
[1, 2] [3, 1, 2]
buffer( openingCondition, closingCondition )
Creates a new subset when
openingCondition
is satisfied and emits the subset when isclosingCondition
is satisfied. The opening and closing conditions can each be a literal value, a regular expression, a type qualifier (i.e. Java class), or a boolean predicate. For example:// emits bundles starting with `2` and ending with `4` Channel.of( 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2 ) .buffer( 2, 4 ) .view()
[2, 3, 4] [2, 3, 4]
buffer( size: n, remainder: true | false )
Emits a new subset for every
n
items. Remaining items are discarded. For example:Channel.of( 1, 2, 3, 1, 2, 3, 1 ) .buffer( size: 2 ) .view()
[1, 2] [3, 1] [2, 3]
The
remainder
option can be used to emit any remaining items as a partial subset:Channel.of( 1, 2, 3, 1, 2, 3, 1 ) .buffer( size: 2, remainder: true ) .view()
[1, 2] [3, 1] [2, 3] [1]
buffer( size: n, skip: m, remainder: true | false )
Emits a new subset for every
n
items, skippingm
items before collecting each subset. For example:Channel.of( 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2 ) .buffer( size: 3, skip: 2 ) .view()
[3, 4, 5] [3, 4, 5]
The
remainder
option can be used to emit any remaining items as a partial subset.
See also: collate
collate
Returns: queue channel
The collate
operator collects items from a source channel into groups of N items.
This operator has multiple variants:
collate( size, remainder = true )
Collects items into groups of
size
items:Channel.of(1, 2, 3, 1, 2, 3, 1) .collate( 3 ) .view()
[1, 2, 3] [1, 2, 3] [1]
By default, any remaining items are emitted as a partial group. You can specify
false
as the second parameter to discard them instead:Channel.of(1, 2, 3, 1, 2, 3, 1) .collate( 3, false ) .view()
[1, 2, 3] [1, 2, 3]
Note
This version of
collate
is equivalent tobuffer( size: n, remainder: true | false )
.collate( size, step, remainder = true )
Collects items into groups of
size
items using a sliding window that moves bystep
items at a time:Channel.of(1, 2, 3, 4) .collate( 3, 1 ) .view()
[1, 2, 3] [2, 3, 4] [3, 4] [4]
You can specify
false
as the third parameter to discard any remaining items.
See also: buffer
collect
Returns: value channel
The collect
operator collects all items from a source channel into a list and emits it as a single item:
Channel.of( 1, 2, 3, 4 )
.collect()
.view()
[1, 2, 3, 4]
An optional closure can be used to transform each item before it is collected:
Channel.of( 'hello', 'ciao', 'bonjour' )
.collect { it.length() }
.view()
[5, 4, 7]
Available options:
flat
When
true
, nested list structures are flattened and their items are collected individually (default:true
).sort
When
true
, the collected items are sorted by their natural ordering (default:false
). Can also be a closure or a Comparator which defines how items are compared during sorting.
See also: toList, toSortedList
collectFile
Returns: queue channel
The collectFile
operator collects the items from a source channel and saves them to one or more files, emitting the collected file(s).
This operator has multiple variants:
collectFile( name: '...', [options] )
Collects the items and saves them to a single file specified by the
name
option:Channel.of('alpha', 'beta', 'gamma') .collectFile(name: 'sample.txt', newLine: true) .subscribe { println "Entries are saved to file: $it" println "File content is: ${it.text}" }
collectFile( closure, [options] )
Collects the items into groups and saves each group to a file, using a grouping criteria. The grouping criteria is a closure that maps each item to a pair, where the first element is the file name for the group and the second element is the content to be appended to that file. For example:
Channel.of('Hola', 'Ciao', 'Hello', 'Bonjour', 'Halo') .collectFile { item -> [ "${item[0]}.txt", item + '\n' ] } .subscribe { println "File '${it.name}' contains:" println it.text }
File 'B.txt' contains: Bonjour File 'C.txt' contains: Ciao File 'H.txt' contains: Halo Hola Hello
When the items from the source channel are files, the grouping criteria can be omitted. In this case, the items will be grouped by their source filename.
The following example shows how to use a closure to collect and sort all sequences in a FASTA file from shortest to longest:
Channel
.fromPath('/data/sequences.fa')
.splitFasta( record: [id: true, sequence: true] )
.collectFile( name: 'result.fa', sort: { it.size() } ) {
it.sequence
}
.view { it.text }
Warning
The collectFile
operator needs to store files in a temporary directory that is automatically deleted on workflow completion. For performance reasons, this directory is located in the machine’s local storage, and it should have as much free space as the data that is being collected. The tempDir
option can be used to specify a different temporary directory.
Available options:
cache
Controls the caching ability of the
collectFile
operator when using the resume feature. It follows the same semantic of the cache directive (default:true
).keepHeader
Prepend the resulting file with the header fetched in the first collected file. The header size (ie. lines) can be specified by using the
skip
option (default:0
), to determine how many lines to remove from all collected files except for the first (where no lines will be removed).name
Name of the file where all received values are stored.
newLine
Appends a
newline
character automatically after each entry (default:false
).seed
A value or a map of values used to initialize the files content.
skip
Skip the first
n
lines e.g.skip: 1
(default:0
).sort
Defines sorting criteria of content in resulting file(s). Can be one of the following values:
false
: Disable content sorting. Entries are appended as they are produced.true
: Order the content by the entry’s natural ordering i.e. numerical for number, lexicographic for string, etc. See the Java documentation for more information.'index'
: Order the content by the incremental index number assigned to each entry while they are collected.'hash'
: (default) Order the content by the hash number associated to each entry'deep'
: Similar to the previous, but the hash number is created on actual entries content e.g. when the entry is a file the hash is created on the actual file content.A custom sorting criteria can be specified with a Closure or a Comparator object.
The file content is sorted in such a way that it does not depend on the order in which entries were added to it, which guarantees that it is consistent (i.e. does not change) across different executions with the same data.
storeDir
Folder where the resulting file(s) are stored.
tempDir
Folder where temporary files, used by the collecting process, are stored.
combine
Returns: queue channel
The combine
operator produces the combinations (i.e. cross product, “Cartesian” product) of two source channels, or a channel and a list (as the right operand), emitting each combination separately.
For example:
numbers = Channel.of(1, 2, 3)
words = Channel.of('hello', 'ciao')
numbers
.combine(words)
.view()
[1, hello]
[2, hello]
[3, hello]
[1, ciao]
[2, ciao]
[3, ciao]
The by
option can be used to combine items that share a matching key. The value should be the zero-based index of the tuple, or a list of indices. For example:
source = Channel.of( [1, 'alpha'], [2, 'beta'] )
target = Channel.of( [1, 'x'], [1, 'y'], [1, 'z'], [2, 'p'], [2, 'q'], [2, 't'] )
source.combine(target, by: 0).view()
[1, alpha, x]
[1, alpha, y]
[1, alpha, z]
[2, beta, p]
[2, beta, q]
[2, beta, t]
Note
The combine
operator is similar to cross
and join
, making them easy to confuse. Their differences can be summarized as follows:
combine
andcross
both produce an outer product or cross product, whereasjoin
produces an inner product.combine
filters pairs with a matching key only if theby
option is used, whereascross
always filters pairs with a matching key.combine
with theby
option merges and flattens each pair, whereascross
does not. Compare the examples forcombine
andcross
to see this difference.
concat
Returns: queue channel
The concat
operator emits the items from two or more source channels into a single output channel. Each source channel is emitted in the order in which it was specified.
In other words, given N channels, the items from the i+1-th channel are emitted only after all of the items from the i-th channel have been emitted.
For example:
a = Channel.of( 'a', 'b', 'c' )
b = Channel.of( 1, 2, 3 )
c = Channel.of( 'p', 'q' )
c.concat( b, a ).view()
p
q
1
2
3
a
b
c
See also: mix
count
Returns: value channel
The count
operator computes the total number of items in a source channel and emits it:
Channel.of(9, 1, 7, 5)
.count()
.view()
4
An optional filter can be provided to select which items to count. The selection criteria can be a literal value, a regular expression, a type qualifier (i.e. Java class), or a boolean predicate. For example:
Channel.of(4, 1, 7, 1, 1)
.count(1)
.view()
3
Channel.of('a', 'c', 'c', 'q', 'b')
.count( ~/c/ )
.view()
2
Channel.of('a', 'c', 'c', 'q', 'b')
.count { it <= 'c' }
.view()
4
countFasta
Returns: value channel
Counts the total number of records in a channel of FASTA files, equivalent to splitFasta | count
. See splitFasta for the list of available options.
countFastq
Returns: value channel
Counts the total number of records in a channel of FASTQ files, equivalent to splitFastq | count
. See splitFastq for the list of available options.
countJson
Returns: value channel
Counts the total number of records in a channel of JSON files, equivalent to splitJson | count
. See splitJson for the list of available options.
countLines
Returns: value channel
Counts the total number of lines in a channel of text files, equivalent to splitText | count
. See splitLines for the list of available options.
cross
Returns: queue channel
The cross
operator emits every pairwise combination of two channels for which the pair has a matching key.
By default, the key is defined as the first entry in a list or map, or the value itself for any other data type. For example:
source = Channel.of( [1, 'alpha'], [2, 'beta'] )
target = Channel.of( [1, 'x'], [1, 'y'], [1, 'z'], [2, 'p'], [2, 'q'], [2, 't'] )
source.cross(target).view()
[[1, alpha], [1, x]]
[[1, alpha], [1, y]]
[[1, alpha], [1, z]]
[[2, beta], [2, p]]
[[2, beta], [2, q]]
[[2, beta], [2, t]]
An optional closure can be used to define the matching key for each item:
source = Channel.of( [1, 'alpha'], [2, 'beta'] )
target = Channel.of( [1, 'a'], [1, 'b'], [2, 'a'], [2, 'b'] )
source .cross(target) { it[1][0] } .view()
[[1, alpha], [1, a]]
[[1, alpha], [2, a]]
[[2, beta], [1, b]]
[[2, beta], [2, b]]
There are two important caveats when using the cross
operator:
The operator is not commutative, i.e.
a.cross(b)
is not the same asb.cross(a)
Each source channel should not emit any items with duplicate keys, i.e. each item should have a unique key.
See also: combine
distinct
Returns: queue channel or value channel, matching the source type
The distinct
operator forwards a source channel with consecutively repeated items removed, such that each emitted item is different from the preceding one:
Channel.of( 1, 1, 2, 2, 2, 3, 1, 1, 2, 2, 3 )
.distinct()
.view()
1
2
3
1
2
3
An optional closure can be used to transform each value before it is evaluated for distinct-ness:
Channel.of( 1, 1, 2, 2, 2, 3, 1, 1, 2, 4, 6 )
.distinct { it % 2 }
.view()
1
2
3
2
See also: unique
dump
Returns: queue channel or value channel, matching the source type
The dump
operator prints each item in a source channel when the pipeline is executed with the -dump-channels
command-line option, otherwise it does nothing. It is a useful way to inspect and debug channels quickly without having to modify the pipeline script.
The tag
option can be used to select which channels to dump:
Channel.of( 1, 2, 3 )
.map { it+1 }
.dump(tag: 'foo')
Channel.of( 1, 2, 3 )
.map { it^2 }
.dump(tag: 'bar')
Then, you can run your pipeline with -dump-channels foo
or -dump-channels bar
to dump the content of either channel. Multiple tag names can be specified as a comma-separated list.
Available options:
pretty
New in version 22.10.0.
When
true
, format the output as pretty-printed JSON (default:false
).tag
Associate the channel with a tag that can be specified with the
-dump-channels
option to select which channels to dump.
filter
Returns: queue channel or value channel, matching the source type
The filter
operator emits the items from a source channel that satisfy a condition, discarding all other items. The filter condition can be a literal value, a regular expression, a type qualifier (i.e. Java class), or a boolean predicate.
The following example filters a channel with a regular expression that only matches strings beginning with a
:
Channel.of( 'a', 'b', 'aa', 'bc', 3, 4.5 )
.filter( ~/^a.*/ )
.view()
a
aa
The following example filters a channel with the Number
type qualifier so that only numbers are emitted:
Channel.of( 'a', 'b', 'aa', 'bc', 3, 4.5 )
.filter( Number )
.view()
3
4.5
The following example filters a channel using a boolean predicate, which is a closure that returns a boolean value. In this case, the predicate is used to select only odd numbers:
Channel.of( 1, 2, 3, 4, 5 )
.filter { it % 2 == 1 }
.view()
1
3
5
first
Returns: value channel
The first
operator emits the first item in a source channel, or the first item that matches a condition. The condition can be a regular expression, a type qualifier (i.e. Java class), or a boolean predicate. For example:
// no condition is specified, emits the very first item: 1
Channel.of( 1, 2, 3 )
.first()
.view()
// emits the first item matching the regular expression: 'aa'
Channel.of( 'a', 'aa', 'aaa' )
.first( ~/aa.*/ )
.view()
// emits the first String value: 'a'
Channel.of( 1, 2, 'a', 'b', 3 )
.first( String )
.view()
// emits the first item for which the predicate evaluates to true: 4
Channel.of( 1, 2, 3, 4, 5 )
.first { it > 3 }
.view()
flatMap
Returns: queue channel
The flatMap
operator applies a mapping function to each item from a source channel.
When the mapping function returns a list, each element in the list is emitted separately:
Channel.of( 1, 2, 3 )
.flatMap { n -> [ n, n*2, n*3 ] }
.view()
1
2
3
2
4
6
3
6
9
When the mapping function returns a map, each key-value pair in the map is emitted separately:
Channel.of( 1, 2, 3 )
.flatMap { n -> [ number: n, square: n*n, cube: n*n*n ] }
.view { "${it.key}: ${it.value}" }
number: 1
square: 1
cube: 1
number: 2
square: 4
cube: 8
number: 3
square: 9
cube: 27
flatten
Returns: queue channel
The flatten
operator flattens each item from a source channel that is a list or other collection, such that each element in each collection is emitted separately:
Channel.of( [1, [2, 3]], 4, [5, [6]] )
.flatten()
.view()
1
2
3
4
5
6
As shown in the above example, deeply nested collections are also flattened.
See also: flatMap
groupTuple
Returns: queue channel
The groupTuple
operator collects lists (i.e. tuples) from a source channel into groups based on a grouping key. A new tuple is emitted for each distinct key.
To be more precise, the operator transforms a sequence of tuples like (K, V, W, ..) into a sequence of tuples like (K, list(V), list(W), ..).
For example:
Channel.of( [1, 'A'], [1, 'B'], [2, 'C'], [3, 'B'], [1, 'C'], [2, 'A'], [3, 'D'] )
.groupTuple()
.view()
[1, [A, B, C]]
[2, [C, A]]
[3, [B, D]]
By default, the first element of each tuple is used as the grouping key. The by
option can be used to specify a different index, or list of indices. For example, to group by the second element of each tuple:
Channel.of( [1, 'A'], [1, 'B'], [2, 'C'], [3, 'B'], [1, 'C'], [2, 'A'], [3, 'D'] )
.groupTuple(by: 1)
.view()
[[1, 2], A]
[[1, 3], B]
[[2, 1], C]
[[3], D]
By default, if you don’t specify a size, the groupTuple
operator will not emit any groups until all inputs have been received. If possible, you should always try to specify the number of expected elements in each group using the size
option, so that each group can be emitted as soon as it’s ready. In cases where the size of each group varies based on the grouping key, you can use the built-in groupKey()
function, which allows you to define a different expected size for each group:
chr_frequency = ["chr1": 2, "chr2": 3]
Channel.of(
['region1', 'chr1', '/path/to/region1_chr1.vcf'],
['region2', 'chr1', '/path/to/region2_chr1.vcf'],
['region1', 'chr2', '/path/to/region1_chr2.vcf'],
['region2', 'chr2', '/path/to/region2_chr2.vcf'],
['region3', 'chr2', '/path/to/region3_chr2.vcf']
)
.map { region, chr, vcf -> tuple( groupKey(chr, chr_frequency[chr]), vcf ) }
.groupTuple()
.view()
[chr1, [/path/to/region1_chr1.vcf, /path/to/region2_chr1.vcf]]
[chr2, [/path/to/region1_chr2.vcf, /path/to/region2_chr2.vcf, /path/to/region3_chr2.vcf]]
Available options:
by
The zero-based index of the element to use as the grouping key. Can also be a list of indices, e.g.
by: [0,2]
(default:[0]
).remainder
When
true
, incomplete tuples (i.e. groups with less thansize
items) are emitted as partial groups, otherwise they are discarded (default:false
). This option can only be used withsize
.size
The required number of items for each group. When a group reaches the required size, it is emitted.
sort
Defines the sorting criteria for the grouped items. Can be one of the following values:
false
: No sorting is applied (default).true
: Order the grouped items by the item’s natural ordering i.e. numerical for number, lexicographic for string, etc. See the Java documentation for more information.'hash'
: Order the grouped items by the hash number associated to each entry.'deep'
: Similar to the previous, but the hash number is created on actual entries content e.g. when the item is a file, the hash is created on the actual file content.A custom sorting criteria used to order the nested list elements of each tuple. It can be a Closure or a Comparator object.
ifEmpty
Returns: queue channel or value channel, matching the source type
The ifEmpty
operator emits a source channel, or a default value if the source channel is empty (doesn’t emit any value):
Channel .of(1, 2, 3) .ifEmpty('Hello') .view()
1
2
3
Channel .empty() .ifEmpty('Hello') .view()
Hello
The default value can also be a closure, in which case the closure is evaluated and the result is emitted when the source channel is empty.
See also: empty channel factory
join
Returns: queue channel
The join
operator emits the inner product of two source channels using a matching key.
To be more precise, the operator transforms a sequence of tuples like (K, V1, V2, ..) and (K, W1, W1, ..) into a sequence of tuples like (K, V1, V2, .., W1, W2, ..). It is equivalent to an inner join in SQL, or an outer join when remainder
is true
.
For example:
left = Channel.of( ['X', 1], ['Y', 2], ['Z', 3], ['P', 7] )
right = Channel.of( ['Z', 6], ['Y', 5], ['X', 4] )
left.join(right).view()
[Z, 3, 6]
[Y, 2, 5]
[X, 1, 4]
By default, the first element of each item is used as the key. The by
option can be used to specify a different index, or list of indices.
By default, unmatched items are discarded. The remainder
option can be used to emit them at the end:
left = Channel.of( ['X', 1], ['Y', 2], ['Z', 3], ['P', 7] )
right = Channel.of( ['Z', 6], ['Y', 5], ['X', 4] )
left.join(right, remainder: true).view()
[Y, 2, 5]
[Z, 3, 6]
[X, 1, 4]
[P, 7, null]
Available options:
by
The zero-based index of each item to use as the matching key. Can also be a list of indices, e.g.
by: [0, 2]
(default:[0]
).failOnDuplicate
When
true
, an error is reported when the operator receives multiple items from the same channel with the same key (default:false
). Value is set totrue
if strict mode is enabled.failOnMismatch
When
true
, an error is reported when the operator receives an item from one channel for which there no matching item from the other channel (default:false
). Value is set totrue
if strict mode is enabled. This option cannot be used withremainder
.remainder
When
true
, unmatched items are emitted at the end, otherwise they are discarded (default:false
).
last
Returns: value channel
The last
operator emits the last item from a source channel:
Channel.of( 1, 2, 3, 4, 5, 6 )
.last()
.view()
6
map
Returns: queue channel or value channel, matching the source type
The map
operator applies a mapping function to each item from a source channel:
Channel.of( 1, 2, 3, 4, 5 )
.map { it * it }
.view()
1
4
9
16
25
max
Returns: value channel
The max
operator emits the item with the greatest value from a source channel:
Channel.of( 8, 6, 2, 5 )
.max()
.view { "Max value is $it" }
Max value is 8
An optional closure can be used to control how the items are compared. The closure can be a mapping function, which transforms each item before it is compared, or a comparator function, which defines how to compare two items more generally.
The following examples show how to find the longest string in a channel:
// mapping function
Channel.of( "hello", "hi", "hey" )
.max { it.size() }
.view()
hello
// comparator function
Channel.of( "hello", "hi", "hey" )
.max { a, b -> a.size() <=> b.size() }
.view()
hello
merge
Returns: queue channel or value channel, matching the source type
The merge
operator joins the items from two or more channels into a new channel:
odds = Channel.of(1, 3, 5, 7, 9)
evens = Channel.of(2, 4, 6)
odds
.merge( evens )
.view()
[1, 2]
[3, 4]
[5, 6]
An optional closure can be used to control how two items are merged:
odds = Channel.of(1, 3, 5, 7, 9)
evens = Channel.of(2, 4, 6)
odds
.merge( evens ) { a, b -> tuple(b*b, a) }
.view()
[16, 3]
[36, 5]
[4, 1]
The merge
operator may return a queue channel or value channel depending on the inputs:
If the first argument is a queue channel, the
merge
operator will return a queue channel merging as many values as are available for all inputs. Value channels will be re-used for each merged value.If the first argument is a value channel, the
merge
operator will return a value channel merging the first value from each input, regardless of whether there are queue channel inputs with additional values.
Danger
In general, the use of the merge
operator is discouraged. Processes and channel operators are not guaranteed to emit items in the order that they were received, as they are executed concurrently. Therefore, if you try to merge output channels from different processes, the resulting channel may be different on each run, which will cause resumed runs to not work properly.
You should always use a matching key (e.g. sample ID) to merge multiple channels, so that they are combined in a deterministic way. For this purpose, you can use the join operator.
min
Returns: value channel
The min
operator emits the item with the lowest value from a source channel:
Channel.of( 8, 6, 2, 5 )
.min()
.view { "Min value is $it" }
Min value is 2
An optional closure can be used to control how the items are compared. The closure can be a mapping function, which transforms each item before it is compared, or a comparator function, which defines how to compare two items more generally.
The following examples show how to find the shortest string in a channel:
// mapping function
Channel.of( "hello", "hi", "hey" )
.min { it.size() }
.view()
hi
// comparator function
Channel.of( "hello", "hi", "hey" )
.min { a, b -> a.size() <=> b.size() }
.view()
hi
mix
Returns: queue channel
The mix
operator emits the items from two or more source channels into a single output channel:
c1 = Channel.of( 1, 2, 3 )
c2 = Channel.of( 'a', 'b' )
c3 = Channel.of( 'z' )
c1.mix(c2, c3).view()
1
2
3
a
b
z
The items in the mixed output channel may appear in any order, regardless of which source channel they came from. Thus, the previous example could also output the following:
z
1
a
2
b
3
See also: concat
multiMap
New in version 19.11.0-edge.
Returns: multiple queue channels or value channels, matching the source type
The multiMap
operator applies a set of mapping functions to a source channel, producing a separate output channel for each mapping function.
The multi-map criteria is a closure that defines, for each output channel, a label followed by a mapping expression.
For example:
Channel.of( 1, 2, 3, 4 )
.multiMap { it ->
foo: it + 1
bar: it * it
}
.set { result }
result.foo.view { "foo $it" }
result.bar.view { "bar $it" }
foo 2
foo 3
foo 4
foo 5
bar 1
bar 4
bar 9
bar 16
Multiple labels can share the same mapping expression using the following shorthand:
Channel.of( 1, 2, 3 )
.multiMap { it -> foo: bar: it }
.set { result }
result.foo.view { "foo $it" }
result.bar.view { "bar $it" }
bar 1
bar 2
bar 3
foo 1
foo 2
foo 3
The above example creates two channels as before, but now they both receive the same items.
You can use the multiMapCriteria()
method to create a multi-map criteria as a variable that can be passed as an argument to any number of multiMap
operations, as shown below:
def criteria = multiMapCriteria {
small: [it, it < 10]
large: [it, it > 10]
}
Channel.of(1, 2, 30).multiMap(criteria).set { ch1 }
Channel.of(10, 20, 1).multiMap(criteria).set { ch2 }
ch1.small.view { v, is_small -> "ch1: $v is small: $is_small" }
ch1.large.view { v, is_large -> "ch1: $v is large: $is_large" }
ch2.small.view { v, is_small -> "ch2: $v is small: $is_small" }
ch2.large.view { v, is_large -> "ch2: $v is large: $is_large" }
Note
If you use multiMap
to split a tuple or map into multiple channels, it is recommended that you retain a matching key (e.g. sample ID) with each new channel, so that you can re-combine these channels later on if needed. In general, you should not expect to be able to merge channels correctly without a matching key, due to the concurrent nature of Nextflow pipelines.
randomSample
Returns: queue channel
The randomSample
operator emits a randomly-selected subset of items from a source channel:
Channel.of( 1..100 )
.randomSample( 10 )
.view()
The above snippet will print 10 randomly-selected numbers between 1 and 100 (without replacement).
An optional second parameter can be used to set the initial seed for the random number generator, which ensures that the randomSample
operator produces the same pseudo-random sequence across runs:
Channel.of( 1..100 )
.randomSample( 10, 234 )
.view()
The above example will print 10 randomly-selected numbers between 1 and 100 (without replacement). Each subsequent script execution will produce the same sequence.
reduce
Returns: value channel
The reduce
operator applies an accumulator function sequentially to each item in a source channel, and emits the final accumulated value. The accumulator function takes two parameters – the accumulated value and the i-th emitted item – and it should return the accumulated result, which is passed to the next invocation with the i+1-th item. This process is repeated for each item in the source channel.
For example:
Channel.of( 1, 2, 3, 4, 5 )
.reduce { a, b ->
println "a: $a b: $b"
a + b
}
.view { "result = $it" }
a: 1 b: 2
a: 3 b: 3
a: 6 b: 4
a: 10 b: 5
result = 15
By default, the first item is used as the initial accumulated value. You can optionally specify a different initial value as shown below:
Channel.of( 1, 2, 3, 4, 5 )
.reduce( 'result:' ) { accum, v ->
println accum
accum + ' ' + v
}
.view { "final $it" }
result:
result: 1
result: 1 2
result: 1 2 3
result: 1 2 3 4
final result: 1 2 3 4 5
set
Returns: nothing
The set
operator assigns a source channel to a variable, whose name is specified as a closure parameter:
Channel.of(10, 20, 30).set { my_channel }
Using set
is semantically equivalent to assigning a variable:
my_channel = Channel.of(10, 20, 30)
See also: tap
splitCsv
Returns: queue channel
The splitCsv
operator parses and splits CSV-formatted text from a source channel into records, or groups of records with a given size.
For example:
Channel.of( '10,20,30\n70,80,90' )
.splitCsv()
.view { row -> "${row[0]} - ${row[1]} - ${row[2]}" }
10 - 20 - 30
70 - 80 - 90
The above example shows hows CSV text is parsed and split into individual rows, where each row is simply a list of columns.
When the CSV begins with a header line defining the column names, and the header
option is true
, each row is returned as a map instead:
Channel.of( 'alpha,beta,gamma\n10,20,30\n70,80,90' )
.splitCsv( header: true )
.view { row -> "${row.alpha} - ${row.beta} - ${row.gamma}" }
10 - 20 - 30
70 - 80 - 90
The header
option can also just be a list of columns:
Channel.of( 'alpha,beta,gamma\n10,20,30\n70,80,90' )
.splitCsv( header: ['col1', 'col2', 'col3'], skip: 1 )
.view { row -> "${row.col1} - ${row.col2} - ${row.col3}" }
10 - 20 - 30
70 - 80 - 90
Available options:
by
When specified, group rows into chunks with the given size (default: none).
charset
Parse the content with the specified charset, e.g.
UTF-8
. See the list of standard charsets for available options.decompress
When
true
, decompress the content using the GZIP format before processing it (default:false
). Files with the.gz
extension are decompressed automatically.elem
The index of the element to split when the source items are lists or tuples (default: first file object or first element).
header
When
true
, the first line is used as the columns names (default:false
). Can also be a list of columns names.limit
Limits the number of records to retrieve for each source item (default: no limit).
quote
The character used to quote values (default:
''
or""
).sep
The character used to separate values (default:
,
)skip
Number of lines to ignore from the beginning when parsing the CSV text (default:
0
).strip
When
true
, remove leading and trailing blanks from values (default:false
).
splitFasta
Returns: queue channel
The splitFasta
operator splits FASTA-formatted text from a source channel into individual sequences.
The by
option can be used to group sequences into chunks of a given size. The following example shows how to read a FASTA file and split it into chunks of 10 sequences each:
Channel
.fromPath('misc/sample.fa')
.splitFasta( by: 10 )
.view()
Warning
Chunks are stored in memory by default. When splitting large files, specify file: true
to save the chunks into files in order to avoid running out of memory. See the list of options below for details.
The record
option can be used to split FASTA content into records instead of text chunks. Each record is a map that allows you to access the FASTA sequence data with ease. For example:
Channel
.fromPath('misc/sample.fa')
.splitFasta( record: [id: true, seqString: true] )
.filter { record -> record.id =~ /^ENST0.*/ }
.view { record -> record.seqString }
The above example loads the misc/sample.fa
file, splits it into records containing the id
and seqString
fields (i.e. the sequence id and the sequence data), filters records by their ID, and finally prints the sequence string of each record.
Available options:
by
Defines the number of sequences in each chunk (default:
1
).charset
Parse the content with the specified charset, e.g.
UTF-8
. See the list of standard charsets for available options.compress
When
true
, resulting file chunks are GZIP compressed (default:false
). The.gz
suffix is automatically added to chunk file names.decompress
When
true
, decompress the content using the GZIP format before processing it (default:false
). Files with the.gz
extension are decompressed automatically.elem
The index of the element to split when the source items are lists or tuples (default: first file object or first element).
file
When
true
, saves each split to a file. Use a string instead oftrue
value to create split files with a specific name (split index number is automatically added). Finally, set this attribute to an existing directory, in order to save the split files into the specified directory.limit
Limits the number of sequences to retrieve for each source item (default: no limit).
record
Parse each entry in the FASTA file into a record. The following fields are available:
id
: The FASTA sequence identifier, i.e. the word following the>
symbol up to the first blank or newline characterheader
: The first line in a FASTA sequence without the>
characterdesc
: The text in the FASTA header following the ID valuetext
: The complete FASTA sequence including the headerseqString
: The sequence data as a single-line string, i.e. containing no newline characterssequence
: The sequence data as a multi-line string, i.e. always ending with a newline characterwidth
: Define the length of a single line when thesequence
field is used, after which the sequence data continues on a new line.
size
Defines the size of the expected chunks as a memory unit, e.g.
1.MB
.
See also: countFasta
splitFastq
Returns: queue channel
The splitFasta
operator splits FASTQ formatted text from a source channel into individual sequences.
The by
option can be used to group sequences into chunks of a given size. The following example shows how to read a FASTQ file and split it into chunks of 10 sequences each:
Channel
.fromPath('misc/sample.fastq')
.splitFastq( by: 10 )
.view()
Warning
Chunks are stored in memory by default. When splitting large files, specify file: true
to save the chunks into files in order to avoid running out of memory. See the list of options below for details.
The record
option can be used to split FASTQ content into records instead of text chunks. Each record is a map that allows you to access the FASTQ sequence data with ease. For example:
Channel
.fromPath('misc/sample.fastq')
.splitFastq( record: true )
.view { record -> record.readHeader }
The pe
option can be used to split paired-end FASTQ files. The source channel must emit tuples containing the file pairs. For example:
Channel
.fromFilePairs('/my/data/SRR*_{1,2}.fastq', flat: true)
.splitFastq(by: 100_000, pe: true, file: true)
.view()
Note
Channel.fromFilePairs()
requires the flat: true
option in order to emit the file pairs as separate elements in the produced tuples.
Note
This operator assumes that the order of the paired-end reads correspond with each other and that both files contain the same number of reads.
Available options:
by
Defines the number of sequences in each chunk (default:
1
).charset
Parse the content with the specified charset, e.g.
UTF-8
. See the list of standard charsets for available options.compress
When
true
, resulting file chunks are GZIP compressed (default:false
). The.gz
suffix is automatically added to chunk file names.decompress
When
true
, decompress the content using the GZIP format before processing it (default:false
). Files with the.gz
extension are decompressed automatically.elem
The index of the element to split when the source items are lists or tuples (default: first file object or first element).
file
When
true
, saves each split to a file. Use a string instead oftrue
value to create split files with a specific name (split index number is automatically added). Finally, set this attribute to an existing directory, in order to save the split files into the specified directory.limit
Limits the number of sequences to retrieve for each source item (default: no limit).
pe
When
true
, splits paired-end read files. Items emitted by the source channel must be tuples with the file pairs.record
Parse each entry in the FASTQ file into a record. The following fields are available:
readHeader
: Sequence header (without the@
prefix)readString
: The raw sequence dataqualityHeader
: Base quality header (it may be empty)qualityString
: Quality values for the sequence
See also: countFastq
splitJson
Returns: queue channel
The splitJson
operator splits JSON formatted text from a source channel into individual records.
If the source item is a JSON array, each element of the array will be emitted:
// Example with a JSON array
Channel.of('[1, null, ["A", {}], true]')
.splitJson()
.view{"Item: ${it}"}
Item: 1
Item: null
Item: [A, [:]]
Item: true
If the source item is a JSON object, each key-value pair will be emitted as a map with the properties key
and value
:
// Example with a JSON object
Channel.of('{"A": 1, "B": [1, 2, 3], "C": {"D": null}}')
.splitJson()
.view{"Item: ${it}"}
Item: [key:A, value:1]
Item: [key:B, value:[1, 2, 3]]
Item: [key:C, value:[D:null]]
The path
option can be used to query a section of the JSON document to parse and split:
Channel.of('{"A": 1, "B": [2, 3, {"C": {"D": null, "E": 4, "F": 5}}]}')
.splitJson(path: 'B[2].C')
.view{"Item: ${it}"}
Item: [key:D, value:null]
Item: [key:E, value:4]
Item: [key:F, value:5]
Available options:
limit
Limits the number of records to retrieve for each source item (default: no limit).
path
Defines a query for a section of each source item to parse and split. The expression should be a path similar to JSONPath. The empty string is the document root (default). An integer in brackets is a zero-based index in a JSON array. A string preceded by a dot
.
is a key in a JSON object.
See also: countJson
splitText
Returns: queue channel
The splitText
operator splits multi-line text content from a source channel into chunks of N lines:
Channel
.fromPath('/some/path/*.txt')
.splitText()
.view()
The above example loads a collection of text files, splits the content of each file into individual lines, and prints each line.
The by
option can be used to emit chunks of N lines:
Channel
.fromPath('/some/path/*.txt')
.splitText( by: 10 )
.subscribe {
print it;
print "--- end of the chunk ---\n"
}
An optional closure can be used to transform each text chunk produced by the operator. The following example shows how to split text files into chunks of 10 lines and transform them to uppercase letters:
Channel
.fromPath('/some/path/*.txt')
.splitText( by: 10 ) { it.toUpperCase() }
.view()
Note
Text chunks returned by the splitText
operator are always terminated by a \n
newline character.
Available options:
by
Defines the number of lines in each
chunk
(default:1
).charset
Parse the content with the specified charset, e.g.
UTF-8
. See the list of standard charsets for available options.compress
When
true
, resulting file chunks are GZIP compressed (default:false
). The.gz
suffix is automatically added to chunk file names.decompress
When
true
, decompresses the content using the GZIP format before processing it (default:false
). Files with the.gz
extension are decompressed automatically.elem
The index of the element to split when the source items are lists or tuples (default: first file object or first element).
file
When
true
, saves each split to a file. Use a string instead oftrue
value to create split files with a specific name (split index number is automatically added). Finally, set this attribute to an existing directory, in order to save the split files into the specified directory.keepHeader
Parses the first line as header and prepends it to each emitted chunk (default:
false
).limit
Limits the number of lines to retrieve for each source item (default: no limit).
See also: countLines
subscribe
Returns: the source channel
The subscribe
operator invokes a custom function for each item from a source channel:
// define a channel emitting three values
source = Channel.of( 'alpha', 'beta', 'delta' )
// subscribe to the channel with a function that prints each value
source.subscribe { println "Got: $it" }
Got: alpha
Got: beta
Got: delta
The closure parameter can be defined explicitly if needed, using a name other than it
and, optionally, the expected type:
Channel
.of( 'alpha', 'beta', 'lambda' )
.subscribe { String str ->
println "Got: ${str}; len: ${str.size()}"
}
Got: alpha; len: 5
Got: beta; len: 4
Got: lambda; len: 6
The subscribe
operator supports multiple types of event handlers:
Channel.of( 1, 2, 3 )
.subscribe onNext: { println it }, onComplete: { println 'Done' }
1
2
3
Done
Available options:
onNext
Closure that is invoked when an item is emitted. Equivalent to providing a closure as the first argument.
onComplete
Closure that is invoked after the last item is emitted by the channel.
onError
Closure that is invoked when an exception is raised while handling the
onNext
event. It will not make further calls toonNext
oronComplete
. TheonError
method takes as its parameter theThrowable
that caused the error.
sum
Returns: value channel
The sum
operator emits the sum of all items in a source channel:
Channel.of( 8, 6, 2, 5 )
.sum()
.view { "The sum is $it" }
The sum is 21
An optional closure can be used to transform each item before it is added to the sum:
Channel.of( 4, 1, 7, 5 )
.sum { it * it }
.view { "Square: $it" }
Square: 91
take
Returns: queue channel
The take
operator takes the first N items from a source channel:
Channel.of( 1..10 )
.take( 3 )
.view()
1
2
3
Tip
Specifying a size of -1
causes the operator to take all values.
See also: until
tap
Returns: queue channel or value channel, matching the source type
The tap
operator assigns a source channel to a variable, and emits the source channel. It is a useful way to extract intermediate output channels from a chain of operators. For example:
Channel.of( 'a', 'b', 'c' )
.tap { log1 }
.map { it * 2 }
.tap { log2 }
.map { it.toUpperCase() }
.view { "Result: $it" }
log1.view { "Log 1: $it" }
log2.view { "Log 2: $it" }
Result: AA
Result: BB
Result: CC
Log 1: a
Log 1: b
Log 1: c
Log 2: aa
Log 2: bb
Log 2: cc
See also: set
toInteger
Returns: queue channel or value channel, matching the source type
The toInteger
operator converts string values from a source channel to integer values:
Channel.of( '1', '7', '12' )
.toInteger()
.sum()
.view()
20
Note
toInteger
is equivalent to:
map { it -> it as Integer }
Note
You can also use toLong
, toFloat
, and toDouble
to convert to other numerical types.
toList
Returns: value channel
The toList
operator collects all the items from a source channel into a list and emits the list as a single item:
Channel.of( 1, 2, 3, 4 )
.toList()
.view()
[1, 2, 3, 4]
Note
There are two main differences between toList
and collect
:
When there is no input,
toList
emits an empty list whereascollect
emits nothing.By default,
collect
flattens list items by one level.
In other words, toList
is equivalent to:
collect(flat: false).ifEmpty([])
See also: collect
toSortedList
Returns: value channel
The toSortedList
operator collects all the items from a source channel into a sorted list and emits the list as a single item:
Channel.of( 3, 2, 1, 4 )
.toSortedList()
.view()
[1, 2, 3, 4]
An optional closure can be used to control how items are compared when sorting. For example, to sort tuples by their second element in descending order:
Channel.of( ['homer', 5], ['bart', 2], ['lisa', 10], ['marge', 3], ['maggie', 7] )
.toSortedList { a, b -> b[1] <=> a[1] }
.view()
[[lisa, 10], [maggie, 7], [homer, 5], [marge, 3], [bart, 2]]
Note
toSortedList
is equivalent to:
collect(flat: false, sort: true).ifEmpty([])
See also: collect
transpose
Returns: queue channel
The transpose
operator “transposes” each tuple from a source channel by flattening any nested list in each tuple, emitting each nested item separately.
To be more precise, the operator transforms a sequence of tuples like (K, list(V), list(W), ..) into a sequence of tuples like (K, V, W, ..).
For example:
Channel.of(
[1, ['A', 'B', 'C']],
[2, ['C', 'A']],
[3, ['B', 'D']]
)
.transpose()
.view()
[1, A]
[1, B]
[1, C]
[2, C]
[2, A]
[3, B]
[3, D]
If each source item has more than two elements, these will be flattened by the first element in the item, and a new item will be emitted only when it is complete:
Channel.of(
[1, [1], ['A']],
[2, [1, 2], ['B', 'C']],
[3, [1, 2, 3], ['D', 'E']]
)
.transpose()
.view()
[1, 1, A]
[2, 1, B]
[2, 2, C]
[3, 1, D]
[3, 2, E]
The remainder
option can be used to emit any incomplete items:
Channel.of(
[1, [1], ['A']],
[2, [1, 2], ['B', 'C']],
[3, [1, 2, 3], ['D', 'E']]
)
.transpose(remainder: true)
.view()
[1, 1, A]
[2, 1, B]
[2, 2, C]
[3, 1, D]
[3, 2, E]
[3, 3, null]
Available options:
by
The zero-based index of the element to be transposed. Can also be a list of indices, e.g.
by: [0,2]
. By default, every list element is transposed.remainder
When
true
, incomplete tuples are emitted withnull
values for missing elements, otherwise they are discarded (default:false
).
See also: groupTuple
unique
Returns: queue channel or value channel, matching the source type
The unique
operator emits the unique items from a source channel:
Channel.of( 1, 1, 2, 2, 2, 3, 1, 1, 2, 2, 3 )
.unique()
.view()
1
2
3
An optional closure can be used to transform each item before it is evaluated for uniqueness:
Channel.of( 1, 1, 2, 2, 2, 3, 1, 1, 2, 4, 6 )
.unique { it % 2 }
.view()
1
2
Note
The difference between unique
and distinct
is that unique
removes all duplicate values, whereas distinct
removes only consecutive duplicate values. As a result, unique
must process the entire source channel before it can emit anything, whereas distinct
can emit each value immediately.
See also: distinct
until
Returns: queue channel or value channel, matching the source type
The until
operator emits each item from a source channel until a stopping condition is satisfied:
Channel.of( 3, 2, 1, 5, 1, 5 )
.until { it == 5 }
.view()
3
2
1
See also: take
view
Returns: queue channel or value channel, matching the source type
The view
operator prints each item from a source channel to standard output:
Channel.of(1, 2, 3).view()
1
2
3
An optional closure can be used to transform each item before it is printed:
Channel.of(1, 2, 3)
.map { it -> [it, it*it] }
.view { num, sqr -> "The square of $num is $sqr" }
The square of 1 is 1
The square of 2 is 4
The square of 3 is 9
The view
operator also emits every item that it receives, allowing it to be chained with other operators.
Available options:
newLine
Print each item to a separate line (default:
true
).