Workflows

In Nextflow, a workflow is a composition of processes and dataflow logic (i.e. channels and operators).

The workflow definition starts with the keyword workflow, followed by an optional name, and finally the workflow body delimited by curly braces. A basic workflow looks like the following example:

workflow {
    foo()
}

Where foo could be a function, a process, or another workflow.

Workflows are lazily executed, which means that Nextflow parses the entire workflow structure first, and then executes the entire workflow at once. The order in which a task is executed is determined only by its dependencies, so a task will be executed as soon as all of its required inputs are available.

The syntax of a workflow is defined as follows:

workflow [ name ] {

    take:
    < workflow inputs >

    main:
    < dataflow statements >

    emit:
    < workflow outputs >

}

Tip

The main: label can be omitted if there are no take: or emit: blocks.

Note

Workflows were introduced in DSL2. If you are still using DSL1, see the Migrating from DSL 1 page to learn how to migrate your Nextflow pipelines to DSL2.

Implicit workflow

A script can define a single workflow without a name (also known as the implicit workflow), which is the default entrypoint of the script. The -entry command line option can be used to execute a different workflow as the entrypoint at runtime.

Note

Implicit workflow definitions are ignored when a script is included as a module. This way, a script can be written such that it can be either imported as a module or executed as a pipeline.

Named workflows

A named workflow is a workflow that can be invoked from other workflows. For example:

workflow my_pipeline {
    foo()
    bar( foo.out.collect() )
}

workflow {
    my_pipeline()
}

The above snippet defines a workflow named my_pipeline, that can be invoked from another workflow as my_pipeline(), just like any other function or process.

Using variables and params

A workflow can access any variable or parameter defined in the global scope:

params.data = '/some/data/file'

workflow {
    if( params.data )
        bar(params.data)
    else
        bar(foo())
}

Tip

The use of global variables and params in named workflows is discouraged because it breaks the modularity of the workflow. As a best practice, every workflow input should be explicitly defined as such in the take: block, and params should only be used in the implicit workflow.

Workflow inputs (take)

A workflow can declare one or more input channels using the take keyword. For example:

workflow my_pipeline {
    take:
    data1
    data2

    main:
    foo(data1, data2)
    bar(foo.out)
}

Warning

When the take keyword is used, the beginning of the workflow body must be defined with the main keyword.

Inputs can be specified like arguments when invoking the workflow:

workflow {
    my_pipeline( channel.from('/some/data') )
}

Workflow outputs (emit)

A workflow can declare one or more output channels using the emit keyword. For example:

workflow my_pipeline {
    main:
    foo(data)
    bar(foo.out)

    emit:
    bar.out
}

When invoking the workflow, the output channel(s) can be accessed using the out property, i.e. my_pipeline.out. When multiple output channels are declared, use the array bracket notation or the assignment syntax to access each output channel as described for process outputs.

Named outputs

If an output channel is assigned to an identifier in the emit block, the identifier can be used to reference the channel from the calling workflow. For example:

workflow my_pipeline {
    main:
    foo(data)
    bar(foo.out)

    emit:
    my_data = bar.out
}

The result of the above workflow can be accessed using my_pipeline.out.my_data.

Invoking processes

A process can be invoked like a function in a workflow definition, passing the expected input channels like function arguments. For example:

process foo {
    output:
    path 'foo.txt'

    script:
    """
    your_command > foo.txt
    """
}

process bar {
    input:
    path x

    output:
    path 'bar.txt'

    script:
    """
    another_command $x > bar.txt
    """
}

workflow {
    data = channel.fromPath('/some/path/*.txt')
    foo()
    bar(data)
}

Warning

A process can be only be invoked once in a single workflow, however you can get around this restriction by using Module aliases.

Process composition

Processes with matching input/output declarations can be composed so that the output of the first process is passed as input to the second process. The previous example can be rewritten as follows:

workflow {
    bar(foo())
}

Process outputs

A process output can be accessed using the out attribute on the corresponding process object. For example:

workflow {
    foo()
    bar(foo.out)
    bar.out.view()
}

When a process defines multiple output channels, each output can be accessed by index (out[0], out[1], etc.) or by name (see below).

The process output(s) can also be accessed like the return value of a function:

workflow {
    f_out = foo()
    (b1, b2) = bar(f_out)
    b1.view()
}

Named outputs

