Add New Functionality

Implement New Steps

uap can be easily extended by implementing new source or processing steps. This requires basic python programming skills. New steps are added to uap by placing a single Python file into one of these folders in the uap installation directory:

include/sources

Place source step files here

include/steps

Place processing step files here

Let’s talk about how to implement such uap steps.

Step 1: Import Statements and Logger

At the beginning of every step please import the required modules and create a logger object.

# First import standard libraries
import os
from logging import getLogger

# Secondly import third party libraries
import yaml

# Thirdly import local application files
from abstract_step import AbstractStep # or AbstractSourceStep
from uaperrors import UAPError

# Get application wide logger
logger = getLogger("uap_logger")

Essential imports are the from logging import getLogger, from abstract_step import ... and from uaperrors import UAPError. The logger can be used to log messages on different verbosity levels (e.g. logger.debug, logger.info, logger.warning). The UAPError can be used to raise UAÜ specific errors with raise UAPError(<message>). AbstractStep and AbstractSourceStep are available parent classes from which the new implementation musst inherit its methods.

Step 2: Class Definition

Now you need to define a class (which inherits either from AbstractStep or AbstractSourceStep) and its __init__ method.

class ConcatenateFiles(AbstractStep):
    # Overwrite initialisation
    def __init__(self, pipeline):
        # Call super classes initialisation
        super(ConcatenateFiles, self).__init__(pipeline)

..

The new class needs to be derived from either AbstractStep, for processing steps, or AbstractSourceStep, for source steps.

Step 3: __init__ Method

The __init__ method is the place where you should declare:

Tools via self.require_tool('tool_name'):

Steps usually require tools to perform their task. Each tool that is going to be used by a step needs to be requested via the method require_tool('tool_name'). uap tests the existence of the required tools whenever it constructs the directed acyclic graph (DAG) of the analysis. The test is based on the information provided in the tools section of the analysis configuration. An entry for tool_name has to exist and to provide information to verify the tools accessibility.

Connections via add_connection(...):

Connections are defined by the method add_connection(...). They are used to transfer data from one step to another. If a step defines an output connection out/something and a subsequent step defines an input connection named in/something, then the files beloging to out/something will be available via the connection in/something.

Please name connection in a way that they describe the data itself and NOT the data type. For instance, use in/genome over in/fasta.

The first parameters of the method musst be a tag and all other parameter should be passed with their respective keyword:

  1. tag

    The name of the connection. It musst start with in/ to declare an input connection or out/ to declare an output connection.

  2. optional (Boolean, default False)

    Defines if the connection is mandatory (False) or optional (True). A mendatory connection will be checked more rigorously.

  3. format

    Descripes the expected file formats to aid the user.

  4. description

    Descripes the content to aid the user. The string is used in for the documentation with sphinx and reStructuredText markups can be used.

Options via self.add_option():

Options allow to influence the commands executed by a step. It is advisable to provide as many meaningful options as possible to keep steps flexible. Options are defined via the method add_option().

The add_option() method allows to specify various information about the option. The method parameters are these:

  1. key

    name of the option (if possible include the name of the tool this option influences e.g. dd-blocksize to set dd blocksize)

  2. option_type

    The option type has to be at least one of int, float, str, bool, list, or dict.

  3. optional (Boolean, default False)

    Defines if the option is mandatory (False) or optional (True).

  4. choices

    List of valid values for the option.

  5. default (default None)

    Defines the default value for the option.

  6. description

    The description of the functionality of the option. The string is used in for the documentation with sphinx and reStructuredText markups can be used.

