Source code for pore_c.analyses.reference

import re
from pathlib import Path
from typing import List, Pattern, Tuple

import numpy as np
import pandas as pd
from pandas import DataFrame

from pore_c.datasources import IndexedFasta
from pore_c.utils import kmg_bases_to_int

from ..config import PQ_ENGINE, PQ_VERSION
from ..model import FragmentRecord, FragmentRecordDf

# complement translation table with support for regex punctuation
COMPLEMENT_TRANS = str.maketrans("ACGTWSMKRYBDHVNacgtwsmkrybdhvn-)(][", "TGCAWSKMYRVHDBNtgcawskmyrvhdbn-()[]")

# translate degenerate bases to regex set
DEGENERATE_TRANS = str.maketrans(
    dict(
        [
            ("N", "[ACGT]"),
            ("V", "[ACG]"),
            ("H", "[ACT]"),
            ("D", "[AGT]"),
            ("B", "[CGT]"),
            ("W", "[AT]"),
            ("S", "[CG]"),
            ("M", "[AC]"),
            ("K", "[GT]"),
            ("R", "[AG]"),
            ("Y", "[CT]"),
        ]
    )
)


[docs]def create_virtual_digest( reference_fasta: Path, digest_type: str, digest_param: str, fragments: Path = None, digest_stats: Path = None ) -> Tuple[FragmentRecordDf, pd.DataFrame]: """Iterate over the sequences in a fasta file and find the match positions for the restriction fragment""" # convert the sequences to a dask bag ff = IndexedFasta(reference_fasta) seq_bag = ff.to_dask() dtype = FragmentRecord.pandas_dtype(overrides={"chrom": pd.CategoricalDtype(ff._chroms, ordered=True)}) frag_df = ( pd.concat( seq_bag.map(lambda x: (x["seqid"], x["seq"], digest_type, digest_param)) .starmap(create_fragment_dataframe) .compute() ) .astype(dict(chrom=dtype["chrom"])) .sort_values(["chrom", "start"]) .assign(fragment_id=lambda x: np.arange(len(x), dtype=int) + 1) .astype(dtype) ) frag_df.to_parquet(str(fragments), engine=PQ_ENGINE, index=False, version=PQ_VERSION) summary_stats_df = ( frag_df.groupby("chrom")["fragment_length"] .agg(["size", "mean", "median", "min", "max"]) .fillna(-1) .astype({"size": int, "min": int, "max": int}) .rename(columns={"size": "num_fragments"}) ) summary_stats_df.to_csv(digest_stats) return frag_df, summary_stats_df
[docs]def revcomp(seq: str) -> str: """Return the reverse complement of a string:""" return seq[::-1].translate(COMPLEMENT_TRANS)
[docs]def replace_degenerate(pattern: str) -> str: """Replace degenerate bases with regex set""" return pattern.translate(DEGENERATE_TRANS)
[docs]def create_regex(pattern: str) -> Pattern: """Takes a raw restriction digest site in the form of a regular expression string and returns a regular expression object consisting of both forward and reverse complement versions of the pattern""" site_raw = pattern.replace("(", " ").replace(")", " ").replace(" ", "").replace("|", " ").split() sites_raw = [] for entry in sites_raw: sites_raw.append(revcomp(entry)) sites_raw += site_raw if len(sites_raw) > 1: fwd_rev_pattern = "(" + "|".join(sorted(list(set(sites_raw)))) + ")" else: fwd_rev_pattern = "(" + sites_raw[0] + ")" ### fwd_rev_pattern = replace_degenerate(fwd_rev_pattern) try: regex = re.compile(fwd_rev_pattern) except Exception as exc: raise ValueError(f"Error compiling regex for pattern {pattern}, redundance form: {fwd_rev_pattern}\n{exc}") return regex
[docs]def find_fragment_intervals(digest_type: str, digest_param: str, seq: str) -> List[int]: """Finds the start positions of all matches of the regex in the sequence""" if digest_type == "regex": regex = create_regex(digest_param) positions = find_site_positions_regex(regex, seq) elif digest_type == "bin": bin_width = kmg_bases_to_int(digest_param) positions = find_site_positions_bins(bin_width, seq) elif digest_type == "enzyme": positions = find_site_positions_biopython(digest_param, seq) intervals = to_intervals(positions, len(seq)) return intervals
[docs]def to_intervals(positions: List[int], chrom_length: int): prefix, suffix = [], [] if (len(positions) == 0) or positions[0] != 0: prefix = [0] if (len(positions) == 0) or positions[-1] != chrom_length: suffix = [chrom_length] endpoints = np.array(prefix + positions + suffix) return {"start": endpoints[:-1], "end": endpoints[1:]}
[docs]def find_site_positions_bins(bin_width, seq: str) -> List[int]: """Mimic a fixed-width sequence digest by returning the positions of fixed-width bin boundaries""" if len(seq) < bin_width: return [] else: positions = list(range(0, len(seq), bin_width)) return positions
[docs]def find_site_positions_regex(regex: Pattern, seq: str) -> List[int]: """Finds the start positions of all matches of the regex in the sequence""" positions = [m.start() for m in regex.finditer(seq.upper())] return positions
[docs]def find_site_positions_biopython(enzyme: str, seq: str) -> List[int]: from Bio import Restriction from Bio.Alphabet.IUPAC import IUPACAmbiguousDNA from Bio.Seq import Seq enz = getattr(Restriction, enzyme, None) if enz is None: raise ValueError(f"Enzyme not found: {enzyme}") s = Seq(seq, IUPACAmbiguousDNA()) positions = [_ - 1 for _ in enz.search(s)] return positions
[docs]def create_fragment_dataframe(seqid: str, seq: str, digest_type: str, digest_param: str) -> DataFrame: """Iterate over the sequences in a fasta file and find the match positions for the restriction fragment""" intervals = ( DataFrame(find_fragment_intervals(digest_type, digest_param, seq)) .assign(chrom=seqid) .eval("fragment_length = end - start") ) return intervals