The emit option can be added to the process output definition to assign a name identifier. This name can be used to reference the channel from the calling workflow. For example:

process foo {
    output:
    path '*.bam', emit: samples_bam

    '''
    your_command --here
    '''
}

workflow {
    foo()
    foo.out.samples_bam.view()
}

When referencing a named output directly from the process invocation, you can use a more concise syntax:

workflow {
    ch_samples = foo().samples_bam
}

See naming process outputs for more details.

Named stdout

The emit option can also be used to name a stdout output. However, while process output options are usually prefixed with a comma, this is not the case for stdout. This is because stdout does not have an argument like other types.

process sayHello {
    input:
    val cheers

    output:
    stdout emit: verbiage

    script:
    """
    echo -n $cheers
    """
}

workflow {
    things = channel.of('Hello world!', 'Yo, dude!', 'Duck!')
    sayHello(things)
    sayHello.out.verbiage.view()
}

Invoking workflows

Named workflows can be invoked and composed just like any other process or function.

workflow flow1 {
    take: data
    main:
        foo(data)
        bar(foo.out)
    emit:
        bar.out
}

workflow flow2 {
    take: data
    main:
        foo(data)
        baz(foo.out)
    emit:
        baz.out
}

workflow {
    take: data
    main:
        flow1(data)
        flow2(flow1.out)
}

Note

Each workflow invocation has its own scope. As a result, the same process can be invoked in two different workflow scopes, like foo in the above snippet, which is used in both flow1 and flow2. The workflow execution path, along with the process names, determines the fully qualified process name that is used to distinguish the different process invocations, i.e. flow1:foo and flow2:foo in the above example.

Tip

The fully qualified process name can be used as a process selector in a Nextflow configuration file, and it takes priority over the simple process name.

Special operators

Pipe |

The | pipe operator can be used to compose Nextflow processes and operators. For example:

process foo {
    input:
    val data

    output:
    val result

    exec:
    result = "$data world"
}

workflow {
   channel.from('Hello','Hola','Ciao') | foo | map { it.toUpperCase() } | view
}

The above snippet defines a process named foo and invokes it with the data channel. The result is then piped to the map operator, which converts each string to uppercase, and finally to the view operator which prints it.

Tip

Statements can also be split across multiple lines for better readability:

workflow {
    channel.from('Hello','Hola','Ciao')
      | foo
      | map { it.toUpperCase() }
      | view
}

And &

The & and operator can be used to feed multiple processes with the same channel(s). For example:

process foo {
    input:
    val data

    output:
    val result

    exec:
    result = "$data world"
}

process bar {
    input:
    val data

    output:
    val result

    exec:
    result = data.toUpperCase()
}

workflow {
    channel.from('Hello')
      | map { it.reverse() }
      | (foo & bar)
      | mix
      | view
}

In the above snippet, the initial channel is piped to the map operator, which reverses the string value. Then, the result is passed to the processes foo and bar, which are executed in parallel. Each process outputs a channel, and the two channels are combined using the mix operator. Finally, the result is printed using the view operator.

Publishing outputs

New in version 24.04.0.

Note

This feature requires the nextflow.preview.output feature flag to be enabled.

A script may define the set of outputs that should be published by the implicit workflow, known as the workflow output definition:

workflow {
    foo(bar())
}

output {
    directory 'results'
}

The output definition must be defined after the implicit workflow.

Publishing channels

Processes and workflows can each define a publish section which maps channels to publish targets. For example:

process foo {
    // ...

    output:
    path 'result.txt', emit: results

    publish:
    results >> 'foo'

    // ...
}

workflow foobar {
    main:
    foo(data)
    bar(foo.out)

    publish:
    foo.out >> 'foobar/foo'

    emit:
    bar.out
}

In the above example, the output results of process foo is published to the target foo/ by default. However, when the workflow foobar invokes process foo, it publishes foo.out (i.e. foo.out.results) to the target foobar/foo/, overriding the default target defined by foo.

In a process, any output with an emit name can be published. In a workflow, any channel defined in the workflow, including process and subworkflow outputs, can be published.

Note

If the publish source is a process/workflow output (e.g. foo.out) with multiple channels, each channel will be published. Individual output channels can also be published by index or name (e.g. foo.out[0] or foo.out.results).

As shown in the example, workflows can override the publish targets of process and subworkflow outputs. This way, each process and workflow can define some sensible defaults for publishing, which can be overridden by calling workflows as needed.

