Source code for tombo.tombo_helper

from __future__ import division, unicode_literals, absolute_import

from builtins import int, range, dict, map, zip

import os
import io
import re
import sys
import queue
import random

# Future warning from cython in h5py
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import h5py

import numpy as np

from time import sleep
from time import strftime
from operator import itemgetter
from itertools import repeat, islice
from multiprocessing import Process, Queue
from collections import defaultdict, namedtuple

if sys.version_info[0] > 2:
    unicode = str

# import tombo functions
from ._version import TOMBO_VERSION
from ._default_parameters import PHRED_BASE, DNA_SAMP_TYPE, RNA_SAMP_TYPE
from ._c_helper import (
    c_new_mean_stds, c_new_means, c_valid_cpts_w_cap,
from ._c_dynamic_programming import (
    c_banded_traceback, c_adaptive_banded_forward_pass)

# list of classes/functions to include in API
__all__ = [
    'readData', 'intervalData', 'TomboReads',
    'TomboMotif', 'parse_motif_descs',
    'resquiggleParams', 'startClipParams',
    'resquiggleResults', 'alignInfo', 'genomeLocation',
    'sequenceData', 'dpResults', 'scaleValues', 'Fasta', 'seqSampleType',
    'get_seq_sample_type', 'get_raw_read_slot',
    'get_single_slot_read_centric', 'get_multiple_slots_read_centric']


# single base conversion for motifs
    'A':'A', 'C':'C', 'G':'G', 'T':'T', 'B':'[CGT]',
    'D':'[AGT]', 'H':'[ACT]', 'K':'[GT]', 'M':'[AC]',
    'N':'[ACGT]', 'R':'[AG]', 'S':'[CG]', 'V':'[ACG]',
    'W':'[AT]', 'Y':'[CT]'}
INVALID_BASES = re.compile('[^ACGT]')
INVALID_BASE_RUNS = re.compile('[^ACGT]+')

###### Custom TomboError ######

class TomboError(Exception):

###### Cython Wrapper Functions ######

# can't import TomboError in cython so wrap these functions in python
def valid_cpts_w_cap(*args, **kwargs):
        valid_cpts = c_valid_cpts_w_cap(*args, **kwargs)
    except NotImplementedError as e:
        raise TomboError(unicode(e))
    return valid_cpts

def valid_cpts_w_cap_t_test(*args, **kwargs):
        valid_cpts = c_valid_cpts_w_cap_t_test(*args, **kwargs)
    except NotImplementedError as e:
        raise TomboError(unicode(e))
    return valid_cpts

def banded_traceback(*args, **kwargs):
        return c_banded_traceback(*args, **kwargs)
    except NotImplementedError as e:
        raise TomboError(unicode(e))

def adaptive_banded_forward_pass(*args, **kwargs):
        return c_adaptive_banded_forward_pass(*args, **kwargs)
    except NotImplementedError as e:
        raise TomboError(unicode(e))

###### Global Namedtuples ######