..

        # Define connections
        self.add_connection('in/text', optional=False,
            format=['txt', 'text'],
            description='Contains certain information.')
        self.add_connection('out/text', optional=False,
             format='txt', description='The result.')

        # Request tools
        self.require_tool('cat')

        # Options for workflow
        self.add_option('concatenate_all_files', bool, optional=False,
                        default=False, description="Concatenate all files from "
                        "all runs, if 'True'.")

        # Options for 'cat' (see manpage)
        self.add_option('show-all', bool, optional=True,
                        description="Show all characters")

        self.add_option('number-nonblank', int, optional=True,
                        description="number nonempty output lines, "
                        "overrides --number")

        self.add_option('show-ends', bool, optional=True,
                        description="display $ at end of each line")

        self.add_option("number", int, optional=True,
                        description="number all output lines")

        self.add_option("squeeze-blank", bool, optional=True,
                        description="suppress repeated empty output lines")

        self.add_option("show-tabs", bool, optional=True,
                        description="display TAB characters as ^I")

        self.add_option("show-nonprinting", bool, optional=True,
                         description="use ^ and M- notation, except for "
                         "LFD and TAB")

..

Step 4: runs Method

The runs method is where all the work is done. This method gets handed over an instance of the ConnectionsCollector which can be used like a dictionary of dictionaries. The keys of the first dictionary are the run IDs (often resembling the samples). The values of the first dictionary is another dictionary. The keys of that second dictionary are the connections e.g. “in/text” and the values are the corresponding files belonging to that connection.

Let’s inspect all the run IDs, connections, and input files we got from our upstream steps for all runs that have the connection in/testconnection. And let’s tore all files we received in a list for later use.

..

    def runs(self, cc):
        all_files = list()
        # Let's inspect the cc data structure
        run_ids = cc.get_runs_with_connections('in/testconnection')
        for run_id in run_ids:
            logger.info("Run ID: %s" % run_id)
            for connection in cc[run_id].keys():
                logger.info("Connection: %s" % connection)
                for in_file in cc[run_id][connection]:
                    logger.info("Input file: %s" % in_file)
                    # Collect all files
                    all_files.append(in_file)

..

It comes in handy to assemble a list with all options for cat here.

..

     # List with options for 'cat'
     cat_options = ['show-all', 'number-nonblank', 'show-ends', 'number',
                    'squeeze-blank', 'show-tabs', 'show-nonprinting']

     # Get all options which were set
     set_options = [option for option in cat_options if \
                    self.is_option_set_in_config(option)]

     # Compile the list of options
     cat_option_list = list()
     for option in set_options:
         # bool options look different than ...
         if isinstance(self.get_option(option), bool):
             if self.get_option(option):
                 cat_option_list.append('--%s' % option)
         # ... the rest ...
         else:
             cat_option_list.append('--%s' % option)
             # ... make sure to cast the values to string
             cat_option_list.append(str(self.get_option(option)))

..

What should happen if we are told to concatenate all files from all input runs? We have to create a single run with a new run ID ‘all_files’. The run consists of a exec_group that runs the cat command.

Note

An exec_group is a list of commands which are executed in one go. You might create multiple exec_group’s if you need to make sure a set of commands finished before another set is started. An exec_group can contain commands and pipelines. They can be added like this:

# Add a single command
exec_group.add_command(...)

# Add a pipeline to an exec_group
pipe = exec_group.add_pipeline()
# Add a command to a pipeline
pipe.add_command(...)

The result of the concatenation is written to an output file. The run object needs to know about each output file that is going to be created.

Note

An output file is announced via the run objects add_output_file(tag, out_path, in_paths) method. The method parameters are:

  1. tag: The name of the out connection e.g. ‘text’ for ‘out/text’

  2. out_path: The name of the output file (best practice is to add the run ID to the file name)

  3. in_paths: The input files this output file is based on

..

     # Okay let's concatenate all files we get
     if self.get_option('concatenate_all_files'):

         # New run named 'all_files' is created here
         run = self.declare_run('all_files')

         # Create an exec
         exec_group = run.new_exec_group()
         # Assemble the cat command
         cat = [ self.get_tool('cat') ]
         # Add the options to the command
         cat.extend( cat_option_list )
         cat.extend( all_files )

         # Now add the command to the execution group
         exec_group.add_command(
             cat,
             stdout_path = run.add_output_file(
                 'text',
                 "%s_concatenated.txt" % run_id,
                 all_files)
         )