By default, all files emitted by the channel will be published into the specified directory. If a channel emits list values, any files in the list (including nested lists) will also be published. For example:

workflow {
    ch_samples = Channel.of(
        [ [id: 'sample1'], file('sample1.txt') ]
    )

    publish:
    ch_samples >> 'samples' // sample1.txt will be published
}

Publish directory

The directory statement is used to set the top-level publish directory of the workflow:

output {
    directory 'results'

    // ...
}

It is optional, and it defaults to the launch directory (workflow.launchDir). Published files will be saved within this directory.

Publish targets

A publish target is a name with a specific publish configuration. By default, when a channel is published to a target in the publish: section of a process or workflow, the target name is used as the publish path.

For example, given the following output definition:

workflow {
    ch_foo = foo()
    ch_bar = bar(ch_foo)

    publish:
    ch_foo >> 'foo'
    ch_bar >> 'bar'
}

output {
    directory 'results'
}

The following directory structure will be created:

results/
└── foo/
    └── ...
└── bar/
    └── ...

Note

The trailing slash in the target name is not required; it is only used to denote that the target name is intended to be used as the publish path.

Warning

The target name must not begin with a slash (/), it should be a relative path name.

Workflows can also disable publishing for specific channels by redirecting them to null:

workflow {
    ch_foo = foo()

    publish:
    ch_foo >> (params.save_foo ? 'foo' : null)
}

Publish targets can be customized in the output definition using a set of options similar to the publishDir directive.

For example:

output {
    directory 'results'
    mode 'copy'

    'foo' {
        mode 'link'
    }
}

In this example, all files will be copied by default, and files published to foo/ will be hard-linked, overriding the default option.

Available options:

contentType

Currently only supported for S3.

Specify the media type a.k.a. MIME type of published files (default: false). Can be a string (e.g. 'text/html'), or true to infer the content type from the file extension.

enabled

Enable or disable publishing (default: true).

ignoreErrors

When true, the workflow will not fail if a file can’t be published for some reason (default: false).

mode

The file publishing method (default: 'symlink'). The following options are available:

'copy'

Copy each file into the output directory.

'copyNoFollow'

Copy each file into the output directory without following symlinks, i.e. only the link is copied.

'link'

Create a hard link in the output directory for each file.

'move'

Move each file into the output directory.

Should only be used for files which are not used by downstream processes in the workflow.

'rellink'

Create a relative symbolic link in the output directory for each file.

'symlink'

Create an absolute symbolic link in the output directory for each output file.

overwrite

When true any existing file in the specified folder will be overwritten (default: 'standard'). The following options are available:

false

Never overwrite existing files.

true

Always overwrite existing files.

'deep'

Overwrite existing files when the file content is different.

'lenient'

Overwrite existing files when the file size is different.

'standard'

Overwrite existing files when the file size or last modified timestamp is different.

path

Specify the publish path relative to the output directory (default: the target name). Can only be specified within a target definition.

storageClass

Currently only supported for S3.

Specify the storage class for published files.

tags

Currently only supported for S3.

Specify arbitrary tags for published files. For example:

tags FOO: 'hello', BAR: 'world'

Index files

A publish target can create an index file of the values that were published. An index file is a useful way to save the metadata associated with files, and is more flexible than encoding metadata in the file path. Currently only CSV files are supported.

For example:

workflow {
    ch_foo = Channel.of(
        [id: 1, name: 'foo 1'],
        [id: 2, name: 'foo 2'],
        [id: 3, name: 'foo 3']
    )

    publish:
    ch_foo >> 'foo'
}

output {
    directory 'results'

    'foo' {
        index {
            path 'index.csv'
        }
    }
}

The above example will write the following CSV file to results/foo/index.csv:

"id","name"
"1","foo 1"
"2","foo 2"
"3","foo 3"

You can customize the index file by specifying options in a block, for example:

index {
    path 'index.csv'
    header ['name', 'extra_option']
    sep '\t'
    mapper { val -> val + [extra_option: 'bar'] }
}

The following options are available:

header

When true, the keys of the first record are used as the column names (default: false). Can also be a list of column names.

mapper

Closure which defines how to transform each published value into a CSV record. The closure should return a list or map. By default, no transformation is applied.

path

The name of the index file relative to the target path (required).

sep

The character used to separate values (default: ',').