Source code for py_hla_match.hla

import logging
import re
import threading
from typing import Optional

from pyard.exceptions import InvalidAlleleError

from py_hla_match.singleton import get_ard_instance
from py_hla_match.config import (
    get_config,
    get_config_version
)
from py_hla_match.exceptions import (
    MalformedHLAStringError,
    EmptyHLAStringError, PyardLibraryError
)

logger = logging.getLogger(__name__)

# regex pattern for ARD redux allele string
REDUX_PATTERN = re.compile(
    r"""
    ^ (?:HLA-)?
    (?P<locus>[A-Z0-9]+)
    \*
    (
        (?P<allele_fields>\d{2,4}(?::\d{2,4}){0,3})
        (?P<suffix>[NLSCAQ])?
        (?P<group_code>[GP])?
        $
    |
        (?P<remainder>.*)
        $
    )
    """,
    re.VERBOSE
)


[docs] class HLA: _cache = {} _redux_cache = {} _cache_lock = threading.RLock() _redux_cache_lock = threading.RLock() _config_version: Optional[int] = None __slots__ = ( # original allele string 'allele_string', # parsed locus fields 'locus', 'drb_sub_locus', # parsed allele fields 'allele_group', 'allele', 'synonymous_variant', 'non_coding_variant', # parsed suffix and group code 'suffix', 'group_code', # ARD reduction string 'ard_redux_allele_string', # ARD reduction fields 'ard_redux_allele_group', 'ard_redux_allele', # locked state '_locked', ) def __setattr__(self, name, value): """ Block any attribute mutation once _locked is True. __slots__ prevents creation of *new* attributes; this method prevents changing existing ones. """ if getattr(self, '_locked', False): raise AttributeError( f"{self.__class__.__name__} instances are immutable " f"after initialisation (attempted to set '{name}')." ) super().__setattr__(name, value) def __delattr__(self, name): raise AttributeError( f"{self.__class__.__name__} instances are immutable" ) def __new__(cls, allele_string: str): """Thread-safe caching""" with cls._cache_lock: # Invalidate cache if config version changed current_ver = get_config_version() if cls._config_version != current_ver: cls._cache.clear() cls._redux_cache.clear() cls._config_version = current_ver if allele_string in cls._cache: return cls._cache[allele_string] # Create new instance only if not cached instance = super().__new__(cls) cls._cache[allele_string] = instance return instance def __init__(self, allele_string: str) -> None: # Skip initialization if already initialized (cached object) if hasattr(self, '_locked') and self._locked: return super().__setattr__('allele_string', allele_string) # set None as default for missing slot members for attr in self.__slots__: if attr not in ('allele_string', '_locked'): super().__setattr__(attr, None) self._parse_allele_string() # if well-formed high-res allele use py-ard reduction if self.allele: self._ard_redux() else: logger.warning( f"HLA string '{self.allele_string}' at locus " f"'{self.locus}' is not a specific allele." ) if self.allele_group is not None: logger.warning( f"WARNING: Validity of '{self.allele_group}' not checked." ) super().__setattr__('_locked', True) def _parse_allele_string(self) -> None: """ Parses HLA allele string and populate HLA attributes. """ # validate the allele string match = self._validate_nomenclature() # extract locus self.locus = match.group('locus') # handle DRB3/4/5 region if self.locus in get_config().drb345_sub_loci: # if locus is DRB3/4/5, set the sub-locus self.drb_sub_locus = self.locus self.locus = 'DRB345' allele_fields = match.group('allele_fields') nan_field = match.group('nan') remainder = match.group('remainder') if allele_fields: self.suffix = match.group('suffix') self.group_code = match.group('group_code') if self.group_code == 'G' and allele_fields.count(':') != 2: raise MalformedHLAStringError( f"'{self.allele_string}' – 'G' group must have 3 fields." ) if self.group_code == 'P' and allele_fields.count(':') != 1: raise MalformedHLAStringError( f"'{self.allele_string}' – 'P' group must have 2 fields." ) # extract details from allele fields field_contents = allele_fields.split(':') if len(field_contents) > 0: self.allele_group = field_contents[0] if len(field_contents) > 1: self.allele = field_contents[1] if len(field_contents) > 2: self.synonymous_variant = field_contents[2] if len(field_contents) > 3: self.non_coding_variant = field_contents[3] elif nan_field: logger.info( f"HLA string '{self.allele_string}' at locus " f"'{self.locus}' is undefined: '{nan_field}')." ) elif remainder: raise MalformedHLAStringError( f"Invalid HLA allele string: '{self.allele_string}' " f"contains unparsable content: '{remainder}'" ) else: raise EmptyHLAStringError( f"HLA string '{self.allele_string}' at locus" f" '{self.locus}' is empty." ) def _validate_nomenclature(self) -> re.Match: """ Validate HLA allele string with nomenclature. Tries to extract locus information if HLA allele string is not complete. :raises MalformedHLAStringError: If the allele string is malformed. """ # may wanna check with: # https://raw.githubusercontent.com/ANHIG/IMGTHLA/Latest/wmda/hla_nom.txt # check if we got at least a one-field (valid) allele string # NOTE: we could soften this in the future if any(ch.islower() for ch in self.allele_string if ch.isalpha()): raise MalformedHLAStringError( f"Lower-case letters found in '{self.allele_string}'. " "Allele strings must be upper-case adhering to HLA " "nomenclature." ) # NOTE: we might soften this in the future if allele_string := self.allele_string: if ( allele_string != allele_string.strip() or any(ch.isspace() for ch in allele_string) ): raise MalformedHLAStringError( "Found whitespace or invisible characters in allele " f"string '{allele_string}'." ) match = get_config().nomenclature_pattern.match(self.allele_string) if not match: raise MalformedHLAStringError( f"Invalid HLA allele string: {self.allele_string}" " String must contain valid LOCUS followed by '*'." ) # validate locus locus = match.group('locus') if not self._is_valid_locus(locus): raise MalformedHLAStringError( f"Invalid HLA allele string: '{self.allele_string}' with " f"unknown locus '{locus}'." ) return match def _is_valid_locus(self, locus: str) -> bool: """Locus validation using known loci.""" return locus in get_config().effective_valid_loci def _ard_redux(self): """Thread-safe ARD redux with caching""" redux_type = 'lgx' cache_key = (self.allele_string, redux_type) # Thread-safe cache check with self._redux_cache_lock: if cache_key in self._redux_cache: cached_result = self._redux_cache[cache_key] self.ard_redux_allele_string = cached_result['redux_string'] self.ard_redux_allele_group = cached_result['allele_group'] self.ard_redux_allele = cached_result['allele'] return try: ard = get_ard_instance() redux_string = ard.redux(self.allele_string, redux_type).strip() except InvalidAlleleError as e: # propagate allele specific exception raise e except Exception as e: # catch and re-raise any other (library specific) exceptions raise PyardLibraryError( f"Failed during allele reduction for '{self.allele_string}' " f"with redux_type '{redux_type}'.", details=str(e) ) from e # Problem with pyard seems to be inconsistent behavior of # InvalidAlleleError per redux type, for now imo we need to double # check with 'P' try: ard.redux(self.allele_string, 'P').strip() except InvalidAlleleError as e: # propagate allele specific exception raise e except Exception as e: # catch and re-raise any other (library specific) exceptions raise PyardLibraryError( f"Failed during allele reduction for '{self.allele_string}' " f"with redux_type '{redux_type}'.", details=str(e) ) from e # still we proceed with the 'lgx' redox string because that's the one # most robust on the standard two-field ARD reduction # Parse the result match = REDUX_PATTERN.match(redux_string) if not match: raise MalformedHLAStringError( "py-ard reports unexpected string not matching regex " f"'{redux_string}'." ) allele_group = None allele = None if match: allele_fields = match.group('allele_fields') field_contents = allele_fields.split(':') if len(field_contents) > 0: allele_group = field_contents[0] if len(field_contents) > 1: allele = field_contents[1] # Thread-safe caching with self._redux_cache_lock: # Double-check pattern: another thread might have computed this # while we were working if cache_key in self._redux_cache: cached_result = self._redux_cache[cache_key] self.ard_redux_allele_string = cached_result['redux_string'] self.ard_redux_allele_group = cached_result['allele_group'] self.ard_redux_allele = cached_result['allele'] else: # We're the first to compute this, cache it self.ard_redux_allele_string = redux_string self.ard_redux_allele_group = allele_group self.ard_redux_allele = allele self._redux_cache[cache_key] = { 'redux_string': redux_string, 'allele_group': allele_group, 'allele': allele }
[docs] def has_resolution_level(self) -> int: """ Returns the resolution level based on the number of parsed fields. - 4: Non-coding variant (e.g., A*01:01:01:02) - 3: Synonymous variant (e.g., A*01:01:02) - 2: Specific allele (e.g., A*01:01) - 1: Allele group (e.g., A*01) - 0: Locus only or undefined """ if self.non_coding_variant is not None: return 4 if self.synonymous_variant is not None: return 3 if self.allele is not None: return 2 if self.allele_group is not None: return 1 return 0
def __eq__(self, other): if not isinstance(other, HLA): return NotImplemented return ( self.locus == other.locus and self.drb_sub_locus == other.drb_sub_locus and self.allele_group == other.allele_group and self.allele == other.allele and self.synonymous_variant == other.synonymous_variant and self.non_coding_variant == other.non_coding_variant and self.suffix == other.suffix and self.group_code == other.group_code # TODO: or (self.reduced = other.reduced ) ) def __hash__(self): return hash( ( self.locus, self.drb_sub_locus, self.allele_group, self.allele, self.synonymous_variant, self.non_coding_variant, self.suffix, self.group_code ) ) def __repr__(self): return ( f"HLA(allele_string={repr(self.allele_string)}, " f"locus={repr(self.locus)}, " f"allele_group={repr(self.allele_group)}, " f"allele={repr(self.allele)}, " f"synonymous_variant={repr(self.synonymous_variant)}, " f"non_coding_variant={repr(self.non_coding_variant)}, " f"suffix={repr(self.suffix)}, " f"group_code={repr(self.group_code)}, " f"ard_redux_allele_string={repr(self.ard_redux_allele_string)}, " f"ard_redux_allele_group={repr(self.ard_redux_allele_group)}, " f"ard_redux_allele={repr(self.ard_redux_allele)})" )