Source code for misc

from uaperrors import UAPError
import sys
import hashlib
import json
from logging import getLogger
import os
import re
import signal
import yaml
from collections import OrderedDict

logger = getLogger('uap_logger')

# an enum class, yanked from
# http://stackoverflow.com/questions/36932/whats-the-best-way-to-implement-an-enum-in-python


[docs]class Enum(set): def __init__(self, _list): self.order = _list super(Enum, self).__init__(_list) def __getattr__(self, name): if name in self: return name raise AttributeError
[docs]def assign_strings(paths, tags): ''' Assign N strings (path names, for example) to N tags. Example: - paths = ['RIB0000794-cutadapt-R1.fastq.gz', 'RIB0000794-cutadapt-R2.fastq.gz'] - tags = ['R1', 'R2'] - result = { 'R1': 'RIB0000794-cutadapt-R1.fastq.gz', 'R2': 'RIB0000794-cutadapt-R2.fastq.gz' } If this is not possible without ambiguities, a StandardError is thrown. Attention: The number of paths must be equal to the number of tags, a 1:1 relation is returned, if possible. ''' def check_candidate(paths, tags, head, tail): chopped = [] for path in paths: if path[:len(head)] != head: return None if len(tail) == 0: chopped.append((path[len(head):], path)) else: if path[-len(tail):] != tail: return None chopped.append((path[len(head):-len(tail)], path)) if [_[0] for _ in sorted(chopped)] == sorted(tags): result = {} for _ in sorted(chopped): result[_[0]] = _[1] return result return None results = {} if len(paths) != len(tags): raise UAPError("Number of tags must be equal to number of paths") for tag in tags: for path in paths: result_candidate = {} if tag in path: # find all occurences of tag in path offset = 0 while path.find(tag, offset) >= 0: index = path.find(tag, offset) head = path[:index] tail = path[(index + len(tag)):] # now try chopping off head and tail from every path # and see whether we can unambiguously assign a path # to every tag, if yes, we have a result candidate result_candidate = check_candidate(paths, tags, head, tail) if result_candidate: results[json.dumps( result_candidate, sort_keys=True)] = result_candidate offset = index + 1 if len(results) != 1: raise UAPError("Unable to find an unambiguous mapping.") return results[results.keys()[0]]
def assign_string(s, tags): match = None for tag in tags: if tag in s: if match is not None: raise UAPError("Could not unambiguously match %s to %s." % (s, tags)) match = tag if match is None: raise UAPError("Could not match %s to %s." % (s, tags)) return match
[docs]def natsorted(l): ''' Return a 'naturally sorted' permutation of l. Credits: http://www.codinghorror.com/blog/2007/12/sorting-for-humans-natural-sort-order.html ''' def convert(text): return int(text) if text.isdigit() else text def alphanum_key(key): return [convert(c) for c in re.split('([0-9]+)', key)] return sorted(l, key=alphanum_key)
def str_to_sha256(s): return hashlib.sha256(s).hexdigest() def str_to_sha256_b62(s): digest = hashlib.sha256(s).digest() number = 0 for c in digest: number <<= 8 number += ord(c) result = '' alphabet = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' base = len(alphabet) while number > 0: digit = number % base result += alphabet[digit] number //= base return result
[docs]def bytes_to_str(num): ''' Convert a number representing a number of bytes into a human-readable string such as "4.7 GB" ''' for _, x in enumerate(['bytes', 'k', 'M', 'G']): if num < 1024.0: if _ == 0: return "%d %s" % (num, x) else: return "%1.1f %sB" % (num, x) num /= 1024.0 return "%1.1f %sB" % (num, 'T')
[docs]def duration_to_str(duration, long=False): ''' Minor adjustment for Python's duration to string conversion, removed microsecond accuracy and replaces 'days' with 'd' ''' value = str(duration) if not long: if 'days' in value: value = value.replace(' days,', 'd') if 'day' in value: value = value.replace(' day,', 'd') if 'd' in value and ':' in value and ( value.index(':') - value.index('d')) != 4: value = value[:value.index('d') + 1] + \ ' ' + value[value.index('d') + 1:] if '.' in value: value = value[0:value.index('.') + 2] return value
[docs]def append_suffix_to_path(path, suffix): ''' Append a suffix to a path, for example: - path: /home/michael/chocolate-cookies.txt.gz - suffix: done right - result: /home/michael/chocolate-cookies-done-right.txt.gz ''' dirname, filename = os.path.split(path) if '.' in filename: basename = filename[:filename.index('.')] extension = filename[filename.index('.'):] else: basename = filename extension = '' filename = basename + '-' + suffix + extension return os.path.join(dirname, filename)
[docs]def sha256sum_of(file): """ Returns hexdigits of the sha256sum of the passed file. """ sha256sum = hashlib.sha256() try: with open(file, 'rb') as f: # the below exception is raised for large files # this workaround reads the file in chunks and # updates the sha256sum while True: # read file in 2MB chunks buf = f.read(2 * 1024 * 1024) if not buf: break sha256sum.update(buf) except BaseException: raise UAPError("Error while calculating SHA256sum " "of %s" % file) return sha256sum.hexdigest()
[docs]def sha_and_file(file): ''' Designed to be run in multiprocessing.Pool().imap. ''' signal.signal(signal.SIGTERM, signal.SIG_DFL) return sha256sum_of(file), file
class UAPDumper(yaml.Dumper): # ensures indentation of lists def increase_indent(self, flow=False, indentless=False): return super(UAPDumper, self).increase_indent(flow, False)
[docs]class literal(str): pass
def literal_presenter(dumper, data): return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') UAPDumper.add_representer(literal, literal_presenter) def ordered_dict_presenter(dumper, data): return dumper.represent_dict(data.items()) UAPDumper.add_representer(OrderedDict, ordered_dict_presenter) def dict_keys_presenter(dumper, data): return dumper.represent_list(data) UAPDumper.add_representer(type(dict().keys()), dict_keys_presenter)
[docs]class type_tuple(tuple): pass
def type_tuple_presenter(dumper, data): strings = [ty.__name__ for ty in data] return dumper.represent_scalar('tag:yaml.org,2002:str', ', '.join(strings)) UAPDumper.add_representer(type_tuple, type_tuple_presenter)