Source code for tombo.tombo_helper

from __future__ import division, unicode_literals, absolute_import

from builtins import int, range, dict, map, zip

import os
import io
import re
import sys
import queue
import random

# Future warning from cython in h5py
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import h5py

import numpy as np
np.seterr(all='raise')

from time import sleep
from time import strftime
from operator import itemgetter
from itertools import repeat, islice
from multiprocessing import Process, Queue
from collections import defaultdict, namedtuple

if sys.version_info[0] > 2:
    unicode = str

# import tombo functions
from ._version import TOMBO_VERSION
from ._default_parameters import PHRED_BASE, DNA_SAMP_TYPE, RNA_SAMP_TYPE
from ._c_helper import (
    c_new_mean_stds, c_new_means, c_valid_cpts_w_cap,
    c_valid_cpts_w_cap_t_test)
from ._c_dynamic_programming import (
    c_banded_traceback, c_adaptive_banded_forward_pass)


# list of classes/functions to include in API
__all__ = [
    'readData', 'intervalData', 'TomboReads',
    'TomboMotif', 'parse_motif_descs',
    'resquiggleParams', 'startClipParams',
    'resquiggleResults', 'alignInfo', 'genomeLocation',
    'sequenceData', 'dpResults', 'scaleValues', 'Fasta', 'seqSampleType',
    'get_seq_sample_type', 'get_raw_read_slot',
    'get_single_slot_read_centric', 'get_multiple_slots_read_centric']


VERBOSE = True

# single base conversion for motifs
SINGLE_LETTER_CODE = {
    'A':'A', 'C':'C', 'G':'G', 'T':'T', 'B':'[CGT]',
    'D':'[AGT]', 'H':'[ACT]', 'K':'[GT]', 'M':'[AC]',
    'N':'[ACGT]', 'R':'[AG]', 'S':'[CG]', 'V':'[ACG]',
    'W':'[AT]', 'Y':'[CT]'}
INVALID_BASES = re.compile('[^ACGT]')
INVALID_BASE_RUNS = re.compile('[^ACGT]+')


###############################
###### Custom TomboError ######
###############################

class TomboError(Exception):
    pass


######################################
###### Cython Wrapper Functions ######
######################################

# can't import TomboError in cython so wrap these functions in python
def valid_cpts_w_cap(*args, **kwargs):
    try:
        valid_cpts = c_valid_cpts_w_cap(*args, **kwargs)
    except NotImplementedError as e:
        raise TomboError(unicode(e))
    valid_cpts.sort()
    return valid_cpts

def valid_cpts_w_cap_t_test(*args, **kwargs):
    try:
        valid_cpts = c_valid_cpts_w_cap_t_test(*args, **kwargs)
    except NotImplementedError as e:
        raise TomboError(unicode(e))
    valid_cpts.sort()
    return valid_cpts

def banded_traceback(*args, **kwargs):
    try:
        return c_banded_traceback(*args, **kwargs)
    except NotImplementedError as e:
        raise TomboError(unicode(e))

def adaptive_banded_forward_pass(*args, **kwargs):
    try:
        return c_adaptive_banded_forward_pass(*args, **kwargs)
    except NotImplementedError as e:
        raise TomboError(unicode(e))


################################
###### Global Namedtuples ######
################################

