API documentation¶
Pipeline-specific modules¶
abstract_step¶
Classes AbstractStep and AbstractSourceStep are defined here.
The class AbstractStep has to be inherited by all processing step classes. The class AbstractSourceStep has to be inherited by all source step classes.
Processing steps generate output files from input files whereas source steps only provide output files. Both step types may generates tasks, but only source steps can introduce files from outside the destination path into the pipeline.
-
class
abstract_step.
AbstractStep
(pipeline)[source]¶ -
add_connection
(connection, optional=False, format=None, description=None)[source]¶ Add a connection, which must start with ‘in/’ or ‘out/’. :type format: (str) Data format passed in the connection. :type description: (str) Explain the connection.
-
add_dependency
(parent)[source]¶ Add a parent step to this steps dependencies.
parent – parent step this step depends on
-
check_required_out_connections
()[source]¶ This functions tests if all required out connections were set by all runs.
-
declare_run
(run_id)[source]¶ Declare a run. Use it like this:
with self.declare_run(run_id) as run: # add output files and information to the run here
-
dependencies
¶ All steps this step depends on.
-
finalize
()[source]¶ Finalizes the step.
The intention is to make further changes to the step impossible, but apparently, it’s checked nowhere at the moment.
-
find_upstream_info_for_input_paths
(input_paths, key)[source]¶ Find a piece of public information in all upstream steps. If the information is not found or defined in more than one upstream step, this will crash.
-
get_in_connections
(with_optional=True, strip_prefix=False)[source]¶ Return all in-connections for this step
-
get_module_loads
()[source]¶ Return dictionary with module load commands to execute before starting any other command of this step
-
get_module_unloads
()[source]¶ Return dictionary with module unload commands to execute before starting any other command of this step
-
get_out_connections
(with_optional=True, strip_prefix=False)[source]¶ Return all out-connections for this step
-
get_post_commands
()[source]¶ Return dictionary with commands to execute after finishing any other command of this step
-
get_pre_commands
()[source]¶ Return dictionary with commands to execute before starting any other command of this step
-
get_run_ids_in_connections_input_files
()[source]¶ Return a dictionary with all run IDs from parent steps, the in connections they provide data for, and the names of the files:
run_id_1: in_connection_1: [input_path_1, input_path_2, ...] in_connection_2: ... run_id_2: ...
Format of
in_connection
:in/<connection>
. Input paths are absolute.
-
get_runs
()[source]¶ Getter method for runs of this step.
If there are no runs as this method is called, they are created here.
-
classmethod
get_step_class_for_key
(key)[source]¶ Returns a step (or source step) class for a given key which corresponds to the name of the module the class is defined in. Pass ‘cutadapt’ and you will get the cutadapt.Cutadapt class which you may then instantiate.
-
get_step_name
()[source]¶ Returns this steps name.
Returns the step name which is initially equal to the step type (== module name) but can be changed via set_step_name() or via the YAML configuration.
-
is_option_set_in_config
(key)[source]¶ Determine whether an optional option (that is, a non-required option) has been set in the configuration.
-
require_tool
(tool)[source]¶ Declare that this step requires an external tool. Query it later with get_tool().
-
run
(run_id)[source]¶ Create a temporary output directory and execute a run. After the run has finished, it is checked that all output files are in place and the output files are moved to the final output location. Finally, YAML annotations are written.
-
runs
(run_ids_connections_files)[source]¶ Abstract method this must be implemented by actual step.
Raise NotImplementedError if subclass does not override this method.
-
-
class
abstract_step.
AbstractSourceStep
(pipeline)[source]¶ A subclass all source steps inherit from and which distinguishes source steps from all real processing steps because they do not yield any tasks, because their “output files” are in fact files which are already there.
Note that the name might be a bit misleading because this class only applies to source steps which ‘serve’ existing files. A step which has no input but produces input data for other steps and actually has to do something for it, on the other hand, would be a normal AbstractStep subclass because it produces tasks.
pipeline¶
-
exception
pipeline.
ConfigurationException
(value)[source]¶ an exception class for reporting configuration errors
-
class
pipeline.
Pipeline
(**kwargs)[source]¶ The Pipeline class represents the entire processing pipeline which is defined and configured via the configuration file config.yaml.
Individual steps may be defined in a tree, and their combination with samples as generated by one or more source leads to an array of tasks.
-
all_tasks_topologically_sorted
¶ List of all tasks in topological order.
-
check_tools
()[source]¶ checks whether all tools references by the configuration are available and records their versions as determined by
[tool] --version
etc.
-
config
¶ Dictionary representation of configuration YAML file.
-
config_name
¶ Name of the YAML configuration file
-
file_dependencies
¶ This dict stores file dependencies within this pipeline, but regardless of step, output file tag or run ID. This dict has, for all output files generated by the pipeline, a set of input files that output file depends on.
-
file_dependencies_reverse
¶ This dict stores file dependencies within this pipeline, but regardless of step, output file tag or run ID. This dict has, for all input files required by the pipeline, a set of output files which are generated using this input file.
-
get_cluster_job_ids
()[source]¶ The argument less method returns a set the cluster job ids of all subbmited jobs.
-
get_task_with_list
(as_string=False, exclusive=False)[source]¶ Reruns a list of tasks, specified with the run argument.
-
git_tag
¶ use git diff to determine any changes in git directory if git is available
-
input_files_for_task_id
¶ This dict stores a set of input files for every task id in the pipeline.
-
known_config_keys
¶ A set of accepted keys in the config.
-
notify
(message, attachment=None)[source]¶ prints a notification to the screen and optionally delivers the message on additional channels (as defined by the configuration)
-
output_files_for_task_id
¶ This dict stores a set of output files for every task id in the pipeline.
-
setup_lmod
()[source]¶ If lmod is configured this functions sets the required environmental variables.
-
states
= {'BAD', 'CHANGED', 'EXECUTING', 'FINISHED', 'QUEUED', 'READY', 'VOLATILIZED', 'WAITING'}¶ Possible states a task can be in.
-
steps
¶ This dict stores step objects by their name. Each step knows his dependencies.
-
task_for_output_file
¶ This dict stores a task ID for every output file created by the pipeline.
-
task_for_task_id
¶ This dict stores task objects by task IDs.
-
task_id_for_output_file
¶ This dict stores a task ID for every output file created by the pipeline.
-
task_ids_for_input_file
¶ This dict stores a set of task IDs for every input file used in the pipeline.
-
tasks_in_step
¶ This dict stores tasks per step name.
-
topological_step_order
¶ List with topologically ordered steps.
-
used_tools
¶ A set that stores all tools used by some step.
-
-
pipeline.
check_tool
(args)[source]¶ A top-level function to be used a multiprocessing pool to retrieve tool information in parallel.
-
pipeline.
coreutils
= {'basename', 'cat', 'cp', 'cut', 'date', 'dd', 'dirname', 'du', 'head', 'ln', 'ls', 'mkdir', 'mkfifo', 'mv', 'paste', 'printf', 'pwd', 'rm', 'seq', 'sleep', 'sort', 'tail', 'tee', 'tr', 'uniq', 'wc'}¶ Some GNU Core Utilities that are configured by default to be callable by name and to ignore theire version in the output hash.
run¶
-
class
run.
Run
(step, run_id)[source]¶ The Run class is a helper class which represents a run in a step. Declare runs inside AbstractStep.runs() via:
with self.new_run(run_id) as run: # declare output files, private and public info here
After that, use the available methods to configure the run. The run has typically no information about input connections only about input files.
-
add_empty_output_connection
(tag)[source]¶ An empty output connection has ‘None’ as output file and ‘None’ as input file.
-
add_output_file
(tag, out_path, in_paths)[source]¶ Add an output file to this run. Output file names must be unique across all runs defined by a step, so it may be a good idea to include the run_id into the output filename.
- tag: You must specify the connection annotation which must have been
previously declared via AbstractStep.add_connection(“out/…”), but this doesn’t have to be done in the step constructor, it’s also possible in declare_runs() right before this method is called.
- out_path: The output file path, without a directory. The pipeline
assigns directories for you (this parameter must not contain a slash).
- in_paths: A list of input files this output file depends on. It is
crucial to get this right, so that the pipeline can determine which steps are up-to-date at any given time. You have to specify absolute paths here, including a directory, and you can obtain them via AbstractStep.run_ids_and_input_files_for_connection and related functions.
-
add_private_info
(key, value)[source]¶ Add private information to a run. Use this to store data which you will need when the run is executed. As opposed to public information, private information is not visible to subsequent steps.
You can store paths to input files here, but not paths to output files as their expected location is not defined until we’re in AbstractStep.execute (hint: they get written to a temporary directory inside execute()).
-
add_public_info
(key, value)[source]¶ Add public information to a run. For example, a FASTQ reader may store the index barcode here for subsequent steps to query via
AbstractStep.find_upstream_info()
.
-
add_temporary_directory
(prefix='', suffix='', designation=None)[source]¶ Convenience method for creation of temporary directories. Basically, just calls self.add_temporary_file(). The magic happens in ProcessPool.__exit__()
-
add_temporary_file
(prefix='temp', suffix='', designation=None)[source]¶ Returns the name of a temporary file.
-
annotation_written
¶ Flag to mark if an annotation file was written during this uap execution.
-
fsc
¶ A cache.
-
get_output_directory_du_jour_placeholder
()[source]¶ Used to return a placeholder for the temporary output directory, which needed to be replaced by the actual temp directory inside the abstract_step.execute() method.
-
get_output_files_abspath
()[source]¶ Return a dictionary of all defined output files, grouped by connection annotation:
annotation_1: out_path_1: [in_path_1, in_path_2, ...] out_path_2: ... annotation_2: ...
The
out_path
consists of the output directory du jour and the output file name.
Retrieve a set of output files of the given annotation, assigned to the same number of specified tags. If you have two ‘alignment’ output files and they are called out-a.txt and out-b.txt, you can use this function like this:
tags: [‘a’, ‘b’]
result: {‘a’: ‘out-a.txt’, ‘b’: ‘out-b.txt’}
-
get_private_info
(key)[source]¶ Query private information which must have been previously stored via ” “add_private_info().
-
get_public_info
(key)[source]¶ Query public information which must have been previously stored via ” “add_public_info().
-
get_run_structure
(*args, **kwargs)[source]¶ Returns a dictionary with all information known at run declaratuon time, relevant for its result and nothing more.
- Included are:
tool versions
commands and structure
output connections and files
parent run names and hashsum of their run_structure
- Should not include:
any absolute paths
parent hashes of source steps (absolute paths)
-
is_stale
(exec_ping_file=None)[source]¶ Returns time of inactivity if the ping file exists and is stale.
-
remove_temporary_paths
()[source]¶ Everything stored in self._temp_paths is examined and deleted if possible. The list elements are removed in LIFO order. Also, self._known_paths ‘type’ info is updated here. NOTE: Included additional stat checks to detect FIFOs as well as other special files.
-
update_public_info
(key, value)[source]¶ Update public information already existing in a run. For example, all steps which handle FASTQ files want to know how to distinguish between files of read 1 and files of read 2. So each step that provides FASTQ should update this information if the file names are altered. The stored information can be acquired via:
AbstractStep.find_upstream_info()
.
-
Miscellaneous modules¶
process_pool¶
-
class
process_pool.
ProcessPool
(run)[source]¶ The process pool provides an environment for launching and monitoring processes. You can launch any number of unrelated processes plus any number of pipelines in which several processes are chained together.
Use it like this:
with process_pool.ProcessPool(self) as pool: # launch processes or create pipelines here
When the scope opened by the with statement is left, all processes are launched and being watched. The process pool then waits until all processes have finished. You cannot launch a process pool within another process pool, but you can launch multiple pipeline and independent processes within a single process pool. Also, you can launch several process pools sequentially.
-
COPY_BLOCK_SIZE
= 4194304¶ When stdout or stderr streams should be written to output files, this is the buffer size which is used for writing.
-
class
Pipeline
(pool)[source]¶ This class can be used to chain multiple processes together.
Use it like this:
with pool.Pipeline(pool) as pipeline: # append processes to the pipeline here
-
SIGTERM_TIMEOUT
= 10¶ After a SIGTERM signal is issued, wait this many seconds before going postal.
-
TAIL_LENGTH
= 1024¶ Size of the tail which gets recorded from both stdout and stderr streams of every process launched with this class, in bytes.
-
classmethod
kill
()[source]¶ Kills all user-launched processes. After that, the remaining process will end and a report will be written.
-
classmethod
kill_all_child_processes
()[source]¶ Kill all child processes of this process by sending a SIGTERM to each of them. This includes all children which were not launched by this module, and their children etc.
-
launch
(args, stdout_path=None, stderr_path=None, hints={})[source]¶ Launch a process. Arguments, including the program itself, are passed in args. If the program is not a binary but a script which cannot be invoked directly from the command line, the first element of args must be a list like this: [‘python’, ‘script.py’].
Use stdout_path and stderr_path to redirect stdout and stderr streams to files. In any case, the output of both streams gets watched, the process pool calculates SHA256 checksums automatically and also keeps the last 1024 bytes of every stream. This may be useful if a process crashes and writes error messages to stderr in which case you can see them even if you didn’t redirect stderr to a log file.
Hints can be specified but are not essential. They help to determine the direction of arrows for the run annotation graphs rendered by GraphViz (sometimes, it’s not clear from the command line whether a certain file is an input or output file to a given process).
-
fscache¶
-
class
fscache.
FSCache
[source]¶ Use this class if you expect to make the same os.path.* calls many times during a short time. The first time you call a method with certain arguments, the call is made, but all subsequent calls are served from a cache.
Usage example:
# Instantiate a new file system cache. fsc = FSCache() # This call will stat the file system. print(fsc.exists('/home')) # This call will leave the file system alone, the cached result will be returned. print(fsc.exists('/home'))
You may call any method which is available in os.path.
misc¶
-
misc.
append_suffix_to_path
(path, suffix)[source]¶ Append a suffix to a path, for example:
path: /home/michael/chocolate-cookies.txt.gz
suffix: done right
result: /home/michael/chocolate-cookies-done-right.txt.gz
-
misc.
assign_strings
(paths, tags)[source]¶ Assign N strings (path names, for example) to N tags. Example:
paths = [‘RIB0000794-cutadapt-R1.fastq.gz’, ‘RIB0000794-cutadapt-R2.fastq.gz’]
tags = [‘R1’, ‘R2’]
result = { ‘R1’: ‘RIB0000794-cutadapt-R1.fastq.gz’, ‘R2’: ‘RIB0000794-cutadapt-R2.fastq.gz’ }
If this is not possible without ambiguities, a StandardError is thrown. Attention: The number of paths must be equal to the number of tags, a 1:1 relation is returned, if possible.
-
misc.
bytes_to_str
(num)[source]¶ Convert a number representing a number of bytes into a human-readable string such as “4.7 GB”
-
misc.
duration_to_str
(duration, long=False)[source]¶ Minor adjustment for Python’s duration to string conversion, removed microsecond accuracy and replaces ‘days’ with ‘d’
-
misc.
natsorted
(l)[source]¶ Return a ‘naturally sorted’ permutation of l.
Credits: http://www.codinghorror.com/blog/2007/12/sorting-for-humans-natural-sort-order.html
connections_collector¶
-
class
connections_collector.
ConnectionsCollector
(step_name=None)[source]¶ A ConnectionsCollector helps to collect and query file connections. An instance cc is generated for each step and passed to its
runs
method. For backwards compatibility reasons it can be queryed like a dictionarycc[run_id][connection]
.cc
can be used in the course of a step to dicide how to use the input runs and connections.-
add_connection
(connection, files, run_id=None)[source]¶ Saves the names in
files
for a newconnection
.
-
all_runs_have_connection
(connection)[source]¶ Returns a logical indication whether all saved runs have the queried
connection
.
-
any_runs_have_connection
(connection)[source]¶ Returns a logical indication whether any saved runs have the queried
connection
.
-
connect
(parent, child, connections=None)[source]¶ Makes connections between parent and child step and returns the utilized parent connections as “parent/name”. The passed connections need to be in the format as in the pipline configuration file. If no connections are passed it connects all equally named connections.
-
connection_exists
(connection)[source]¶ Returns a logical indication whether the requested connection exists.
-
exists_connection_for_run
(connection, run_id=None)[source]¶ Returns a logical indication whether the requested connection exists for the given/current run.
-
get_runs_with_any
(connections, with_empty=True)[source]¶ Returns all run ids with any of the given connections.
-
get_runs_with_connections
(connections, with_empty=True)[source]¶ Returns all run ids that have requested all connections.
-
get_runs_without_any
(connections, with_empty=True)[source]¶ Returns runs that have none of the passed connections.
-
look_for_unique
(connection, include=None)[source]¶ Looks for a unique file in the connection and returns it. E.g., to find a reference assembly among all parent runs. If NO runs come with the connection it returns None and if MORE THAN ONE run comes with the connection an UAPError is raised. The value passed with include is also counted.
-