..

What should happen if all files of an input run have to be concatenated? We create a new run for each input run and concatenate all files that belong to the input run.

# Concatenate all files from a runs 'in/text' connection
else:
    # iterate over all run IDs ...
    for run_id in cc.keys():
        input_paths = cc[run_id]['in/text']
        # ... and declare a new run for each of them.
        exec_group = self.declare_run(run_id).new_exec_group()
        # Assemble the cat command
        cat = [ self.get_tool('cat') ]
        # Add the options to the command
        cat.extend( cat_option_list )
        cat.extend( input_paths )

        # Now add the command to the execution group
        exec_group.add_command(
            cat,
            stdout_path = run.add_output_file(
                'text',
                "%s_concatenated.txt" % run_id,
                input_paths)
        )

That’s it. You created your first uap processing step.

Step 5: Add the new step to uap

You have to make the new step known to uap. Save the complete file into uap’s include/steps folder. Processing step files are located at uap’s include/steps/ folder and source step files at uap’s include/sources/ folder.

You can control that your step is correctly “installed” if its included in the list of all source and processing steps:

$ ls -la $(uap --path)/include/sources
... Lists all available source step files

$ ls -la $(uap --path)/include/steps
... Lists all available processing step files

You can also use uap’s steps subcommand to get information about installed steps.

If the step file exists at the correct location that step can be used in an analysis configuration file.

A potential example YAML file named test.yaml could look like this:

destination_path: example-out/test/

steps:
    ##################
    ## Source steps ##
    ##################

    raw_file_source:
        pattern: example-data/text-files/*.txt
        group: (.*).txt

    ######################
    ## Processing steps ##
    ######################

    cat:
        _depends: raw_file_source
        _connect:
            in/text:
                - raw_file_source/raw
        concatenate_all_files: False

tools:
    cat:
        path: cat
        get_version: '--version'
        exit_code: 0

You need to create the destination path and some text files matching the pattern example-data/text-files/*.txt. Also you see the work of the _connect keyword in play. Check the status of the configured analysis:

$ uap test.yaml status
Ready runs
----------
[r] cat/Hello_america
[r] cat/Hello_asia
[r] cat/Hello_europe
[r] cat/Hello_world

runs: 4 total, 4 ready

Best Practices

There are a couple of things you should keep in mind while implementing new steps or modifying existing ones:

  • NEVER remove files! If files need to be removed report the issue and exit uap or force the user to call a specific subcommand. Never delete files without permission by the user.

  • Make sure errors already show up when the steps runs() method is called. Stick to fail early, fail often. That way errors show up before submitting jobs to the cluster and cluster waiting time is not wasted.

  • Make sure that all tools which you request with self.require_tool() are also used in the runs() method. Use the __init__() method to request tools.

  • Always call os.path.abspath on files that are passed via an option to enable files references relativ to an uap config for your step.

  • Make sure your disk access is as cluster-friendly as possible (which primarily means using large block sizes and preferably no seek operations). If possible, use pipelines to wrap your commands in pigz or dd commands. Make the used block size configurable. Although this is not possible in every case (for example when seeking in files is involved), it is straightforward with tools that read a continuous stream from stdin and write a continuous stream to stdout.

  • Always use os.path.join(...) to handle paths.

  • Use bash commands like mkfifo over python library equivalents like os.mkfifo(). The mkfifo command is hashed while an os.mkfifo() is not.

  • Keep your steps as flexible as possible. You don’t know what other user might need, so let them decide.

Usage of dd and mkfifo

uap relies often on dd and FIFOs to process data with fewer disk read-write operations. Please provide a step option to adjust the dd blocksize (this option is usually called dd-blocksize). Create your steps in a way that they perform the least filesystem operations. Some systems might be very sensitive to huge numbers of read-write operations.