[docs]class alignInfo(namedtuple( 'alignInfo', ('ID', 'Subgroup', 'ClipStart', 'ClipEnd', 'Insertions', 'Deletions', 'Matches', 'Mismatches'))): """Information from genomic read alignment Args: ID (str): read identifier Subgroup (str): sub-group location containing read information (e.g. 'BaseCalled_template') ClipStart (int): number of bases clipped from beginning of basecalls ClipEnd (int): number of bases clipped from end of basecalls Insertions (int): number of inserted bases in alignment Deletions (int): number of delected bases in alignment Matches (int): number of matched bases in alignment Mismatches (int): number of mis-matched bases in alignment """
# TODO convert rna to rev_sig
[docs]class readData(namedtuple('readData', ( 'start', 'end', 'filtered', 'read_start_rel_to_raw', 'strand', 'fn', 'corr_group', 'rna', 'sig_match_score', 'mean_q_score', 'read_id'))): """Nanopore read meta-data Example:: r_data = tombo_helper.readData( start=0, end=1000, filtered=False, read_start_rel_to_raw=100, strand='+', fn='path/to/read.fast5', corr_group='RawGenomeCorrected_000', rna=False, sig_match_score=1.0, mean_q_score=10.0) # output is a list of readData objects reads_index = tombo_helper.TomboReads(['test_data/native_reads',]) cs_reads = reads_index.get_cs_reads('chr20', '+') Args: start (int): 0-based start mapped position end (int): 0-based open interval end mapped position filtered (bool): is this read filtered? read_start_rel_to_raw (int): start (in raw signal vector) of assigned bases strand (str): read mapped strand ('+', '-' or None) fn (str): raw read filename. corr_group (str): --corrected-group slot specified in re-squiggle command rna (bool): is this read RNA? sig_match_score (float): signal matching score. Default: None mean_q_score (float): mean basecalling q-score. Default: None read_id (str): read identifier Default: None """
# set default values for sig_match_score, q_score and read_id readData.__new__.__defaults__ = (None, None, None)
[docs]class scaleValues(namedtuple( 'scaleValues', ( 'shift', 'scale', 'lower_lim', 'upper_lim', 'outlier_thresh'))): """Signal normaliztion scaling parameters. For details see https://nanoporetech.github.io/tombo/resquiggle.html#signal-normalization Args: shift (float): shift scaling parameter scale (float): scale scaling parameter lower_lim (float): lower windsorizing threshold upper_lim (float): upper windsorizing threshold outlier_thresh (float): outlier threshold used to define `lower_lim` and `upper_lim` """
[docs]class resquiggleParams(namedtuple( 'resquiggleParams', ('match_evalue', 'skip_pen', 'bandwidth', 'max_half_z_score', 'running_stat_width', 'min_obs_per_base', 'raw_min_obs_per_base', 'mean_obs_per_event', 'z_shift', 'stay_pen', 'use_t_test_seg', 'band_bound_thresh', 'start_bw', 'start_save_bw', 'start_n_bases'))): """Re-squiggle parameters Args: match_evalue (float): expected value for matching event to sequence skip_pen (float): penalty for skipped sequence position bandwidth (int): adaptive bandwidth max_half_z_score (float): windsorize half z-scores above this value running_stat_width (int): running neighboring window width for segmentation scoring min_obs_per_base (int): minimum observations per genomic base mean_obs_per_event (int): mean number of raw obs. per event during segmentation z_shift (float): amount to shift z-scores for DP (derived from match_evalue) stay_pen (float): stay penalty for DP (derived from match_evalue) use_t_test_seg (bool): use t-test segmentation criterion (default: raw neighboring window difference) band_bound_thresh (int): bandwidth boundary threshold for determining if a read potentially left the adaptive bandwidth start_bw (int): bandwidth for read start identification start_save_bw (int): save bandwidth for read start identification (if start_bw_fails) start_n_bases (int): number of genomic bases to use for read start identification """
# set default values for start params resquiggleParams.__new__.__defaults__ = (None, None, None) class trimRnaParams(namedtuple( 'trimRnaParams', ('moving_window_size', 'min_running_values', 'thresh_scale', 'max_raw_obs'))): """Parameters to trim RNA adapters. """ class stallParams(namedtuple( 'stallParams', ('window_size', 'threshold', 'min_consecutive_obs', 'edge_buffer', # percentile stall method params 'lower_pctl', 'upper_pctl', # mean windows stall method params 'mini_window_size', 'n_windows'))): """Parameters to identify RNA stalls """ # set method specific params to None stallParams.__new__.__defaults__ = (None,) * 4
[docs]class startClipParams(namedtuple( 'startClipParams', ('bandwidth', 'num_genome_bases'))): """Parameters to identify read start using bases clipped from mapping Args: bandwidth (int): bandwidth num_genome_bases (int): number of genome bases """
[docs]class resquiggleResults(namedtuple( 'resquiggleResults', ('align_info', 'genome_loc', 'genome_seq', 'mean_q_score', 'raw_signal', 'channel_info', 'read_start_rel_to_raw', 'segs', 'scale_values', 'sig_match_score', 'norm_params_changed', 'start_clip_bases', 'stall_ints'))): """Re-squiggle results Args: align_info (:class:`tombo.tombo_helper.alignInfo`): read alignment information genome_loc (:class:`tombo.tombo_helper.genomeLocation`): genome mapping location genome_seq (str): mapped genome sequence mean_q_score (float): mean basecalling q-score raw_signal (np.array::np.float64): raw signal (i.e. un-segmented; signal may be normalized) (optional) channel_info (:class:`tombo.tombo_helper.channelInfo`): channel information (optional) read_start_rel_to_raw (int): read start within raw signal (optional) segs (np.array::np.int64): relative raw signal segment positions (optional) scale_values (:class:`tombo.tombo_helper.scaleValues`): signal normalization scale values (optional) sig_match_score (float): expected to observed signal score (see :class:`tombo.tombo_stats.get_read_seg_score`; optional) norm_params_changed (bool): were scale parameters updated (optional) start_clip_bases (str): mapping clipped bases from start of read (optional) stall_ints (list): list of rna stall locations within raw signal (optional) """
# set default None values for when just mapping results are included resquiggleResults.__new__.__defaults__ = (None,) * 9
[docs]class dpResults(namedtuple( 'dpResults', ('read_start_rel_to_raw', 'segs', 'ref_means', 'ref_sds', 'genome_seq'))): """Dynamic programming results Args: read_start_rel_to_raw (int): read start within raw signal segs (np.array::np.int64): relative raw signal segment positions ref_means (np.array::np.float64): expected signal levels ref_sds (np.array::np.float64): expected SD of signal levels genome_seq (str): mapped genome sequence """
[docs]class genomeLocation(namedtuple('genomeLocation', ('Start', 'Strand', 'Chrom'))): """Genomic location Args: Start (int): 0-based genomic start position Strand (str): Strand (should be '+' or '-') Chrom (str): Chromosome """
[docs]class sequenceData(namedtuple('sequenceData', ('seq', 'id', 'mean_q_score'))): """Read sequence data from FASTQ record Args: seq (str): read sequence id (str): read id (extracted from FASTQ record line) mean_q_score (float): mean q-score """
class channelInfo(namedtuple( 'channelInfo', ('offset', 'range', 'digitisation', 'number', 'sampling_rate'))): """Read channel information Args: offset (float): offset parameter range (float): range parameter digitisation (float): digitisation parameter number (int): channel number sampling_rate (int): number of raw samples per second """ class regionStats(namedtuple( 'regionStats', ('reg_frac_standard_base', 'reg_poss', 'chrm', 'strand', 'start', 'reg_cov', 'ctrl_cov', 'valid_cov'))): """Region statistics Args: reg_frac_standard_base (np.array::np.float64): fraction of standard bases reg_poss (np.array::np.int64): positions for reported fractions chrm (str): chromosome name strand (str): strand (should be '+' or '-') start (int): 0-based region start reg_cov (np.array::np.int64): region read depth ctrl_cov (np.array::np.int64): region control sample read depth valid_cov (np.array::np.int64): region valid (tested) read depth """ class groupStats(namedtuple( 'groupStats', ('reg_stats', 'reg_poss', 'chrm', 'strand', 'start', 'reg_cov', 'ctrl_cov'))): """Region statistics Args: reg_stats (np.array::np.float64): statistic for group comparison reg_poss (np.array::np.int64): positions for reported fractions chrm (str): chromosome name strand (str): strand (should be '+' or '-') start (int): 0-based region start reg_cov (np.array::np.int64): region read depth ctrl_cov (np.array::np.int64): region control sample read depth """
[docs]class seqSampleType(namedtuple( 'seqSampleType', ('name', 'rev_sig'))): """Description of a sequencing sample type Args: name (str): name of the sequencing sample rev_sig (bool): Is the raw signal reversed (3' to 5') """
###################################### ###### Various Helper Functions ###### ###################################### def status_message(message, indent=False): pre_str = '\t' if indent else '' sys.stderr.write(pre_str + strftime('[%H:%M:%S] ') + message + '\n') sys.stderr.flush() return def warning_message(message): sys.stderr.write( '*' * 20 + ' WARNING ' + '*' * 20 + '\n\t' + message + '\n') sys.stderr.flush() return def error_message_and_exit(message): sys.stderr.write( '*' * 20 + ' ERROR ' + '*' * 20 + '\n\t' + message + '\n') sys.exit() return def resolve_path(fn_path): """Helper function to resolve relative and linked paths that might give other packages problems. """ return os.path.realpath(os.path.expanduser(fn_path)) COMP_BASES = dict(zip(map(ord, 'ACGT'), map(ord, 'TGCA'))) def comp_seq(seq): """Complement DNA sequence """ return seq.translate(COMP_BASES) def rev_comp(seq): """Reverse complement DNA sequence """ return seq.translate(COMP_BASES)[::-1] def invalid_seq(seq): return bool(INVALID_BASES.search(seq)) U_TO_T = {ord('U'):ord('T')} def rev_transcribe(seq): """Convert U bases to T """ return seq.translate(U_TO_T) def get_mean_q_score(read_q): if sys.version_info[0] > 2: return np.mean([q_val - PHRED_BASE for q_val in read_q.encode('ASCII')]) return np.mean([ord(q_val) - PHRED_BASE for q_val in read_q.encode('ASCII')]) def get_chrm_sizes(reads_index, ctrl_reads_index=None): """Get covered chromosome sizes from a set of reads """ strand_chrm_sizes = defaultdict(list) for (chrm, strand), cs_read_cov in reads_index: try: strand_chrm_sizes[chrm].append(max( r_data.end for r_data in cs_read_cov)) except ValueError: continue if ctrl_reads_index is not None: for (chrm, strand), cs_read_cov in ctrl_reads_index: try: strand_chrm_sizes[chrm].append(max( r_data.end for r_data in cs_read_cov)) except ValueError: continue chrm_sizes = {} for chrm, strnd_sizes in strand_chrm_sizes.items(): try: chrm_sizes[chrm] = max(strnd_sizes) except ValueError: continue return chrm_sizes def parse_genome_locations(genome_locs, default_strand=None): """Parse genome location strings and convert to 0-based coordinates """ parsed_locs = [] for chrm_pos_strand in genome_locs: # strip off any quotes and return up to the first 3 values split_vals = chrm_pos_strand.replace('"', '').replace( "'", "").split(':')[:3] # default to plus strand if not specified if len(split_vals) == 1: error_message_and_exit( 'Invalid genome location provided: ' + chrm_pos_strand + '\n\t\tTry adding quotation marks around specified genome ' + 'locations (especially for sequence identifiers with ' + 'special characters).') elif len(split_vals) == 2: parsed_locs.append(( split_vals[0], int(split_vals[1]) - 1, default_strand)) else: parsed_locs.append(( split_vals[0], int(split_vals[1]) - 1, split_vals[2])) return parsed_locs def parse_genome_regions(all_regs_text): parsed_regs = defaultdict(list) include_whole_chrms = set() for reg_text in all_regs_text: try: chrm_reg = reg_text.replace('"', '').replace("'", "").split(':') if len(chrm_reg) == 1: chrm = chrm_reg[0] reg_pos = None include_whole_chrms.add(chrm) elif len(chrm_reg) == 2: chrm, reg_pos = chrm_reg reg_pos = list(map(lambda x: int(x.replace(',','')), reg_pos.split('-'))) else: raise TomboError('Invalid region text provided.') except: error_message_and_exit( 'Invalid [--include-region] format.') parsed_regs[chrm].append(reg_pos) parsed_regs = dict(parsed_regs) for chrm in include_whole_chrms: parsed_regs[chrm] = None return parsed_regs def parse_locs_file(locs_fn): """Parse BED files containing genomic locations (assumes single base locations, so end coordinate is ignored). """ n_added, n_failed = 0, 0 raw_locs = defaultdict(set) with open(locs_fn) as locs_fp: for line in locs_fp: try: chrm, pos, _, _, _, strand = line.split()[:6] # bed specs indicate 0-based start so don't shift here pos = int(pos) raw_locs[(chrm, strand)].add(pos) n_added += 1 except: n_failed += 1 continue if n_failed > 0: if n_added > 0: warning_message(( 'Some provided locations ({}) were incorrectly formatted. ' + 'Ensure that provided file ({}) is in BED format.').format( n_failed, locs_fn)) else: warning_message(( 'All provided locations from {} were incorrectly formatted. ' + 'Ensure that provided file is in BED format.').format( locs_fn)) return dict((cs, np.array(sorted(cs_poss))) for cs, cs_poss in raw_locs.items()) def parse_obs_filter(obs_filter): """Parse observations per base formatted filtering """ if len(obs_filter) < 1: return None # parse obs_filter try: obs_filter = [list(map(int, pctl_nobs.split(':'))) for pctl_nobs in obs_filter] except: raise TomboError('Invalid format for observation filter') if any(pctl < 0 or pctl > 100 for pctl in map(itemgetter(0), obs_filter)): error_message_and_exit('Invalid percentile value.') return obs_filter def get_seq_kmers(seq, kmer_width, rev_strand=False): """Compute expected signal levels for a sequence from a reference model Args: seq (str): genomic seqeunce to be converted to expected signal levels kmer_width (int): k-mer width rev_strand (bool): provided sequence is from reverse strand (so flip return order to genome forward direction) """ seq_kmers = [seq[i:i + kmer_width] for i in range(len(seq) - kmer_width + 1)] # get stat lookups from seq on native strand then flip if rev_strand if rev_strand: seq_kmers = seq_kmers[::-1] return seq_kmers
[docs]class TomboMotif(object): """Description of a sequence motif, including potentially modified position Attributes: raw_motif (str): input raw motif string motif_len (int): length of motif motif_pat (``re.compile``): python regular expression for this motif rev_comp_pat (``re.compile``): python regular expression for reverse complement of this motif is_palindrome (bool): is the motif palindromic (in the genomic not literary definition) mod_pos (int): modified base position within motif mod_base (str): modified base (should generally be A, C, G or T) .. automethod:: __init__ """ def _parse_motif(self, raw_motif, rev_comp_motif=False): conv_motif = ''.join(SINGLE_LETTER_CODE[letter] for letter in raw_motif) if rev_comp_motif: # reverse complement and then flip any group brackets conv_motif = rev_comp(conv_motif).translate({ ord('['):']', ord(']'):'['}) return re.compile(conv_motif) def _compute_partial_patterns(self): """Compute patterns for partial matches that include the mod_pos at the start, end or within short sequences. Key into _partial_pats with: 1) whether searching at start, end or within a short sequence 2) length of the partial pattern Values are compiled partial pattern and mod_pos within pattern. Short patterns are lists. """ self._partial_pats = {'start':{}, 'end':{}, 'short':{}} for offset in range(self.mod_pos - 1): self._partial_pats['start'][ self.motif_len - offset - 1] = (self._parse_motif( self.raw_motif[offset + 1:]), self.mod_pos - offset - 1) for offset in range(self.motif_len - self.mod_pos): self._partial_pats['end'][ self.motif_len - offset - 1] = (self._parse_motif( self.raw_motif[:-(offset + 1)]), self.mod_pos) for short_len in range(1, self.motif_len): self._partial_pats['short'][short_len] = [ (self._parse_motif(self.raw_motif[offset:offset + short_len]), self.mod_pos - offset) for offset in range( max(0, self.mod_pos - short_len), min(self.motif_len - short_len + 1, self.mod_pos))] return
[docs] def __init__(self, raw_motif, mod_pos=None): """Parse string motif Args: raw_motif (str): sequence motif. supports IUPAC single letter codes (use T for RNA). mod_pos (int): 1-based position of modified base within the motif """ # TODO convert mod_pos to 0-based coordinate # (1-based is much more error prone) invalid_chars = re.findall( '[^' + ''.join(SINGLE_LETTER_CODE) + ']', raw_motif) if len(invalid_chars) > 0: error_message_and_exit( 'Invalid characters in motif: ' + ', '.join(invalid_chars)) # basic motif parsing self.raw_motif = raw_motif self.motif_len = len(raw_motif) self.motif_pat = self._parse_motif(raw_motif) self.rev_comp_pat = self._parse_motif(raw_motif, True) self.is_palindrome = self.motif_pat == self.rev_comp_pat # parse modified position from motif if provided self.mod_pos = mod_pos if mod_pos is None: self.mod_base = None else: assert 0 < mod_pos <= self.motif_len self.mod_base = raw_motif[mod_pos - 1] if INVALID_BASES.match(self.mod_base): warning_message( 'Provided modified position is not a single base, which ' + 'is likely an error. Specified modified base is one of: ' + ' '.join(SINGLE_LETTER_CODE[self.mod_base][1:-1])) self._compute_partial_patterns()
def __repr__(self): return '\n'.join(('Raw Motif:\t' + self.raw_motif, 'Mod Position:\t' + str(self.mod_pos), 'Motif Pattern:\t' + str(self.motif_pat), 'Rev Comp Pattern:\t' + str(self.rev_comp_pat)))
[docs] def matches_seq(self, seq): """Does the motif match provided sequence (including mod_pos within seq)? Including partial matches at beginning and end that include mod_pos. """ # check matches to start of sequence for start_len in range(1, min(len(seq) + 1, self.motif_len)): try: start_pat, start_mod_pos = self._partial_pats[ 'start'][start_len] except KeyError: continue if start_pat.match(seq[:start_len]): return True # check central sequence overlaps if len(seq) < self.motif_len: for short_pat, mod_pos in self._partial_pats['short'][len(seq)]: if short_pat.match(seq): return True else: if self.motif_pat.search(seq): return True # check end of seq matches for end_len in range(1, min(len(seq) + 1, self.motif_len)): try: end_pat, end_mod_pos = self._partial_pats['end'][end_len] except KeyError: continue if end_pat.match(seq[-end_len:]): return True return False
[docs] def find_mod_poss(self, seq): """Find all mod-base positions within the sequence. Including partial matches at beginning and end that include mod_pos. """ seq_mod_poss = set() # check matches to start of sequence for start_len in range(1, min(len(seq) + 1, self.motif_len)): try: start_pat, start_mod_pos = self._partial_pats[ 'start'][start_len] except KeyError: continue if start_pat.match(seq[:start_len]): seq_mod_poss.add(start_mod_pos) # check central sequence overlaps if len(seq) < self.motif_len: for short_pat, short_mod_pos in self._partial_pats[ 'short'][len(seq)]: if short_pat.match(seq): seq_mod_poss.add(short_mod_pos) else: for motif_match in self.motif_pat.finditer(seq): seq_mod_poss.add(motif_match.start() + self.mod_pos) # check end of seq matches for end_len in range(1, min(len(seq) + 1, self.motif_len)): try: end_pat, end_mod_pos = self._partial_pats['end'][end_len] except KeyError: continue if end_pat.match(seq[-end_len:]): seq_mod_poss.add(len(seq) - end_len + end_mod_pos) return sorted(seq_mod_poss)
[docs]def parse_motif_descs(stat_motif_descs): """Parse string motif descriptions as defined by ``tombo plot roc --motif-descriptions`` Args: stat_motif_descs (str): string motifs description (see ``tombo plot roc --motif-descriptions``) Returns: list of tuples with :class:`tombo.tombo_helper.TomboMotif` and motif/modification names """ parsed_motif_descs = [] try: for motif_desc in stat_motif_descs.split('::'): raw_motif, mod_pos, mod_name = motif_desc.split(':') motif = TomboMotif(raw_motif, int(mod_pos)) parsed_motif_descs.append((motif, mod_name)) except: error_message_and_exit( 'Invalid motif decriptions format. Format descriptions as: ' + '"motif:mod_pos:name[::motif2:mod_pos2:name2...]".') return parsed_motif_descs
########################### ###### FASTA Parsing ###### ########################### def get_rec_names(fasta_fn): with io.open(fasta_fn) as fasta_fp: all_rec_ids = [line.replace(">","").split()[0] for line in fasta_fp if line.startswith('>')] return all_rec_ids
[docs]class Fasta(object): """Fasta file sequence format wrapper class. Will load faidx via ``pyfaidx`` package if installed, else the fasta will be loaded into memory for sequence extraction. .. automethod:: __init__ """ def _load_in_mem(self): genome_index = {} curr_id = None curr_seq = [] with io.open(self.fasta_fn) as fasta_fp: for line in fasta_fp: if line.startswith('>'): if (curr_id is not None and curr_seq is not []): genome_index[curr_id] = ''.join(curr_seq) curr_seq = [] curr_id = line.replace(">","").split()[0] else: curr_seq.append(line.strip()) # add last record if (curr_id is not None and curr_seq is not []): genome_index[curr_id] = ''.join(curr_seq) return genome_index def _index_contains_uridines(self, n_chrms=10, n_bases=1000): if self.has_pyfaidx: # check first N bases of the first M chrms for U characters for chrm in islice(self.index.index, n_chrms): if re.search('U', self.get_seq( chrm, 1, n_bases, error_end=False)): return True else: for chrm in islice(self.index.keys(), n_chrms): if re.search('U', self.get_seq( chrm, 1, n_bases, error_end=False)): return True return False
[docs] def __init__(self, fasta_fn, dry_run=False, force_in_mem=False, assume_dna_base=False): """Load a fasta Args: fasta_fn (str): path to fasta file dry_run (bool): when pyfaidx is not installed, don't actually read sequence into memory. force_in_mem (bool): force genome to be loaded into memory even if pyfaidx is installed allowing on-disk access assume_dna_bases (bool): skip check for DNA or RNA bases (default: False) """ self.fasta_fn = resolve_path(fasta_fn) self.has_rna_bases = False try: if force_in_mem: raise ImportError import pyfaidx self.has_pyfaidx = True try: self.index = pyfaidx.Faidx(self.fasta_fn) except UnicodeDecodeError: error_message_and_exit( 'FASTA file does not appear to be formatted correctly.') except ImportError: self.has_pyfaidx = False if not dry_run: self.index = self._load_in_mem() if not dry_run: self.has_rna_bases = (assume_dna_base or self._index_contains_uridines())
[docs] def get_seq(self, chrm, start=None, end=None, error_end=True): """Extract sequence from a specific genomic region. Note if provided, start and end must both be provided or they will be ignored. Args: chrm (str): chromosome name start (int): 0-based start position end (int): 0-based open-interval end position error_end (bool): raise an error when the region requested extends beyond the chromosome (default: True) Returns: Genomic sequence requested. Sequence is converted to RNA sequence if applicable. """ if self.has_pyfaidx: if not (start or end): r_seq = self.index.fetch( chrm, 1, self.index.index[chrm].rlen).seq.upper() elif (start < 0 or start > self.index.index[chrm].rlen or ( error_end and ( end < 0 or end > self.index.index[chrm].rlen))): raise TomboError( 'Encountered invalid genome sequence request.') else: r_seq = self.index.fetch(chrm, start + 1, end).seq.upper() else: if (start is not None and ( start < 0 or start > len(self.index[chrm]))) or ( error_end and end is not None and (end < 0 or end > len(self.index[chrm]))): raise TomboError( 'Encountered invalid genome sequence request.') r_seq = self.index[chrm][start:end].upper() if self.has_rna_bases: r_seq = rev_transcribe(r_seq) return r_seq
[docs] def iter_chrms(self): """Iterate over chromosome names """ if self.has_pyfaidx: for chrm in self.index.index: yield unicode(chrm) else: for chrm in self.index: yield chrm
def __contains__(self, chrm): if self.has_pyfaidx: return chrm in self.index.index return chrm in self.index
############################################# ###### Automatic Sample-Type Detection ###### ############################################# def is_read_rna(fast5_data): """Determine if a read is RNA or DNA Args: fast5_data (`h5py.File`) Returns: Boolean, incdicating whether the read appears to be RNA or DNA Note: This function uses the read meta-data and so non-standard processing pipelines may not prodcue expected values. """ # check both experiment type and kit slots for "rna" exp_type = fast5_data['UniqueGlobalKey/context_tags'].attrs.get( 'experiment_type') try: exp_type = exp_type.decode() # remove the word internal since it contains rna. exp_type = exp_type.replace('internal', '') except (AttributeError, TypeError): pass exp_kit = fast5_data['UniqueGlobalKey/context_tags'].attrs.get( 'experiment_kit') try: exp_kit = exp_kit.decode() # remove the word internal since it contains rna. exp_kit = exp_kit.replace('internal', '') except (AttributeError, TypeError): pass if exp_type is None and exp_kit is None: return False return ( (exp_type is not None and re.search('rna', exp_type) is not None) or (exp_kit is not None and re.search('rna', exp_kit) is not None)) def is_sample_rna(reads_index=None, fast5_fns=None, n_reads=50): """Determine if a set of reads are RNA or DNA from a small sample. Must provide either reads_index or fast5_fns. Args: reads_index (:class:`tombo.tombo_helper.TomboReads`) fast5_fns (list): list of fast5 read filename n_reads (int): number of reads to check (default: 50) Returns: False if any read does not appear to be an RNA read (see :class:`tombo.tombo_helper.is_read_rna`) else return True. """ proc_reads = 0 if reads_index is not None: for r_data in reads_index.iter_reads(): if not r_data.rna: return False proc_reads += 1 if proc_reads >= n_reads: break elif fast5_fns is not None: for fast5_fn in fast5_fns: try: with h5py.File(fast5_fn, 'r') as fast5_data: if not is_read_rna(fast5_data): return False proc_reads += 1 except: continue if proc_reads >= n_reads: break else: raise TomboError( 'Must provide either reads_index or fast5_fns to ' + 'determine is sample is RNA.') return True
[docs]def get_seq_sample_type(fast5_data=None, reads_index=None, fast5_fns=None, num_reads=50): """Get the sequencing sample type from a single read or set of reads Args: fast5_data (`h5py.File`): open read h5py File object reads_index (:class:`tombo.tombo_helper.TomboReads`) fast5_fns (list): FAST5 read filenames num_reads (int): sample of reads to check for sample type """ if fast5_data is None and reads_index is None and fast5_fns is None: raise TomboError('Must provide either fast5_data, reads_index, or ' + 'fast5_fns to determine sequencing sample type.') if fast5_data is None: return seqSampleType(RNA_SAMP_TYPE, True) if is_sample_rna( reads_index, fast5_fns, num_reads) else seqSampleType( DNA_SAMP_TYPE, False) return seqSampleType(RNA_SAMP_TYPE, True) if is_read_rna( fast5_data) else seqSampleType(DNA_SAMP_TYPE, False)
############################ ###### Lock Functions ###### ############################ def get_lock_fn(fast5s_dir): """Get filename for the lock file to indicate that this directory is currently being processed. This file should be saved to be deleted later. """ # if directory comes with trailing slash, remove for processing if fast5s_dir.endswith('/'): fast5s_dir = fast5s_dir[:-1] split_dir = os.path.split(fast5s_dir) return os.path.join(split_dir[0], "." + split_dir[1] + '.tombo.lock') def _is_lock_file(lock_fn): lock_bn = os.path.split(lock_fn)[1] if (len(lock_bn) == 0 or not lock_bn.startswith('.') or not lock_bn.endswith('.tombo.lock')): return False return True ##################################### ###### FAST5 Parsing Functions ###### ##################################### def reads_contain_basecalls(fast5_fns, bc_grp, num_reads): test_fns = random.sample( fast5_fns, num_reads) if len(fast5_fns) > num_reads else fast5_fns for fast5_fn in test_fns: try: with h5py.File(fast5_fn, 'r') as fast5_data: fast5_data['/Analyses/' + bc_grp] except: continue # if the basecall group is accessible for a single file return true return True # else if all tested reads did not contain the basecall group return False return False def get_files_list(fast5s_dir): """Get all fast5 files recursively below this directory """ all_fast5s = [] # walk through directory structure searching for fast5 files for root, _, fns in os.walk(fast5s_dir): for fn in fns: if not fn.endswith('.fast5'): continue all_fast5s.append(os.path.join(root, fn)) return all_fast5s def clear_tombo_locks(lock_fns): """Clear all lock files """ for lock_fn in lock_fns: # safegaurd against incorrect file passed to this function if not _is_lock_file(lock_fn): continue # try to remove lock files so that is some have been corrupted or # otherwise cause an error at least all those accessible will be cleared try: os.remove(lock_fn) except: pass return def get_files_list_and_lock_dirs(fast5s_dir, ignore_locks): """Get all fast5 files recursively below this directory and add a Tombo lock file to indicate that this directory is currently being re-squiggled """ ignore_locks_mess = ( 'This set of reads is currently being processed by another ' + 'resquiggle command. Multiple resquiggle commands cannot be ' + 'run concurrently on a set of reads to avoid corrupting ' + 'read files. If you are sure this set of reads is not being ' + 'processed by another command (usually caused by previous ' + 'unexpected exit) set the --ignore-read-locks flag.') all_fast5s = [] lock_fns = [] try: # walk through directory structure searching for fast5 files for root, _, fns in os.walk(fast5s_dir): lock_fn = get_lock_fn(root) if not ignore_locks and os.path.exists(lock_fn): clear_tombo_locks(lock_fns) error_message_and_exit(ignore_locks_mess) lock_fns.append(lock_fn) # create empty file indicating this directory is locked open(lock_fn, 'w').close() for fn in fns: if not fn.endswith('.fast5'): continue all_fast5s.append(os.path.join(root, fn)) except: clear_tombo_locks(lock_fns) error_message_and_exit( 'Unexpected error during file enumeration. Check that you have ' + 'write permission within the specified [fast5_basedir].') return all_fast5s, lock_fns
[docs]def get_raw_read_slot(fast5_data): """Get the raw read slot from this FAST5 read file Args: fast5_data (`h5py.File`): open FAST5 read file object Example:: all_aw_signal = get_raw_read_slot(fast5_data)['Signal'][:] Returns: The HDF5 group slot containing the raw signal data. """ try: raw_read_slot = next(iter(fast5_data['/Raw/Reads'].values())) except KeyError: raise TomboError( 'Raw data is not found in /Raw/Reads/Read_[read#]. Note that ' + 'Tombo does not support multi-fast5 format.') return raw_read_slot
[docs]class TomboReads(object): """A set of reads with associated meta-data from re-squiggle processing .. automethod:: __init__ """ def _get_index_fn(self, fast5s_dir): """Get the filename for the requested directory and corrected group """ # if directory comes with trailing slash, remove for processing if fast5s_dir.endswith('/'): fast5s_dir = fast5s_dir[:-1] split_dir = os.path.split(fast5s_dir) return os.path.join(split_dir[0], "." + split_dir[1] + "." + self.corr_grp + '.tombo.index') # index building and writing class functions def _prep_for_writing(self, fast5s_dirs): assert len(fast5s_dirs) == 1, ( 'Must provide only a single FAST5 base directory when ' + 'openning for writing.') fast5s_dir = fast5s_dirs[0] fast5s_dir = (fast5s_dir if fast5s_dir.endswith('/') else fast5s_dir + '/') index_fn = self._get_index_fn(fast5s_dir) self.fast5s_dirs[fast5s_dir] = index_fn if os.path.exists(index_fn): os.remove(index_fn) # open default dict to fill with readData lists by (chrm, strand) self.reads_index = defaultdict(list) return
[docs] def add_read_data(self, chrm, strand, read_data): """Add read data to the index Args: chrm (str): chromosome name strand (str): strand ('+' or '-') read_data (:class:`tombo.tombo_helper.readData`): read information """ self.reads_index[(chrm, strand)].append(read_data) return
[docs] def replace_index(self, new_reads_index): """Replace current reads index Args: new_reads_index (dict): dictionary with (chrm, strand) pointing to lists of :class:`tombo.tombo_helper.readData` objects """ if sum(len(x) for x in new_reads_index.values()) == 0: raise TomboError('Cannot replace with an empty index.') self.reads_index = new_reads_index return
[docs] def write_index_file(self): """Write index file Note: Must be an index from only a single ``fast5s_dir`` Index filename will be: `.[fast5s_dir].[corr_grp].tombo.index` Note that this is a hidden file (starts with a ".") """ status_message('Saving Tombo reads index to file.') assert len(self.fast5s_dirs) == 1, ( 'Cannot write index for TomboReads created from more than ' + 'one base directory.') basedir, index_fn = next(iter(self.fast5s_dirs.items())) try: import cPickle as pickle except: import pickle index_data = defaultdict(list) for chrm_strand, cs_reads in self: for rd in cs_reads: # clip the basedir off the FAST5 filename in case later # functions are called from another relative path and # split corr_grp and bc_subgrp for easier filtering index_data[chrm_strand].append(( re.sub(basedir, '', rd.fn, 1), rd.start, rd.end, rd.read_start_rel_to_raw, rd.corr_group.split('/')[0], rd.corr_group.split('/')[-1], rd.filtered, rd.rna, rd.sig_match_score, rd.mean_q_score, rd.read_id)) with io.open(index_fn, 'wb') as index_fp: # note protocol 2 for py2/3 compatibility pickle.dump(dict(index_data), index_fp, protocol=2) return
# Index parsing class functions def _parse_fast5s_wo_index(self, wo_index_fast5s_dirs): """Parse re-squiggled reads data from a list of fast5 directories """ def get_read_data(read_fn, fast5_data, bc_subgrp): read_id = get_raw_read_slot(fast5_data).attrs.get('read_id') corr_data = fast5_data[ '/'.join(('/Analyses', self.corr_grp, bc_subgrp))] rna = corr_data.attrs.get('rna') rna = False if rna is None else rna align_data = dict(corr_data['Alignment'].attrs.items()) read_start_rel_to_raw = corr_data['Events'].attrs.get( 'read_start_rel_to_raw') chrm = align_data['mapped_chrom'] strand = align_data['mapped_strand'] try: chrm = chrm.decode() strand = strand.decode() except: pass return chrm, strand, readData( align_data['mapped_start'], align_data['mapped_end'], False, read_start_rel_to_raw, strand, read_fn, self.corr_grp + '/' + bc_subgrp, rna, read_id=read_id) files = [fn for fast5s_dir in wo_index_fast5s_dirs for fn in get_files_list(fast5s_dir)] dir_reads_index = defaultdict(list) for read_fn in files: try: with h5py.File(read_fn, 'r') as fast5_data: i_bc_subgrps = ( fast5_data['/Analyses/' + self.corr_grp].keys() if self.bc_subgrps is None else self.bc_subgrps) for bc_subgrp in i_bc_subgrps: chrm, strand, r_data = get_read_data( read_fn, fast5_data, bc_subgrp) dir_reads_index[(chrm, strand)].append(r_data) except: # ignore errors and process all reads that don't error continue return dict(dir_reads_index) def _load_index_data(self, fast5s_dir): try: import cPickle as pickle except: import pickle with io.open(self.fast5s_dirs[fast5s_dir], 'rb') as index_fp: raw_index_data = pickle.load(index_fp) # determine the index type used. Index information was added around # version 1.3 making the index data 2 elements longer. # so check which one is used here. try: num_index_vals = len(next(iter(raw_index_data.values()))[0]) except StopIteration: raise TomboError('Tombo index file appears to be empty') if num_index_vals == 8: def convert_r_data(from_base_fn, start, end, rsrtr, c_grp, s_grp, filtered, rna): return readData(start, end, filtered, rsrtr, strand, os.path.join(fast5s_dir, from_base_fn), self.corr_grp + '/' + s_grp, rna) elif num_index_vals == 10: def convert_r_data( from_base_fn, start, end, rsrtr, c_grp, s_grp, filtered, rna, sig_match_score, mean_q_score): return readData(start, end, filtered, rsrtr, strand, os.path.join(fast5s_dir, from_base_fn), self.corr_grp + '/' + s_grp, rna, sig_match_score, mean_q_score) elif num_index_vals == 11: def convert_r_data( from_base_fn, start, end, rsrtr, c_grp, s_grp, filtered, rna, sig_match_score, mean_q_score, read_id): return readData(start, end, filtered, rsrtr, strand, os.path.join(fast5s_dir, from_base_fn), self.corr_grp + '/' + s_grp, rna, sig_match_score, mean_q_score, read_id) else: raise TomboError('Invalid Tombo index file.') dir_reads_index = {} for (chrm, strand), cs_raw_data in raw_index_data.items(): cs_data = [convert_r_data(*r_data) for r_data in cs_raw_data] # don't add chrm/strand if all reads are filtered if len(cs_data) > 0: dir_reads_index[(chrm, strand)] = cs_data return dir_reads_index def _parse_fast5s_w_index(self, fast5s_dir): """Use index file to parse information about a set of reads """ try: curr_dir_reads_index = self._load_index_data(fast5s_dir) except UnicodeDecodeError: warning_message( 'Invalid Tombo index file.\n\t\tThis occurs most often ' + 'when the re-squiggle command was completed using a Tombo ' + 'build against a different python version (2 or 3).') raise TomboError if not self.remove_filtered and self.bc_subgrps is None: return curr_dir_reads_index filt_dir_reads_index = {} for (chrm, strand), cs_raw_data in curr_dir_reads_index.items(): cs_data = [ rd for rd in cs_raw_data if rd.corr_group.split('/')[0] == self.corr_grp and (self.bc_subgrps is None or rd.corr_group.split('/')[-1] in self.bc_subgrps) and (not self.remove_filtered or not rd.filtered)] # don't add chrm/strand if all reads are filtered if len(cs_data) > 0: filt_dir_reads_index[(chrm, strand)] = cs_data return filt_dir_reads_index def _merge_dir_indices(self, w_index_ri, wo_index_ri): """Merge coverage from serveral parsed sets of data """ all_raw_reads_index = w_index_ri + [wo_index_ri,] reads_index = defaultdict(list) for chrm_strand in set([cs for d_ri in all_raw_reads_index for cs in d_ri]): for dir_reads_index in all_raw_reads_index: if chrm_strand not in dir_reads_index: continue reads_index[chrm_strand].extend(dir_reads_index[chrm_strand]) return dict(reads_index) def _parse_fast5s(self, fast5s_dirs): wo_index_dirs = [] w_index_covs = [] warn_index = False # determine if index exists for each directory and load appropriately for fast5s_dir in fast5s_dirs: fast5s_dir = (fast5s_dir if fast5s_dir.endswith('/') else fast5s_dir + '/') self.fast5s_dirs[fast5s_dir] = self._get_index_fn(fast5s_dir) if os.path.exists(self.fast5s_dirs[fast5s_dir]): try: w_index_covs.append(self._parse_fast5s_w_index(fast5s_dir)) except TomboError: warning_message( 'Failed to parse tombo index file for ' + fast5s_dir + ' directory. Creating temporary index from ' + 'FAST5 files.') wo_index_dirs.append(fast5s_dir) else: if not warn_index: warning_message( 'Tombo index file does not exist for one or more ' + 'directories.\n\t\tIf --skip-index was not set for ' + 're-squiggle command, ensure that the specified ' + 'directory is the same as for the re-squiggle command.') warn_index = True wo_index_dirs.append(fast5s_dir) wo_index_cov = self._parse_fast5s_wo_index(wo_index_dirs) self.reads_index = self._merge_dir_indices(w_index_covs, wo_index_cov) return
[docs] def __init__(self, fast5s_basedirs, corrected_group='RawGenomeCorrected_000', basecall_subgroups=None, for_writing=False, remove_filtered=True, sample_name=None): """Parse data from a list of re-squiggle fast5 directories Args: fast5s_basedirs (list): fast5 base directories, which have been processed by ``tombo resquiggle`` corrected_group (str): Analysis slot containing the re-squiggle information (optional; default: 'RawGenomeCorrected_000') basecall_subgroups (list): basecall subgroups (optional; default: Process all basecall subgroups) for_writing (bool): open TomboReads to write index (optional; default: Open for reading and parse re-squiggled reads) remove_filtered (bool): remove filtered reads as indicated by index file (optional; default: True) sample_name (str): for verbose output """ if VERBOSE and not for_writing: status_mess = ('Parsing Tombo index file(s).' if sample_name is None else 'Parsing ' + sample_name + ' Tombo index file(s).') status_message(status_mess) assert isinstance(fast5s_basedirs, list), ( 'fast5s_basedirs must be a list.') self.fast5s_dirs = {} self.corr_grp = corrected_group self.bc_subgrps = basecall_subgroups self.sample_name = sample_name self.remove_filtered = remove_filtered self.for_writing = for_writing self.coverage = None if self.for_writing: self._prep_for_writing(fast5s_basedirs) else: self._parse_fast5s(fast5s_basedirs) return
def _compute_coverage(self): if VERBOSE: status_message('Calculating read coverage.') self.coverage = {} for chrm_strand, cs_reads in self: if len(cs_reads) == 0: continue cs_coverage = np.zeros(max(r_data.end for r_data in cs_reads), dtype=np.int64) for r_data in cs_reads: cs_coverage[r_data.start:r_data.end] += 1 self.coverage[chrm_strand] = cs_coverage return self def _add_coverages(self, ctrl_reads_index): merged_cov = {} # if self or control reads don't have coverage, compute it if self.coverage is None: self._compute_coverage() if ctrl_reads_index.coverage is None: ctrl_reads_index._compute_coverage() for chrm_strand, ctrl_cs_cov in ctrl_reads_index.coverage.items(): if chrm_strand in self.coverage: self_cs_cov = self.coverage[chrm_strand] # copy longer array and add shorter array coverage if self_cs_cov.shape[0] > ctrl_cs_cov.shape[0]: merged_cs_cov = self_cs_cov.copy() merged_cs_cov[:ctrl_cs_cov.shape[0]] += ctrl_cs_cov else: merged_cs_cov = ctrl_cs_cov.copy() merged_cs_cov[:self_cs_cov.shape[0]] += self_cs_cov else: merged_cs_cov = ctrl_cs_cov.copy() merged_cov[chrm_strand] = merged_cs_cov return merged_cov
[docs] def iter_coverage_regions(self, ctrl_reads_index=None): """Get genome coverage for a set of reads Args: ctrl_reads_index (:class:`tombo.tombo_helper.TomboReads`): a second set of tombo reads to add for coverage Returns: Yeilds (chrm, strand, cs_cov, cs_cov_starts) indicating read coverage levels cs_cov and cs_cov_starts are numpy arrays containing the coverage level and the 0-based genomic start positions of those intervals """ if VERBOSE: status_message('Calculating read coverage regions.') if self.coverage is None: self._compute_coverage() coverage = (self.coverage if ctrl_reads_index is None else self._add_coverages(ctrl_reads_index)) for (chrm, strand), cs_cov in coverage.items(): cs_cov_starts = np.concatenate([ [0,], np.where(np.diff(cs_cov))[0] + 1, [cs_cov.shape[0],]]) cs_cov = cs_cov[cs_cov_starts[:-1]] yield chrm, strand, cs_cov, cs_cov_starts return
[docs] def iter_cov_regs( self, cov_thresh, region_size=None, ctrl_reads_index=None): """Iterate over regions with coverage greater than or equal to cov_thresh. If region_size is provided, regions are rounded to the nearest region_sized windows and only the region start is yielded (e.g. chrm, strand, start). If not provided (chrm, strand, start, end) is yielded. """ def round_reg_start(x): return int(region_size * np.floor(x / float(region_size))) def round_reg_end(x): return int(region_size * np.ceil(x / float(region_size))) for chrm, strand, cov, starts in self.iter_coverage_regions( ctrl_reads_index): curr_reg_start = -1 valid_cov = np.where(np.diff(np.concatenate([ [False,], np.greater_equal(cov, cov_thresh), [False,]])))[0] for cov_start_i, cov_end_i in zip(valid_cov[:-1], valid_cov[1:]): cov_start, cov_end = starts[cov_start_i], starts[cov_end_i] if region_size is None: yield chrm, strand, cov_start, cov_end continue for reg_start in range(round_reg_start(cov_start), round_reg_end(cov_end), region_size): if reg_start != curr_reg_start: yield chrm, strand, reg_start curr_reg_start = reg_start return
[docs] def get_all_cs(self): """Get list of all (chromosome, strand) stored in index. """ return list(self.reads_index.keys())
[docs] def is_empty(self): """Are any reads stored in the index? """ for cs_reads in self.reads_index.values(): if len(cs_reads) > 0: return False return True
def __contains__(self, chrm_strand): return chrm_strand in self.reads_index def __iter__(self): self._iter = iter(self.reads_index.items()) return self def __next__(self): return next(self._iter) # for python2 compatibility
[docs] def next(self): return self.__next__()
[docs] def iter_reads(self): """Iterate over reads stored in the index """ for _, cs_reads in self: for rd in cs_reads: yield rd return
[docs] def get_cs_reads(self, chrm, strand, invalid_return=[]): """Extract the list of reads stored in a specified chromosome and strand Args: chrm (str): chromosome name strand (str): strand ('+' or '-') invalid_return: value to return for invalid (chrm, strand) """ if (chrm, strand) not in self.reads_index: return invalid_return return self.reads_index[(chrm, strand)]
def _get_strand_spec_cov(self, chrm, pos, strand, invalid_return): if (chrm, strand) not in self.coverage: return invalid_return if pos >= self.coverage[(chrm, strand)].shape[0]: return invalid_return return self.coverage[(chrm, strand)][pos]
[docs] def get_coverage(self, chrm, pos, strand=None, invalid_return=0): """Get coverage at specified position Args: chrm (str): chromosome name pos (int): 0-based genomic position strand (str): interval strand ('+', '-' or None). Default: None (max over both strands) invalid__return: return value for invalid (bad chrm, strand or pos beyond coverage) position. Default: 0 """ if self.coverage is None: self._compute_coverage() if strand is None: try: return max( self._get_strand_spec_cov(chrm, pos, '+', invalid_return), self._get_strand_spec_cov(chrm, pos, '-', invalid_return)) # with None invalid_return value could be max over 2 None's except TypeError: return invalid_return return self._get_strand_spec_cov(chrm, pos, strand, invalid_return)
[docs] def get_cs_coverage(self, chrm, strand, invalid_return=None): """Extract coverage levels over a specified chromosome and strand Args: chrm (str): chromosome name strand (str): strand ('+' or '-') invalid_return: value to return for invalid (chrm, strand) Returns: numpy array (numpy.int64) containing read coverage levels """ if self.coverage is None: self._compute_coverage() if (chrm, strand) not in self.coverage: return invald_return return self.coverage[(chrm, strand)]
[docs] def iter_cs_coverage(self): """Iterate over coverage levels across all stored (chrm, strands) """ if self.coverage is None: self._compute_coverage() return self.coverage.items()
########################################### ###### Events Table Access Functions ###### ###########################################
[docs]def get_multiple_slots_read_centric(r_data, slot_names, corr_grp=None): """Extract multiple read-centric slot_names from this read's Events table Args: fast5_data (:class:`tombo.tombo_helper.readData` or an open ``h5py.File`` object) slot_name (str): slot from which to extract data (valid values: norm_mean, norm_stdev, start, length, base) corr_grp (str): corrected group slot from which to extract data (default: use value from ``fast5_data`` object; required for ``h5py.File``) Returns: A tuple of numpy arrays specified by the ``slot_names`` """ try: do_close = False if not isinstance(r_data, h5py.File): do_close = True corr_grp = r_data.corr_group r_data = h5py.File(r_data.fn, 'r') event_slot_name = '/'.join(('/Analyses', corr_grp, 'Events')) # note that it's more efficient to try to access the slot # and except the error that check if the slot exists first r_event_data = r_data[event_slot_name][:] if do_close: r_data.close() except: # probably truncated file or events don't exist return [None,] * len(slot_names) return [r_event_data[slot_name] for slot_name in slot_names]
[docs]def get_single_slot_read_centric(r_data, slot_name, corr_grp=None): """Extract read-centric slot_name from this read's Events table Args: fast5_data (:class:`tombo.tombo_helper.readData` or an open ``h5py.File`` object) slot_name (str): slot from which to extract data (valid values: norm_mean, norm_stdev, start, length, base) corr_grp (str): corrected group slot from which to extract data (default: use value from ``fast5_data`` object; required for ``h5py.File``) """ try: # if r_data is an open h5py object then don't open the filename do_close = False if not isinstance(r_data, h5py.File): do_close = True corr_grp = r_data.corr_group r_data = h5py.File(r_data.fn, 'r') # note that it's more efficient to try to access the slot # and except the error that check if the slot exists first r_slot_values = r_data['/'.join(('/Analyses', corr_grp, 'Events'))][ slot_name] if do_close: r_data.close() except: # probably truncated file or events don't exist return None return r_slot_values
def get_single_slot_genome_centric(r_data, slot_name): """Extract genome-centric slot_name from this read's Events table """ r_slot_values = get_single_slot_read_centric(r_data, slot_name) if r_slot_values is None: return None if r_data.strand == '-': r_slot_values = r_slot_values[::-1] return r_slot_values def get_mean_slot_genome_centric(cs_reads, chrm_len, slot_name): """Get the mean over all reads at each covered genomic location for this slots value """ base_sums = np.zeros(chrm_len) base_cov = np.zeros(chrm_len, dtype=np.int64) for r_data in cs_reads: # extract read means data so data across all chrms is not # in RAM at one time g_slot_values = get_single_slot_genome_centric(r_data, slot_name) if g_slot_values is None: continue base_sums[r_data.start: r_data.start + len(g_slot_values)] += g_slot_values base_cov[r_data.start:r_data.start + len(g_slot_values)] += 1 return base_sums / base_cov def iter_mean_slot_values( reads_index, chrm_sizes, slot_name, ctrl_reads_index=None): """Iterate through chromosomes and strands yielding mean slots values over all reads at each covered genomic location. Generator returns chrmosome, strand, cs_mean_values, cs_ctrl_mean_values tuples (4 return values). """ # ignore divide by zero errors that occur where there is no # coverage. Need to correct nan values after subtracting two sets of # coverage so leave as nan for now old_err_settings = np.seterr(all='ignore') for chrm, strand in [(c, s) for c in sorted(chrm_sizes) for s in ('+', '-')]: if ctrl_reads_index is None: if (chrm, strand) not in reads_index: continue cs_mean_values = get_mean_slot_genome_centric( reads_index.get_cs_reads(chrm, strand), chrm_sizes[chrm], slot_name) yield chrm, strand, cs_mean_values, None else: cs_mean_values, ctrl_cs_mean_values = None, None if (chrm, strand) in reads_index: cs_mean_values = get_mean_slot_genome_centric( reads_index.get_cs_reads(chrm, strand), chrm_sizes[chrm], slot_name) if (chrm, strand) in ctrl_reads_index: ctrl_cs_mean_values = get_mean_slot_genome_centric( ctrl_reads_index.get_cs_reads(chrm, strand), chrm_sizes[chrm], slot_name) if cs_mean_values is None and ctrl_cs_mean_values is None: continue yield chrm, strand, cs_mean_values, ctrl_cs_mean_values _ = np.seterr(**old_err_settings) return def get_largest_signal_differences( reads_index, ctrl_reads_index, num_regions, num_bases): chrm_sizes = get_chrm_sizes(reads_index, ctrl_reads_index) all_largest_diff_poss = [] for chrm, strand, cs_sig_means, ctrl_cs_sig_means in iter_mean_slot_values( reads_index, chrm_sizes, 'norm_mean', ctrl_reads_index): if cs_sig_means is None or ctrl_cs_sig_means is None: continue chrm_diffs = np.nan_to_num(np.abs(cs_sig_means - ctrl_cs_sig_means)) chrm_max_diff_regs = np.argsort(chrm_diffs)[::-1][:num_regions] all_largest_diff_poss.extend(( chrm_diffs[pos], max(pos - int(num_bases / 2.0), 0), chrm, strand) for pos in chrm_max_diff_regs) return sorted(all_largest_diff_poss, reverse=True)[:num_regions] def get_signal_differences(reads_index, ctrl_reads_index): """Helper function to compute all signal differences """ chrm_sizes = get_chrm_sizes(reads_index, ctrl_reads_index) all_diffs = {} for chrm, strand, cs_sig_means, ctrl_cs_sig_means in iter_mean_slot_values( reads_index, chrm_sizes, 'norm_mean', ctrl_reads_index): if cs_sig_means is None or ctrl_cs_sig_means is None: continue all_diffs[(chrm, strand)] = np.nan_to_num( cs_sig_means - ctrl_cs_sig_means) return all_diffs #################################### ###### Genomic Interval Class ###### ####################################
[docs]class intervalData(object): """Genome/transcriptome interval information Example:: int_data = tombo_helper.intervalData( chrm='chr20', start=10000, end=10100, strand='+') Note: All intervalData functions return the object in order to allow chaining of interval commands. (e.g. ``int_data.add_reads(reads_index).add_seq()``) .. automethod:: __init__ """ def __setattr__(self, name, value): if name == 'chrm': if not isinstance(value, unicode): raise TypeError(name + ' must be a string') elif name in ('start', 'end'): if not (isinstance(value, int) or isinstance(value, np.integer)): raise TypeError(name + ' must be an int') elif name == 'strand': if value not in (None, '+', '-'): raise TypeError('strand must be either None, "+", or "-"') elif name in ('reg_id', 'reg_text', 'seq'): if value is not None and not isinstance(value, unicode): raise TypeError(name + ' must be a string') elif name == 'reads': if value is not None and not isinstance(value, list): raise TypeError('reads must be a list') else: raise ValueError(name + ' is an invalid attribute for intervalData') super(intervalData, self).__setattr__(name, value)
[docs] def __init__(self, chrm, start, end, strand=None, reg_id=None, reg_text='', reads=None, seq=None): """Initialize a genome/transcriptome interval object Args: chrm (str): chromosome name. start (int): 0-based start position. end (int): 1-based (or open interval) end position. strand (str): interval strand ('+', '-' or None). Default: None reg_id (str): used to keep track of multiple intervals. Default: None reg_text (str): region description text. Default: '' reads (list): list of readData values overlapping region. Default: None seq (str): region genomic sequence. Default: None """ self.chrm = unicode(chrm) self.start = start self.end = end self.strand = unicode(strand) if strand is not None else None self.reg_id = unicode(reg_id) if reg_id is not None else None self.reg_text = unicode(reg_text) self.reads = reads self.seq = seq
[docs] def update(self, **kwargs): """Update slots specified in keyword arguments (kwargs) """ for k, v in kwargs.items(): self.__setattr__(k, v) # return self to allow chaining and auto-return return self
def __repr__(self): # convert reads and seq if they are too long self_vars = vars(self).copy() if self_vars['reads'] is not None: self_vars['reads'] = '<{:d} reads>'.format(len(self_vars['reads'])) if self_vars['seq'] is not None and len(self_vars['seq']) > 50: self_vars['seq'] = '<{:d} bases of sequence>'.format( len(self_vars['seq'])) self_vars['reg_text'] = '"' + self_vars['reg_text'] + '"' return '<tombo.tombo_helper.intervalData object> :\n' + '\n'.join( "{:>15} : {}".format(k, str(v)) for k, v in self_vars.items())
[docs] def copy(self, include_reads=True): """Create a copy of this interval. Useful when adding sets of reads from multiple samples to compare. Args: include_reads (bool): include current reads slot in the new object (default: True) """ cp_reads = self.reads if include_reads else None return type(self)(self.chrm, self.start, self.end, self.strand, self.reg_id, self.reg_text, cp_reads, self.seq)
[docs] def merge(self, other_reg): """Create a copy of this interval with the reads from this interval and `other_reg` Args: other_reg (:class:`tombo.tombo_helper.intervalData`): a second region to merge with this interval Note: Aside from reads, all other attributes will be taken from this interval """ merged_reg_data = self.copy() self_reads = self.reads if self.reads is not None else [] other_reads = other_reg.reads if other_reg.reads is not None else [] return merged_reg_data.update(reads=self_reads + other_reads)
[docs] def expand_interval(self, expand_width): """Expand this interval by the specified amount (effects only the start and stop attributes; NOT seq or reads) Args: expand_width (int): amount by which to expand the interval """ self.update(start=self.start - expand_width, end=self.end + expand_width) return self
[docs] def add_reads(self, reads_index, require_full_span=False): """Add reads overlapping this interval Args: reads_index (:class:`tombo.tombo_helper.TomboReads`): reads index require_full_span (bool): require that reads span then entire interval (default: include all reads overlapping the interval) """ def get_c_s_data(strand): if require_full_span: def read_intersects_interval(r_start, r_end): return r_start <= self.start and r_end >= self.end else: def read_intersects_interval(r_start, r_end): return not (r_start >= self.end or r_end <= self.start) # get all reads intersecting the interval return [ r_data for r_data in reads_index.get_cs_reads(self.chrm, strand) if read_intersects_interval(r_data.start, r_data.end)] # get all reads that overlap this interval # note that this includes partial overlaps as these contribute # to coverage and other statistics so can't really restrict to # full coverage as previous versions of code did if self.strand is not None: return self.update(reads=get_c_s_data(self.strand)) # if strand is None, get data from both strands return self.update(reads=get_c_s_data('+') + get_c_s_data('-'))
def _update_seq(self, r_data, reg_base_data): """Update the sequence for the region based on this read """ read_bases = get_single_slot_read_centric(r_data, 'base') if read_bases is None: warning_message( 'Unable to extract data from read. Potentially corrupted file ' + 'or invalid Tombo index file for this directory.') return reg_base_data, max(0, r_data.start - self.start) r_seq = b''.join(read_bases).decode() if r_data.strand == '-': r_seq = rev_comp(r_seq) # if read starts before the interval if r_data.start <= self.start: r_end_overlap = r_data.end - self.start # if read covers the whole interval if r_data.end > self.end: r_end_clip = r_data.end - self.end reg_base_data = r_seq[-r_end_overlap:-r_end_clip] return reg_base_data, len(reg_base_data) # end of read overlaps beginning of interval reg_base_data[:r_end_overlap] = r_seq[-r_end_overlap:] return reg_base_data, r_end_overlap # read doesn't cover the beginning of region if r_data.end > self.end: # beginning of read covers to the end of the region r_begin_overlap = self.end - r_data.start reg_base_data[-r_begin_overlap:] = r_seq[:r_begin_overlap] return reg_base_data, len(reg_base_data) # first read is completely contained in the interval r_len = r_data.end - r_data.start r_int_start = r_data.start - self.start reg_base_data[r_int_start:r_int_start + r_len] = r_seq return reg_base_data, r_int_start + r_len
[docs] def add_seq(self, genome_index=None, error_end=True): """Extract the forward strand genomic sequence for an interval from reads or genome_index if provided Args: genome_index (:class:`tombo.tombo_helper.Fasta`) Tombo FASTA sequence object """ if genome_index is not None: return self.update(seq=genome_index.get_seq( self.chrm, self.start, self.end, error_end=error_end)) # handle case where no read overlaps whole region # let each read contibute its sequence and fill the rest # with dashes reg_base_data = ['-'] * (self.end - self.start) if self.reads is None or len(self.reads) == 0: return self.update(seq=''.join(reg_base_data)) # get region sequence by moving through reads that # cover the region, but only extract seqeunce from the # (close to) minimal reads s_reg_reads = sorted(self.reads, key=lambda r: (r.start, r.end)) # begin filling sequence with first (by start pos) read reg_base_data, curr_cov_pos = self._update_seq( s_reg_reads.pop(0), reg_base_data) # if there was only one read return now if len(s_reg_reads) == 0 or curr_cov_pos >= self.end - self.start: return self.update(seq=''.join(reg_base_data)) # get next read (by start pos) curr_read = s_reg_reads.pop(0) for next_read in s_reg_reads: # once the next read start passes the region covered thus far # add the sequence for the saved curr_read to the reg sequence if next_read.start >= curr_cov_pos: # add read with curr longest end position to the region seq reg_base_data, curr_cov_pos = self._update_seq( curr_read, reg_base_data) curr_read = next_read # if the whole interval is covered return the sequence if curr_cov_pos >= self.end - self.start: return self.update(seq=''.join(reg_base_data)) continue if next_read.end > curr_read.end: curr_read = next_read reg_base_data, _ = self._update_seq(curr_read, reg_base_data) return self.update(seq=''.join(reg_base_data))
[docs] def get_base_levels(self, read_rows=False, num_reads=None): """Extract base levels from this interval. Args: read_rows (bool): return array with reads as rows (default: row columns) num_reads (int): maximal number of reads to output. randomly downsample if more reads are present (default: output all reads) Return: `np.array` containing read mean levels with rows corresponding to interval position and columns to individual reads (or reverse if `read_rows`) """ def get_read_reg_events(r_data): r_means = get_single_slot_genome_centric(r_data, 'norm_mean') if r_means is None: return None if r_data.start > self.start and r_data.end < self.end: # handle reads that are contained in a region # create region with nan values r_reg_means = np.full(self.end - self.start, np.NAN) r_reg_means[r_data.start - self.start: r_data.end - self.start] = r_means elif r_data.start > self.start: # handle reads that start in middle of region start_overlap = self.end - r_data.start # create region with nan values r_reg_means = np.full(self.end - self.start, np.NAN) r_reg_means[-start_overlap:] = r_means[:start_overlap] elif r_data.end < self.end: # handle reads that end inside region end_overlap = r_data.end - self.start # create region with nan values r_reg_means = np.full(self.end - self.start, np.NAN) r_reg_means[:end_overlap] = r_means[-end_overlap:] else: r_reg_means = r_means[ self.start - r_data.start:self.end - r_data.start] return r_reg_means if self.reads is None or len(self.reads) == 0: raise TomboError( 'Must annotate region with reads ' + '(see `TomboInterval.add_reads`) to extract base levels.') if num_reads is not None: np.random.shuffle(self.reads) reg_events = [] for r_data in self.reads: if self.strand is not None and r_data.strand != self.strand: continue r_means = get_read_reg_events(r_data) if r_means is None: continue reg_events.append(r_means) if num_reads is not None and len(reg_events) >= num_reads: break if read_rows: return np.row_stack(reg_events) return np.column_stack(reg_events)
def filter_empty_regions(plot_intervals): num_filt = sum(len(p_int.reads) == 0 for p_int in plot_intervals) plot_intervals = [p_int for p_int in plot_intervals if len(p_int.reads) > 0] if len(plot_intervals) == 0: error_message_and_exit('No reads in any selected regions.') if VERBOSE and num_filt > 0: warning_message('Some selected regions contain no reads.') return plot_intervals def get_unique_intervals(plot_intervals, covered_poss=None, num_regions=None): # unique genomic regions filter uniq_p_intervals = [] used_intervals = defaultdict(set) for reg_data in plot_intervals: # could have significant region immediately next to # beginning/end of reads interval_poss = list(range(reg_data.start, reg_data.end)) if reg_data.start not in used_intervals[( reg_data.chrm, reg_data.strand)] and ( covered_poss is None or all( pos in covered_poss[(reg_data.chrm, reg_data.strand)] for pos in interval_poss)): uniq_p_intervals.append(reg_data) used_intervals[(reg_data.chrm, reg_data.strand)].update( interval_poss) if num_regions is not None and len(uniq_p_intervals) >= num_regions: break return uniq_p_intervals ########################################### ###### Special Data Access Functions ###### ########################################### def get_channel_info(fast5_data): """Get channel information for a read """ try: fast5_attrs = fast5_data['UniqueGlobalKey/channel_id'].attrs except KeyError: raise TomboError("No channel_id group in HDF5 file. " + "Probably mux scan HDF5 file.") try: channel_info = channelInfo( fast5_attrs.get('offset'), fast5_attrs.get('range'), fast5_attrs.get('digitisation'), fast5_attrs.get('channel_number'), fast5_attrs.get('sampling_rate').astype(np.int64)) except KeyError: raise TomboError("Channel info parameters not available.") return channel_info def get_raw_signal(r_data, int_start, int_end): """Extract raw signal from where this read overlaps a particular genomic region """ with h5py.File(r_data.fn, 'r') as fast5_data: # retrieve shift and scale computed in correction script corr_subgrp = fast5_data['/Analyses/' + r_data.corr_group] event_starts = corr_subgrp['Events']['start'] events_end = event_starts[-1] + corr_subgrp['Events']['length'][-1] segs = np.concatenate([event_starts, [events_end,]]) scale_values = scaleValues( corr_subgrp.attrs.get('shift'), corr_subgrp.attrs.get('scale'), corr_subgrp.attrs.get('lower_lim'), corr_subgrp.attrs.get('upper_lim'), corr_subgrp.attrs.get('outlier_threshold')) all_sig = get_raw_read_slot(fast5_data)['Signal'][:] rsrtr = r_data.read_start_rel_to_raw if r_data.rna: # reverse raw signal for RNA all_sig = all_sig[::-1] if r_data.strand == "-": segs = (segs[::-1] * -1) + segs[-1] if int_start < r_data.start: # handle reads that start in the middle of the interval start_offset = r_data.start - int_start overlap_seg_data = segs[:int_end - r_data.start + 1] else: start_offset = 0 skipped_bases = int_start - r_data.start overlap_seg_data = segs[ skipped_bases:int_end - r_data.start + 1] # trim and flip raw signal (perform normalization outside of this # function in order to avoid circular import between tombo_helper # and tombo_stats) num_reg_obs = overlap_seg_data[-1] - overlap_seg_data[0] if r_data.strand == "+": reg_start_rel_raw = rsrtr + overlap_seg_data[0] r_sig = all_sig[reg_start_rel_raw:reg_start_rel_raw + num_reg_obs] else: reg_start_rel_raw = rsrtr + segs[-1] - overlap_seg_data[-1] r_sig = all_sig[reg_start_rel_raw:reg_start_rel_raw + num_reg_obs] r_sig = r_sig[::-1] return r_sig, overlap_seg_data, start_offset, scale_values def parse_read_correction_data(r_data): """Parse correction data from an event resquiggled read """ try: with h5py.File(r_data.fn, 'r') as fast5_data: corr_grp = fast5_data['/Analyses/' + r_data.corr_group] events_grp = corr_grp['Events'] event_starts = events_grp['start'] events_end = event_starts[-1] + events_grp['length'][-1] new_segs = np.concatenate([event_starts, [events_end,]]) raw_grp = get_raw_read_slot(fast5_data) read_id = raw_grp.attrs.get('read_id') try: read_id = read_id.decode() except (AttributeError, TypeError): pass signal_data = raw_grp['Signal'][:] raw_offset = events_grp.attrs.get('read_start_rel_to_raw') scale_values = scaleValues(*[ corr_grp.attrs.get(attr_name) for attr_name in ( 'shift', 'scale', 'lower_lim', 'upper_lim', 'outlier_threshold')]) old_segs = corr_grp['Alignment/read_segments'][:] old_align_vals = list(map( lambda x: x.decode(), corr_grp['Alignment/read_alignment'][:])) new_align_vals = list(map( lambda x: x.decode(), corr_grp['Alignment/genome_alignment'][:])) except: return None if r_data.rna: signal_data = signal_data[::-1] return (read_id, signal_data, raw_offset, scale_values, old_segs, old_align_vals, new_align_vals, events_end, new_segs) def get_all_read_data(r_data): """Extract most relevant read data from this read """ try: with h5py.File(r_data.fn, 'r') as fast5_data: # note that it's more efficient to try to access the slot # and except the error that check if the slot exists first corr_subgrp = fast5_data['/Analyses/' + r_data.corr_group] algn_subgrp = dict(corr_subgrp['Alignment'].attrs.items()) event_data = corr_subgrp['Events'][:] r_attrs = dict(corr_subgrp.attrs.items()) all_sig = get_raw_read_slot(fast5_data)['Signal'][:] except: # probably truncated file or Events slot doesn't exist return None if r_data.rna: all_sig = all_sig[::-1] r_means = event_data['norm_mean'] r_seq = b''.join(event_data['base']).decode() events_end = event_data[-1]['start'] + event_data[-1]['length'] segs = np.concatenate([event_data['start'], [events_end,]]).astype(np.int64) return (r_means, r_seq, all_sig, segs, r_data.read_start_rel_to_raw, r_attrs.get('norm_type'), r_attrs.get('outlier_threshold'), genomeLocation( algn_subgrp['mapped_start'], algn_subgrp['mapped_strand'], algn_subgrp['mapped_chrom'])) def get_reads_events(cs_reads): """Extract read base levels split by genomic position Args: cs_reads (list): a list of reads from a single (chromosome, strand) Returns: A dictionary with 0-based genomic positions pointing to read signal levels """ # note that this function assumes that all reads come from the same # chromosome and strand cs_base_means = [] cs_read_start_ends = [] for r_data in cs_reads: # extract read means data so data across all chrms is not # in RAM at one time read_means = get_single_slot_genome_centric(r_data, 'norm_mean') if read_means is None: continue if read_means.shape[0] != r_data.end - r_data.start: continue assert read_means.shape[0] == r_data.end - r_data.start, ( 'Read found with mismatching mapping location and ' + 'signal information.') cs_base_means.append(read_means) cs_read_start_ends.append((r_data.start, r_data.end)) if len(cs_base_means) == 0: return None chrm_signal = np.concatenate(cs_base_means) chrm_pos = np.concatenate([np.arange(start, end) for start, end in cs_read_start_ends]) # get order of all bases from position array as_chrm_pos = np.argsort(chrm_pos) # then sort the signal array by genomic position and # split into event means by base split_poss = np.where( np.concatenate([[0,], np.diff( chrm_pos[as_chrm_pos])]) > 0)[0] cs_base_events = dict(zip( np.unique(chrm_pos[as_chrm_pos]), np.split(chrm_signal[as_chrm_pos], split_poss))) return cs_base_events ################################### ###### FAST5 Write Functions ###### ################################### def prep_fast5(fast5_fn, corr_grp, overwrite, in_place, bc_grp=None, return_fp=False): """Prepare a read for re-squiggle processing (This deletes old re-squiggle info for this read) """ def try_close_prep_err(fast5_data, err_str): try: fast5_data.close() except: pass # is_tombo_error = True return err_str, fast5_fn, True # several checks to prepare the FAST5 file for correction before # processing to save compute if not in_place: return ('Not currently implementing new hdf5 file writing', fast5_fn, True) # check that the file is writeable before trying to correct if not os.access(fast5_fn, os.W_OK): return 'FAST5 file is not writable', fast5_fn, True try: # create group to store data fast5_data = h5py.File(fast5_fn, 'r+') try: analyses_grp = fast5_data['/Analyses'] except: return try_close_prep_err( fast5_data, 'Base calls not found in FAST5 (see `tombo preprocess`)') try: # check that the requested basecalls group exsists if bc_grp is not None: analyses_grp[bc_grp] except: return try_close_prep_err( fast5_data, 'Base calls not found in FAST5 (see `tombo preprocess`)') try: corr_grp_ptr = analyses_grp[corr_grp] if not overwrite: return try_close_prep_err( fast5_data, "Tombo data exists in [--corrected-group] " + "and [--overwrite] is not set") del analyses_grp[corr_grp] except: # if the corr_grp isn't there we will write it now, but # it's more efficient to try than to check if the slot is there pass corr_grp = analyses_grp.create_group(corr_grp) corr_grp.attrs['tombo_version'] = TOMBO_VERSION corr_grp.attrs['basecall_group'] = bc_grp except: return 'Error opening or writing to fast5 file', fast5_fn, True if return_fp: return fast5_data try: fast5_data.close() except: return 'Error closing fast5 file', fast5_fn, True return def write_error_status(fn, corr_grp, bc_subgrp, error_text): """Write error message for a read into the FAST5 file """ with h5py.File(fn, 'r+') as fast5_data: analysis_grp = fast5_data['/Analyses'] corr_grp = analysis_grp[corr_grp] if bc_subgrp is not None: # add subgroup matching subgroup from original basecalls corr_subgrp = corr_grp.create_group(bc_subgrp) corr_subgrp.attrs['status'] = error_text else: corr_grp.attrs['status'] = error_text return def write_new_fast5_group( fast5_data, corr_grp_slot, rsqgl_res, norm_type, compute_sd, alignVals=None, old_segs=None, rna=False): """Write new fast5 group with re-squiggle data """ try: # compute event data before accessing fast5 file if compute_sd: norm_means, norm_stds = c_new_mean_stds( rsqgl_res.raw_signal, rsqgl_res.segs) else: norm_means = c_new_means(rsqgl_res.raw_signal, rsqgl_res.segs) norm_stds = repeat(np.NAN) # had to shift to names formats numpy array specification due to # python2 numpy unicode issues. See discussion here: # https://github.com/numpy/numpy/issues/2407 event_data = np.array( list(zip(norm_means, norm_stds, rsqgl_res.segs[:-1], np.diff(rsqgl_res.segs), list(rsqgl_res.genome_seq))), dtype=[(str('norm_mean'), 'f8'), (str('norm_stdev'), 'f8'), (str('start'), 'u4'), (str('length'), 'u4'), (str('base'), 'S1')]) if alignVals is not None: r_align_vals, g_align_vals = zip(*alignVals) np_read_align = np.chararray(len(alignVals)) np_read_align[:] = r_align_vals np_genome_align = np.chararray(len(alignVals)) np_genome_align[:] = g_align_vals except: raise TomboError('Error computing new events') do_close = False if not isinstance(fast5_data, h5py.File): try: fast5_data = h5py.File(fast5_data, 'r+') do_close = True except: raise TomboError( 'Error opening file for new group writing. This should ' + 'have been caught during the alignment phase. Check that ' + 'there are no other tombo processes or processes ' + 'accessing these HDF5 files running simultaneously.') try: analysis_grp = fast5_data['/Analyses'] corr_grp = analysis_grp[corr_grp_slot] # add subgroup matching subgroup from original basecalls corr_subgrp = corr_grp.create_group(rsqgl_res.align_info.Subgroup) corr_subgrp.attrs['status'] = 'success' # TODO change to rev_sig corr_subgrp.attrs['rna'] = rna if rsqgl_res.sig_match_score is not None: corr_subgrp.attrs[ 'signal_match_score'] = rsqgl_res.sig_match_score corr_subgrp.attrs['shift'] = rsqgl_res.scale_values.shift corr_subgrp.attrs['scale'] = rsqgl_res.scale_values.scale corr_subgrp.attrs['norm_type'] = norm_type if rsqgl_res.scale_values.lower_lim is not None: corr_subgrp.attrs[ 'lower_lim'] = rsqgl_res.scale_values.lower_lim if rsqgl_res.scale_values.upper_lim is not None: corr_subgrp.attrs[ 'upper_lim'] = rsqgl_res.scale_values.upper_lim if rsqgl_res.scale_values.outlier_thresh is not None: corr_subgrp.attrs[ 'outlier_threshold'] = rsqgl_res.scale_values.outlier_thresh # store alignment statistics corr_alignment = corr_subgrp.create_group('Alignment') corr_alignment.attrs['mapped_start'] = rsqgl_res.genome_loc.Start corr_alignment.attrs[ 'mapped_end'] = rsqgl_res.genome_loc.Start + len(rsqgl_res.segs) - 1 corr_alignment.attrs[ 'mapped_strand'] = rsqgl_res.genome_loc.Strand corr_alignment.attrs['mapped_chrom'] = rsqgl_res.genome_loc.Chrom if rsqgl_res.align_info is not None: corr_alignment.attrs[ 'clipped_bases_start'] = rsqgl_res.align_info.ClipStart corr_alignment.attrs[ 'clipped_bases_end'] = rsqgl_res.align_info.ClipEnd corr_alignment.attrs[ 'num_insertions'] = rsqgl_res.align_info.Insertions corr_alignment.attrs[ 'num_deletions'] = rsqgl_res.align_info.Deletions corr_alignment.attrs[ 'num_matches'] = rsqgl_res.align_info.Matches corr_alignment.attrs[ 'num_mismatches'] = rsqgl_res.align_info.Mismatches if alignVals is not None: corr_alignment.create_dataset( 'read_alignment', data=np_read_align, compression="gzip") corr_alignment.create_dataset( 'genome_alignment', data=np_genome_align, compression="gzip") if old_segs is not None: # store old segmentation in order to plot "correction process" corr_alignment.create_dataset( 'read_segments', data=old_segs, compression="gzip") # Add Events to data frame with event means, SDs and lengths corr_events = corr_subgrp.create_dataset( 'Events', data=event_data, compression="gzip") corr_events.attrs[ 'read_start_rel_to_raw'] = rsqgl_res.read_start_rel_to_raw except: raise TomboError( 'Error writing resquiggle information back into fast5 file.') if do_close: try: fast5_data.close() except: raise TomboError( 'Error closing fast5 file after writing resquiggle information.') return if __name__ == '__main__': sys.stderr.write('This is a module. See commands with `tombo -h`') sys.exit(1)