Operators (typed)
New in version 26.04.0.
This page describes the core operators that are recommended for use with static typing.
collect
Channel<E> collect() -> Value<Bag<E>>
The collect operator collects all values from a source channel into a collection and emits it as a dataflow value:
channel.of( 1, 2, 3, 4 )
.collect()
.view()
[1, 2, 3, 4]
combine
Channel<L> combine( other: Channel<R> ) -> Channel<Tuple>
Channel<L> combine( other: Value<R> ) -> Channel<Tuple>
The combine operator emits every pairwise combination of a source channel with another channel or dataflow value:
numbers = channel.of(1, 2, 3)
words = channel.of('hello', 'ciao')
numbers
.combine(words)
.view()
[1, hello]
[2, hello]
[3, hello]
[1, ciao]
[2, ciao]
[3, ciao]
Tuples in both the left- and right-hand sources are flattened in the combined tuple. For example, tuple(1, 2) and tuple('red', 'blue') are combined as tuple(1, 2, 'red', 'blue').
Channel<Record> combine( [opts] ) -> Channel<Record>
When the combine operator is called on a channel of records with named arguments, the named arguments are appended to each record in the source channel. Each named argument can be a value or dataflow value.
filter
Channel<E> filter( condition: (E) -> Boolean ) -> Channel<E>
The filter operator emits the values from a source channel that satisfy a condition, discarding all other values:
channel.of( 1, 2, 3, 4, 5 )
.filter { v -> v % 2 == 1 }
.view()
1
3
5
flatMap
Channel<E> flatMap( transform: (E) -> Iterable<R> ) -> Channel<R>
The flatMap operator applies a mapping function to each value from a source channel. The mapping function should return a collection, and each element in the collection is emitted separately.
For example:
channel.of( 1, 2, 3 )
.flatMap { n -> [ n, n*2, n*3 ] }
.view()
1
2
3
2
4
6
3
6
9
groupBy
Channel<Tuple<K, V>> groupBy() -> Channel<Tuple<K, Bag<V>>>
Channel<Tuple<K, Integer, V>> groupBy() -> Channel<Tuple<K, Bag<V>>>
The groupBy operator collects values from a source channel into groups based on a grouping key. A tuple is emitted for each group, containing the grouping key and collection of values.
The source channel should supply either 2-tuples of the form (<key>, <value>) or 3-tuples of the form (<key>, <size>, <value>).
If the source tuples do not specify a size, groupBy will not emit any groups until all inputs have been received:
channel.of( tuple(1, 'A'), tuple(1, 'B'), tuple(2, 'C'), tuple(3, 'B'), tuple(1, 'C'), tuple(2, 'A'), tuple(3, 'D') )
.groupBy()
.map { key, values -> tuple(key, values.toSorted()) }
.view()
[1, [A, B, C]]
[2, [A, C]]
[3, [B, D]]
If the source tuples do specify a size, then groupBy will emit each group as soon as it is ready:
channel.of(
tuple('chr1', ['/path/to/region1_chr1.vcf', '/path/to/region2_chr1.vcf']),
tuple('chr2', ['/path/to/region1_chr2.vcf', '/path/to/region2_chr2.vcf', '/path/to/region3_chr2.vcf']),
)
.flatMap { chr, vcfs ->
vcfs.collect { vcf ->
tuple(chr, vcfs.size(), vcf) // preserve group size
}
}
.view { v -> "scattered: ${v}" }
.groupBy()
.map { key, values -> tuple(key, values.toSorted()) }
.view { v -> "gathered: ${v}" }
scattered: [chr1, 2, /path/to/region1_chr1.vcf]
scattered: [chr1, 2, /path/to/region2_chr1.vcf]
scattered: [chr2, 3, /path/to/region1_chr2.vcf]
scattered: [chr2, 3, /path/to/region2_chr2.vcf]
scattered: [chr2, 3, /path/to/region3_chr2.vcf]
gathered: [chr1, [/path/to/region1_chr1.vcf, /path/to/region2_chr1.vcf]]
gathered: [chr2, [/path/to/region1_chr2.vcf, /path/to/region2_chr2.vcf, /path/to/region3_chr2.vcf]]
Note
When specifying the group size, make sure that the number of inputs for a given group matches the specified size for that group. Otherwise, the run will fail.
join
Channel<Record> join( other: Channel<Record>, [opts] ) -> Channel<Record>
The join operator emits the relational join of two channels of records, using a matching key given by the by option:
left = channel.of( record(id: 'X', a: 1), record(id: 'Y', a: 2), record(id: 'Z', a: 3), record(id: 'P', a: 7) )
right = channel.of( record(id: 'Z', b: 6), record(id: 'Y', b: 5), record(id: 'X', b: 4) )
left.join(right, by: 'id').view()
[id:Z, a:3, b:6]
[id:Y, a:2, b:5]
[id:X, a:1, b:4]
Duplicate matching keys are handled by emitting each matching combination (like a relational join):
left = channel.of( record(id: 'X', a: 1), record(id: 'X', a: 3) )
right = channel.of( record(id: 'X', b: 2), record(id: 'X', b: 4) )
left.join(right, by: 'id').view()
[id:X, a:1, b:2]
[id:X, a:1, b:4]
[id:X, a:3, b:2]
[id:X, a:3, b:4]
By default, unmatched values are discarded. The remainder option can be used to emit them at the end:
left = channel.of( record(id: 'X', a: 1), record(id: 'Y', a: 2), record(id: 'Z', a: 3), record(id: 'P', a: 7) )
right = channel.of( record(id: 'Z', b: 6), record(id: 'Y', b: 5), record(id: 'X', b: 4), record(id: 'Q', b: 8) )
left.join(right, by: 'id', remainder: true).view()
[id:Y, a:2, b:5]
[id:Z, a:3, b:6]
[id:X, a:1, b:4]
[id:P, a:7]
[id:Q, b:8]
Available options:
by: StringThis option is required.
The record field to use as the matching key.
remainder: BooleanWhen
true, unmatched values are emitted at the end, otherwise they are discarded (default:false).
map
Channel<E> map( transform: (E) -> R ) -> Channel<R>
The map operator applies a mapping function to each value from a source channel:
channel.of( 1, 2, 3, 4, 5 )
.map { v -> v * v }
.view()
1
4
9
16
25
mix
Channel<E> mix( other: Channel<E> ) -> Channel<E>
Channel<E> mix( other: Value<E> ) -> Channel<E>
The mix operator emits the values from a channel and another channel or dataflow value into a single output channel:
c1 = channel.of( '1', '2', '3' )
c2 = channel.of( 'a', 'b' )
v3 = channel.value( 'z' )
c1.mix(c2).mix(v3).view()
1
2
3
a
b
z
The values in the output channel may be emitted in any order, for example:
z
1
a
2
b
3
reduce
Channel<E> reduce( seed: R, accumulator: (R, E) -> R ) -> Value<R>
Channel<E> reduce( accumulator: (E, E) -> E ) -> Value<E>
The reduce operator applies an accumulator function sequentially to each value in a source channel, and emits the final accumulated value.
The accumulator function takes two parameters – the accumulated value and the i-th emitted value – and it should return the accumulated result, which is passed to the next invocation with the i+1-th value. This process is repeated for each value in the source channel.
For example:
channel.of( 1, 2, 3, 4, 5 )
.reduce { a, b ->
println "a: $a b: $b"
a + b
}
.view { result -> "result = $result" }
a: 1 b: 2
a: 3 b: 3
a: 6 b: 4
a: 10 b: 5
result = 15
By default, the first value is used as the initial accumulated value (the seed). You can optionally specify a different initial value as shown below:
channel.of( 1, 2, 3, 4, 5 )
.reduce( 'result:' ) { acc, v ->
println acc
acc + ' ' + v
}
.view { result -> "final $result" }
result:
result: 1
result: 1 2
result: 1 2 3
result: 1 2 3 4
final result: 1 2 3 4 5
subscribe
Channel<E> subscribe( action: (E) -> () )
Channel<E> subscribe( [opts] )
The subscribe operator invokes a custom function for each value from a source channel:
source = channel.of( 'alpha', 'beta', 'delta' )
source.subscribe { str ->
println "Got: ${str}; len: ${str.length()}"
}
Got: alpha; len: 5
Got: beta; len: 4
Got: delta; len: 5
The subscribe operator supports multiple types of event handlers:
channel.of( 1, 2, 3 ).subscribe(
onNext: { v -> println v },
onComplete: { println 'Done' }
)
1
2
3
Done
Note
Unlike most operators, subscribe does not return anything. It should only be used for side effects, such as printing to the console, writing to a file, or making HTTP requests.
Available options:
onNext: (E) -> ()Closure that is invoked when an value is emitted. Equivalent to providing a single closure argument.
onComplete: () -> ()Closure that is invoked after the last value is emitted by the channel.
onError: (T) -> ()Closure that is invoked when an exception is raised while handling the
onNextevent. It will not make further calls toonNextoronComplete. TheonErrormethod takes as its parameter theThrowablethat caused the error.
unique
Channel<E> unique( transform: (E) -> ? ) -> Channel<E>
Channel<E> unique() -> Channel<E>
The unique operator emits the unique values from a source channel:
channel.of( 1, 1, 2, 2, 2, 3, 1, 1, 2, 2, 3 )
.unique()
.view()
1
2
3
An optional closure can be used to transform each value before it is evaluated for uniqueness:
channel.of( 1, 1, 2, 2, 2, 3, 1, 1, 2, 4, 6 )
.unique { v -> v % 2 }
.view()
1
2
until
Channel<E> until( condition: (E) -> Boolean ) -> Channel<E>
The until operator emits each value from a source channel until a stopping condition is satisfied:
channel.of( 3, 2, 1, 5, 1, 5 )
.until { v -> v == 5 }
.view()
3
2
1
view
Channel<E> view( transform: (E) -> String, [opts] ) -> Channel<E>
Channel<E> view( [opts] ) -> Channel<E>
The view operator prints each value from a source channel to standard output:
channel.of(1, 2, 3).view()
1
2
3
An optional closure can be used to transform each value before it is printed:
channel.of(1, 2, 3)
.map { v -> [v, v*v] }
.view { num, sqr -> "The square of $num is $sqr" }
The square of 1 is 1
The square of 2 is 4
The square of 3 is 9
The tag option can be used to print the channel only when the -dump-channels command-line option is specified with the given tag:
channel.of( 1, 2, 3 )
.map { v -> v + 1 }
.view(tag: 'plus1')
channel.of( 1, 2, 3 )
.map { v -> v ** 2 }
.view(tag: 'exp2')
You can run this script with -dump-channels plus1 or -dump-channels exp2 to print either channel, or -dump-channels plus1,exp2 to print both.
The view operator also emits every value that it receives, allowing it to be chained with other operators.
Available options:
newLine: BooleanPrint each value to a separate line (default:
true).tag: StringPrint the channel values only when
-dump-channelsis specified on the command line with the given tag.