[docs]class alignInfo(namedtuple( 'alignInfo', ('ID', 'Subgroup', 'ClipStart', 'ClipEnd', 'Insertions', 'Deletions', 'Matches', 'Mismatches'))): """Information from genomic read alignment Args: ID (str): read identifier Subgroup (str): sub-group location containing read information (e.g. 'BaseCalled_template') ClipStart (int): number of bases clipped from beginning of basecalls ClipEnd (int): number of bases clipped from end of basecalls Insertions (int): number of inserted bases in alignment Deletions (int): number of delected bases in alignment Matches (int): number of matched bases in alignment Mismatches (int): number of mis-matched bases in alignment """
# TODO convert rna to rev_sig
[docs]class readData(namedtuple('readData', ( 'start', 'end', 'filtered', 'read_start_rel_to_raw', 'strand', 'fn', 'corr_group', 'rna', 'sig_match_score', 'mean_q_score', 'read_id'))): """Nanopore read meta-data Example:: r_data = tombo_helper.readData( start=0, end=1000, filtered=False, read_start_rel_to_raw=100, strand='+', fn='path/to/read.fast5', corr_group='RawGenomeCorrected_000', rna=False, sig_match_score=1.0, mean_q_score=10.0) # output is a list of readData objects reads_index = tombo_helper.TomboReads(['test_data/native_reads',]) cs_reads = reads_index.get_cs_reads('chr20', '+') Args: start (int): 0-based start mapped position end (int): 0-based open interval end mapped position filtered (bool): is this read filtered? read_start_rel_to_raw (int): start (in raw signal vector) of assigned bases strand (str): read mapped strand ('+', '-' or None) fn (str): raw read filename. corr_group (str): --corrected-group slot specified in re-squiggle command rna (bool): is this read RNA? sig_match_score (float): signal matching score. Default: None mean_q_score (float): mean basecalling q-score. Default: None read_id (str): read identifier Default: None """
# set default values for sig_match_score, q_score and read_id readData.__new__.__defaults__ = (None, None, None)
[docs]class scaleValues(namedtuple( 'scaleValues', ( 'shift', 'scale', 'lower_lim', 'upper_lim', 'outlier_thresh'))): """Signal normaliztion scaling parameters. For details see Args: shift (float): shift scaling parameter scale (float): scale scaling parameter lower_lim (float): lower windsorizing threshold upper_lim (float): upper windsorizing threshold outlier_thresh (float): outlier threshold used to define `lower_lim` and `upper_lim` """
[docs]class resquiggleParams(namedtuple( 'resquiggleParams', ('match_evalue', 'skip_pen', 'bandwidth', 'max_half_z_score', 'running_stat_width', 'min_obs_per_base', 'raw_min_obs_per_base', 'mean_obs_per_event', 'z_shift', 'stay_pen', 'use_t_test_seg', 'band_bound_thresh', 'start_bw', 'start_save_bw', 'start_n_bases'))): """Re-squiggle parameters Args: match_evalue (float): expected value for matching event to sequence skip_pen (float): penalty for skipped sequence position bandwidth (int): adaptive bandwidth max_half_z_score (float): windsorize half z-scores above this value running_stat_width (int): running neighboring window width for segmentation scoring min_obs_per_base (int): minimum observations per genomic base mean_obs_per_event (int): mean number of raw obs. per event during segmentation z_shift (float): amount to shift z-scores for DP (derived from match_evalue) stay_pen (float): stay penalty for DP (derived from match_evalue) use_t_test_seg (bool): use t-test segmentation criterion (default: raw neighboring window difference) band_bound_thresh (int): bandwidth boundary threshold for determining if a read potentially left the adaptive bandwidth start_bw (int): bandwidth for read start identification start_save_bw (int): save bandwidth for read start identification (if start_bw_fails) start_n_bases (int): number of genomic bases to use for read start identification """
# set default values for start params resquiggleParams.__new__.__defaults__ = (None, None, None) class trimRnaParams(namedtuple( 'trimRnaParams', ('moving_window_size', 'min_running_values', 'thresh_scale', 'max_raw_obs'))): """Parameters to trim RNA adapters. """ class stallParams(namedtuple( 'stallParams', ('window_size', 'threshold', 'min_consecutive_obs', 'edge_buffer', # percentile stall method params 'lower_pctl', 'upper_pctl', # mean windows stall method params 'mini_window_size', 'n_windows'))): """Parameters to identify RNA stalls """ # set method specific params to None stallParams.__new__.__defaults__ = (None,) * 4
[docs]class startClipParams(namedtuple( 'startClipParams', ('bandwidth', 'num_genome_bases'))): """Parameters to identify read start using bases clipped from mapping Args: bandwidth (int): bandwidth num_genome_bases (int): number of genome bases """
[docs]class resquiggleResults(namedtuple( 'resquiggleResults', ('align_info', 'genome_loc', 'genome_seq', 'mean_q_score', 'raw_signal', 'channel_info', 'read_start_rel_to_raw', 'segs', 'scale_values', 'sig_match_score', 'norm_params_changed', 'start_clip_bases', 'stall_ints'))): """Re-squiggle results Args: align_info (:class:`tombo.tombo_helper.alignInfo`): read alignment information genome_loc (:class:`tombo.tombo_helper.genomeLocation`): genome mapping location genome_seq (str): mapped genome sequence mean_q_score (float): mean basecalling q-score raw_signal (np.array::np.float64): raw signal (i.e. un-segmented; signal may be normalized) (optional) channel_info (:class:`tombo.tombo_helper.channelInfo`): channel information (optional) read_start_rel_to_raw (int): read start within raw signal (optional) segs (np.array::np.int64): relative raw signal segment positions (optional) scale_values (:class:`tombo.tombo_helper.scaleValues`): signal normalization scale values (optional) sig_match_score (float): expected to observed signal score (see :class:`tombo.tombo_stats.get_read_seg_score`; optional) norm_params_changed (bool): were scale parameters updated (optional) start_clip_bases (str): mapping clipped bases from start of read (optional) stall_ints (list): list of rna stall locations within raw signal (optional) """
# set default None values for when just mapping results are included resquiggleResults.__new__.__defaults__ = (None,) * 9
[docs]class dpResults(namedtuple( 'dpResults', ('read_start_rel_to_raw', 'segs', 'ref_means', 'ref_sds', 'genome_seq'))): """Dynamic programming results Args: read_start_rel_to_raw (int): read start within raw signal segs (np.array::np.int64): relative raw signal segment positions ref_means (np.array::np.float64): expected signal levels ref_sds (np.array::np.float64): expected SD of signal levels genome_seq (str): mapped genome sequence """
[docs]class genomeLocation(namedtuple('genomeLocation', ('Start', 'Strand', 'Chrom'))): """Genomic location Args: Start (int): 0-based genomic start position Strand (str): Strand (should be '+' or '-') Chrom (str): Chromosome """
[docs]class sequenceData(namedtuple('sequenceData', ('seq', 'id', 'mean_q_score'))): """Read sequence data from FASTQ record Args: seq (str): read sequence id (str): read id (extracted from FASTQ record line) mean_q_score (float): mean q-score """
class channelInfo(namedtuple( 'channelInfo', ('offset', 'range', 'digitisation', 'number', 'sampling_rate'))): """Read channel information Args: offset (float): offset parameter range (float): range parameter digitisation (float): digitisation parameter number (int): channel number sampling_rate (int): number of raw samples per second """ class regionStats(namedtuple( 'regionStats', ('reg_frac_standard_base', 'reg_poss', 'chrm', 'strand', 'start', 'reg_cov', 'ctrl_cov', 'valid_cov'))): """Region statistics Args: reg_frac_standard_base (np.array::np.float64): fraction of standard bases reg_poss (np.array::np.int64): positions for reported fractions chrm (str): chromosome name strand (str): strand (should be '+' or '-') start (int): 0-based region start reg_cov (np.array::np.int64): region read depth ctrl_cov (np.array::np.int64): region control sample read depth valid_cov (np.array::np.int64): region valid (tested) read depth """ class groupStats(namedtuple( 'groupStats', ('reg_stats', 'reg_poss', 'chrm', 'strand', 'start', 'reg_cov', 'ctrl_cov'))): """Region statistics Args: reg_stats (np.array::np.float64): statistic for group comparison reg_poss (np.array::np.int64): positions for reported fractions chrm (str): chromosome name strand (str): strand (should be '+' or '-') start (int): 0-based region start reg_cov (np.array::np.int64): region read depth ctrl_cov (np.array::np.int64): region control sample read depth """
[docs]class seqSampleType(namedtuple( 'seqSampleType', ('name', 'rev_sig'))): """Description of a sequencing sample type Args: name (str): name of the sequencing sample rev_sig (bool): Is the raw signal reversed (3' to 5') """
###################################### ###### Various Helper Functions ###### ###################################### def status_message(message, indent=False): pre_str = '\t' if indent else '' sys.stderr.write(pre_str + strftime('[%H:%M:%S] ') + message + '\n') sys.stderr.flush() return def warning_message(message): sys.stderr.write( '*' * 20 + ' WARNING ' + '*' * 20 + '\n\t' + message + '\n') sys.stderr.flush() return def error_message_and_exit(message): sys.stderr.write( '*' * 20 + ' ERROR ' + '*' * 20 + '\n\t' + message + '\n') sys.exit() return def resolve_path(fn_path): """Helper function to resolve relative and linked paths that might give other packages problems. """ return os.path.realpath(os.path.expanduser(fn_path)) COMP_BASES = dict(zip(map(ord, 'ACGT'), map(ord, 'TGCA'))) def comp_seq(seq): """Complement DNA sequence """ return seq.translate(COMP_BASES) def rev_comp(seq): """Reverse complement DNA sequence """ return seq.translate(COMP_BASES)[::-1] def invalid_seq(seq): return bool( U_TO_T = {ord('U'):ord('T')} def rev_transcribe(seq): """Convert U bases to T """ return seq.translate(U_TO_T) def get_mean_q_score(read_q): if sys.version_info[0] > 2: return np.mean([q_val - PHRED_BASE for q_val in read_q.encode('ASCII')]) return np.mean([ord(q_val) - PHRED_BASE for q_val in read_q.encode('ASCII')]) def get_chrm_sizes(reads_index, ctrl_reads_index=None): """Get covered chromosome sizes from a set of reads """ strand_chrm_sizes = defaultdict(list) for (chrm, strand), cs_read_cov in reads_index: try: strand_chrm_sizes[chrm].append(max( r_data.end for r_data in cs_read_cov)) except ValueError: continue if ctrl_reads_index is not None: for (chrm, strand), cs_read_cov in ctrl_reads_index: try: strand_chrm_sizes[chrm].append(max( r_data.end for r_data in cs_read_cov)) except ValueError: continue chrm_sizes = {} for chrm, strnd_sizes in strand_chrm_sizes.items(): try: chrm_sizes[chrm] = max(strnd_sizes) except ValueError: continue return chrm_sizes def parse_genome_locations(genome_locs, default_strand=None): """Parse genome location strings and convert to 0-based coordinates """ parsed_locs = [] for chrm_pos_strand in genome_locs: # strip off any quotes and return up to the first 3 values split_vals = chrm_pos_strand.replace('"', '').replace( "'", "").split(':')[:3] # default to plus strand if not specified if len(split_vals) == 1: error_message_and_exit( 'Invalid genome location provided: ' + chrm_pos_strand + '\n\t\tTry adding quotation marks around specified genome ' + 'locations (especially for sequence identifiers with ' + 'special characters).') elif len(split_vals) == 2: parsed_locs.append(( split_vals[0], int(split_vals[1]) - 1, default_strand)) else: parsed_locs.append(( split_vals[0], int(split_vals[1]) - 1, split_vals[2])) return parsed_locs def parse_genome_regions(all_regs_text): parsed_regs = defaultdict(list) include_whole_chrms = set() for reg_text in all_regs_text: try: chrm_reg = reg_text.replace('"', '').replace("'", "").split(':') if len(chrm_reg) == 1: chrm = chrm_reg[0] reg_pos = None include_whole_chrms.add(chrm) elif len(chrm_reg) == 2: chrm, reg_pos = chrm_reg reg_pos = list(map(lambda x: int(x.replace(',','')), reg_pos.split('-'))) else: raise TomboError('Invalid region text provided.') except: error_message_and_exit( 'Invalid [--include-region] format.') parsed_regs[chrm].append(reg_pos) parsed_regs = dict(parsed_regs) for chrm in include_whole_chrms: parsed_regs[chrm] = None return parsed_regs def parse_locs_file(locs_fn): """Parse BED files containing genomic locations (assumes single base locations, so end coordinate is ignored). """ n_added, n_failed = 0, 0 raw_locs = defaultdict(set) with open(locs_fn) as locs_fp: for line in locs_fp: try: chrm, pos, _, _, _, strand = line.split()[:6] # bed specs indicate 0-based start so don't shift here pos = int(pos) raw_locs[(chrm, strand)].add(pos) n_added += 1 except: n_failed += 1 continue if n_failed > 0: if n_added > 0: warning_message(( 'Some provided locations ({}) were incorrectly formatted. ' + 'Ensure that provided file ({}) is in BED format.').format( n_failed, locs_fn)) else: warning_message(( 'All provided locations from {} were incorrectly formatted. ' + 'Ensure that provided file is in BED format.').format( locs_fn)) return dict((cs, np.array(sorted(cs_poss))) for cs, cs_poss in raw_locs.items()) def parse_obs_filter(obs_filter): """Parse observations per base formatted filtering """ if len(obs_filter) < 1: return None # parse obs_filter try: obs_filter = [list(map(int, pctl_nobs.split(':'))) for pctl_nobs in obs_filter] except: raise TomboError('Invalid format for observation filter') if any(pctl < 0 or pctl > 100 for pctl in map(itemgetter(0), obs_filter)): error_message_and_exit('Invalid percentile value.') return obs_filter def get_seq_kmers(seq, kmer_width, rev_strand=False): """Compute expected signal levels for a sequence from a reference model Args: seq (str): genomic seqeunce to be converted to expected signal levels kmer_width (int): k-mer width rev_strand (bool): provided sequence is from reverse strand (so flip return order to genome forward direction) """ seq_kmers = [seq[i:i + kmer_width] for i in range(len(seq) - kmer_width + 1)] # get stat lookups from seq on native strand then flip if rev_strand if rev_strand: seq_kmers = seq_kmers[::-1] return seq_kmers
[docs]class TomboMotif(object): """Description of a sequence motif, including potentially modified position Attributes: raw_motif (str): input raw motif string motif_len (int): length of motif motif_pat (``re.compile``): python regular expression for this motif rev_comp_pat (``re.compile``): python regular expression for reverse complement of this motif is_palindrome (bool): is the motif palindromic (in the genomic not literary definition) mod_pos (int): modified base position within motif mod_base (str): modified base (should generally be A, C, G or T) .. automethod:: __init__ """ def _parse_motif(self, raw_motif, rev_comp_motif=False): conv_motif = ''.join(SINGLE_LETTER_CODE[letter] for letter in raw_motif) if rev_comp_motif: # reverse complement and then flip any group brackets conv_motif = rev_comp(conv_motif).translate({ ord('['):']', ord(']'):'['}) return re.compile(conv_motif) def _compute_partial_patterns(self): """Compute patterns for partial matches that include the mod_pos at the start, end or within short sequences. Key into _partial_pats with: 1) whether searching at start, end or within a short sequence 2) length of the partial pattern Values are compiled partial pattern and mod_pos within pattern. Short patterns are lists. """ self._partial_pats = {'start':{}, 'end':{}, 'short':{}} for offset in range(self.mod_pos - 1): self._partial_pats['start'][ self.motif_len - offset - 1] = (self._parse_motif( self.raw_motif[offset + 1:]), self.mod_pos - offset - 1) for offset in range(self.motif_len - self.mod_pos): self._partial_pats['end'][ self.motif_len - offset - 1] = (self._parse_motif( self.raw_motif[:-(offset + 1)]), self.mod_pos) for short_len in range(1, self.motif_len): self._partial_pats['short'][short_len] = [ (self._parse_motif(self.raw_motif[offset:offset + short_len]), self.mod_pos - offset) for offset in range( max(0, self.mod_pos - short_len), min(self.motif_len - short_len + 1, self.mod_pos))] return
[docs] def __init__(self, raw_motif, mod_pos=None): """Parse string motif Args: raw_motif (str): sequence motif. supports IUPAC single letter codes (use T for RNA). mod_pos (int): 1-based position of modified base within the motif """ # TODO convert mod_pos to 0-based coordinate # (1-based is much more error prone) invalid_chars = re.findall( '[^' + ''.join(SINGLE_LETTER_CODE) + ']', raw_motif) if len(invalid_chars) > 0: error_message_and_exit( 'Invalid characters in motif: ' + ', '.join(invalid_chars)) # basic motif parsing self.raw_motif = raw_motif self.motif_len = len(raw_motif) self.motif_pat = self._parse_motif(raw_motif) self.rev_comp_pat = self._parse_motif(raw_motif, True) self.is_palindrome = self.motif_pat == self.rev_comp_pat # parse modified position from motif if provided self.mod_pos = mod_pos if mod_pos is None: self.mod_base = None else: assert 0 < mod_pos <= self.motif_len self.mod_base = raw_motif[mod_pos - 1] if INVALID_BASES.match(self.mod_base): warning_message( 'Provided modified position is not a single base, which ' + 'is likely an error. Specified modified base is one of: ' + ' '.join(SINGLE_LETTER_CODE[self.mod_base][1:-1])) self._compute_partial_patterns()
def __repr__(self): return '\n'.join(('Raw Motif:\t' + self.raw_motif, 'Mod Position:\t' + str(self.mod_pos), 'Motif Pattern:\t' + str(self.motif_pat), 'Rev Comp Pattern:\t' + str(self.rev_comp_pat)))
[docs] def matches_seq(self, seq): """Does the motif match provided sequence (including mod_pos within seq)? Including partial matches at beginning and end that include mod_pos. """ # check matches to start of sequence for start_len in range(1, min(len(seq) + 1, self.motif_len)): try: start_pat, start_mod_pos = self._partial_pats[ 'start'][start_len] except KeyError: continue if start_pat.match(seq[:start_len]): return True # check central sequence overlaps if len(seq) < self.motif_len: for short_pat, mod_pos in self._partial_pats['short'][len(seq)]: if short_pat.match(seq): return True else: if return True # check end of seq matches for end_len in range(1, min(len(seq) + 1, self.motif_len)): try: end_pat, end_mod_pos = self._partial_pats['end'][end_len] except KeyError: continue if end_pat.match(seq[-end_len:]): return True return False
[docs] def find_mod_poss(self, seq): """Find all mod-base positions within the sequence. Including partial matches at beginning and end that include mod_pos. """ seq_mod_poss = set() # check matches to start of sequence for start_len in range(1, min(len(seq) + 1, self.motif_len)): try: start_pat, start_mod_pos = self._partial_pats[ 'start'][start_len] except KeyError: continue if start_pat.match(seq[:start_len]): seq_mod_poss.add(start_mod_pos) # check central sequence overlaps if len(seq) < self.motif_len: for short_pat, short_mod_pos in self._partial_pats[ 'short'][len(seq)]: if short_pat.match(seq): seq_mod_poss.add(short_mod_pos) else: for motif_match in self.motif_pat.finditer(seq): seq_mod_poss.add(motif_match.start() + self.mod_pos) # check end of seq matches for end_len in range(1, min(len(seq) + 1, self.motif_len)): try: end_pat, end_mod_pos = self._partial_pats['end'][end_len] except KeyError: continue if end_pat.match(seq[-end_len:]): seq_mod_poss.add(len(seq) - end_len + end_mod_pos) return sorted(seq_mod_poss)
[docs]def parse_motif_descs(stat_motif_descs): """Parse string motif descriptions as defined by ``tombo plot roc --motif-descriptions`` Args: stat_motif_descs (str): string motifs description (see ``tombo plot roc --motif-descriptions``) Returns: list of tuples with :class:`tombo.tombo_helper.TomboMotif` and motif/modification names """ parsed_motif_descs = [] try: for motif_desc in stat_motif_descs.split('::'): raw_motif, mod_pos, mod_name = motif_desc.split(':') motif = TomboMotif(raw_motif, int(mod_pos)) parsed_motif_descs.append((motif, mod_name)) except: error_message_and_exit( 'Invalid motif decriptions format. Format descriptions as: ' + '"motif:mod_pos:name[::motif2:mod_pos2:name2...]".') return parsed_motif_descs
########################### ###### FASTA Parsing ###### ########################### def get_rec_names(fasta_fn): with as fasta_fp: all_rec_ids = [line.replace(">","").split()[0] for line in fasta_fp if line.startswith('>')] return all_rec_ids
[docs]class Fasta(object): """Fasta file sequence format wrapper class. Will load faidx via ``pyfaidx`` package if installed, else the fasta will be loaded into memory for sequence extraction. .. automethod:: __init__ """ def _load_in_mem(self): genome_index = {} curr_id = None curr_seq = [] with as fasta_fp: for line in fasta_fp: if line.startswith('>'): if (curr_id is not None and curr_seq is not []): genome_index[curr_id] = ''.join(curr_seq) curr_seq = [] curr_id = line.replace(">","").split()[0] else: curr_seq.append(line.strip()) # add last record if (curr_id is not None and curr_seq is not []): genome_index[curr_id] = ''.join(curr_seq) return genome_index def _index_contains_uridines(self, n_chrms=10, n_bases=1000): if self.has_pyfaidx: # check first N bases of the first M chrms for U characters for chrm in islice(self.index.index, n_chrms): if'U', self.get_seq( chrm, 1, n_bases, error_end=False)): return True else: for chrm in islice(self.index.keys(), n_chrms): if'U', self.get_seq( chrm, 1, n_bases, error_end=False)): return True return False
[docs] def __init__(self, fasta_fn, dry_run=False, force_in_mem=False, assume_dna_base=False): """Load a fasta Args: fasta_fn (str): path to fasta file dry_run (bool): when pyfaidx is not installed, don't actually read sequence into memory. force_in_mem (bool): force genome to be loaded into memory even if pyfaidx is installed allowing on-disk access assume_dna_bases (bool): skip check for DNA or RNA bases (default: False) """ self.fasta_fn = resolve_path(fasta_fn) self.has_rna_bases = False try: if force_in_mem: raise ImportError import pyfaidx self.has_pyfaidx = True try: self.index = pyfaidx.Faidx(self.fasta_fn) except UnicodeDecodeError: error_message_and_exit( 'FASTA file does not appear to be formatted correctly.') except ImportError: self.has_pyfaidx = False if not dry_run: self.index = self._load_in_mem() if not dry_run: self.has_rna_bases = (assume_dna_base or self._index_contains_uridines())
[docs] def get_seq(self, chrm, start=None, end=None, error_end=True): """Extract sequence from a specific genomic region. Note if provided, start and end must both be provided or they will be ignored. Args: chrm (str): chromosome name start (int): 0-based start position end (int): 0-based open-interval end position error_end (bool): raise an error when the region requested extends beyond the chromosome (default: True) Returns: Genomic sequence requested. Sequence is converted to RNA sequence if applicable. """ if self.has_pyfaidx: if not (start or end): r_seq = self.index.fetch( chrm, 1, self.index.index[chrm].rlen).seq.upper() elif (start < 0 or start > self.index.index[chrm].rlen or ( error_end and ( end < 0 or end > self.index.index[chrm].rlen))): raise TomboError( 'Encountered invalid genome sequence request.') else: r_seq = self.index.fetch(chrm, start + 1, end).seq.upper() else: if (start is not None and ( start < 0 or start > len(self.index[chrm]))) or ( error_end and end is not None and (end < 0 or end > len(self.index[chrm]))): raise TomboError( 'Encountered invalid genome sequence request.') r_seq = self.index[chrm][start:end].upper() if self.has_rna_bases: r_seq = rev_transcribe(r_seq) return r_seq
[docs] def iter_chrms(self): """Iterate over chromosome names """ if self.has_pyfaidx: for chrm in self.index.index: yield unicode(chrm) else: for chrm in self.index: yield chrm
def __contains__(self, chrm): if self.has_pyfaidx: return chrm in self.index.index return chrm in self.index
############################################# ###### Automatic Sample-Type Detection ###### ############################################# def is_read_rna(fast5_data): """Determine if a read is RNA or DNA Args: fast5_data (`h5py.File`) Returns: Boolean, incdicating whether the read appears to be RNA or DNA Note: This function uses the read meta-data and so non-standard processing pipelines may not prodcue expected values. """ # check both experiment type and kit slots for "rna" exp_type = fast5_data['UniqueGlobalKey/context_tags'].attrs.get( 'experiment_type') try: exp_type = exp_type.decode() # remove the word internal since it contains rna. exp_type = exp_type.replace('internal', '') except (AttributeError, TypeError): pass exp_kit = fast5_data['UniqueGlobalKey/context_tags'].attrs.get( 'experiment_kit') try: exp_kit = exp_kit.decode() # remove the word internal since it contains rna. exp_kit = exp_kit.replace('internal', '') except (AttributeError, TypeError): pass if exp_type is None and exp_kit is None: return False return ( (exp_type is not None and'rna', exp_type) is not None) or (exp_kit is not None and'rna', exp_kit) is not None)) def is_sample_rna(reads_index=None, fast5_fns=None, n_reads=50): """Determine if a set of reads are RNA or DNA from a small sample. Must provide either reads_index or fast5_fns. Args: reads_index (:class:`tombo.tombo_helper.TomboReads`) fast5_fns (list): list of fast5 read filename n_reads (int): number of reads to check (default: 50) Returns: False if any read does not appear to be an RNA read (see :class:`tombo.tombo_helper.is_read_rna`) else return True. """ proc_reads = 0 if reads_index is not None: for r_data in reads_index.iter_reads(): if not r_data.rna: return False proc_reads += 1 if proc_reads >= n_reads: break elif fast5_fns is not None: for fast5_fn in fast5_fns: try: with h5py.File(fast5_fn, 'r') as fast5_data: if not is_read_rna(fast5_data): return False proc_reads += 1 except: continue if proc_reads >= n_reads: break else: raise TomboError( 'Must provide either reads_index or fast5_fns to ' + 'determine is sample is RNA.') return True
[docs]def get_seq_sample_type(fast5_data=None, reads_index=None, fast5_fns=None, num_reads=50): """Get the sequencing sample type from a single read or set of reads Args: fast5_data (`h5py.File`): open read h5py File object reads_index (:class:`tombo.tombo_helper.TomboReads`) fast5_fns (list): FAST5 read filenames num_reads (int): sample of reads to check for sample type """ if fast5_data is None and reads_index is None and fast5_fns is None: raise TomboError('Must provide either fast5_data, reads_index, or ' + 'fast5_fns to determine sequencing sample type.') if fast5_data is None: return seqSampleType(RNA_SAMP_TYPE, True) if is_sample_rna( reads_index, fast5_fns, num_reads) else seqSampleType( DNA_SAMP_TYPE, False) return seqSampleType(RNA_SAMP_TYPE, True) if is_read_rna( fast5_data) else seqSampleType(DNA_SAMP_TYPE, False)
############################ ###### Lock Functions ###### ############################ def get_lock_fn(fast5s_dir): """Get filename for the lock file to indicate that this directory is currently being processed. This file should be saved to be deleted later. """ # if directory comes with trailing slash, remove for processing if fast5s_dir.endswith('/'): fast5s_dir = fast5s_dir[:-1] split_dir = os.path.split(fast5s_dir) return os.path.join(split_dir[0], "." + split_dir[1] + '.tombo.lock') def _is_lock_file(lock_fn): lock_bn = os.path.split(lock_fn)[1] if (len(lock_bn) == 0 or not lock_bn.startswith('.') or not lock_bn.endswith('.tombo.lock')): return False return True ##################################### ###### FAST5 Parsing Functions ###### ##################################### def reads_contain_basecalls(fast5_fns, bc_grp, num_reads): test_fns = random.sample( fast5_fns, num_reads) if len(fast5_fns) > num_reads else fast5_fns for fast5_fn in test_fns: try: with h5py.File(fast5_fn, 'r') as fast5_data: fast5_data['/Analyses/' + bc_grp] except: continue # if the basecall group is accessible for a single file return true return True # else if all tested reads did not contain the basecall group return False return False def get_files_list(fast5s_dir): """Get all fast5 files recursively below this directory """ all_fast5s = [] # walk through directory structure searching for fast5 files for root, _, fns in os.walk(fast5s_dir): for fn in fns: if not fn.endswith('.fast5'): continue all_fast5s.append(os.path.join(root, fn)) return all_fast5s def clear_tombo_locks(lock_fns): """Clear all lock files """ for lock_fn in lock_fns: # safegaurd against incorrect file passed to this function if not _is_lock_file(lock_fn): continue # try to remove lock files so that is some have been corrupted or # otherwise cause an error at least all those accessible will be cleared try: os.remove(lock_fn) except: pass return def get_files_list_and_lock_dirs(fast5s_dir, ignore_locks): """Get all fast5 files recursively below this directory and add a Tombo lock file to indicate that this directory is currently being re-squiggled """ ignore_locks_mess = ( 'This set of reads is currently being processed by another ' + 'resquiggle command. Multiple resquiggle commands cannot be ' + 'run concurrently on a set of reads to avoid corrupting ' + 'read files. If you are sure this set of reads is not being ' + 'processed by another command (usually caused by previous ' + 'unexpected exit) set the --ignore-read-locks flag.') all_fast5s = [] lock_fns = [] try: # walk through directory structure searching for fast5 files for root, _, fns in os.walk(fast5s_dir): lock_fn = get_lock_fn(root) if not ignore_locks and os.path.exists(lock_fn): clear_tombo_locks(lock_fns) error_message_and_exit(ignore_locks_mess) lock_fns.append(lock_fn) # create empty file indicating this directory is locked open(lock_fn, 'w').close() for fn in fns: if not fn.endswith('.fast5'): continue all_fast5s.append(os.path.join(root, fn)) except: clear_tombo_locks(lock_fns) error_message_and_exit( 'Unexpected error during file enumeration. Check that you have ' + 'write permission within the specified [fast5_basedir].') return all_fast5s, lock_fns
[docs]def get_raw_read_slot(fast5_data): """Get the raw read slot from this FAST5 read file Args: fast5_data (`h5py.File`): open FAST5 read file object Example:: all_aw_signal = get_raw_read_slot(fast5_data)['Signal'][:] Returns: The HDF5 group slot containing the raw signal data. """ try: raw_read_slot = next(iter(fast5_data['/Raw/Reads'].values())) except KeyError: raise TomboError( 'Raw data is not found in /Raw/Reads/Read_[read#]. Note that ' + 'Tombo does not support multi-fast5 format.') return raw_read_slot
[docs]class TomboReads(object): """A set of reads with associated meta-data from re-squiggle processing .. automethod:: __init__ """ def _get_index_fn(self, fast5s_dir): """Get the filename for the requested directory and corrected group """ # if directory comes with trailing slash, remove for processing if fast5s_dir.endswith('/'): fast5s_dir = fast5s_dir[:-1] split_dir = os.path.split(fast5s_dir) return os.path.join(split_dir[0], "." + split_dir[1] + "." + self.corr_grp + '.tombo.index') # index building and writing class functions def _prep_for_writing(self, fast5s_dirs): assert len(fast5s_dirs) == 1, ( 'Must provide only a single FAST5 base directory when ' + 'openning for writing.') fast5s_dir = fast5s_dirs[0] fast5s_dir = (fast5s_dir if fast5s_dir.endswith('/') else fast5s_dir + '/') index_fn = self._get_index_fn(fast5s_dir) self.fast5s_dirs[fast5s_dir] = index_fn if os.path.exists(index_fn): os.remove(index_fn) # open default dict to fill with readData lists by (chrm, strand) self.reads_index = defaultdict(list) return
[docs] def add_read_data(self, chrm, strand, read_data): """Add read data to the index Args: chrm (str): chromosome name strand (str): strand ('+' or '-') read_data (:class:`tombo.tombo_helper.readData`): read information """ self.reads_index[(chrm, strand)].append(read_data) return
[docs] def replace_index(self, new_reads_index): """Replace current reads index Args: new_reads_index (dict): dictionary with (chrm, strand) pointing to lists of :class:`tombo.tombo_helper.readData` objects """ if sum(len(x) for x in new_reads_index.values()) == 0: raise TomboError('Cannot replace with an empty index.') self.reads_index = new_reads_index return
[docs] def write_index_file(self): """Write index file Note: Must be an index from only a single ``fast5s_dir`` Index filename will be: `.[fast5s_dir].[corr_grp].tombo.index` Note that this is a hidden file (starts with a ".") """ status_message('Saving Tombo reads index to file.') assert len(self.fast5s_dirs) == 1, ( 'Cannot write index for TomboReads created from more than ' + 'one base directory.') basedir, index_fn = next(iter(self.fast5s_dirs.items())) try: import cPickle as pickle except: import pickle index_data = defaultdict(list) for chrm_strand, cs_reads in self: for rd in cs_reads: # clip the basedir off the FAST5 filename in case later # functions are called from another relative path and # split corr_grp and bc_subgrp for easier filtering index_data[chrm_strand].append(( re.sub(basedir, '', rd.fn, 1), rd.start, rd.end, rd.read_start_rel_to_raw, rd.corr_group.split('/')[0], rd.corr_group.split('/')[-1], rd.filtered, rd.rna, rd.sig_match_score, rd.mean_q_score, rd.read_id)) with, 'wb') as index_fp: # note protocol 2 for py2/3 compatibility pickle.dump(dict(index_data), index_fp, protocol=2) return
# Index parsing class functions def _parse_fast5s_wo_index(self, wo_index_fast5s_dirs): """Parse re-squiggled reads data from a list of fast5 directories """ def get_read_data(read_fn, fast5_data, bc_subgrp): read_id = get_raw_read_slot(fast5_data).attrs.get('read_id') corr_data = fast5_data[ '/'.join(('/Analyses', self.corr_grp, bc_subgrp))] rna = corr_data.attrs.get('rna') rna = False if rna is None else rna align_data = dict(corr_data['Alignment'].attrs.items()) read_start_rel_to_raw = corr_data['Events'].attrs.get( 'read_start_rel_to_raw') chrm = align_data['mapped_chrom'] strand = align_data['mapped_strand'] try: chrm = chrm.decode() strand = strand.decode() except: pass return chrm, strand, readData( align_data['mapped_start'], align_data['mapped_end'], False, read_start_rel_to_raw, strand, read_fn, self.corr_grp + '/' + bc_subgrp, rna, read_id=read_id) files = [fn for fast5s_dir in wo_index_fast5s_dirs for fn in get_files_list(fast5s_dir)] dir_reads_index = defaultdict(list) for read_fn in files: try: with h5py.File(read_fn, 'r') as fast5_data: i_bc_subgrps = ( fast5_data['/Analyses/' + self.corr_grp].keys() if self.bc_subgrps is None else self.bc_subgrps) for bc_subgrp in i_bc_subgrps: chrm, strand, r_data = get_read_data( read_fn, fast5_data, bc_subgrp) dir_reads_index[(chrm, strand)].append(r_data) except: # ignore errors and process all reads that don't error continue return dict(dir_reads_index) def _load_index_data(self, fast5s_dir): try: import cPickle as pickle except: import pickle with[fast5s_dir], 'rb') as index_fp: raw_index_data = pickle.load(index_fp) # determine the index type used. Index information was added around # version 1.3 making the index data 2 elements longer. # so check which one is used here. try: num_index_vals = len(next(iter(raw_index_data.values()))[0]) except StopIteration: raise TomboError('Tombo index file appears to be empty') if num_index_vals == 8: def convert_r_data(from_base_fn, start, end, rsrtr, c_grp, s_grp, filtered, rna): return readData(start, end, filtered, rsrtr, strand, os.path.join(fast5s_dir, from_base_fn), self.corr_grp + '/' + s_grp, rna) elif num_index_vals == 10: def convert_r_data( from_base_fn, start, end, rsrtr, c_grp, s_grp, filtered, rna, sig_match_score, mean_q_score): return readData(start, end, filtered, rsrtr, strand, os.path.join(fast5s_dir, from_base_fn), self.corr_grp + '/' + s_grp, rna, sig_match_score, mean_q_score) elif num_index_vals == 11: def convert_r_data( from_base_fn, start, end, rsrtr, c_grp, s_grp, filtered, rna, sig_match_score, mean_q_score, read_id): return readData(start, end, filtered, rsrtr, strand, os.path.join(fast5s_dir, from_base_fn), self.corr_grp + '/' + s_grp, rna, sig_match_score, mean_q_score, read_id) else: raise TomboError('Invalid Tombo index file.') dir_reads_index = {} for (chrm, strand), cs_raw_data in raw_index_data.items(): cs_data = [convert_r_data(*r_data) for r_data in cs_raw_data] # don't add chrm/strand if all reads are filtered if len(cs_data) > 0: dir_reads_index[(chrm, strand)] = cs_data return dir_reads_index def _parse_fast5s_w_index(self, fast5s_dir): """Use index file to parse information about a set of reads """ try: curr_dir_reads_index = self._load_index_data(fast5s_dir) except UnicodeDecodeError: warning_message( 'Invalid Tombo index file.\n\t\tThis occurs most often ' + 'when the re-squiggle command was completed using a Tombo ' + 'build against a different python version (2 or 3).') raise TomboError if not self.remove_filtered and self.bc_subgrps is None: return curr_dir_reads_index filt_dir_reads_index = {} for (chrm, strand), cs_raw_data in curr_dir_reads_index.items(): cs_data = [ rd for rd in cs_raw_data if rd.corr_group.split('/')[0] == self.corr_grp and (self.bc_subgrps is None or rd.corr_group.split('/')[-1] in self.bc_subgrps) and (not self.remove_filtered or not rd.filtered)] # don't add chrm/strand if all reads are filtered if len(cs_data) > 0: filt_dir_reads_index[(chrm, strand)] = cs_data return filt_dir_reads_index def _merge_dir_indices(self, w_index_ri, wo_index_ri): """Merge coverage from serveral parsed sets of data """ all_raw_reads_index = w_index_ri + [wo_index_ri,] reads_index = defaultdict(list) for chrm_strand in set([cs for d_ri in all_raw_reads_index for cs in d_ri]): for dir_reads_index in all_raw_reads_index: if chrm_strand not in dir_reads_index: continue reads_index[chrm_strand].extend(dir_reads_index[chrm_strand]) return dict(reads_index) def _parse_fast5s(self, fast5s_dirs): wo_index_dirs = [] w_index_covs = [] warn_index = False # determine if index exists for each directory and load appropriately for fast5s_dir in fast5s_dirs: fast5s_dir = (fast5s_dir if fast5s_dir.endswith('/') else fast5s_dir + '/') self.fast5s_dirs[fast5s_dir] = self._get_index_fn(fast5s_dir) if os.path.exists(self.fast5s_dirs[fast5s_dir]): try: w_index_covs.append(self._parse_fast5s_w_index(fast5s_dir)) except TomboError: warning_message( 'Failed to parse tombo index file for ' + fast5s_dir + ' directory. Creating temporary index from ' + 'FAST5 files.') wo_index_dirs.append(fast5s_dir) else: if not warn_index: warning_message( 'Tombo index file does not exist for one or more ' + 'directories.\n\t\tIf --skip-index was not set for ' + 're-squiggle command, ensure that the specified ' + 'directory is the same as for the re-squiggle command.') warn_index = True wo_index_dirs.append(fast5s_dir) wo_index_cov = self._parse_fast5s_wo_index(wo_index_dirs) self.reads_index = self._merge_dir_indices(w_index_covs, wo_index_cov) return
[docs] def __init__(self, fast5s_basedirs, corrected_group='RawGenomeCorrected_000', basecall_subgroups=None, for_writing=False, remove_filtered=True, sample_name=None): """Parse data from a list of re-squiggle fast5 directories Args: fast5s_basedirs (list): fast5 base directories, which have been processed by ``tombo resquiggle`` corrected_group (str): Analysis slot containing the re-squiggle information (optional; default: 'RawGenomeCorrected_000') basecall_subgroups (list): basecall subgroups (optional; default: Process all basecall subgroups) for_writing (bool): open TomboReads to write index (optional; default: Open for reading and parse re-squiggled reads) remove_filtered (bool): remove filtered reads as indicated by index file (optional; default: True) sample_name (str): for verbose output """ if VERBOSE and not for_writing: status_mess = ('Parsing Tombo index file(s).' if sample_name is None else 'Parsing ' + sample_name + ' Tombo index file(s).') status_message(status_mess) assert isinstance(fast5s_basedirs, list), ( 'fast5s_basedirs must be a list.') self.fast5s_dirs = {} self.corr_grp = corrected_group self.bc_subgrps = basecall_subgroups self.sample_name = sample_name self.remove_filtered = remove_filtered self.for_writing = for_writing self.coverage = None if self.for_writing: self._prep_for_writing(fast5s_basedirs) else: self._parse_fast5s(fast5s_basedirs) return
def _compute_coverage(self): if VERBOSE: status_message('Calculating read coverage.') self.coverage = {} for chrm_strand, cs_reads in self: if len(cs_reads) == 0: continue cs_coverage = np.zeros(max(r_data.end for r_data in cs_reads), dtype=np.int64) for r_data in cs_reads: cs_coverage[r_data.start:r_data.end] += 1 self.coverage[chrm_strand] = cs_coverage return self def _add_coverages(self, ctrl_reads_index): merged_cov = {} # if self or control reads don't have coverage, compute it if self.coverage is None: self._compute_coverage() if ctrl_reads_index.coverage is None: ctrl_reads_index._compute_coverage() for chrm_strand, ctrl_cs_cov in ctrl_reads_index.coverage.items(): if chrm_strand in self.coverage: self_cs_cov = self.coverage[chrm_strand] # copy longer array and add shorter array coverage if self_cs_cov.shape[0] > ctrl_cs_cov.shape[0]: merged_cs_cov = self_cs_cov.copy() merged_cs_cov[:ctrl_cs_cov.shape[0]] += ctrl_cs_cov else: merged_cs_cov = ctrl_cs_cov.copy() merged_cs_cov[:self_cs_cov.shape[0]] += self_cs_cov else: merged_cs_cov = ctrl_cs_cov.copy() merged_cov[chrm_strand] = merged_cs_cov return merged_cov
[docs] def iter_coverage_regions(self, ctrl_reads_index=None): """Get genome coverage for a set of reads Args: ctrl_reads_index (:class:`tombo.tombo_helper.TomboReads`): a second set of tombo reads to add for coverage Returns: Yeilds (chrm, strand, cs_cov, cs_cov_starts) indicating read coverage levels cs_cov and cs_cov_starts are numpy arrays containing the coverage level and the 0-based genomic start positions of those intervals """ if VERBOSE: status_message('Calculating read coverage regions.') if self.coverage is None: self._compute_coverage() coverage = (self.coverage if ctrl_reads_index is None else self._add_coverages(ctrl_reads_index)) for (chrm, strand), cs_cov in coverage.items(): cs_cov_starts = np.concatenate([ [0,], np.where(np.diff(cs_cov))[0] + 1, [cs_cov.shape[0],]]) cs_cov = cs_cov[cs_cov_starts[:-1]] yield chrm, strand, cs_cov, cs_cov_starts return
[docs] def iter_cov_regs( self, cov_thresh, region_size=None, ctrl_reads_index=None): """Iterate over regions with coverage greater than or equal to cov_thresh. If region_size is provided, regions are rounded to the nearest region_sized windows and only the region start is yielded (e.g. chrm, strand, start). If not provided (chrm, strand, start, end) is yielded. """ def round_reg_start(x): return int(region_size * np.floor(x / float(region_size))) def round_reg_end(x): return int(region_size * np.ceil(x / float(region_size))) for chrm, strand, cov, starts in self.iter_coverage_regions( ctrl_reads_index): curr_reg_start = -1 valid_cov = np.where(np.diff(np.concatenate([ [False,], np.greater_equal(cov, cov_thresh), [False,]])))[0] for cov_start_i, cov_end_i in zip(valid_cov[:-1], valid_cov[1:]): cov_start, cov_end = starts[cov_start_i], starts[cov_end_i] if region_size is None: yield chrm, strand, cov_start, cov_end continue for reg_start in range(round_reg_start(cov_start), round_reg_end(cov_end), region_size): if reg_start != curr_reg_start: yield chrm, strand, reg_start curr_reg_start = reg_start return
[docs] def get_all_cs(self): """Get list of all (chromosome, strand) stored in index. """ return list(self.reads_index.keys())
[docs] def is_empty(self): """Are any reads stored in the index? """ for cs_reads in self.reads_index.values(): if len(cs_reads) > 0: return False return True
def __contains__(self, chrm_strand): return chrm_strand in self.reads_index def __iter__(self): self._iter = iter(self.reads_index.items()) return self def __next__(self): return next(self._iter) # for python2 compatibility
[docs] def next(self): return self.__next__()
[docs] def iter_reads(self): """Iterate over reads stored in the index """ for _, cs_reads in self: for rd in cs_reads: yield rd return
[docs] def get_cs_reads(self, chrm, strand, invalid_return=[]): """Extract the list of reads stored in a specified chromosome and strand Args: chrm (str): chromosome name strand (str): strand ('+' or '-') invalid_return: value to return for invalid (chrm, strand) """ if (chrm, strand) not in self.reads_index: return invalid_return return self.reads_index[(chrm, strand)]
def _get_strand_spec_cov(self, chrm, pos, strand, invalid_return): if (chrm, strand) not in self.coverage: return invalid_return if pos >= self.coverage[(chrm, strand)].shape[0]: return invalid_return return self.coverage[(chrm, strand)][pos]
[docs] def get_coverage(self, chrm, pos, strand=None, invalid_return=0): """Get coverage at specified position Args: chrm (str): chromosome name pos (int): 0-based genomic position strand (str): interval strand ('+', '-' or None). Default: None (max over both strands) invalid__return: return value for invalid (bad chrm, strand or pos beyond coverage) position. Default: 0 """ if self.coverage is None: self._compute_coverage() if strand is None: try: return max( self._get_strand_spec_cov(chrm, pos, '+', invalid_return), self._get_strand_spec_cov(chrm, pos, '-', invalid_return)) # with None invalid_return value could be max over 2 None's except TypeError: return invalid_return return self._get_strand_spec_cov(chrm, pos, strand, invalid_return)
[docs] def get_cs_coverage(self, chrm, strand, invalid_return=None): """Extract coverage levels over a specified chromosome and strand Args: chrm (str): chromosome name strand (str): strand ('+' or '-') invalid_return: value to return for invalid (chrm, strand) Returns: numpy array (numpy.int64) containing read coverage levels """ if self.coverage is None: self._compute_coverage() if (chrm, strand) not in self.coverage: return invald_return return self.coverage[(chrm, strand)]
[docs] def iter_cs_coverage(self): """Iterate over coverage levels across all stored (chrm, strands) """ if self.coverage is None: self._compute_coverage() return self.coverage.items()
########################################### ###### Events Table Access Functions ###### ###########################################
[docs]def get_multiple_slots_read_centric(r_data, slot_names, corr_grp=None): """Extract multiple read-centric slot_names from this read's Events table Args: fast5_data (:class:`tombo.tombo_helper.readData` or an open ``h5py.File`` object) slot_name (str): slot from which to extract data (valid values: norm_mean, norm_stdev, start, length, base) corr_grp (str): corrected group slot from which to extract data (default: use value from ``fast5_data`` object; required for ``h5py.File``) Returns: A tuple of numpy arrays specified by the ``slot_names`` """ try: do_close = False if not isinstance(r_data, h5py.File): do_close = True corr_grp = r_data.corr_group r_data = h5py.File(r_data.fn, 'r') event_slot_name = '/'.join(('/Analyses', corr_grp, 'Events')) # note that it's more efficient to try to access the slot # and except the error that check if the slot exists first r_event_data = r_data[event_slot_name][:] if do_close: r_data.close() except: # probably truncated file or events don't exist return [None,] * len(slot_names) return [r_event_data[slot_name] for slot_name in slot_names]
[docs]def get_single_slot_read_centric(r_data, slot_name, corr_grp=None): """Extract read-centric slot_name from this read's Events table Args: fast5_data (:class:`tombo.tombo_helper.readData` or an open ``h5py.File`` object) slot_name (str): slot from which to extract data (valid values: norm_mean, norm_stdev, start, length, base) corr_grp (str): corrected group slot from which to extract data (default: use value from ``fast5_data`` object; required for ``h5py.File``) """ try: # if r_data is an open h5py object then don't open the filename do_close = False if not isinstance(r_data, h5py.File): do_close = True corr_grp = r_data.corr_group r_data = h5py.File(r_data.fn, 'r') # note that it's more efficient to try to access the slot # and except the error that check if the slot exists first r_slot_values = r_data['/'.join(('/Analyses', corr_grp, 'Events'))][ slot_name] if do_close: r_data.close() except: # probably truncated file or events don't exist return None return r_slot_values
def get_single_slot_genome_centric(r_data, slot_name): """Extract genome-centric slot_name from this read's Events table """ r_slot_values = get_single_slot_read_centric(r_data, slot_name) if r_slot_values is None: return None if r_data.strand == '-': r_slot_values = r_slot_values[::-1] return r_slot_values def get_mean_slot_genome_centric(cs_reads, chrm_len, slot_name): """Get the mean over all reads at each covered genomic location for this slots value """ base_sums = np.zeros(chrm_len) base_cov = np.zeros(chrm_len, dtype=np.int64) for r_data in cs_reads: # extract read means data so data across all chrms is not # in RAM at one time g_slot_values = get_single_slot_genome_centric(r_data, slot_name) if g_slot_values is None: continue base_sums[r_data.start: r_data.start + len(g_slot_values)] += g_slot_values base_cov[r_data.start:r_data.start + len(g_slot_values)] += 1 return base_sums / base_cov def iter_mean_slot_values( reads_index, chrm_sizes, slot_name, ctrl_reads_index=None): """Iterate through chromosomes and strands yielding mean slots values over all reads at each covered genomic location. Generator returns chrmosome, strand, cs_mean_values, cs_ctrl_mean_values tuples (4 return values). """ # ignore divide by zero errors that occur where there is no # coverage. Need to correct nan values after subtracting two sets of # coverage so leave as nan for now old_err_settings = np.seterr(all='ignore') for chrm, strand in [(c, s) for c in sorted(chrm_sizes) for s in ('+', '-')]: if ctrl_reads_index is None: if (chrm, strand) not in reads_index: continue cs_mean_values = get_mean_slot_genome_centric( reads_index.get_cs_reads(chrm, strand), chrm_sizes[chrm], slot_name) yield chrm, strand, cs_mean_values, None else: cs_mean_values, ctrl_cs_mean_values = None, None if (chrm, strand) in reads_index: cs_mean_values = get_mean_slot_genome_centric( reads_index.get_cs_reads(chrm, strand), chrm_sizes[chrm], slot_name) if (chrm, strand) in ctrl_reads_index: ctrl_cs_mean_values = get_mean_slot_genome_centric( ctrl_reads_index.get_cs_reads(chrm, strand), chrm_sizes[chrm], slot_name) if cs_mean_values is None and ctrl_cs_mean_values is None: continue yield chrm, strand, cs_mean_values, ctrl_cs_mean_values _ = np.seterr(**old_err_settings) return def get_largest_signal_differences( reads_index, ctrl_reads_index, num_regions, num_bases): chrm_sizes = get_chrm_sizes(reads_index, ctrl_reads_index) all_largest_diff_poss = [] for chrm, strand, cs_sig_means, ctrl_cs_sig_means in iter_mean_slot_values( reads_index, chrm_sizes, 'norm_mean', ctrl_reads_index): if cs_sig_means is None or ctrl_cs_sig_means is None: continue chrm_diffs = np.nan_to_num(np.abs(cs_sig_means - ctrl_cs_sig_means)) chrm_max_diff_regs = np.argsort(chrm_diffs)[::-1][:num_regions] all_largest_diff_poss.extend(( chrm_diffs[pos], max(pos - int(num_bases / 2.0), 0), chrm, strand) for pos in chrm_max_diff_regs) return sorted(all_largest_diff_poss, reverse=True)[:num_regions] def get_signal_differences(reads_index, ctrl_reads_index): """Helper function to compute all signal differences """ chrm_sizes = get_chrm_sizes(reads_index, ctrl_reads_index) all_diffs = {} for chrm, strand, cs_sig_means, ctrl_cs_sig_means in iter_mean_slot_values( reads_index, chrm_sizes, 'norm_mean', ctrl_reads_index): if cs_sig_means is None or ctrl_cs_sig_means is None: continue all_diffs[(chrm, strand)] = np.nan_to_num( cs_sig_means - ctrl_cs_sig_means) return all_diffs #################################### ###### Genomic Interval Class ###### ####################################
[docs]class intervalData(object): """Genome/transcriptome interval information Example:: int_data = tombo_helper.intervalData( chrm='chr20', start=10000, end=10100, strand='+') Note: All intervalData functions return the object in order to allow chaining of interval commands. (e.g. ``int_data.add_reads(reads_index).add_seq()``) .. automethod:: __init__ """ def __setattr__(self, name, value): if name == 'chrm': if not isinstance(value, unicode): raise TypeError(name + ' must be a string') elif name in ('start', 'end'): if not (isinstance(value, int) or isinstance(value, np.integer)): raise TypeError(name + ' must be an int') elif name == 'strand': if value not in (None, '+', '-'): raise TypeError('strand must be either None, "+", or "-"') elif name in ('reg_id', 'reg_text', 'seq'): if value is not None and not isinstance(value, unicode): raise TypeError(name + ' must be a string') elif name == 'reads': if value is not None and not isinstance(value, list): raise TypeError('reads must be a list') else: raise ValueError(name + ' is an invalid attribute for intervalData') super(intervalData, self).__setattr__(name, value)
[docs] def __init__(self, chrm, start, end, strand=None, reg_id=None, reg_text='', reads=None, seq=None): """Initialize a genome/transcriptome interval object Args: chrm (str): chromosome name. start (int): 0-based start position. end (int): 1-based (or open interval) end position. strand (str): interval strand ('+', '-' or None). Default: None reg_id (str): used to keep track of multiple intervals. Default: None reg_text (str): region description text. Default: '' reads (list): list of readData values overlapping region. Default: None seq (str): region genomic sequence. Default: None """ self.chrm = unicode(chrm) self.start = start self.end = end self.strand = unicode(strand) if strand is not None else None self.reg_id = unicode(reg_id) if reg_id is not None else None self.reg_text = unicode(reg_text) self.reads = reads self.seq = seq
[docs] def update(self, **kwargs): """Update slots specified in keyword arguments (kwargs) """ for k, v in kwargs.items(): self.__setattr__(k, v) # return self to allow chaining and auto-return return self
def __repr__(self): # convert reads and seq if they are too long self_vars = vars(self).copy() if self_vars['reads'] is not None: self_vars['reads'] = '<{:d} reads>'.format(len(self_vars['reads'])) if self_vars['seq'] is not None and len(self_vars['seq']) > 50: self_vars['seq'] = '<{:d} bases of sequence>'.format( len(self_vars['seq'])) self_vars['reg_text'] = '"' + self_vars['reg_text'] + '"' return '<tombo.tombo_helper.intervalData object> :\n' + '\n'.join( "{:>15} : {}".format(k, str(v)) for k, v in self_vars.items())
[docs] def copy(self, include_reads=True): """Create a copy of this interval. Useful when adding sets of reads from multiple samples to compare. Args: include_reads (bool): include current reads slot in the new object (default: True) """ cp_reads = self.reads if include_reads else None return type(self)(self.chrm, self.start, self.end, self.strand, self.reg_id, self.reg_text, cp_reads, self.seq)
[docs] def merge(self, other_reg): """Create a copy of this interval with the reads from this interval and `other_reg` Args: other_reg (:class:`tombo.tombo_helper.intervalData`): a second region to merge with this interval Note: Aside from reads, all other attributes will be taken from this interval """ merged_reg_data = self.copy() self_reads = self.reads if self.reads is not None else [] other_reads = other_reg.reads if other_reg.reads is not None else [] return merged_reg_data.update(reads=self_reads + other_reads)
[docs] def expand_interval(self, expand_width): """Expand this interval by the specified amount (effects only the start and stop attributes; NOT seq or reads) Args: expand_width (int): amount by which to expand the interval """ self.update(start=self.start - expand_width, end=self.end + expand_width) return self
[docs] def add_reads(self, reads_index, require_full_span=False): """Add reads overlapping this interval Args: reads_index (:class:`tombo.tombo_helper.TomboReads`): reads index require_full_span (bool): require that reads span then entire interval (default: include all reads overlapping the interval) """ def get_c_s_data(strand): if require_full_span: def read_intersects_interval(r_start, r_end): return r_start <= self.start and r_end >= self.end else: def read_intersects_interval(r_start, r_end): return not (r_start >= self.end or r_end <= self.start) # get all reads intersecting the interval return [ r_data for r_data in reads_index.get_cs_reads(self.chrm, strand) if read_intersects_interval(r_data.start, r_data.end)] # get all reads that overlap this interval # note that this includes partial overlaps as these contribute # to coverage and other statistics so can't really restrict to # full coverage as previous versions of code did if self.strand is not None: return self.update(reads=get_c_s_data(self.strand)) # if strand is None, get data from both strands return self.update(reads=get_c_s_data('+') + get_c_s_data('-'))
def _update_seq(self, r_data, reg_base_data): """Update the sequence for the region based on this read """ read_bases = get_single_slot_read_centric(r_data, 'base') if read_bases is None: warning_message( 'Unable to extract data from read. Potentially corrupted file ' + 'or invalid Tombo index file for this directory.') return reg_base_data, max(0, r_data.start - self.start) r_seq = b''.join(read_bases).decode() if r_data.strand == '-': r_seq = rev_comp(r_seq) # if read starts before the interval if r_data.start <= self.start: r_end_overlap = r_data.end - self.start # if read covers the whole interval if r_data.end > self.end: r_end_clip = r_data.end - self.end reg_base_data = r_seq[-r_end_overlap:-r_end_clip] return reg_base_data, len(reg_base_data) # end of read overlaps beginning of interval reg_base_data[:r_end_overlap] = r_seq[-r_end_overlap:] return reg_base_data, r_end_overlap # read doesn't cover the beginning of region if r_data.end > self.end: # beginning of read covers to the end of the region r_begin_overlap = self.end - r_data.start reg_base_data[-r_begin_overlap:] = r_seq[:r_begin_overlap] return reg_base_data, len(reg_base_data) # first read is completely contained in the interval r_len = r_data.end - r_data.start r_int_start = r_data.start - self.start reg_base_data[r_int_start:r_int_start + r_len] = r_seq return reg_base_data, r_int_start + r_len
[docs] def add_seq(self, genome_index=None, error_end=True): """Extract the forward strand genomic sequence for an interval from reads or genome_index if provided Args: genome_index (:class:`tombo.tombo_helper.Fasta`) Tombo FASTA sequence object """ if genome_index is not None: return self.update(seq=genome_index.get_seq( self.chrm, self.start, self.end, error_end=error_end)) # handle case where no read overlaps whole region # let each read contibute its sequence and fill the rest # with dashes reg_base_data = ['-'] * (self.end - self.start) if self.reads is None or len(self.reads) == 0: return self.update(seq=''.join(reg_base_data)) # get region sequence by moving through reads that # cover the region, but only extract seqeunce from the # (close to) minimal reads s_reg_reads = sorted(self.reads, key=lambda r: (r.start, r.end)) # begin filling sequence with first (by start pos) read reg_base_data, curr_cov_pos = self._update_seq( s_reg_reads.pop(0), reg_base_data) # if there was only one read return now if len(s_reg_reads) == 0 or curr_cov_pos >= self.end - self.start: return self.update(seq=''.join(reg_base_data)) # get next read (by start pos) curr_read = s_reg_reads.pop(0) for next_read in s_reg_reads: # once the next read start passes the region covered thus far # add the sequence for the saved curr_read to the reg sequence if next_read.start >= curr_cov_pos: # add read with curr longest end position to the region seq reg_base_data, curr_cov_pos = self._update_seq( curr_read, reg_base_data) curr_read = next_read # if the whole interval is covered return the sequence if curr_cov_pos >= self.end - self.start: return self.update(seq=''.join(reg_base_data)) continue if next_read.end > curr_read.end: curr_read = next_read reg_base_data, _ = self._update_seq(curr_read, reg_base_data) return self.update(seq=''.join(reg_base_data))
[docs] def get_base_levels(self, read_rows=False, num_reads=None): """Extract base levels from this interval. Args: read_rows (bool): return array with reads as rows (default: row columns) num_reads (int): maximal number of reads to output. randomly downsample if more reads are present (default: output all reads) Return: `np.array` containing read mean levels with rows corresponding to interval position and columns to individual reads (or reverse if `read_rows`) """ def get_read_reg_events(r_data): r_means = get_single_slot_genome_centric(r_data, 'norm_mean') if r_means is None: return None if r_data.start > self.start and r_data.end < self.end: # handle reads that are contained in a region # create region with nan values r_reg_means = np.full(self.end - self.start, np.NAN) r_reg_means[r_data.start - self.start: r_data.end - self.start] = r_means elif r_data.start > self.start: # handle reads that start in middle of region start_overlap = self.end - r_data.start # create region with nan values r_reg_means = np.full(self.end - self.start, np.NAN) r_reg_means[-start_overlap:] = r_means[:start_overlap] elif r_data.end < self.end: # handle reads that end inside region end_overlap = r_data.end - self.start # create region with nan values r_reg_means = np.full(self.end - self.start, np.NAN) r_reg_means[:end_overlap] = r_means[-end_overlap:] else: r_reg_means = r_means[ self.start - r_data.start:self.end - r_data.start] return r_reg_means if self.reads is None or len(self.reads) == 0: raise TomboError( 'Must annotate region with reads ' + '(see `TomboInterval.add_reads`) to extract base levels.') if num_reads is not None: np.random.shuffle(self.reads) reg_events = [] for r_data in self.reads: if self.strand is not None and r_data.strand != self.strand: continue r_means = get_read_reg_events(r_data) if r_means is None: continue reg_events.append(r_means) if num_reads is not None and len(reg_events) >= num_reads: break if read_rows: return np.row_stack(reg_events) return np.column_stack(reg_events)
def filter_empty_regions(plot_intervals): num_filt = sum(len(p_int.reads) == 0 for p_int in plot_intervals) plot_intervals = [p_int for p_int in plot_intervals if len(p_int.reads) > 0] if len(plot_intervals) == 0: error_message_and_exit('No reads in any selected regions.') if VERBOSE and num_filt > 0: warning_message('Some selected regions contain no reads.') return plot_intervals def get_unique_intervals(plot_intervals, covered_poss=None, num_regions=None): # unique genomic regions filter uniq_p_intervals = [] used_intervals = defaultdict(set) for reg_data in plot_intervals: # could have significant region immediately next to # beginning/end of reads interval_poss = list(range(reg_data.start, reg_data.end)) if reg_data.start not in used_intervals[( reg_data.chrm, reg_data.strand)] and ( covered_poss is None or all( pos in covered_poss[(reg_data.chrm, reg_data.strand)] for pos in interval_poss)): uniq_p_intervals.append(reg_data) used_intervals[(reg_data.chrm, reg_data.strand)].update( interval_poss) if num_regions is not None and len(uniq_p_intervals) >= num_regions: break return uniq_p_intervals ########################################### ###### Special Data Access Functions ###### ########################################### def get_channel_info(fast5_data): """Get channel information for a read """ try: fast5_attrs = fast5_data['UniqueGlobalKey/channel_id'].attrs except KeyError: raise TomboError("No channel_id group in HDF5 file. " + "Probably mux scan HDF5 file.") try: channel_info = channelInfo( fast5_attrs.get('offset'), fast5_attrs.get('range'), fast5_attrs.get('digitisation'), fast5_attrs.get('channel_number'), fast5_attrs.get('sampling_rate').astype(np.int64)) except KeyError: raise TomboError("Channel info parameters not available.") return channel_info def get_raw_signal(r_data, int_start, int_end): """Extract raw signal from where this read overlaps a particular genomic region """ with h5py.File(r_data.fn, 'r') as fast5_data: # retrieve shift and scale computed in correction script corr_subgrp = fast5_data['/Analyses/' + r_data.corr_group] event_starts = corr_subgrp['Events']['start'] events_end = event_starts[-1] + corr_subgrp['Events']['length'][-1] segs = np.concatenate([event_starts, [events_end,]]) scale_values = scaleValues( corr_subgrp.attrs.get('shift'), corr_subgrp.attrs.get('scale'), corr_subgrp.attrs.get('lower_lim'), corr_subgrp.attrs.get('upper_lim'), corr_subgrp.attrs.get('outlier_threshold')) all_sig = get_raw_read_slot(fast5_data)['Signal'][:] rsrtr = r_data.read_start_rel_to_raw if r_data.rna: # reverse raw signal for RNA all_sig = all_sig[::-1] if r_data.strand == "-": segs = (segs[::-1] * -1) + segs[-1] if int_start < r_data.start: # handle reads that start in the middle of the interval start_offset = r_data.start - int_start overlap_seg_data = segs[:int_end - r_data.start + 1] else: start_offset = 0 skipped_bases = int_start - r_data.start overlap_seg_data = segs[ skipped_bases:int_end - r_data.start + 1] # trim and flip raw signal (perform normalization outside of this # function in order to avoid circular import between tombo_helper # and tombo_stats) num_reg_obs = overlap_seg_data[-1] - overlap_seg_data[0] if r_data.strand == "+": reg_start_rel_raw = rsrtr + overlap_seg_data[0] r_sig = all_sig[reg_start_rel_raw:reg_start_rel_raw + num_reg_obs] else: reg_start_rel_raw = rsrtr + segs[-1] - overlap_seg_data[-1] r_sig = all_sig[reg_start_rel_raw:reg_start_rel_raw + num_reg_obs] r_sig = r_sig[::-1] return r_sig, overlap_seg_data, start_offset, scale_values def parse_read_correction_data(r_data): """Parse correction data from an event resquiggled read """ try: with h5py.File(r_data.fn, 'r') as fast5_data: corr_grp = fast5_data['/Analyses/' + r_data.corr_group] events_grp = corr_grp['Events'] event_starts = events_grp['start'] events_end = event_starts[-1] + events_grp['length'][-1] new_segs = np.concatenate([event_starts, [events_end,]]) raw_grp = get_raw_read_slot(fast5_data) read_id = raw_grp.attrs.get('read_id') try: read_id = read_id.decode() except (AttributeError, TypeError): pass signal_data = raw_grp['Signal'][:] raw_offset = events_grp.attrs.get('read_start_rel_to_raw') scale_values = scaleValues(*[ corr_grp.attrs.get(attr_name) for attr_name in ( 'shift', 'scale', 'lower_lim', 'upper_lim', 'outlier_threshold')]) old_segs = corr_grp['Alignment/read_segments'][:] old_align_vals = list(map( lambda x: x.decode(), corr_grp['Alignment/read_alignment'][:])) new_align_vals = list(map( lambda x: x.decode(), corr_grp['Alignment/genome_alignment'][:])) except: return None if r_data.rna: signal_data = signal_data[::-1] return (read_id, signal_data, raw_offset, scale_values, old_segs, old_align_vals, new_align_vals, events_end, new_segs) def get_all_read_data(r_data): """Extract most relevant read data from this read """ try: with h5py.File(r_data.fn, 'r') as fast5_data: # note that it's more efficient to try to access the slot # and except the error that check if the slot exists first corr_subgrp = fast5_data['/Analyses/' + r_data.corr_group] algn_subgrp = dict(corr_subgrp['Alignment'].attrs.items()) event_data = corr_subgrp['Events'][:] r_attrs = dict(corr_subgrp.attrs.items()) all_sig = get_raw_read_slot(fast5_data)['Signal'][:] except: # probably truncated file or Events slot doesn't exist return None if r_data.rna: all_sig = all_sig[::-1] r_means = event_data['norm_mean'] r_seq = b''.join(event_data['base']).decode() events_end = event_data[-1]['start'] + event_data[-1]['length'] segs = np.concatenate([event_data['start'], [events_end,]]).astype(np.int64) return (r_means, r_seq, all_sig, segs, r_data.read_start_rel_to_raw, r_attrs.get('norm_type'), r_attrs.get('outlier_threshold'), genomeLocation( algn_subgrp['mapped_start'], algn_subgrp['mapped_strand'], algn_subgrp['mapped_chrom'])) def get_reads_events(cs_reads): """Extract read base levels split by genomic position Args: cs_reads (list): a list of reads from a single (chromosome, strand) Returns: A dictionary with 0-based genomic positions pointing to read signal levels """ # note that this function assumes that all reads come from the same # chromosome and strand cs_base_means = [] cs_read_start_ends = [] for r_data in cs_reads: # extract read means data so data across all chrms is not # in RAM at one time read_means = get_single_slot_genome_centric(r_data, 'norm_mean') if read_means is None: continue if read_means.shape[0] != r_data.end - r_data.start: continue assert read_means.shape[0] == r_data.end - r_data.start, ( 'Read found with mismatching mapping location and ' + 'signal information.') cs_base_means.append(read_means) cs_read_start_ends.append((r_data.start, r_data.end)) if len(cs_base_means) == 0: return None chrm_signal = np.concatenate(cs_base_means) chrm_pos = np.concatenate([np.arange(start, end) for start, end in cs_read_start_ends]) # get order of all bases from position array as_chrm_pos = np.argsort(chrm_pos) # then sort the signal array by genomic position and # split into event means by base split_poss = np.where( np.concatenate([[0,], np.diff( chrm_pos[as_chrm_pos])]) > 0)[0] cs_base_events = dict(zip( np.unique(chrm_pos[as_chrm_pos]), np.split(chrm_signal[as_chrm_pos], split_poss))) return cs_base_events ################################### ###### FAST5 Write Functions ###### ################################### def prep_fast5(fast5_fn, corr_grp, overwrite, in_place, bc_grp=None, return_fp=False): """Prepare a read for re-squiggle processing (This deletes old re-squiggle info for this read) """ def try_close_prep_err(fast5_data, err_str): try: fast5_data.close() except: pass # is_tombo_error = True return err_str, fast5_fn, True # several checks to prepare the FAST5 file for correction before # processing to save compute if not in_place: return ('Not currently implementing new hdf5 file writing', fast5_fn, True) # check that the file is writeable before trying to correct if not os.access(fast5_fn, os.W_OK): return 'FAST5 file is not writable', fast5_fn, True try: # create group to store data fast5_data = h5py.File(fast5_fn, 'r+') try: analyses_grp = fast5_data['/Analyses'] except: return try_close_prep_err( fast5_data, 'Base calls not found in FAST5 (see `tombo preprocess`)') try: # check that the requested basecalls group exsists if bc_grp is not None: analyses_grp[bc_grp] except: return try_close_prep_err( fast5_data, 'Base calls not found in FAST5 (see `tombo preprocess`)') try: corr_grp_ptr = analyses_grp[corr_grp] if not overwrite: return try_close_prep_err( fast5_data, "Tombo data exists in [--corrected-group] " + "and [--overwrite] is not set") del analyses_grp[corr_grp] except: # if the corr_grp isn't there we will write it now, but # it's more efficient to try than to check if the slot is there pass corr_grp = analyses_grp.create_group(corr_grp) corr_grp.attrs['tombo_version'] = TOMBO_VERSION corr_grp.attrs['basecall_group'] = bc_grp except: return 'Error opening or writing to fast5 file', fast5_fn, True if return_fp: return fast5_data try: fast5_data.close() except: return 'Error closing fast5 file', fast5_fn, True return def write_error_status(fn, corr_grp, bc_subgrp, error_text): """Write error message for a read into the FAST5 file """ with h5py.File(fn, 'r+') as fast5_data: analysis_grp = fast5_data['/Analyses'] corr_grp = analysis_grp[corr_grp] if bc_subgrp is not None: # add subgroup matching subgroup from original basecalls corr_subgrp = corr_grp.create_group(bc_subgrp) corr_subgrp.attrs['status'] = error_text else: corr_grp.attrs['status'] = error_text return def write_new_fast5_group( fast5_data, corr_grp_slot, rsqgl_res, norm_type, compute_sd, alignVals=None, old_segs=None, rna=False): """Write new fast5 group with re-squiggle data """ try: # compute event data before accessing fast5 file if compute_sd: norm_means, norm_stds = c_new_mean_stds( rsqgl_res.raw_signal, rsqgl_res.segs) else: norm_means = c_new_means(rsqgl_res.raw_signal, rsqgl_res.segs) norm_stds = repeat(np.NAN) # had to shift to names formats numpy array specification due to # python2 numpy unicode issues. See discussion here: # event_data = np.array( list(zip(norm_means, norm_stds, rsqgl_res.segs[:-1], np.diff(rsqgl_res.segs), list(rsqgl_res.genome_seq))), dtype=[(str('norm_mean'), 'f8'), (str('norm_stdev'), 'f8'), (str('start'), 'u4'), (str('length'), 'u4'), (str('base'), 'S1')]) if alignVals is not None: r_align_vals, g_align_vals = zip(*alignVals) np_read_align = np.chararray(len(alignVals)) np_read_align[:] = r_align_vals np_genome_align = np.chararray(len(alignVals)) np_genome_align[:] = g_align_vals except: raise TomboError('Error computing new events') do_close = False if not isinstance(fast5_data, h5py.File): try: fast5_data = h5py.File(fast5_data, 'r+') do_close = True except: raise TomboError( 'Error opening file for new group writing. This should ' + 'have been caught during the alignment phase. Check that ' + 'there are no other tombo processes or processes ' + 'accessing these HDF5 files running simultaneously.') try: analysis_grp = fast5_data['/Analyses'] corr_grp = analysis_grp[corr_grp_slot] # add subgroup matching subgroup from original basecalls corr_subgrp = corr_grp.create_group(rsqgl_res.align_info.Subgroup) corr_subgrp.attrs['status'] = 'success' # TODO change to rev_sig corr_subgrp.attrs['rna'] = rna if rsqgl_res.sig_match_score is not None: corr_subgrp.attrs[ 'signal_match_score'] = rsqgl_res.sig_match_score corr_subgrp.attrs['shift'] = rsqgl_res.scale_values.shift corr_subgrp.attrs['scale'] = rsqgl_res.scale_values.scale corr_subgrp.attrs['norm_type'] = norm_type if rsqgl_res.scale_values.lower_lim is not None: corr_subgrp.attrs[ 'lower_lim'] = rsqgl_res.scale_values.lower_lim if rsqgl_res.scale_values.upper_lim is not None: corr_subgrp.attrs[ 'upper_lim'] = rsqgl_res.scale_values.upper_lim if rsqgl_res.scale_values.outlier_thresh is not None: corr_subgrp.attrs[ 'outlier_threshold'] = rsqgl_res.scale_values.outlier_thresh # store alignment statistics corr_alignment = corr_subgrp.create_group('Alignment') corr_alignment.attrs['mapped_start'] = rsqgl_res.genome_loc.Start corr_alignment.attrs[ 'mapped_end'] = rsqgl_res.genome_loc.Start + len(rsqgl_res.segs) - 1 corr_alignment.attrs[ 'mapped_strand'] = rsqgl_res.genome_loc.Strand corr_alignment.attrs['mapped_chrom'] = rsqgl_res.genome_loc.Chrom if rsqgl_res.align_info is not None: corr_alignment.attrs[ 'clipped_bases_start'] = rsqgl_res.align_info.ClipStart corr_alignment.attrs[ 'clipped_bases_end'] = rsqgl_res.align_info.ClipEnd corr_alignment.attrs[ 'num_insertions'] = rsqgl_res.align_info.Insertions corr_alignment.attrs[ 'num_deletions'] = rsqgl_res.align_info.Deletions corr_alignment.attrs[ 'num_matches'] = rsqgl_res.align_info.Matches corr_alignment.attrs[ 'num_mismatches'] = rsqgl_res.align_info.Mismatches if alignVals is not None: corr_alignment.create_dataset( 'read_alignment', data=np_read_align, compression="gzip") corr_alignment.create_dataset( 'genome_alignment', data=np_genome_align, compression="gzip") if old_segs is not None: # store old segmentation in order to plot "correction process" corr_alignment.create_dataset( 'read_segments', data=old_segs, compression="gzip") # Add Events to data frame with event means, SDs and lengths corr_events = corr_subgrp.create_dataset( 'Events', data=event_data, compression="gzip") corr_events.attrs[ 'read_start_rel_to_raw'] = rsqgl_res.read_start_rel_to_raw except: raise TomboError( 'Error writing resquiggle information back into fast5 file.') if do_close: try: fast5_data.close() except: raise TomboError( 'Error closing fast5 file after writing resquiggle information.') return if __name__ == '__main__': sys.stderr.write('This is a module. See commands with `tombo -h`') sys.exit(1)