Source code for tombo.tombo_stats

from __future__ import division, unicode_literals, absolute_import

from builtins import int, range, dict, map, zip, object

import os
import io
import re
import sys
import queue
import random
import pkg_resources

# Future warning from cython in h5py
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import h5py

import numpy as np
np.seterr(all='raise')

from tqdm import tqdm
from time import sleep
from copy import deepcopy
from operator import itemgetter
from scipy import stats, optimize
from collections import defaultdict
from scipy.spatial.distance import pdist
from multiprocessing import Process, Queue, Pipe
from numpy.lib.recfunctions import append_fields
from scipy.cluster.hierarchy import single, leaves_list
from itertools import repeat, product, count, combinations

if sys.version_info[0] > 2:
    unicode = str

# import tombo functions
from . import tombo_helper as th

from ._c_helper import (
    c_mean_std, c_apply_outlier_thresh, c_new_means, c_calc_llh_ratio,
    c_calc_llh_ratio_const_var, c_calc_scaled_llh_ratio_const_var,
    c_new_mean_stds, c_compute_running_pctl_diffs, c_compute_slopes)

from ._default_parameters import (
    SMALLEST_PVAL, STANDARD_MODELS, ALTERNATE_MODELS,
    MIN_KMER_OBS_TO_EST, ALT_EST_BATCH, MAX_KMER_OBS, NUM_DENS_POINTS,
    LLR_THRESH, SAMP_COMP_THRESH, DE_NOVO_THRESH, KERNEL_DENSITY_RANGE,
    ROC_PLOT_POINTS, NANOPOLISH_CENTRAL_POS, NUM_READS_FOR_SCALE,
    ROBUST_QUANTS, MAX_POINTS_FOR_THEIL_SEN, NUM_READS_TO_ADJUST_MODEL,
    OCLLHR_SCALE, OCLLHR_HEIGHT, OCLLHR_POWER, FM_OFFSET_DEFAULT,
    MOST_SIGNIF_NUM_BATCHES_DEFAULT, DNA_SAMP_TYPE, RNA_SAMP_TYPE,
    MEAN_PRIOR_CONST, SD_PRIOR_CONST, ALGN_PARAMS_TABLE, SEG_PARAMS_TABLE,
    MIN_EVENT_TO_SEQ_RATIO, COLLAPSE_RNA_STALLS, STALL_PARAMS,
    OUTLIER_THRESH,
    RNA_SCALE_NUM_EVENTS, RNA_SCALE_MAX_FRAC_EVENTS, USE_RNA_EVENT_SCALE)
DEFAULT_STALL_PARAMS=th.stallParams(**STALL_PARAMS)

# list of classes/functions to include in API
__all__ = [
    'TomboStats', 'ModelStats', 'LevelStats', 'PerReadStats',
    'TomboModel', 'AltModel', 'normalize_raw_signal', 'compute_base_means',
    'get_read_seg_score', 'calc_kmer_fitted_shift_scale',
    'load_resquiggle_parameters', 'compute_num_events']


VERBOSE = True

_PROFILE_SIGNIF = False
_PROFILE_SIGNIF_STATS_OUT = False
_PROFILE_SIGNIF_PER_READ = False
_PROFILE_EST_REF = False
_PROFILE_CENTER_REF = False
_PROFILE_ALT_EST = False
_PROFILE_MOTIF_ALT_EST = False

_DEBUG_EST_STD = False
_DEBUG_EST_BW = 0.05
_DEBUG_EST_NUM_KMER_SAVE = 500

STAT_BLOCKS_QUEUE_LIMIT = 5

DNA_BASES = ['A','C','G','T']

HALF_NORM_EXPECTED_VAL = stats.halfnorm.expect()
LOG10_E = np.log10(np.exp(1))

STANDARD_MODEL_NAME = 'standard'

SAMP_COMP_TXT = 'sample_compare'
DE_NOVO_TXT = 'de_novo'
ALT_MODEL_TXT = 'model_compare'
PER_READ_STATS = (SAMP_COMP_TXT, DE_NOVO_TXT, ALT_MODEL_TXT)

# TODO only ks-test currently implemented
KS_TEST_TXT = 'ks_test'
U_TEST_TXT = 'u_test'
T_TEST_TXT = 't_test'
KS_STAT_TEST_TXT = 'ks_stat_test'
U_STAT_TEST_TXT = 'u_stat_test'
T_STAT_TEST_TXT = 't_stat_test'
LEVEL_STATS_TXTS = (
    KS_TEST_TXT, U_TEST_TXT, T_TEST_TXT,
    KS_STAT_TEST_TXT, U_STAT_TEST_TXT, T_STAT_TEST_TXT)

ALT_MODEL_SEP_CHAR = '_'

NORM_TYPES = ('none', 'pA', 'pA_raw', 'median', 'robust_median',
              'median_const_scale')

# options specifying testing methods
# assume constant SD in model to save on computation
CONST_SD_MODEL = True

STAT_BLOCKS_H5_NAME = 'Statistic_Blocks'
MOST_SIGNIF_H5_NAME = 'Most_Significant_Stats'
COV_DAMP_COUNTS_H5_NAME = 'Cov_Damp_Counts'
COV_THRESH_H5_NAME = 'Cov_Threshold'

# turned off by default (and not accessible via command line so
# hardcoded for now)
DEFAULT_TRIM_RNA_PARAMS = th.trimRnaParams(
    moving_window_size=50, min_running_values=100,
    thresh_scale=0.7, max_raw_obs=40000)


#############################################
##### Pair-wise Distance and Clustering #####
#############################################

def transform_and_trim_stats(reg_stats, are_pvals, trim_value):
    if are_pvals:
        with np.errstate(divide='ignore'):
            reg_stats = -np.log10(reg_stats)
        nan_r_stats = np.nan_to_num(reg_stats)
        reg_stats[nan_r_stats > trim_value] = trim_value
    else:
        nan_r_stats = np.nan_to_num(reg_stats)
        reg_stats[nan_r_stats > trim_value] = trim_value
        reg_stats[nan_r_stats < -trim_value] = -trim_value
    return reg_stats

def order_reads(reg_stats):
    """Compute order of reads based on log p-values or -log likelihood ratios
    """
    if reg_stats.shape[0] == 1:
        return [0]
    # get pairwise distances between reads
    # will get some empty slice means warnings, so suppress those
    #   (no seterr for this specific warning)
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=RuntimeWarning)
        r_dists = pdist(reg_stats, lambda u, v:
                        np.nanmean(np.sqrt(((u-v)**2))))
    r_dists[np.isnan(r_dists)] = np.nanmax(r_dists) + 1
    # then perform single/min linkage clustering and return the leaf order
    return leaves_list(single(r_dists))

def sliding_window_dist(sig_diffs1, sig_diffs2, slide_span, num_bases):
    """Compute distance over the minimum over a sliding window
    """
    return np.sqrt(min(np.sum(np.square(
        sig_diffs1[i1:i1+num_bases] - sig_diffs2[i2:i2+num_bases]))
                       for i1 in range((slide_span * 2) + 1)
                       for i2 in range((slide_span * 2) + 1)))

def euclidian_dist(sig_diffs1, sig_diffs2):
    """Compute Euclidean distance
    """
    return np.sqrt(np.sum(np.square(sig_diffs1 - sig_diffs2)))

def get_pairwise_dists(reg_sig_diffs, index_q, dists_q, slide_span=None):
    """Compute pairwise distances between a set of signal shifts
    """
    if slide_span > 0:
        num_bases=reg_sig_diffs[0].shape[0] - (slide_span * 2)
    while True:
        try:
            index = index_q.get(block=False)
        except queue.Empty:
            break

        if slide_span > 0:
            row_dists = np.array(
                [sliding_window_dist(
                    reg_sig_diffs[index], reg_sig_diffs[j],
                    slide_span, num_bases)
                 for j in range(0,index+1)] +
                [0 for _ in range(index+1, len(reg_sig_diffs))])
        else:
            row_dists = np.array(
                [euclidian_dist(reg_sig_diffs[index], reg_sig_diffs[j])
                 for j in range(0,index+1)] +
                [0 for _ in range(index+1, len(reg_sig_diffs))])
        dists_q.put((index, row_dists))

    return


##################################
###### Signal Normalization ######
##################################

[docs]def compute_base_means(all_raw_signal, base_starts): """Efficiently compute new base mean values from raw signal and base start positions Args: all_raw_signal (`np.array`): raw nanopore signal obervation values base_starts (`np.array::np.int32`): 0-based base start positions within raw signal Returns: `np.array::np.float64` containing base mean levels """ return c_new_means(all_raw_signal.astype(np.float64), base_starts)
def get_scale_values_from_events( all_raw_signal, valid_cpts, outlier_thresh, num_events=None, max_frac_events=None): if num_events is not None or max_frac_events is not None: if (num_events is None or valid_cpts.shape[0] * max_frac_events < num_events): num_events = int(valid_cpts.shape[0] * max_frac_events) valid_cpts = valid_cpts[:num_events] event_means = compute_base_means(all_raw_signal, valid_cpts) read_med = np.median(event_means) read_mad = np.median(np.abs(event_means - read_med)) lower_lim = -outlier_thresh upper_lim = outlier_thresh return th.scaleValues( shift=read_med, scale=read_mad, lower_lim=lower_lim, upper_lim=upper_lim, outlier_thresh=None) def trim_rna( all_raw_signal, rsqgl_params, trim_rna_params=DEFAULT_TRIM_RNA_PARAMS): all_raw_signal = all_raw_signal[:trim_rna_params.max_raw_obs] num_events = np.int64( all_raw_signal.shape[0] // rsqgl_params.mean_obs_per_event) # get stdev over delta-mean events valid_cpts = th.valid_cpts_w_cap( all_raw_signal.astype(np.float64), rsqgl_params.min_obs_per_base, rsqgl_params.running_stat_width, num_events) _, window_sds = c_new_mean_stds( all_raw_signal.astype(np.float64), valid_cpts) # now moving window through this array n_windows = (window_sds.size - trim_rna_params.moving_window_size) + 1 s_bytes = window_sds.strides[0] moving_window_sds = np.lib.stride_tricks.as_strided( window_sds, shape=(n_windows, trim_rna_params.moving_window_size), strides=(s_bytes, s_bytes)).mean(-1) thresh = moving_window_sds.mean() * trim_rna_params.thresh_scale n_windows = (moving_window_sds.size - trim_rna_params.min_running_values) + 1 s_bytes = moving_window_sds.strides[0] running_mins = np.lib.stride_tricks.as_strided( moving_window_sds, shape=(n_windows, trim_rna_params.min_running_values), strides=(s_bytes, s_bytes)).min(-1) try: pos_index = next(i for i, v in enumerate(running_mins) if v > thresh) except StopIteration: return 0 return valid_cpts[pos_index] def identify_stalls(all_raw_signal, stall_params, return_metric=False): """Identify locations where bases have stalled in the pore. Two methods availble depending on parameters specified in stall_params. """ def compute_running_mean_diffs(): """Compute average difference between n_window neighboring window means each of size window_size. """ moving_average = np.cumsum(all_raw_signal) moving_average[stall_params.mini_window_size:] = ( moving_average[stall_params.mini_window_size:] - moving_average[:-stall_params.mini_window_size]) moving_average = moving_average[ stall_params.mini_window_size - 1:] / stall_params.mini_window_size # extract moving window averages at n_window offsets offsets = [moving_average[ int(stall_params.mini_window_size * offset): int(-stall_params.mini_window_size * ( stall_params.n_windows - offset - 1))] for offset in range(stall_params.n_windows - 1)] + [ moving_average[int(stall_params.mini_window_size * ( stall_params.n_windows - 1)):],] # compute difference between all pairwise offset diffs = [np.abs(offsets[i] - offsets[j]) for i in range(stall_params.n_windows) for j in range(i + 1, stall_params.n_windows)] # compute average over offset differences at each valid position diff_sums = diffs[0].copy() for diff_i in diffs: diff_sums += diff_i return diff_sums / len(diffs) # if the raw signal is too short to compute stall metrics if all_raw_signal.shape[0] < stall_params.window_size: if return_metric: return [], np.repeat(np.NAN, all_raw_signal.shape[0]) return [] # identify potentially stalled signal from either running window means # or running percentile difference methods stall_metric = np.empty(all_raw_signal.shape, all_raw_signal.dtype) stall_metric[:] = np.NAN start_offset = int(stall_params.window_size * 0.5) end_offset = (all_raw_signal.shape[0] - stall_params.window_size + start_offset + 1) if (stall_params.lower_pctl is not None and stall_params.upper_pctl is not None): stall_metric[start_offset:end_offset] = c_compute_running_pctl_diffs( all_raw_signal, np.int64(stall_params.window_size), np.float64(stall_params.lower_pctl), np.float64(stall_params.upper_pctl)) elif (stall_params.n_windows is not None and stall_params.mini_window_size is not None): assert (stall_params.window_size == stall_params.mini_window_size * stall_params.n_windows) stall_metric[start_offset:end_offset] = compute_running_mean_diffs() else: th.TomboError( 'Must provide method specific parameters for stall detection') # identify contiguous windows over threshold for minimal stretches with np.errstate(invalid='ignore'): stall_locs = np.where(np.diff(np.concatenate( [[False], stall_metric <= stall_params.threshold])))[0] if stall_metric[-1] <= stall_params.threshold: stall_locs = np.concatenate([stall_locs, [stall_metric.shape[0]]]) stall_locs = stall_locs.reshape(-1,2) stall_locs = stall_locs[(np.diff(stall_locs) > stall_params.min_consecutive_obs).flatten()] if stall_locs.shape[0] == 0: if return_metric: return [], stall_metric return [] # expand windows out to region that gave result below threshold # since windows are centered (minus edge buffer) expand_width = (stall_params.window_size // 2) - stall_params.edge_buffer if expand_width > 0: stall_locs[:,0] -= expand_width stall_locs[:,1] += expand_width # collapse intervals that now overlap merged_stall_locs = [] prev_int = stall_locs[0] for curr_int in stall_locs: if curr_int[0] > prev_int[1]: # add previous interval to all intervals merged_stall_locs.append(prev_int) prev_int = curr_int else: # extend previous interval since these overlap prev_int[1] = curr_int[1] merged_stall_locs.append(prev_int) stall_locs = merged_stall_locs if return_metric: return stall_locs, stall_metric return stall_locs
[docs]def calc_kmer_fitted_shift_scale( prev_shift, prev_scale, r_event_means, r_model_means, r_model_inv_vars=None, method='theil_sen'): """Use robust Theil-Sen estimator to compute fitted shift and scale parameters based on read sequence Args: prev_shift (float): previous shift parameter prev_scale (float): previous scale parameter r_ref_means (`np.array::np.float64`): expected base signal levels r_ref_sds (`np.array::np.float64`): expected base signal level sds r_model_inv_vars (`np.array::np.float64`): expected base signal level inverse variances for method of moments (`mom`) computation method (str): one of `theil_sen`, `robust`, or `mom` Returns: Sequence-fitted scaling parameters 1) shift parameter (float) 2) scale parameter (float) 3) shift correction factor; multiply by ``prev_scale`` and add to ``prev_shift`` to get ``shift`` (float) 4) scale correction factor; multiply by ``prev_scale`` to get ``scale`` (float) """ if method == 'robust': def read_lad_objective(x): return np.sum(np.abs(((r_event_means - x[0]) / x[1]) - r_model_means)) shift_corr_factor, scale_corr_factor = optimize.minimize( read_lad_objective, np.array([0,1]), method='nelder-mead', options={'xtol': 1e-8}).x elif method == 'theil_sen': def compute_slopes(r_event_means, r_model_means): # despite computing each diff twice this vectorized solution is # about 10X faster than a list comprehension approach delta_event = r_event_means[:, np.newaxis] - r_event_means delta_model = r_model_means[:, np.newaxis] - r_model_means return delta_model[delta_event > 0] / delta_event[delta_event > 0] n_points = r_model_means.shape[0] # potentially sample points for long reads (>1kb) if r_model_means.shape[0] > MAX_POINTS_FOR_THEIL_SEN: n_points = MAX_POINTS_FOR_THEIL_SEN samp_ind = np.random.choice( r_model_means.shape[0], n_points, replace=False) r_model_means = r_model_means[samp_ind] r_event_means = r_event_means[samp_ind] # compute Theil-Sen slope estimator slope = np.median(c_compute_slopes(r_event_means, r_model_means)) inter = np.median(r_model_means - (slope * r_event_means)) if slope == 0: raise th.TomboError('Read failed sequence-based signal ' + 're-scaling parameter estimation.') # convert to shift and scale parameters (e.g. (obs - shift) / scale) scale_corr_factor = 1 / slope shift_corr_factor = -inter / slope elif method == 'mom': model_mean_var = r_model_means * r_model_inv_vars # prep kmer model coefficient matrix for the k-mers from this read model_mean_var_sum = model_mean_var.sum() coef_mat = np.array(( (r_model_inv_vars.sum(), model_mean_var_sum), (model_mean_var_sum, (model_mean_var * r_model_means).sum()))) # prep dependent values from this reads true events r_event_var = r_event_means * r_model_inv_vars r_event_var_mean = r_event_var * r_model_means dep_vect = np.array((r_event_var.sum(), r_event_var_mean.sum())) shift_corr_factor, scale_corr_factor = np.linalg.solve( coef_mat, dep_vect) else: th.error_message_and_exit( 'Invalid k-mer fitted normalization parameter method: ' + method + '\n\t\tValid methods are "robust" and "mom".') # apply shift and scale values fitted from kmer conditional model shift = prev_shift + (shift_corr_factor * prev_scale) scale = prev_scale * scale_corr_factor return shift, scale, shift_corr_factor, scale_corr_factor
def estimate_global_scale(fast5_fns, num_reads=NUM_READS_FOR_SCALE): if VERBOSE: th.status_message('Estimating global scale parameter.') np.random.shuffle(fast5_fns) read_mads = [] if VERBOSE: bar = tqdm(total=num_reads, desc='Total reads processed', smoothing=0) for fast5_fn in fast5_fns: try: with h5py.File(fast5_fn, 'r') as fast5_data: all_sig = th.get_raw_read_slot(fast5_data)['Signal'][:] shift = np.median(all_sig) read_mads.append(np.median(np.abs(all_sig - shift))) if VERBOSE: bar.update(1) except: continue if len(read_mads) >= num_reads: break if VERBOSE: bar.close() if len(read_mads) == 0: th.error_message_and_exit( 'No reads contain raw signal for ' + 'global scale parameter estimation.') if len(read_mads) < num_reads: th.warning_message( 'Few reads contain raw signal for global scale parameter ' + 'estimation. Results may not be optimal.') return np.mean(read_mads)
[docs]def normalize_raw_signal( all_raw_signal, read_start_rel_to_raw=0, read_obs_len=None, norm_type='median', outlier_thresh=None, channel_info=None, scale_values=None, event_means=None, model_means=None, model_inv_vars=None, const_scale=None): """Apply scaling and windsorizing parameters to normalize raw signal. Args: all_raw_signal (`np.array`): raw nanopore signal obervation values read_start_rel_to_raw (int): amount of signal to trim from beginning of the signal (default: 0) read_obs_len (int): length of signal to process from `read_start_rel_to_raw` (default: full length) norm_type (str): normalization type (`median` (default), `none`, `pA_raw`, `pA`, `median_const_scale`, `robust_median`; ignored is ``scale_values`` provided) outlier_thresh (float): windsorizing threshold (MAD units; default: None) channel_info (:class:`tombo.tombo_helper.channelInfo`): channel information (optional; only for `pA` and `pA_raw`) scale_values (:class:`tombo.tombo_helper.scaleValues`): scaling values (optional) event_means (`np.array`): for `pA` fitted scaling parameters (optional) model_means (`np.array`): for `pA` fitted scaling parameters (optional) model_inv_vars (`np.array`): for `pA` fitted scaling parameters (optional) const_scale (float): global scale parameter (optional) Returns: Normalized signal and scaling parameters 1) normalized signal observations (`np.array::np.float64`) 2) :class:`tombo.tombo_helper.scaleValues` """ if read_obs_len is None: read_obs_len = all_raw_signal.shape[0] - read_start_rel_to_raw if norm_type not in NORM_TYPES and scale_values is None: raise th.TomboError( 'Normalization type ' + norm_type + ' is not a valid ' + 'option and shift or scale parameters were not provided.') raw_signal = all_raw_signal[read_start_rel_to_raw: read_start_rel_to_raw + read_obs_len] if scale_values is None: if norm_type == 'none': shift, scale = 0, 1 elif norm_type in ('pA_raw', 'pA'): # correct raw signal as described here: # https://community.nanoporetech.com # /posts/squiggle-plot-for-raw-data shift, scale = ( -1 * channel_info.offset, channel_info.digitisation / channel_info.range) if norm_type == 'pA': # perform k-mer model fitted correction as in # nanocorr/nanopolish/albacore(pre-RNN) shift, scale, _, _ = calc_kmer_fitted_shift_scale( shift, scale, event_means, model_means, model_inv_vars, method='mom') elif norm_type == 'median': shift = np.median(raw_signal) scale = np.median(np.abs(raw_signal - shift)) elif norm_type == 'median_const_scale': assert const_scale is not None shift = np.median(raw_signal) scale = const_scale elif norm_type == 'robust_median': shift = np.mean(np.percentile(raw_signal, ROBUST_QUANTS)) scale = np.median(np.abs(raw_signal - shift)) else: shift = scale_values.shift scale = scale_values.scale norm_signal = (raw_signal - shift) / scale # windsorize the raw signal lower_lim, upper_lim = None, None if outlier_thresh is not None or scale_values is not None: if outlier_thresh is not None: read_med = np.median(norm_signal) read_mad = np.median(np.abs(norm_signal - read_med)) lower_lim = read_med - (read_mad * outlier_thresh) upper_lim = read_med + (read_mad * outlier_thresh) else: lower_lim = scale_values.lower_lim upper_lim = scale_values.upper_lim # provided scale values could contain None winsorizing limits still if lower_lim is not None and upper_lim is not None: norm_signal = c_apply_outlier_thresh( norm_signal, lower_lim, upper_lim) return norm_signal, th.scaleValues( shift, scale, lower_lim, upper_lim, outlier_thresh)
############################# ##### Tombo Model Class ##### #############################
[docs]class TomboModel(object): """Load, store and access Tombo model attributes and sequence-based expected mean and standard deviation levels (median normalization only). .. automethod:: __init__ """ def _center_model(self, shift_corr_factor, scale_corr_factor): centered_means = {} for kmer, k_mean in self.means.items(): centered_means[kmer] = ( k_mean * scale_corr_factor) + shift_corr_factor self.means = centered_means return def _make_constant_sd(self): med_sd = np.median(list(self.sds.values())) self.sds = dict((kmer, med_sd) for kmer in self.sds) return
[docs] def write_model(self, ref_fn): """Write TomboModel to specified file Args: ref_fn (str): filename to write TomboModel """ # Explicity use btype string names for py3 compatiblity as well as # pickle-ability of numpy arrays for consistency. See discussion here: # https://github.com/numpy/numpy/issues/2407 ref_for_file = np.array( [(kmer, self.means[kmer], self.sds[kmer]) for kmer in self.means], dtype=[(str('kmer'), 'S' + unicode(self.kmer_width)), (str('mean'), 'f8'), (str('sd'), 'f8')]) with h5py.File(ref_fn, 'w') as ref_fp: ref_fp.create_dataset('model', data=ref_for_file, compression="gzip") ref_fp.attrs['central_pos'] = self.central_pos ref_fp.attrs['model_name'] = STANDARD_MODEL_NAME return
def _parse_tombo_model(self): """Parse a tombo model file """ try: with h5py.File(self.ref_fn, 'r') as ref_fp: ref_raw = ref_fp['model'][:] central_pos = ref_fp.attrs.get('central_pos') model_name = ref_fp.attrs.get('model_name') try: model_name = model_name.decode() except (AttributeError, TypeError): pass except: th.error_message_and_exit( 'Invalid Tombo model file provided: ' + unicode(self.ref_fn)) if model_name != STANDARD_MODEL_NAME: th.error_message_and_exit('Non-standard Tombo model provided.') mean_ref = {} sd_ref = {} for kmer, kmer_mean, kmer_std in ref_raw: kmer = kmer.decode() mean_ref[kmer] = kmer_mean sd_ref[kmer] = kmer_std self.means = mean_ref self.sds = sd_ref self.central_pos = central_pos self.name = model_name return def _parse_text_model(self): """ Parse a text model file (such as those from nanopolish) """ try: mean_ref, sd_ref = {}, {} with io.open(self.ref_fn) as fp: for line in fp: if line.startswith('#'): continue try: kmer, kmer_mean, kmer_sd = line.split()[:3] kmer_mean, kmer_sd = map(float, (kmer_mean, kmer_sd)) except ValueError: # header or other non-kmer field continue mean_ref[kmer] = kmer_mean sd_ref[kmer] = kmer_sd except: th.error_message_and_exit('Invalid text pA model file provided: ' + unicode(self.ref_fn)) self.means = mean_ref self.sds = sd_ref self.central_pos = NANOPOLISH_CENTRAL_POS self.name = STANDARD_MODEL_NAME return def _load_model(self, kmer_ref, central_pos): mean_ref = {} sd_ref = {} for kmer, kmer_mean, kmer_std in kmer_ref: # reference may or may not be stored as a numpy array try: kmer = kmer.decode() except AttributeError: pass mean_ref[kmer] = kmer_mean sd_ref[kmer] = kmer_std self.means = mean_ref self.sds = sd_ref self.central_pos = central_pos self.name = STANDARD_MODEL_NAME return def _add_invvar(self): self.inv_var = {} for kmer, stdev in self.sds.items(): self.inv_var[kmer] = 1 / (stdev * stdev) return def _get_default_standard_ref(self, reads_index): if th.is_sample_rna(reads_index=reads_index): if VERBOSE: th.status_message( 'Loading default canonical ***** RNA ***** model.') std_ref_fn = STANDARD_MODELS[RNA_SAMP_TYPE] self.seq_samp_type = th.seqSampleType(RNA_SAMP_TYPE, True) else: if VERBOSE: th.status_message( 'Loading default canonical ***** DNA ***** model.') self.seq_samp_type = th.seqSampleType(DNA_SAMP_TYPE, False) std_ref_fn = STANDARD_MODELS[DNA_SAMP_TYPE] # get full filename path with setuptools self.ref_fn = th.resolve_path(pkg_resources.resource_filename( 'tombo', 'tombo_models/' + std_ref_fn)) return def _get_default_standard_ref_from_files(self, fast5_fns): if th.is_sample_rna(fast5_fns=fast5_fns): if VERBOSE: th.status_message( 'Loading default canonical ***** RNA ***** model.') std_ref_fn = STANDARD_MODELS[RNA_SAMP_TYPE] self.seq_samp_type = th.seqSampleType(RNA_SAMP_TYPE, True) else: if VERBOSE: th.status_message( 'Loading default canonical ***** DNA ***** model.') std_ref_fn = STANDARD_MODELS[DNA_SAMP_TYPE] self.seq_samp_type = th.seqSampleType(DNA_SAMP_TYPE, False) # get full filename path with setuptools self.ref_fn = th.resolve_path(pkg_resources.resource_filename( 'tombo', 'tombo_models/' + std_ref_fn)) return
[docs] def __init__( self, ref_fn=None, is_text_model=False, kmer_ref=None, central_pos=None, seq_samp_type=None, reads_index=None, fast5_fns=None, minimal_startup=True): """Initialize a Tombo k-mer model object Args: ref_fn (str): tombo model filename is_text_model (bool): `ref_fn` is text (e.g. https://github.com/nanoporetech/kmer_models/blob/master/r9.4_180mv_450bps_6mer/template_median68pA.model) kmer_ref (list): containing 3-tuples 1) k-mer 2) expected level 3) level SD central_pos (int): base within k-mer to assign signal (only applicable when `kmer_ref` is provided) seq_samp_type (:class:`tombo.tombo_helper.seqSampleType`): sequencing sample type (default: None) reads_index (:class:`tombo.tombo_helper.TomboReads`): For determining `seq_samp_type` fast5_fns (list): fast5 read filenames from which to extract read metadata. For determining `seq_samp_type` minimal_startup (bool): don't compute inverse variances (default True) Note: Order of priority for initialization when multiple model specifications are provided: 1) `ref_fn` 2) `kmer_ref` (requires `central_pos`) 3) `seq_samp_type` 4) `reads_index` 5) `fast5_fns` Last 3 options load a default model file included with Tombo. Last 2 determine the sample type from read metadata. """ if ref_fn is not None: self.ref_fn = th.resolve_path(ref_fn) if is_text_model: self._parse_text_model() else: self._parse_tombo_model() self.seq_samp_type = seq_samp_type elif kmer_ref is not None: assert central_pos is not None, ( 'central_pos must be provided is TomboModel is loaded ' + 'with a kmer_ref') self._load_model(kmer_ref, central_pos) self.seq_samp_type = seq_samp_type else: if seq_samp_type is not None: self.seq_samp_type = seq_samp_type self.ref_fn = th.resolve_path(pkg_resources.resource_filename( 'tombo', 'tombo_models/' + STANDARD_MODELS[ seq_samp_type.name])) elif reads_index is not None: self._get_default_standard_ref(reads_index) elif fast5_fns is not None: self._get_default_standard_ref_from_files(fast5_fns) else: th.error_message_and_exit( 'Must provide initialization method for TomboModel.') self._parse_tombo_model() self.kmer_width = len(next(k for k in self.means)) self.inv_var = None if not minimal_startup: self._add_invvar()
[docs] def reverse_sequence_copy(self): """Return a copy of model for processing sequence/signal in reverse (default models are all saved in genome sequence forward (5p to 3p) direction) """ rev_model = deepcopy(self) rev_model.central_pos = self.kmer_width - self.central_pos - 1 rev_model.means = dict((kmer[::-1], kmer_mean) for kmer, kmer_mean in self.means.items()) rev_model.sds = dict((kmer[::-1], kmer_sds) for kmer, kmer_sds in self.sds.items()) if self.inv_var is not None: rev_model.inv_var = dict( (kmer[::-1], kmer_inv_var) for kmer, kmer_inv_var in self.inv_var.items()) return rev_model
[docs] def get_exp_levels_from_seq(self, seq, rev_strand=False): """Compute expected signal levels for a sequence Args: seq (str): genomic seqeunce to be converted to expected signal levels rev_strand (bool): provided sequence is from reverse strand (so flip return order to genome forward direction) Note: Returned expected signal levels will be trimmed compared to the passed sequence based on the `std_ref.kmer_width` and `std_ref.central_pos`. Returns: Expected signal level references 1) ref_means (`np.array::np.float64`) expected signal levels 2) ref_sds (`np.array::np.float64`) expected signal level sds """ seq_kmers = th.get_seq_kmers(seq, self.kmer_width, rev_strand) try: ref_means = np.array([self.means[kmer] for kmer in seq_kmers]) ref_sds = np.array([self.sds[kmer] for kmer in seq_kmers]) except KeyError: th.error_message_and_exit( 'Invalid sequence encountered from genome sequence.') return ref_means, ref_sds
[docs] def get_exp_levels_from_kmers(self, seq_kmers): """Look up expected signal levels for a list of k-mers Args: seq_kmers (list): list of k-mers (as returned from :class:`tombo.tombo_helper.get_seq_kmers`) Returns: Expected signal level references 1) ref_means (`np.array::np.float64`) expected signal levels 2) ref_sds (`np.array::np.float64`) expected signal level sds """ try: ref_means = np.array([self.means[kmer] for kmer in seq_kmers]) ref_sds = np.array([self.sds[kmer] for kmer in seq_kmers]) except KeyError: th.error_message_and_exit( 'Invalid sequence encountered from genome sequence.') return ref_means, ref_sds
[docs] def get_exp_levels_from_seq_with_gaps(self, reg_seq, rev_strand): # loop over regions without valid sequence (non-ACGT) reg_ref_means, reg_ref_sds = ( np.empty(len(reg_seq) - self.kmer_width + 1), np.empty(len(reg_seq) - self.kmer_width + 1)) reg_ref_means[:] = np.NAN reg_ref_sds[:] = np.NAN prev_ibr_end = 0 for inv_base_run_m in th.INVALID_BASE_RUNS.finditer(reg_seq): ibr_start, ibr_end = inv_base_run_m.start(), inv_base_run_m.end() # if valid region is too short continue if ibr_start - prev_ibr_end < self.kmer_width: prev_ibr_end = ibr_end continue subreg_ref_means, subreg_ref_sds = self.get_exp_levels_from_seq( reg_seq[prev_ibr_end:ibr_start]) reg_ref_means[prev_ibr_end: ibr_start - self.kmer_width + 1] = subreg_ref_means reg_ref_sds[prev_ibr_end: ibr_start - self.kmer_width + 1] = subreg_ref_sds prev_ibr_end = ibr_end # if there is valid sequence at the end of a region include it here if prev_ibr_end <= len(reg_seq) - self.kmer_width: subreg_ref_means, subreg_ref_sds = self.get_exp_levels_from_seq( reg_seq[prev_ibr_end:]) reg_ref_means[prev_ibr_end:] = subreg_ref_means reg_ref_sds[prev_ibr_end:] = subreg_ref_sds if rev_strand: reg_ref_means = reg_ref_means[::-1] reg_ref_sds = reg_ref_sds[::-1] return reg_ref_means, reg_ref_sds
[docs]class AltModel(object): """Load, store and access alternate-base Tombo model attributes and sequence-based expected mean and standard deviation levels (median normalization only). .. automethod:: __init__ """
[docs] def write_model(self, ref_fn): """Write AltModel to specified file Args: ref_fn (str): filename to write AltModel """ # Explicity use btype string names for py3 compatiblity as well as # pickle-ability of numpy arrays for consistency. See discussion here: # https://github.com/numpy/numpy/issues/2407 ref_for_file = np.array( [(kmer, pos, self.means[(kmer, pos)], self.sds[(kmer, pos)]) for kmer, pos in self.means], dtype=[(str('kmer'), 'S' + unicode(self.kmer_width)), (str('pos'), 'u4'), (str('mean'), 'f8'), (str('sd'), 'f8')]) with h5py.File(ref_fn, 'w') as ref_fp: ref_fp.create_dataset( 'model', data=ref_for_file, compression="gzip") ref_fp.attrs['central_pos'] = self.central_pos ref_fp.attrs['model_name'] = self.name ref_fp.attrs['alt_base'] = self.alt_base ref_fp.attrs['motif'] = self.motif.raw_motif ref_fp.attrs['mod_pos'] = self.motif.mod_pos return
def _make_constant_sd(self): med_sd = np.median(list(self.sds.values())) self.sds = dict((kmer, med_sd) for kmer in self.sds) return def _parse_alt_model(self): """Parse an alternate-base Tombo model file """ try: with h5py.File(self.ref_fn, 'r') as ref_fp: ref_raw = ref_fp['model'][:] central_pos = ref_fp.attrs.get('central_pos') model_name = ref_fp.attrs.get('model_name') try: model_name = model_name.decode() except (AttributeError, TypeError): pass alt_base = ref_fp.attrs.get('alt_base') try: alt_base = alt_base.decode() except (AttributeError, TypeError): pass raw_motif = ref_fp.attrs.get('motif') try: raw_motif = raw_motif.decode() except (AttributeError, TypeError): pass mod_pos = ref_fp.attrs.get('mod_pos') except: th.error_message_and_exit( 'Invalid alternate-base tombo model file provided: ' + unicode(self.ref_fn)) mean_ref = {} sd_ref = {} for kmer, pos, kmer_mean, kmer_std in ref_raw: kmer = kmer.decode() mean_ref[(kmer, pos)] = kmer_mean sd_ref[(kmer, pos)] = kmer_std self.means = mean_ref self.sds = sd_ref self.central_pos = central_pos self.alt_base = alt_base self.name = model_name self.motif = th.TomboMotif(raw_motif, mod_pos) return def _load_alt_model(self, alt_ref, central_pos, alt_base, name, motif): mean_ref = {} sd_ref = {} for kmer, pos, kmer_mean, kmer_std in alt_ref: # reference may or may not be stored as a numpy array try: kmer = kmer.decode() except AttributeError: pass mean_ref[(kmer, pos)] = kmer_mean sd_ref[(kmer, pos)] = kmer_std self.means = mean_ref self.sds = sd_ref self.central_pos = central_pos self.alt_base = alt_base self.name = name if motif is None: self.motif = th.TomboMotif(self.alt_base, 1) else: assert (isinstance(motif, th.TomboMotif) and motif.mod_pos is not None) self.motif = motif return def _add_invvar(self): self.inv_var = {} for kmer_pos, stdev in self.sds.items(): self.inv_var[kmer_pos] = 1 / (stdev * stdev) return
[docs] def __init__( self, ref_fn=None, kmer_ref=None, central_pos=None, alt_base=None, name=None, motif=None, minimal_startup=True): """Initialize a Tombo k-mer model object Args: ref_fn (str): tombo model filename kmer_ref (list): containing 3-tuples 1) k-mer 2) expected level 3) level SD central_pos (int): base within k-mer to assign signal (only applicable when `kmer_ref` is provided) alt_base (int): "swap" base within k-mer (only applicable when `kmer_ref` is provided) name (str): model name (only applicable when `kmer_ref` is provided) motif (:class:`tombo.tombo_helper.TomboMotif`): model motif (defaults to alt_base motif; only applicable when `kmer_ref` is provided) minimal_startup (bool): don't compute inverse variances (default True) Note: ``kmer_ref``, ``central_pos``, ``alt_base``, ``name`` and ``motif`` are ignored if ``ref_fn`` is provided. """ if ref_fn is not None: self.ref_fn = th.resolve_path(ref_fn) self._parse_alt_model() elif kmer_ref is not None: assert central_pos is not None and alt_base is not None, ( 'central_pos and alt_base must be provided if AltModel is ' + 'loaded with a kmer_ref') self._load_alt_model(kmer_ref, central_pos, alt_base, name, motif) else: th.error_message_and_exit( 'Must provide initialization method for AltModel.') self.kmer_width = len(next(kmer for kmer, pos in self.means)) self.inv_var = None if not minimal_startup: self._add_invvar()
[docs] def get_exp_level(self, kmer, pos): try: return self.means[(kmer, pos)] except KeyError: return np.NAN
[docs] def get_exp_sd(self, kmer, pos): try: return self.sds[(kmer, pos)] except KeyError: return np.NAN
[docs] def get_exp_levels_from_kmers(self, seq_kmers, rev_strand=False): """Look up expected alternate-base signal levels across a central base. Alternative base to test should be last base in the first k-mer and first base in the last k-mer Args: seq_kmers (list): list of k-mers the same length as the k-mer Returns: Expected signal level references 1) ref_means (`np.array::np.float64`) expected signal levels 2) ref_sds (`np.array::np.float64`) expected signal level sds """ pos_range = (range(self.kmer_width) if rev_strand else range(self.kmer_width - 1, -1, -1)) try: ref_means = np.array([ self.get_exp_level(kmer, pos) for kmer, pos in zip( seq_kmers, pos_range)]) ref_sds = np.array([ self.get_exp_sd(kmer, pos) for kmer, pos in zip( seq_kmers, pos_range)]) except KeyError: th.error_message_and_exit( 'Invalid sequence encountered from genome sequence.') return ref_means, ref_sds
############################ ##### Model Estimation ##### ############################ def check_valid_alt_models(alt_refs, std_ref): """Parse several alternative tombo model files """ for alt_ref in alt_refs.values(): if (std_ref.central_pos != alt_ref.central_pos or std_ref.kmer_width != alt_ref.kmer_width): th.warning_message( 'Standard and ' + alt_ref.ref_fn + ' alternative base ' + 'models must be estimated using the same k-mer positions.') continue if not isinstance(alt_ref, AltModel): th.warning_message( 'Alternative model ' + alt_ref.ref_fn + ' appears to be a ' + 'standard model and will not be processed.') continue return alt_refs def _print_alt_models(): alt_model_types = [tuple(mod_name.split(ALT_MODEL_SEP_CHAR)) for mod_name in ALTERNATE_MODELS.keys()] alt_seq_samps = ['',] + sorted(set(list(zip(*alt_model_types))[0])) alt_mods = list(set(list(zip(*alt_model_types))[1])) row_format ="{:<10}" * (len(alt_seq_samps)) + '\n' sys.stderr.write(row_format.format(*alt_seq_samps)) for alt_mod in alt_mods: has_mod = [alt_mod,] for seq_samp in alt_seq_samps[1:]: has_mod.append(' X' if (seq_samp, alt_mod) in alt_model_types else '') sys.stderr.write(row_format.format(*has_mod)) return def load_default_alt_ref(alt_name, seq_samp_type): try: alt_model_fn = ALTERNATE_MODELS[ seq_samp_type.name + ALT_MODEL_SEP_CHAR + alt_name] except KeyError: alt_model_fn = None if alt_model_fn is not None: # get full filename path with setuptools alt_model_fn = pkg_resources.resource_filename( 'tombo', 'tombo_models/' + alt_model_fn) if alt_model_fn is None or not os.path.isfile(alt_model_fn): th.warning_message( 'Tombo default model for ' + alt_name + ' in ' + seq_samp_type.name + ' does not exists.') return None return AltModel(ref_fn=alt_model_fn) def load_alt_refs(alt_model_fns, alt_names, reads_index, std_ref, seq_samp_type=None): alt_refs = {} if alt_model_fns is not None: # load alternative models from filenames for alt_model_fn in alt_model_fns: alt_ref = AltModel(alt_model_fn) if alt_ref.name in alt_refs: th.warning_message( alt_ref.name + ' alternative model found in more than ' + 'one model file. Ignoring: ' + alt_model_fn) continue alt_refs[alt_ref.name] = alt_ref else: # load alternative models from internal defaults if seq_samp_type is None: seq_samp_type = th.get_seq_sample_type(reads_index=reads_index) for alt_name in alt_names: alt_ref = load_default_alt_ref(alt_name, seq_samp_type) if alt_ref is None: continue alt_refs[alt_name] = alt_ref check_valid_alt_models(alt_refs, std_ref) return alt_refs def load_valid_models( tb_model_fn, plot_default_stnd, alt_model_fn, plot_default_alt, reads_index, ctrl_fast5s_dirs=None): # if no model was requested if (tb_model_fn is None and not plot_default_stnd and alt_model_fn is None and not plot_default_alt): return None, None std_ref = TomboModel(ref_fn=tb_model_fn, reads_index=reads_index) if alt_model_fn is not None: alt_ref = AltModel(ref_fn=alt_model_fn) elif plot_default_alt is not None: seq_samp_type = std_ref.seq_samp_type if seq_samp_type is None: seq_samp_type = th.get_seq_sample_type(reads_index=reads_index) alt_ref = load_default_alt_ref(plot_default_alt, seq_samp_type) else: alt_ref = None if ctrl_fast5s_dirs is not None and tb_model_fn is not None: th.warning_message( 'Both a second set of FAST5s and a tombo model were ' + 'provided. Two samples with model plotting is not ' + 'currently available. Models requested will be ignored.') return std_ref, alt_ref def calc_med_sd(vals): """Helper function to compute median and standard deviation from a numpy array """ return np.median(vals), np.std(vals) def get_region_kmer_levels( reg_data, cov_thresh, upstrm_bases, dnstrm_bases, cs_cov_thresh, est_mean, region_size, motif=None, valid_poss=None): """Compute mean or median and standard deviation for each k-mer """ if cs_cov_thresh is not None: # sample reads until requested mean depth of coverage is achieved cs_num_bases_thresh = region_size * cs_cov_thresh np.random.shuffle(reg_data.reads) cumm_num_bases = np.cumsum([ max(r_data.end, reg_data.end) - min(r_data.start, reg_data.start) for r_data in reg_data.reads]) try: cs_num_reads = next((i for i, v in enumerate(cumm_num_bases) if v >= cs_num_bases_thresh)) reg_data.update(reads=reg_data.reads[:cs_num_reads]) except StopIteration: # if threshold is not met use all reads from region pass # TODO convert this to the intervalData method get_base_levels function # involves a bit of logical refactoring below (which should be much simpler) base_events = th.get_reads_events(reg_data.reads) if len(base_events) == 0: return # get intervals within the region where coverage is high enough # for model estimation reg_cov = np.array([ len(base_events[pos]) if pos in base_events else 0 for pos in range(reg_data.start, reg_data.end)]) cov_intervals = np.where(np.diff(np.concatenate( [[False], reg_cov > cov_thresh])))[0] if reg_cov[-1] > cov_thresh: # if valid coverage goes until end of coverage, add the last # interval endpoint cov_intervals = np.concatenate([cov_intervals, [region_size]]) if cov_intervals.shape[0] <= 1: return cov_intervals = cov_intervals.reshape(-1,2) kmer_width = upstrm_bases + dnstrm_bases + 1 if motif is None: reg_kmer_levels = dict( (''.join(kmer), []) for kmer in product(DNA_BASES, repeat=kmer_width)) else: reg_kmer_levels = dict( ((''.join(kmer), i_offset - 1), []) for kmer in product(DNA_BASES, repeat=kmer_width) for i_offset in motif.find_mod_poss(''.join(kmer))) # upstream and downstream changes the sequence selection # depending on the strand # TODO this will miss motif hits at region boundaries when the motif is # longer than either end of the k-me relative to the central position bb, ab = (upstrm_bases, dnstrm_bases) if reg_data.strand == '+' else \ (dnstrm_bases, upstrm_bases) for cov_start, cov_end in cov_intervals: # get region sequence from expanded region to include k-mer lookups int_seq = reg_data.copy().update( start=reg_data.start + cov_start - bb, end=reg_data.start + cov_end + ab).add_seq().seq int_len = cov_end - cov_start # get valid positions to include in extracted levels # for standard extraction fill with None relative position if valid_poss is None and motif is None: int_poss = zip(range(int_len), repeat(None)) # for motif/position extraction record relative modified position else: # get modified base position relative to coverage interval if valid_poss is not None: if (reg_data.chrm, reg_data.strand) not in valid_poss: continue reg_mod_poss = (valid_poss[(reg_data.chrm, reg_data.strand)] - reg_data.start - cov_start) reg_mod_poss = reg_mod_poss[ np.logical_and(np.greater_equal(reg_mod_poss, 0), np.less(reg_mod_poss, int_len))] else: if reg_data.strand == '+': reg_mod_poss = [ m.start() + motif.mod_pos - 1 - bb for m in motif.motif_pat.finditer(int_seq) if 0 <= m.start() + motif.mod_pos - 1 - bb < int_len] else: reg_mod_poss = [ m.start() + motif.motif_len - motif.mod_pos - bb for m in motif.rev_comp_pat.finditer(int_seq) if 0 <= m.start() + motif.motif_len - motif.mod_pos - bb < int_len] # record relative mod positions within k-mers int_poss = [ (mod_pos - i_offset + bb, i_offset if reg_data.strand == '+' else kmer_width - i_offset - 1) for mod_pos in reg_mod_poss for i_offset in range(kmer_width) if 0 <= mod_pos - i_offset + bb < int_len] for pos, offset in int_poss: pos_kmer = int_seq[pos:pos + kmer_width] if reg_data.strand == '-': pos_kmer = th.rev_comp(pos_kmer) #print(pos_kmer, offset, # pos + reg_data.start + cov_start, reg_data.strand, # reg_data.start + cov_start - bb, # reg_data.start + cov_end + ab) kmer_key = pos_kmer if offset is None else (pos_kmer, offset) try: if est_mean: reg_kmer_levels[kmer_key].append(c_mean_std( base_events[pos + reg_data.start + cov_start])) else: reg_kmer_levels[kmer_key].append(calc_med_sd( base_events[pos + reg_data.start + cov_start])) except KeyError: continue return reg_kmer_levels def _est_kmer_model_worker( region_q, kmer_level_q, progress_q, reads_index, cov_thresh, upstrm_bases, dnstrm_bases, cs_cov_thresh, est_mean, region_size, motif, valid_poss): while True: try: chrm, strand, reg_start = region_q.get(block=False) except queue.Empty: # sometimes throws false empty error with get(block=False) sleep(0.1) if not region_q.empty(): continue break reg_data = th.intervalData( chrm=chrm, start=reg_start, end=reg_start + region_size, strand=strand).add_reads(reads_index) if len(reg_data.reads) == 0: progress_q.put(1) continue reg_kmer_levels = get_region_kmer_levels( reg_data, cov_thresh, upstrm_bases, dnstrm_bases, cs_cov_thresh, est_mean, region_size, motif, valid_poss) if reg_kmer_levels is not None: kmer_level_q.put(reg_kmer_levels) progress_q.put(1) return if _PROFILE_EST_REF: _est_kmer_model_wrapper = _est_kmer_model_worker def _est_kmer_model_worker(*args): import cProfile cProfile.runctx('_est_kmer_model_wrapper(*args)', globals(), locals(), filename='est_kmer_model.prof') return def extract_kmer_levels( reads_index, region_size, cov_thresh, upstrm_bases, dnstrm_bases, cs_cov_thresh, est_mean=False, num_processes=1, motif=None, valid_poss=None): region_q = Queue() kmer_level_q = Queue() progress_q = Queue() num_regions = 0 for chrm, strand, reg_start in reads_index.iter_cov_regs( cov_thresh, region_size): region_q.put((chrm, strand, reg_start)) num_regions += 1 est_args = ( region_q, kmer_level_q, progress_q, reads_index, cov_thresh, upstrm_bases, dnstrm_bases, cs_cov_thresh, est_mean, region_size, motif, valid_poss) est_ps = [] for p_id in range(num_processes): p = Process(target=_est_kmer_model_worker, args=est_args) p.start() est_ps.append(p) if VERBOSE: th.status_message('Extracting average k-mer levels.') bar = tqdm(total=num_regions, smoothing=0) all_reg_kmer_levels = [] while any(p.is_alive() for p in est_ps): try: reg_kmer_levels = kmer_level_q.get(block=False) all_reg_kmer_levels.append(reg_kmer_levels) except queue.Empty: try: iter_proc = progress_q.get(block=False) if VERBOSE: bar.update(iter_proc) except queue.Empty: sleep(1) continue # clear rest of queue while not kmer_level_q.empty(): reg_kmer_levels = kmer_level_q.get(block=False) all_reg_kmer_levels.append(reg_kmer_levels) if VERBOSE: while not progress_q.empty(): iter_proc = progress_q.get(block=False) if VERBOSE: bar.update(iter_proc) bar.close() if len(all_reg_kmer_levels) == 0: th.error_message_and_exit( 'No genomic positions contain --minimum-test-reads. Consider ' + 'setting this option to a lower value.') return all_reg_kmer_levels def tabulate_kmer_levels(all_reg_kmer_levels, min_kmer_obs): if VERBOSE: th.status_message('Tabulating k-mer model statistics.') all_kmer_mean_sds = [] if _DEBUG_EST_STD: kmer_dens = [] save_x = np.linspace(KERNEL_DENSITY_RANGE[0], KERNEL_DENSITY_RANGE[1], _DEBUG_EST_NUM_KMER_SAVE) kmer_width = len(next(iter(all_reg_kmer_levels[0].keys()))) for kmer in product(DNA_BASES, repeat=kmer_width): kmer = ''.join(kmer) try: kmer_levels = np.concatenate([ reg_kmer_levels[kmer] for reg_kmer_levels in all_reg_kmer_levels if len(reg_kmer_levels[kmer]) > 0]) except ValueError: th.error_message_and_exit( 'At least one k-mer is not covered at any poitions by ' + '--minimum-test-reads.\n\t\tConsider fitting to a smaller ' + 'k-mer via the --upstream-bases and --downstream-bases, ' + 'or lowering --minimum-test-reads.\n\t\tNote that this may ' + 'result in a lower quality model.') if kmer_levels.shape[0] < min_kmer_obs: min_obs = min( sum(len(reg_levs[''.join(kmer)]) for reg_levs in all_reg_kmer_levels) for kmer in product(DNA_BASES, repeat=kmer_width) if motif is None or motif.matches_seq(''.join(kmer))) th.error_message_and_exit( 'K-mers represeneted in fewer observations than ' + 'requested in the provided reads. Consider a shorter ' + 'k-mer or providing more reads.\n\t' + unicode(min_obs) + ' observations found in least common kmer.') all_kmer_mean_sds.append((kmer, np.median(kmer_levels[:,0]), np.median(kmer_levels[:,1]))) if _DEBUG_EST_STD: kmer_kde = stats.gaussian_kde( kmer_levels[:,0], bw_method=_DEBUG_EST_BW / kmer_levels[:,0].std(ddof=1)) with np.errstate(under='ignore'): kmer_dens.append((kmer, kmer_kde.evaluate(save_x))) if _DEBUG_EST_STD: with io.open('debug_est_standard_ref.density.txt', 'wt') as fp: fp.write('Kmer\tSignal\tDensity\n') fp.write('\n'.join('\t'.join(map(str, (kmer, x, y))) for kmer, dens_i in kmer_dens for x, y in zip(save_x, dens_i)) + '\n') return all_kmer_mean_sds # methods needed for (re-squiggle) segmentation are also needed here for # RNA event-based scaling (for model scale centering)
[docs]def load_resquiggle_parameters( seq_samp_type, sig_aln_params=None, seg_params=None, use_save_bandwidth=False): """Load parameters for re-squiggle algorithm Args: seq_samp_type (:class:`tombo.tombo_helper.seqSampleType`): sequencing sample type sig_aln_params (tuple): signal alignment parameters (optional; default: load seq_samp_type defaults) seg_params (tuple): segmentation parameters (optional; default: load seq_samp_type defaults) use_save_bandwidth (bool): load larger "save" bandwidth Returns: :class:`tombo.tombo_helper.resquiggleParams` """ if sig_aln_params is None: (match_evalue, skip_pen, bandwidth, save_bandwidth, max_half_z_score, band_bound_thresh, start_bw, start_save_bw, start_n_bases) = ALGN_PARAMS_TABLE[seq_samp_type.name] else: # unpack signal alignment parameters (match_evalue, skip_pen, bandwidth, save_bandwidth, max_half_z_score, band_bound_thresh, start_bw, start_save_bw, start_n_bases) = sig_aln_params bandwidth = int(bandwidth) save_bandwidth = int(save_bandwidth) band_bound_thresh = int(band_bound_thresh) start_bw = int(start_bw) start_save_bw = int(start_save_bw) start_n_bases = int(start_n_bases) if use_save_bandwidth: bandwidth = save_bandwidth if seg_params is None: (running_stat_width, min_obs_per_base, raw_min_obs_per_base, mean_obs_per_event) = SEG_PARAMS_TABLE[seq_samp_type.name] else: (running_stat_width, min_obs_per_base, raw_min_obs_per_base, mean_obs_per_event) = seg_params z_shift, stay_pen = get_dynamic_prog_params(match_evalue) rsqgl_params = th.resquiggleParams( match_evalue, skip_pen, bandwidth, max_half_z_score, running_stat_width, min_obs_per_base, raw_min_obs_per_base, mean_obs_per_event, z_shift, stay_pen, seq_samp_type.name == RNA_SAMP_TYPE, band_bound_thresh, start_bw, start_save_bw, start_n_bases) return rsqgl_params
[docs]def compute_num_events( signal_len, seq_len, mean_obs_per_event, min_event_to_seq_ratio=MIN_EVENT_TO_SEQ_RATIO): """Compute number of events to find for this read Args: signal_len (int): length of raw signal seq_len (int): length of sequence mean_obs_per_base (int): mean raw observations per genome base min_event_to_seq_ratio (float): minimum event to sequence ratio (optional) Returns: Number of events to find for this read """ return max(signal_len // mean_obs_per_event, int(seq_len * min_event_to_seq_ratio))
def remove_stall_cpts(stall_ints, valid_cpts): if len(stall_ints) == 0: return valid_cpts # RNA data contains stall regions that can cause problems for # banded dynamic programming so they are removed here stall_int_iter = iter(stall_ints) curr_stall_int = next(stall_int_iter) non_stall_cpts = [] # loop over valid cpts for i, cpt in enumerate(valid_cpts): # iterate through stall intervals until the current interval end # is greater than the cpt to check against while cpt > curr_stall_int[1]: try: curr_stall_int = next(stall_int_iter) except StopIteration: break if not (curr_stall_int[0] < cpt < curr_stall_int[1]): non_stall_cpts.append(i) return valid_cpts[non_stall_cpts] def center_model_to_median_norm( reads_index, init_ref, max_reads=NUM_READS_TO_ADJUST_MODEL): upstrm_bases = init_ref.central_pos dnstrm_bases = init_ref.kmer_width - init_ref.central_pos - 1 def get_event_scale_values(all_raw_signal, r_len): rsqgl_params = load_resquiggle_parameters( th.seqSampleType(RNA_SAMP_TYPE, True)) num_events = compute_num_events( all_raw_signal.shape[0], r_len, rsqgl_params.mean_obs_per_event, MIN_EVENT_TO_SEQ_RATIO) valid_cpts = th.valid_cpts_w_cap_t_test( all_raw_signal.astype(np.float64), rsqgl_params.min_obs_per_base, rsqgl_params.running_stat_width, num_events) if COLLAPSE_RNA_STALLS: valid_cpts = remove_stall_cpts( identify_stalls(all_raw_signal, DEFAULT_STALL_PARAMS), valid_cpts) scale_values = get_scale_values_from_events( all_raw_signal, valid_cpts, OUTLIER_THRESH, num_events=RNA_SCALE_NUM_EVENTS, max_frac_events=RNA_SCALE_MAX_FRAC_EVENTS) return normalize_raw_signal( all_raw_signal, scale_values=scale_values) def get_read_corr_factors(r_data): with h5py.File(r_data.fn, 'r') as fast5_data: all_raw_signal = th.get_raw_read_slot(fast5_data)['Signal'][:] event_starts, r_seq = th.get_multiple_slots_read_centric( fast5_data, ('start', 'base'), r_data.corr_group) if r_data.rna: all_raw_signal = all_raw_signal[::-1] if USE_RNA_EVENT_SCALE: norm_signal, scale_values = get_event_scale_values( all_raw_signal, r_data.end - r_data.start) else: # use raw signal median normalization norm_signal, scale_values = normalize_raw_signal( all_raw_signal) else: norm_signal, scale_values = normalize_raw_signal(all_raw_signal) event_starts = event_starts.astype(np.int64) rsrtr = r_data.read_start_rel_to_raw + event_starts[upstrm_bases] # since the last segment end wasn't extracted the events are already # clipped by one, so deal with event boundary clipping here if dnstrm_bases > 1: event_starts = event_starts[upstrm_bases:-(dnstrm_bases - 1)] else: assert dnstrm_bases == 1, ( 'Must have at least one upstream and downstream base for ' + 'a Tombo model.') event_starts = event_starts[upstrm_bases:] # reset event starts to 0 event_starts -= event_starts[0] norm_signal = norm_signal[rsrtr:rsrtr + event_starts[-1]] r_seq = b''.join(r_seq).decode() r_ref_means, _ = init_ref.get_exp_levels_from_seq(r_seq) (_, _, shift_corr_factor, scale_corr_factor) = calc_kmer_fitted_shift_scale( scale_values.shift, scale_values.scale, compute_base_means(norm_signal, event_starts), r_ref_means, method='theil_sen') return shift_corr_factor, scale_corr_factor th.status_message('Centering model to normalized signal') all_shift_corr_factors, all_scale_corr_factors = [], [] all_reads = list(reads_index.iter_reads()) np.random.shuffle(all_reads) for r_data in all_reads: try: r_shift_corr_factor, r_scale_corr_factor = get_read_corr_factors( r_data) all_shift_corr_factors.append(r_shift_corr_factor) all_scale_corr_factors.append(r_scale_corr_factor) if len(all_shift_corr_factors) >= max_reads: break except: continue if len(all_shift_corr_factors) < max_reads: if len(all_shift_corr_factors) == 0: th.error_message_and_exit( 'No reads succcessfully processed for sequence-based ' + 'normalization parameter re-fitting.') th.warning_message( 'Fewer reads succcessfully processed for sequence-based ' + 'normalization parameter re-fitting than requested.') # compute median shift and scale correction factors # scale parameter should be taken in log space, but median performs # the same computation med_shift_corr_factor = np.median(all_shift_corr_factors) med_scale_corr_factor = np.median(all_scale_corr_factors) th.status_message('Shift and scale adjustments to match model to ' + 'median normalization: ' + str(med_shift_corr_factor) + " " + str(med_scale_corr_factor)) init_ref._center_model(med_shift_corr_factor, med_scale_corr_factor) return init_ref if _PROFILE_CENTER_REF: center_model_to_median_norm_wrapper = center_model_to_median_norm def center_model_to_median_norm(*args): import cProfile cProfile.runctx( 'center_model_to_median_norm_wrapper(*args)', globals(), locals(), filename='center_kmer_model.prof') return def estimate_kmer_model( fast5s_dirs, corr_grp, bc_subgrps, cov_thresh, upstrm_bases, dnstrm_bases, min_kmer_obs, kmer_specific_sd, cs_cov_thresh, est_mean, region_size, num_processes): """Estimate a standard tombo k-mer model """ reads_index = th.TomboReads(fast5s_dirs, corr_grp, bc_subgrps) all_reg_kmer_levels = extract_kmer_levels( reads_index, region_size, cov_thresh, upstrm_bases, dnstrm_bases, cs_cov_thresh, est_mean, num_processes) all_kmer_mean_sds = tabulate_kmer_levels(all_reg_kmer_levels, min_kmer_obs) # adjust model to match median normalization best via Theil-Sen optimizer # fit this will increase the accuracy of median normalized re-squiggle # results and should reduce the need for (or number of) iterative # re-squiggle runs init_ref = TomboModel(kmer_ref=all_kmer_mean_sds, central_pos=upstrm_bases) centered_ref = center_model_to_median_norm(reads_index, init_ref) if not kmer_specific_sd: centered_ref._make_constant_sd() return centered_ref ######################################## ##### Alternative Model Estimation ##### ######################################## def _parse_base_levels_worker( reads_q, kmer_level_q, kmer_width, central_pos, completed_kmers): dnstrm_bases = kmer_width - central_pos - 1 proc_kmer_levels = dict( (''.join(kmer), []) for kmer in product(DNA_BASES, repeat=kmer_width)) while True: try: r_fn, corr_slot = reads_q.get(block=False) except queue.Empty: # sometimes throws false empty exception with get(block=False) sleep(0.01) if not reads_q.empty(): continue break with h5py.File(r_fn, 'r') as fast5_data: r_means, r_seq = th.get_multiple_slots_read_centric( fast5_data, ['norm_mean', 'base'], corr_slot) if r_means is None: continue r_seq = b''.join(r_seq).decode() for kmer, level in zip( (r_seq[i:i + kmer_width] for i in range(len(r_seq) - kmer_width + 1)), r_means[central_pos:-dnstrm_bases]): if kmer in completed_kmers: continue proc_kmer_levels[kmer].append(level) kmer_level_q.put(proc_kmer_levels) return def get_batch_kmer_levels( reads_q, kmer_level_q, all_reads, parse_levels_batch_size, std_ref, completed_kmers, num_processes): no_more_reads = False try: for _ in range(parse_levels_batch_size): r_data = next(all_reads) reads_q.put((r_data.fn, r_data.corr_group)) except StopIteration: no_more_reads = True base_lev_args = (reads_q, kmer_level_q, std_ref.kmer_width, std_ref.central_pos, completed_kmers) base_lev_ps = [] for p_id in range(num_processes): p = Process(target=_parse_base_levels_worker, args=base_lev_args) p.start() base_lev_ps.append(p) batch_kmer_levels = [] while any(p.is_alive() for p in base_lev_ps): try: proc_kmer_levels = kmer_level_q.get(block=False) batch_kmer_levels.append(proc_kmer_levels) except queue.Empty: sleep(1) continue while not kmer_level_q.empty(): proc_kmer_levels = kmer_level_q.get(block=False) batch_kmer_levels.append(proc_kmer_levels) return batch_kmer_levels, no_more_reads def parse_base_levels( all_reads, std_ref, parse_levels_batch_size, kmer_obs_thresh, max_kmer_obs, min_kmer_obs_to_est, num_processes): """ Parse base levels and store grouped by k-mer """ reads_q = Queue() kmer_level_q = Queue() all_kmer_levels = dict( (''.join(kmer), []) for kmer in product(DNA_BASES, repeat=std_ref.kmer_width)) # store set of k-mers with enough observations to save on memory footprint # while filling more rare k-mers completed_kmers = set() if VERBOSE: all_reads_bar = tqdm(total=len(all_reads), smoothing=0, desc='Number of total reads used', leave=True) min_kmer_bar = tqdm(total=kmer_obs_thresh, smoothing=0, desc='K-mer with fewest observations', leave=True) curr_min_kmer_count = 0 all_reads = iter(all_reads) n_batches = 0 while True: batch_kmer_levels, no_more_reads = get_batch_kmer_levels( reads_q, kmer_level_q, all_reads, parse_levels_batch_size, std_ref, completed_kmers, num_processes) # only add observations for k-mers that have not been seen enough times # save memory for abundent k-mers batch_total_kmers = [] for kmer in set(all_kmer_levels).difference(completed_kmers): all_kmer_levels[kmer].extend(( kmer_level for proc_kmer_levels in batch_kmer_levels for kmer_level in proc_kmer_levels[kmer])) batch_total_kmers.append(len(all_kmer_levels[kmer])) if batch_total_kmers[-1] > max_kmer_obs: completed_kmers.add(kmer) if VERBOSE: if no_more_reads: all_reads_bar.update(all_reads_bar.total - all_reads_bar.n) else: all_reads_bar.update(parse_levels_batch_size) new_min_kmer_count = min(batch_total_kmers) min_kmer_bar.update(new_min_kmer_count - curr_min_kmer_count) curr_min_kmer_count = new_min_kmer_count sleep(0.1) else: curr_min_kmer_count = min(batch_total_kmers) if curr_min_kmer_count > kmer_obs_thresh or no_more_reads: break n_batches += 1 if VERBOSE: all_reads_bar.close() min_kmer_bar.close() fewest_kmer_obs = min(len(kmer_levels) for kmer_levels in all_kmer_levels.values()) if fewest_kmer_obs < kmer_obs_thresh: if fewest_kmer_obs < min_kmer_obs_to_est: th.error_message_and_exit( 'Too few minimal k-mer observations to continue to ' + 'alternative estimation. Minimal k-mer has ' + unicode(fewest_kmer_obs) + ' total observations and ' + unicode(min_kmer_obs_to_est) + ' observations per k-mer are required.') th.warning_message( 'Requested minimal k-mer observations not found in all reads. ' + 'Continuing to estimation using a k-mer with ' + unicode(fewest_kmer_obs) + ' total observations') return all_kmer_levels def write_kmer_densities_file(dens_fn, kmer_dens, save_x): with io.open(dens_fn, 'wt') as fp: fp.write('Kmer\tSignal\tDensity\n') fp.write('\n'.join('\t'.join(map(str, (kmer, x, y))) for kmer, dens_i in kmer_dens.items() for x, y in zip(save_x, dens_i)) + '\n') return def parse_kmer_densities_file(dens_fn): kmer_dens_raw = defaultdict(list) with io.open(dens_fn) as dens_fp: # read in header dens_fp.readline() for line in dens_fp: kmer, _, dens_i = line.split() kmer_dens_raw[kmer].append(float(dens_i)) kmer_dens = {} first_len = None for kmer, dens_i in kmer_dens_raw.items(): if first_len is None: first_len = len(dens_i) if len(dens_i) != first_len: th.error_message_and_exit('Density file is valid.') kmer_dens[kmer] = np.array(dens_i) return kmer_dens def est_kernel_density( reads_index, std_ref, kmer_obs_thresh, density_basename, save_x, kernel_dens_bw, num_processes, alt_or_stnd_name='alt', parse_levels_batch_size=ALT_EST_BATCH, max_kmer_obs=MAX_KMER_OBS, min_kmer_obs_to_est=MIN_KMER_OBS_TO_EST): all_reads = list(reads_index.iter_reads()) np.random.shuffle(all_reads) base_levels = parse_base_levels( all_reads, std_ref, parse_levels_batch_size, kmer_obs_thresh, max_kmer_obs, min_kmer_obs_to_est, num_processes) if VERBOSE: th.status_message('Fitting kernel densities for k-mer levels.') kmer_dens = {} for kmer, norm_levels in base_levels.items(): norm_levels = np.array(norm_levels) kmer_kde = stats.gaussian_kde( norm_levels, bw_method=kernel_dens_bw / norm_levels.std(ddof=1)) with np.errstate(under='ignore'): kmer_dens[kmer] = kmer_kde.evaluate(save_x) if density_basename is not None: write_kmer_densities_file( density_basename + '.' + alt_or_stnd_name + '_density.txt', kmer_dens, save_x) return kmer_dens def estimate_kmer_densities( fast5s_dirs, ctrl_fast5s_dirs, corr_grp, bc_subgrps, standard_ref_fn, seq_samp_type, kmer_obs_thresh, density_basename, kernel_dens_bw, save_x, num_processes): reads_index = th.TomboReads(fast5s_dirs, corr_grp, bc_subgrps) ctrl_reads_index = th.TomboReads(ctrl_fast5s_dirs, corr_grp, bc_subgrps) if VERBOSE: th.status_message('Parsing standard model file.') std_ref = TomboModel(ref_fn=standard_ref_fn, seq_samp_type=seq_samp_type, reads_index=reads_index) if VERBOSE: th.status_message('Parsing base levels from alternative reads.') alt_dens = est_kernel_density( reads_index, std_ref, kmer_obs_thresh, density_basename, save_x, kernel_dens_bw, num_processes, 'alternate') if VERBOSE: th.status_message('Parsing base levels from standard reads.') std_dens = est_kernel_density( ctrl_reads_index, std_ref, kmer_obs_thresh, density_basename, save_x, kernel_dens_bw, num_processes, 'control') return alt_dens, std_dens, std_ref def load_kmer_densities( alt_dens_fn, std_dens_fn, fast5s_dirs, corr_grp, bc_subgrps, std_ref_fn, seq_samp_type): if VERBOSE: th.status_message('Parsing standard model file.') reads_index = None if std_ref_fn is None and seq_samp_type is None: if fast5s_dirs is None: th.error_message_and_exit( 'Must provide a FAST5s directory, a canonical model ' + 'file or spcify the biological sample type.') reads_index = th.TomboReads(fast5s_dirs, corr_grp, bc_subgrps) std_ref = TomboModel(ref_fn=std_ref_fn, seq_samp_type=seq_samp_type, reads_index=reads_index) if VERBOSE: th.status_message('Parsing density files.') alt_dens = parse_kmer_densities_file(alt_dens_fn) std_dens = parse_kmer_densities_file(std_dens_fn) num_dens_points = next(v for v in alt_dens.values()).shape[0] if num_dens_points != next(v for v in std_dens.values()).shape[0]: th.error_message_and_exit( 'Alternative and standard density ' + 'estimates do not correspond.') save_x = np.linspace(KERNEL_DENSITY_RANGE[0], KERNEL_DENSITY_RANGE[1], num_dens_points) return alt_dens, std_dens, std_ref, save_x def isolate_alt_density( alt_dens, std_dens, alt_base, alt_frac_pctl, std_ref, save_x): def calc_mean(dens): return np.average(save_x[dens>1e-10], weights=dens[dens>1e-10]) # estimate density shift from k-mers without the alternate base no_alt_std_means, no_alt_mean_diffs = [], [] for kmer in std_dens: if alt_base in kmer: continue no_alt_std_means.append(calc_mean(std_dens[kmer])) kmer_alt_mean = calc_mean(alt_dens[kmer]) no_alt_mean_diffs.append(kmer_alt_mean - no_alt_std_means[-1]) calc_offset = np.poly1d(np.polyfit(no_alt_std_means, no_alt_mean_diffs, 2)) save_x_unit = save_x[1] - save_x[0] shifted_alt_dens = {} for kmer, kmer_alt_dens in alt_dens.items(): est_offset = int(calc_offset(calc_mean(std_dens[kmer])) / save_x_unit) # if std density mean is estimated to be greater if est_offset < 0: # shift alt dens to the right shifted_alt_dens[kmer] = np.concatenate([ [0.0,] * -est_offset, kmer_alt_dens[:est_offset]]) else: # else shift alt dens to the left shifted_alt_dens[kmer] = np.concatenate([ kmer_alt_dens[est_offset:], [0.0,] * est_offset]) def get_peak_frac(kmer_std_dens, kmer_alt_dens): # find highest peak in standard density std_peak = np.argmax(kmer_std_dens) # find closest peak in alternative density alt_local_peaks = np.where(np.concatenate([ [False,], np.logical_and( kmer_alt_dens[1:-1] > kmer_alt_dens[:-2], kmer_alt_dens[1:-1] > kmer_alt_dens[2:]) + [False,]]))[0] matched_alt_peak = alt_local_peaks[ np.argmin(abs(alt_local_peaks - std_peak))] return kmer_alt_dens[matched_alt_peak] / kmer_std_dens[std_peak] # estimate the alternative base incorporation rate std_frac = np.percentile([ get_peak_frac(std_dens[kmer], shifted_alt_dens[kmer]) for kmer in std_dens if kmer.count(alt_base) == 1], alt_frac_pctl) if VERBOSE: th.status_message( 'Alternative base incorporation rate estimate: ' + unicode(1 - std_frac)) if std_frac >= 1: th.warning_message( 'Alternative base incorporation rate ' + 'estimate is approximately 0. Consider lowering ' + '--alt-fraction-percentile.') # get mean model SD. most models will be constant, but use mean in case model_sd = np.mean(list(std_ref.sds.values())) # subtract off fraction of standard density from alt density # to estimate mean of isolated alternative distribution alt_ref = [] for kmer, std_level in std_ref.means.items(): if kmer.count(alt_base) == 0: continue # assuming random incorporation the prortion of standard base # observations at this k-mer is the standard fraction raised # to the number of alt_base occurences in the k-mer kmer_std_frac = std_frac**kmer.count(alt_base) with np.errstate(under='ignore'): diff_dens = shifted_alt_dens[kmer] - ( std_dens[kmer] * kmer_std_frac) diff_dens[diff_dens < 0] = 0 alt_level = np.average(save_x, weights=diff_dens) # add alt mean for each alt base position within the kmer for m in re.finditer(alt_base, kmer): alt_ref.append((kmer, m.start(), alt_level, model_sd)) alt_ref = AltModel( kmer_ref=alt_ref, central_pos=std_ref.central_pos, alt_base=alt_base) return alt_ref def estimate_alt_model( fast5s_dirs, ctrl_fast5s_dirs, corr_grp, bc_subgrps, std_ref_fn, seq_samp_type, alt_base, alt_frac_pctl, kmer_obs_thresh, density_basename, kernel_dens_bw, alt_dens_fn, std_dens_fn, num_processes, num_dens_points=NUM_DENS_POINTS): """Estimate an alternative model from a sample with a single, known, randomly-incorporated alternative base """ if alt_dens_fn is None or std_dens_fn is None: save_x = np.linspace(KERNEL_DENSITY_RANGE[0], KERNEL_DENSITY_RANGE[1], num_dens_points) alt_dens, std_dens, std_ref = estimate_kmer_densities( fast5s_dirs, ctrl_fast5s_dirs, corr_grp, bc_subgrps, std_ref_fn, seq_samp_type, kmer_obs_thresh, density_basename, kernel_dens_bw, save_x, num_processes) else: alt_dens, std_dens, std_ref, save_x = load_kmer_densities( alt_dens_fn, std_dens_fn, fast5s_dirs, corr_grp, bc_subgrps, std_ref_fn, seq_samp_type) if VERBOSE: th.status_message('Isolating alternative base distribtuions.') # perform alternative density isolation algorithm alt_ref = isolate_alt_density( alt_dens, std_dens, alt_base, alt_frac_pctl, std_ref, save_x) return alt_ref if _PROFILE_ALT_EST: _est_alt_wrapper = estimate_alt_model def estimate_alt_model(*args): import cProfile cProfile.runctx('_est_alt_wrapper(*args)', globals(), locals(), filename='est_alt_model.prof') return None def tabulate_mod_kmer_levels(all_reg_kmer_levels, min_kmer_obs, motif): if VERBOSE: th.status_message('Tabulating k-mer model statistics.') all_kmer_mean_sds = [] if _DEBUG_EST_STD: kmer_dens = [] save_x = np.linspace(KERNEL_DENSITY_RANGE[0], KERNEL_DENSITY_RANGE[1], _DEBUG_EST_NUM_KMER_SAVE) kmer_width = len(next(iter(all_reg_kmer_levels[0].keys()))[0]) for kmer, offset in [(kmer, offset - 1) for kmer in product(DNA_BASES, repeat=kmer_width) for offset in motif.find_mod_poss(''.join(kmer))]: kmer = ''.join(kmer) try: kmer_levels = np.concatenate([ reg_kmer_levels[(kmer, offset)] for reg_kmer_levels in all_reg_kmer_levels if len(reg_kmer_levels[(kmer, offset)]) > 0]) except ValueError: th.error_message_and_exit( 'At least one modified k-mer is not covered at any poitions ' + 'by --minimum-test-reads.\n\t\tConsider fitting to a smaller ' + 'k-mer via the --upstream-bases and --downstream-bases, ' + 'or lowering --minimum-test-reads.\n\t\tNote that this may ' + 'result in a lower quality model.') if kmer_levels.shape[0] < min_kmer_obs: min_obs = min( sum(len(reg_levs[(''.join(kmer), i_offset - 1)]) for reg_levs in all_reg_kmer_levels) for kmer in product(DNA_BASES, repeat=kmer_width) for i_offset in motif.find_mod_poss(''.join(kmer))) th.error_message_and_exit( 'K-mers represeneted in fewer observations than ' + 'requested in the provided reads. Consider a shorter ' + 'k-mer or providing more reads.\n\t' + unicode(min_obs) + ' observations found in least common kmer.') all_kmer_mean_sds.append((kmer, offset, np.median(kmer_levels[:,0]), np.median(kmer_levels[:,1]))) if _DEBUG_EST_STD: kmer_kde = stats.gaussian_kde( kmer_levels[:,0], bw_method=_DEBUG_EST_BW / kmer_levels[:,0].std(ddof=1)) with np.errstate(under='ignore'): kmer_dens.append((kmer, offset, kmer_kde.evaluate(save_x))) if _DEBUG_EST_STD: with io.open('debug_est_motif_alt_ref.density.txt', 'wt') as fp: fp.write('Kmer\tOffset\tSignal\tDensity\n') fp.write('\n'.join('\t'.join(map(str, (kmer, offset, x, y))) for kmer, offset, dens_i in kmer_dens for x, y in zip(save_x, dens_i)) + '\n') return all_kmer_mean_sds def estimate_motif_alt_model( fast5s_dirs, corr_grp, bc_subgrps, motif_desc, upstrm_bases, dnstrm_bases, valid_locs_fn, min_kmer_obs, cov_thresh, cs_cov_thresh, region_size, num_processes): """Estimate a motif-centered alternate-base tombo k-mer model """ reads_index = th.TomboReads(fast5s_dirs, corr_grp, bc_subgrps) try: raw_motif, mod_pos = motif_desc.split(":") except: th.error_message_and_exit('Invalid motif decription format.') motif = th.TomboMotif(raw_motif, int(mod_pos)) valid_poss = None if valid_locs_fn is None else th.parse_locs_file( valid_locs_fn) all_reg_kmer_levels = extract_kmer_levels( reads_index, region_size, cov_thresh, upstrm_bases, dnstrm_bases, cs_cov_thresh, False, num_processes, motif, valid_poss) # process motif kmers with relative position stored all_mod_kmer_mean_sds = tabulate_mod_kmer_levels( all_reg_kmer_levels, min_kmer_obs, motif) alt_ref = AltModel( kmer_ref=all_mod_kmer_mean_sds, central_pos=upstrm_bases, alt_base=motif.mod_base, motif=motif) alt_ref._make_constant_sd() return alt_ref if _PROFILE_MOTIF_ALT_EST: _est_motif_alt_wrapper = estimate_motif_alt_model def estimate_motif_alt_model(*args): import cProfile cProfile.runctx('_est_motif_alt_wrapper(*args)', globals(), locals(), filename='est_motif_alt_model.prof') return None #################################### ##### Core Statistical Testing ##### #################################### def p_value_to_z_score(pvalue): """Helper function to convert p-value to z-score """ return -stats.norm.ppf(pvalue) def z_score_to_p_value(zscore): """Helper function to convert z-score to p-value """ return stats.norm.cdf(zscore) def correct_multiple_testing(pvals): """Use FDR Benjamini-Hochberg multiple testing correction """ pvals = np.asarray(pvals) pvals_sortind = np.argsort(pvals) pvals_sorted = pvals[pvals_sortind] sortrevind = pvals_sortind.argsort() nobs = len(pvals) ecdffcator = np.arange(1, nobs + 1) / nobs # ignore underflow values with np.errstate(under='ignore'): pvals_corrected_raw = pvals_sorted / ecdffcator pvals_corrected = np.minimum.accumulate( pvals_corrected_raw[::-1])[::-1] del pvals_corrected_raw pvals_corrected[pvals_corrected>1] = 1 return pvals_corrected[sortrevind] def calc_vectorized_fm_pvals(split_pvals, filter_nan=True): """Compute Fisher's Method p-values in a vectorized fashion """ if filter_nan: chi_stats = [np.sum(np.log(base_pvals[~np.isnan(base_pvals)])) * -2 for base_pvals in split_pvals] chi_shapes = [ np.sum(~np.isnan(base_pvals)) * 2 for base_pvals in split_pvals] else: chi_stats = [np.sum(np.log(base_pvals)) * -2 for base_pvals in split_pvals] chi_shapes = [base_pvals.shape[0] * 2 for base_pvals in split_pvals] f_pvals = stats.chi2.sf(chi_stats, chi_shapes) return f_pvals def calc_window_fishers_method(pvals, lag): """Compute Fisher's Method over a moving window across a set of p-values """ assert lag > 0, 'Invalid p-value window provided.' width = (lag * 2) + 1 if pvals.shape[-1] < width: raise th.TomboError("P-values vector too short for Fisher's Method " + "window compuation.") with np.errstate(invalid='ignore'): pvals = np.maximum(pvals, SMALLEST_PVAL) log_sums = np.lib.stride_tricks.as_strided( np.log(pvals), shape=pvals.shape[:-1] + (pvals.shape[-1] - width + 1, width), strides=pvals.strides + (pvals.strides[-1],)).sum(-1) f_pvals = np.empty(pvals.shape) f_pvals[:] = np.NAN with np.errstate(invalid='ignore'): f_pvals[...,lag:-lag] = stats.chi2.sf(log_sums * -2, width * 2) return f_pvals def calc_window_means(stats, lag): """Compute mean over a moving window across a set of statistics """ assert lag > 0, 'Invalid window provided.' width = (lag * 2) + 1 if stats.shape[-1] < width: raise th.TomboError( "Statistics vector too short for window mean compuation.") m_stats = np.empty(stats.shape) m_stats[:] = np.NAN m_stats[...,lag:-lag] = np.mean(np.lib.stride_tricks.as_strided( stats, shape=stats.shape[:-1] + (stats.shape[-1] - width + 1, width), strides=stats.strides + (stats.strides[-1],)), -1) return m_stats def calc_window_z_transform(r_means, ref_means, ref_sds, lag): """Compute Stouffer's Z-transformation across a read """ z_scores = np.abs(r_means - ref_means) / ref_sds width = (lag * 2) + 1 window_z_trans = np.lib.stride_tricks.as_strided( z_scores, shape=(z_scores.shape[0] - (lag * 2), width), strides=z_scores.strides + (z_scores.strides[-1],)).sum(1) / np.sqrt(width) window_z_trans = np.concatenate([np.array([np.NAN] * lag), window_z_trans, np.array([np.NAN] * lag)]) return window_z_trans def calc_mann_whitney_z_score(samp1, samp2): """Compute Mann-Whitney z-scores comparing two samples """ s1_len = samp1.shape[0] s2_len = samp2.shape[0] tot_len = s1_len + s2_len all_vals = np.concatenate([samp1, samp2]) ranks = np.empty(tot_len, int) ranks[all_vals.argsort()] = np.arange(1, tot_len + 1) s1_ranks_sum = ranks[:s1_len].sum() #s2_ranks_sum = ranks[s1_len:].sum() u1 = s1_ranks_sum - (s1_len * (s1_len + 1)) / 2 #u2 = s2_ranks_sum - (s2_len * (s2_len + 1)) / 2 mu = s1_len * s2_len / 2 rhou = np.sqrt(s1_len * s2_len * (s1_len + s2_len + 1) / 12) z = np.abs(u1 - mu) / rhou return z
[docs]def get_read_seg_score(r_means, r_ref_means, r_ref_sds): """Compute expected to observed signal matching score Args: r_means (`np.array::np.float64`): observed base signal levels r_ref_means (`np.array::np.float64`): expected base signal levels r_ref_sds (`np.array::np.float64`): expected base signal level sds Returns: Mean half z-score for observed versus expected signal levels """ return np.mean(np.abs((r_means - r_ref_means) / r_ref_sds))
def score_valid_bases(read_tb, event_means, r_ref_means, r_ref_sds): """Compute expected to observed signal matching score for bases not deleted in dynamic programming Args: read_tb (`np.array::np.int32`): event changepoints r_means (`np.array::np.float64`): observed base signal levels r_ref_means (`np.array::np.float64`): expected base signal levels r_ref_sds (`np.array::np.float64`): expected base signal level sds Returns: Mean half z-score for observed versus expected signal levels (for valid bases) """ valid_bases = np.where(np.diff(read_tb) != 0)[0] if valid_bases.shape[0] == 0: raise th.TomboError('Invalid path through read start') valid_ref_means, valid_ref_sds = ( r_ref_means[valid_bases], r_ref_sds[valid_bases]) base_means = np.array([event_means[b_start:b_end].mean() for b_start, b_end in zip(read_tb[:-1], read_tb[1:]) if b_start != b_end]) return get_read_seg_score(base_means, valid_ref_means, valid_ref_sds) def get_dynamic_prog_params(match_evalue): """Compute dynamic programming shift parameters from an expected match expected value """ z_shift = HALF_NORM_EXPECTED_VAL + match_evalue stay_pen = match_evalue return z_shift, stay_pen ########################## ##### Statistics I/O ##### ########################## def compute_auc(tp_rate, fp_rate): return np.sum(tp_rate[:-1] * (fp_rate[1:] - fp_rate[:-1])) def compute_mean_avg_precison(tp_rate, precision): return np.sum(np.diff(np.concatenate([[0,], tp_rate, [1,]]),) * np.concatenate([[0,], precision, [1,]])[:-1]) def compute_accuracy_rates(stat_has_mod, num_plot_points=ROC_PLOT_POINTS): """Given a list or numpy array of true/false values, function returns num_plot_point evenly spaced values along the true positive, false positive and precision arrays """ tp_cumsum = np.cumsum(stat_has_mod) tp_rate = tp_cumsum / tp_cumsum[-1] fp_cumsum = np.cumsum(np.logical_not(stat_has_mod)) fp_rate = fp_cumsum / fp_cumsum[-1] precision = tp_cumsum / np.arange(1, len(stat_has_mod) + 1, dtype=float) # trim to number of requested points tp_rate = tp_rate[np.linspace(0, tp_rate.shape[0] - 1, num_plot_points).astype(np.int64)] fp_rate = fp_rate[np.linspace(0, fp_rate.shape[0] - 1, num_plot_points).astype(np.int64)] precision = precision[np.linspace(0, precision.shape[0] - 1, num_plot_points + 1).astype(np.int64)][1:] return tp_rate, fp_rate, precision def _compute_motif_stats( stats, motif_descs, genome_index, stats_per_block=None, total_stats_limit=None): all_motif_stats = dict( (mod_name, []) for mod_name in list(zip(*motif_descs))[1]) before_bases = max(( motif.mod_pos for motif in list(zip(*motif_descs))[0])) - 1 after_bases = max((motif.motif_len - motif.mod_pos for motif in list(zip(*motif_descs))[0])) total_num_stats = 0 for chrm, strand, start, end, block_stats in stats: if strand == '+': seq_start = max(start - before_bases, 0) seq_end = end + after_bases else: seq_start = max(start - after_bases, 0) seq_end = end + before_bases reg_seq = genome_index.get_seq( chrm, seq_start, seq_end, error_end=False) # TODO potentially keep all mod sites when they are extremely rare # randomly sub-sample per-read stats here if (stats_per_block is not None and block_stats.shape[0] > stats_per_block): block_stats = block_stats[np.random.choice( block_stats.shape[0], stats_per_block, replace=False)] total_num_stats += block_stats.shape[0] for r_pos_stat in block_stats: # extract position sequence if strand == '+': r_pos_seq = reg_seq[ r_pos_stat['pos'] - seq_start - before_bases: r_pos_stat['pos'] - seq_start + after_bases + 1] else: r_pos_seq = th.rev_comp(reg_seq[ r_pos_stat['pos'] - seq_start - after_bases: r_pos_stat['pos'] - seq_start + before_bases + 1]) # add statistic and whether the sequence matches each motif for motif, mod_name in motif_descs: if r_pos_seq[before_bases] != motif.mod_base: continue all_motif_stats[mod_name].append(( r_pos_stat[stats._stat_slot], bool(motif.motif_pat.match( r_pos_seq[before_bases - motif.mod_pos + 1:])))) if (total_stats_limit is not None and total_num_stats >= total_stats_limit): break return all_motif_stats def _compute_ground_truth_stats(stats, ground_truth_locs): all_stats = [] mod_locs, unmod_locs, mod_name = ground_truth_locs for chrm, strand, start, end, block_stats in stats: try: cs_mod_locs = mod_locs[(chrm, strand)] except KeyError: cs_mod_locs = np.array([]) try: cs_unmod_locs = unmod_locs[(chrm, strand)] except KeyError: cs_unmod_locs = np.array([]) block_mod_locs = cs_mod_locs[ np.logical_and(np.greater_equal(cs_mod_locs, start), np.less(cs_mod_locs, end))] block_unmod_locs = cs_unmod_locs[ np.logical_and(np.greater_equal(cs_unmod_locs, start), np.less(cs_unmod_locs, end))] block_valid_locs = block_stats[ np.isin(block_stats['pos'], np.concatenate([ block_mod_locs, block_unmod_locs]))] all_stats.extend(zip( block_valid_locs[stats._stat_slot], np.isin(block_valid_locs['pos'], block_mod_locs))) return {mod_name:all_stats} def _compute_ctrl_motif_stats( stats, ctrl_stats, motif_descs, genome_index, stats_per_block=None, total_stats_limit=None): all_motif_stats = dict( (mod_name, []) for mod_name in list(zip(*motif_descs))[1]) before_bases = max(( motif.mod_pos for motif in list(zip(*motif_descs))[0])) - 1 after_bases = max((motif.motif_len - motif.mod_pos for motif in list(zip(*motif_descs))[0])) total_num_stats = 0 for chrm, strand, start, end, block_stats in stats: if strand == '+': seq_start = max(start - before_bases, 0) seq_end = end + after_bases else: seq_start = max(start - after_bases, 0) seq_end = end + before_bases reg_seq = genome_index.get_seq( chrm, seq_start, seq_end, error_end=False) ctrl_block_stats = ctrl_stats.get_reg_stats(chrm, strand, start, end) for motif, mod_name in motif_descs: if strand == '+': mod_poss = np.array([ m.start() + motif.mod_pos - 1 for m in motif.motif_pat.finditer(reg_seq)]) + seq_start else: mod_poss = np.array([ m.start() + motif.motif_len - motif.mod_pos for m in motif.rev_comp_pat.finditer(reg_seq)]) + seq_start # TODO possibly use stats_per_block here, but motif stats are # less common, so probably not as useful here for r_pos_stat in block_stats[ np.isin(block_stats['pos'], mod_poss)]: all_motif_stats[mod_name].append(( r_pos_stat[stats._stat_slot], True)) total_num_stats += 1 if ctrl_block_stats is not None: for r_pos_stat in ctrl_block_stats[ np.isin(ctrl_block_stats['pos'], mod_poss)]: all_motif_stats[mod_name].append(( r_pos_stat[stats._stat_slot], False)) total_num_stats += 1 if (total_stats_limit is not None and total_num_stats >= total_stats_limit): break return all_motif_stats def calc_damp_fraction(cov_damp_counts, fracs, valid_cov): """Compute dampened fraction of un-modified reads using provided modified and un-modified pseudo-counts from cov_damp_counts See https://nanoporetech.github.io/tombo/text_output.html?highlight=dampened#text-output-browser-files for more details """ damp_fracs = np.empty(fracs.shape[0]) damp_fracs[:] = np.nan non_mod_counts = np.round(fracs * valid_cov) # compute dampened fraction of modified reads by adding psuedo-counts # to the modified and un-modified counts (equivalent to a beta prior # on the fraction estimation as a binomial variable) damp_fracs = (non_mod_counts + cov_damp_counts['unmod']) / ( valid_cov + sum(list(cov_damp_counts.values()))) return damp_fracs
[docs]class ModelStats(object): """Parse and retrieve relevant information from a standard (per-genomic base) Tombo statistics file. .. automethod:: __init__ """ # TODO add attributes def _parse_stats(self): if self.stats_fn is None or not os.path.isfile(self.stats_fn): th.error_message_and_exit( 'Statistics file not provided or provided file does not exist.') self._fp = h5py.File(self.stats_fn, 'r') self.stat_type = self._fp.attrs.get('stat_type') self.region_size = self._fp.attrs.get('block_size') self.stat_blocks = self._fp[STAT_BLOCKS_H5_NAME] self.num_blocks = 0 blocks_index = defaultdict(dict) for block_name, block_data in self.stat_blocks.items(): blocks_index[ (block_data.attrs.get('chrm'), block_data.attrs.get('strand'))][ block_data.attrs.get('start')] = block_name self.num_blocks += 1 self.blocks_index = dict(blocks_index) self.cov_thresh = self._fp.attrs.get(COV_THRESH_H5_NAME) most_signif_grp = self._fp[MOST_SIGNIF_H5_NAME] # read full most significant array into memory self.most_signif_stats = most_signif_grp[MOST_SIGNIF_H5_NAME][:] self.most_signif_chrm_map = dict( (v,k) for k,v in most_signif_grp['chrm_ids'].attrs.items()) # LevelStats doesn't have damp counts try: self.cov_damp_counts = dict(self._fp[ COV_DAMP_COUNTS_H5_NAME].attrs.items()) except: self.cov_damp_counts = None return def _create_new_stats_file(self): # try to remove file for overwriting old results try: os.remove(self.stats_fn) except: pass # open file for writing self._fp = h5py.File(self.stats_fn, 'w') # save attributes to file and open stats blocks group self._fp.attrs['stat_type'] = self.stat_type self._fp.attrs['block_size'] = self.region_size self.stat_blocks = self._fp.create_group(STAT_BLOCKS_H5_NAME) # save coverage damp counts and threshold attributes self._fp.attrs[COV_THRESH_H5_NAME] = self.cov_thresh self.cov_damp_counts_grp = self._fp.create_group( COV_DAMP_COUNTS_H5_NAME) self.cov_damp_counts_grp.attrs[ 'unmod'] = self.cov_damp_counts['unmod'] self.cov_damp_counts_grp.attrs[ 'mod'] = self.cov_damp_counts['mod'] # storage for most significant stats self.most_signif_sites = self._fp.create_group(MOST_SIGNIF_H5_NAME) self.running_most_signif_sites = np.empty( shape=(self.num_most_signif,), dtype=[(str('damp_frac'), 'f8'), (str('frac'), 'f8'), (str('pos'), 'u4'), (str('cov'), 'u4'), (str('control_cov'), 'u4'), (str('valid_cov'), 'u4'), (str('chrm'), 'u4'), (str('strand'), 'S1')]) self.running_most_signif_sites[:] = np.NAN # store a queue of completed stat batches to be concatenated and stored # as a group to avoid too many array copy and sorting ops self.queued_stat_batches = [] # store chromosomes names in dict for storing most signif array self.curr_chrm_id = 0 self.chrm_names = {} self.chrm_id_grp = self.most_signif_sites.create_group('chrm_ids') self.is_empty = True return
[docs] def __init__(self, stats_fn, stat_type=None, region_size=None, cov_damp_counts=None, cov_thresh=None, num_most_signif=None, most_signif_num_batches=MOST_SIGNIF_NUM_BATCHES_DEFAULT): """Parse or open for writing a standard (per-genomic base) Tombo statistics file. Example:: stats = tombo_stats.TomboStats('path/to/stats.file') for chrm, strand, pos, frac, damp_frac, valid_cov in \ stats.iter_most_signif_sites(): # do stuff Args: stats_fn (str): filename for previously saved tombo stats stat_type (str): type of statistic (model_compare, de_novo, or sample_compare); only applicable for new file writing region_size (int): size of chunked storage blocks; only applicable for new file writing cov_damp_counts (tuple): pseudo-counts for modified and un-modified reads to compute ``damp_frac`` cov_thresh (int): only sites with coverage greater than or equal to this value will be stored num_most_signif (int): number of most significant sites to be stored for faster access most_signif_num_batches (int): number of region batches to store before re-computing the most significant array (default: 10) Warning: If all arguments are provided the current file's contents will be deleted. Intended to open a fresh ``ModelStats`` file for writing. """ self.stats_fn = stats_fn if any(arg is None for arg in (stat_type, region_size, cov_damp_counts, cov_thresh, num_most_signif)): self.open_for_writing = False # open file for reading try: self._parse_stats() except: raise th.TomboError( 'Invalid statistics file provided. Try running ' + 'tombo/scripts/convert_stats.py if this stats file ' + 'was created before Tombo v1.3.1') else: self.open_for_writing = True # set class attributes self.stat_type = stat_type self.region_size = region_size self.curr_block_num = 0 self.cov_damp_counts = dict(zip(('unmod', 'mod'), cov_damp_counts)) self.cov_thresh = cov_thresh self.num_most_signif = num_most_signif self.most_signif_num_batches = most_signif_num_batches # open file for writing self._create_new_stats_file() if self.stat_type not in PER_READ_STATS: if self.stat_type in LEVEL_STATS_TXTS: raise th.TomboError( 'This appears to be a group-comparison stats file. Open ' + 'with tombo_stats.LevelStats.') raise th.TomboError( 'This file is not a valid ModelStats file. `stat_type` ' + 'listed as "' + self.stat_type + '".') self.is_model_stats = True self._stat_slot = str('damp_frac') self._stat_text = 'Est. Frac. Alternate: {0:.2g}' self._stat_transform = lambda pos_stat: 1 - pos_stat[self._stat_slot] return
def _update_most_signif(self): tmp_most_signif = np.concatenate( [self.running_most_signif_sites,] + self.queued_stat_batches) tmp_most_signif.sort(kind='mergesort', order=self._stat_slot) self.running_most_signif_sites = tmp_most_signif[:self.num_most_signif] self.queued_stat_batches = [] return def _add_to_most_signif(self, reg_stats, chrm, strand): if chrm not in self.chrm_names: self.chrm_names[chrm] = self.curr_chrm_id self.curr_chrm_id += 1 self.queued_stat_batches.append(append_fields( base=reg_stats, names=(str('chrm'), str('strand')), data=(list(repeat(self.chrm_names[chrm], reg_stats.shape[0])), list(repeat(strand, reg_stats.shape[0]))), dtypes=('u4', 'S1'))) if len(self.queued_stat_batches) >= self.most_signif_num_batches: self._update_most_signif() return def _write_stat_block(self, reg_stats): """Write region statistics block to file. """ try: block_data = self.stat_blocks.create_group( 'Block_' + unicode(self.curr_block_num)) self.curr_block_num += 1 except: th.warning_message('Statistics file not opened for writing.') return block_data.attrs['chrm'] = reg_stats.chrm block_data.attrs['strand'] = reg_stats.strand block_data.attrs['start'] = reg_stats.start damp_frac = calc_damp_fraction( self.cov_damp_counts, reg_stats.reg_frac_standard_base, reg_stats.valid_cov) reg_stats_arr = np.array( [pos_stats for pos_stats in zip( damp_frac, reg_stats.reg_frac_standard_base, reg_stats.reg_poss, reg_stats.reg_cov, reg_stats.ctrl_cov, reg_stats.valid_cov) if not np.isnan(pos_stats[0])], dtype=[ (str('damp_frac'), 'f8'), (str('frac'), 'f8'), (str('pos'), 'u4'), (str('cov'), 'u4'), (str('control_cov'), 'u4'), (str('valid_cov'), 'u4')]) block_data.create_dataset( 'block_stats', data=reg_stats_arr, compression="gzip") self._add_to_most_signif(reg_stats_arr, reg_stats.chrm, reg_stats.strand) #self._fp.flush() self.is_empty = False return def _close_write(self): # process any remaining batches if len(self.queued_stat_batches) >= 1: self._update_most_signif() # trim the array if necessary if np.isnan(self.running_most_signif_sites[self._stat_slot][-1]): # not as many signif sites were stored as requested so trim array first_nan = np.where(np.isnan( self.running_most_signif_sites[self._stat_slot]))[0][0] self.running_most_signif_sites = self.running_most_signif_sites[ :first_nan,] # add dataset to file self.most_signif_sites.create_dataset( MOST_SIGNIF_H5_NAME, data=self.running_most_signif_sites, compression="gzip") # and add chrm ids map to file (store in reverse order of useful dict, # since int's can't be hdf5 keys for chrm_name, chrm_id in self.chrm_names.items(): self.chrm_id_grp.attrs[chrm_name] = chrm_id return
[docs] def close(self): """Close open HDF5 file and write most significant sites if open for writing """ if self.open_for_writing: self._close_write() self._fp.close() return
# Reading functions def _get_chrm_name(self, pos_stat): return self.most_signif_chrm_map[pos_stat['chrm']]
[docs] def iter_stat_seqs( self, genome_index, before_bases, after_bases, include_pos=True): """Iterate through most significant genomic sites returning the genomic sequence surrounding each position. Args: genome_index (:class:`tombo.tombo_helper.Fasta`): genome index object before_bases (int): number of sequence bases before positions to include after_bases (int): number of sequence bases after positions to include include_pos (bool): yeild (pos_seq, chrm, strand, start, end) for each site (default: True) """ for pos_stat in self.most_signif_stats: chrm, strand, pos = (self._get_chrm_name(pos_stat), pos_stat['strand'].decode(), pos_stat['pos']) if strand == '+': start, end = pos - before_bases, pos + after_bases + 1 try: stat_seq = genome_index.get_seq(chrm, start, end) except th.TomboError: continue else: start, end = pos - after_bases, pos + before_bases + 1 try: stat_seq = th.rev_comp(genome_index.get_seq( chrm, start, end)) except th.TomboError: continue if include_pos: yield stat_seq, chrm, strand, start, end else: yield stat_seq return
[docs] def iter_most_signif_sites(self): """Iterate through statistics table yeilding (chrm, strand, pos, stat). """ for pos_stat in self.most_signif_stats: yield ( self._get_chrm_name(pos_stat), pos_stat['strand'].decode(), pos_stat['pos'], self._stat_transform(pos_stat[self._stat_slot])) return
[docs] def get_most_signif_regions( self, num_bases, num_regions, unique_pos=True, prepend_loc_to_text=False): """Select regions centered on locations with the largest fraction of modified bases Args: num_bases (int): number of bases to output num_regions (int): number of regions to output unique_pos (bool): get only unique positions (optional; default True) intervals may overlap, but identified significant position is outside other intervals prepend_loc_to_text (bool): pre-prend most significant location to the region text (can be off for interval near start/end of sequence records) Returns: A list of :class:`tombo.tombo_helper.intervalData` objects """ selected_regs = [] used_intervals = defaultdict(set) for i, pos_stat in enumerate(self.most_signif_stats): int_start = max(0, pos_stat['pos'] - int(num_bases / 2.0)) chrm = self._get_chrm_name(pos_stat) strand = pos_stat['strand'].decode() if (not unique_pos or pos_stat['pos'] not in used_intervals[(chrm, strand)]): used_intervals[(chrm, strand)].update( range(int_start, int_start + num_bases)) int_text = self._stat_text.format( self._stat_transform(pos_stat)) if prepend_loc_to_text: int_text = '{0}:{1:d}:{2}'.format( chrm, pos_stat['pos'] + 1, strand) + " " + int_text selected_regs.append(th.intervalData( chrm=chrm, start=int_start, end=int_start + num_bases, strand=strand, reg_id='{:03d}'.format(i), reg_text=int_text)) if len(selected_regs) >= num_regions: break if len(selected_regs) == 0: th.error_message_and_exit( 'No locations identified. Most likely an empty ' + 'statistics file.') if len(selected_regs) < num_regions: th.warning_message( 'Fewer unique significant locations more than ' + '[--num-bases]/2 apart were identified. Continuing with ' + str(len(selected_regs)) + ' unique locations. Must raise ' + '--num-most-significant-stored in order to see more most ' + 'significant stats.') return selected_regs
[docs] def compute_motif_stats( self, motif_descs, genome_index, stats_per_block=None, total_stats_limit=None): """Compute lists of statistic values and whether this site represents a match to the provided motifs Args: motif_descs (list; see :class:`tombo.tombo_helper.parse_motif_descs`): containing tuples with :class:`tombo.tombo_helper.TomboMotif` and motif/modification names genome_index (:class:`tombo.tombo_helper.Fasta`): genome index stats_per_block (int): statistics to include in calculations per-block (`--multiprocess-region-size`) total_stats_limit (int): maximum total statistics to include in computation (Default: include all stats) Returns: Dictionary with (key) motif/modification name and (value) list of tuples containing statistic value and boolean motif match """ return _compute_motif_stats( self, motif_descs, genome_index, stats_per_block=stats_per_block, total_stats_limit=total_stats_limit)
[docs] def compute_ground_truth_stats(self, ground_truth_locs): """Compute lists of statistic values and ground truth modification status (boolean) Args: ground_truth_locs: list containing tuples of 1) modified locations (from :class:`tombo.tombo_helper.parse_locs_file`), 2) unmodified locations and mod names. Returns: Dictionary with (key) motif/modification name and (value) list of tuples containing statistic value and boolean motif match """ return _compute_ground_truth_stats(self, ground_truth_locs)
[docs] def compute_ctrl_motif_stats( self, ctrl_stats, motif_descs, genome_index, stats_per_block=None, total_stats_limit=None): """Compute lists of statistic values and whether this site represents a match to the provided motifs Args: ctrl_stats (:class:`tombo.tombo_stats.ModelStats`): control statistics motif_descs (list; see :class:`tombo.tombo_helper.parse_motif_descs`): containing tuples with :class:`tombo.tombo_helper.TomboMotif` and motif/modification names genome_index (:class:`tombo.tombo_helper.Fasta`): genome index stats_per_block (int): statistics to include in calculations per-block (`--multiprocess-region-size`) total_stats_limit (int): maximum total statistics to include in computation (Default: include all stats) Returns: Dictionary with (key) motif/modification name and (value) list of tuples containing statistic value and boolean native or control sample stats """ return _compute_ctrl_motif_stats( self, ctrl_stats, motif_descs, genome_index, stats_per_block=stats_per_block, total_stats_limit=total_stats_limit)
def __iter__(self): """Iterator over all statistics blocks, yeilding chrm, strand, start, end, block_stats """ self.iter_all_cs = iter(sorted(self.blocks_index)) try: self.iter_curr_cs = next(self.iter_all_cs) except StopIteration: self.iter_curr_cs = None self.iter_curr_cs_blocks = iter([]) else: self.iter_curr_cs_blocks = iter( self.blocks_index[self.iter_curr_cs].items()) return self def __next__(self): try: next_start, next_block_name = next(self.iter_curr_cs_blocks) except StopIteration: # move to next chromosome and strand # this will raise a second StopIteration # when the end of the blocks is hit self.iter_curr_cs = next(self.iter_all_cs) self.iter_curr_cs_blocks = iter(sorted( self.blocks_index[self.iter_curr_cs].items())) next_start, next_block_name = next(self.iter_curr_cs_blocks) chrm, strand = self.iter_curr_cs return (chrm, strand, next_start, next_start + self.region_size, self.stat_blocks[next_block_name]['block_stats'][:]) # for python2 compatibility
[docs] def next(self): """Return next statistics block from file including (chrm, strand, block start, block end and statistics table ``numpy structured array``) """ return self.__next__()
[docs] def get_pos_stat(self, chrm, strand, pos, missing_value=None): """Extract statistic value from the requested genomic position. """ try: pos_block_start = np.floor_divide( pos, self.region_size) * self.region_size # TODO: blocks may have missing data (consider full sized blocks # for random disk access to single or range of elements) #block_pos = np.remainder(pos, self.region_size) block_name = self.blocks_index[(chrm, strand)][pos_block_start] block_data = self.stat_blocks[block_name]['block_stats'][:] pos_index = np.where(block_data['pos'] == pos)[0] if len(pos_index) != 1: raise KeyError pos_stat = self._stat_transform(block_data[pos_index[0]]) except KeyError: pos_stat = missing_value return pos_stat
[docs] def get_reg_stats(self, chrm, strand, start, end): if (chrm, strand) not in self.blocks_index: return reg_stats = [] for reg_start, block_name in sorted( self.blocks_index[(chrm, strand)].items()): # if this is an overlapping block if reg_start < end and reg_start + self.region_size > start: # extract stats overlapping this region reg_stats_block = self.stat_blocks[block_name]['block_stats'][:] reg_stats.append(reg_stats_block[ np.logical_and( np.greater_equal(reg_stats_block['pos'], start), np.less(reg_stats_block['pos'], end))]) if len(reg_stats) == 0: return elif len(reg_stats) == 1: return reg_stats[0] return np.vstack(reg_stats)
[docs]class LevelStats(ModelStats): def _create_new_stats_file(self): # try to remove file for overwriting old results try: os.remove(self.stats_fn) except: pass # open file for writing self._fp = h5py.File(self.stats_fn, 'w') # save attributes to file and open stats blocks group self._fp.attrs['stat_type'] = self.stat_type self._fp.attrs['block_size'] = self.region_size self.stat_blocks = self._fp.create_group(STAT_BLOCKS_H5_NAME) # save coverage damp counts and threshold attributes self._fp.attrs[COV_THRESH_H5_NAME] = self.cov_thresh # storage for most significant stats self.most_signif_sites = self._fp.create_group(MOST_SIGNIF_H5_NAME) self.running_most_signif_sites = np.empty( shape=(self.num_most_signif,), dtype=[(str('stat'), 'f8'), (str('pos'), 'u4'), (str('cov'), 'u4'), (str('control_cov'), 'u4'), (str('chrm'), 'u4'), (str('strand'), 'S1')]) self.running_most_signif_sites[:] = np.NAN # store a queue of completed stat batches to be concatenated and stored # as a group to avoid too many array copy and sorting ops self.queued_stat_batches = [] # store chromosomes names in dict for storing most signif array self.curr_chrm_id = 0 self.chrm_names = {} self.chrm_id_grp = self.most_signif_sites.create_group('chrm_ids') self.is_empty = True return def __init__(self, stats_fn, stat_type=None, region_size=None, cov_thresh=None, num_most_signif=None, most_signif_num_batches=MOST_SIGNIF_NUM_BATCHES_DEFAULT): """Parse or open for writing a standard (per-genomic base) Tombo statistics file. Example:: stats = tombo_stats.TomboStats('path/to/stats.file') for chrm, strand, pos, nl_pval in \ stats.iter_most_signif_sites(): # do stuff Args: stats_fn (str): filename for previously saved tombo stats stat_type (str): type of statistic (ks, ttest, utest); only applicable for new file writing region_size (int): size of chunked storage blocks; only applicable for new file writing cov_thresh (int): only sites with coverage greater than or equal to this value will be stored num_most_signif (int): number of most significant sites to be stored for faster access most_signif_num_batches (int): number of region batches to store before re-computing the most significant array (default: 10) Warning: If all arguments are provided the current file's contents will be deleted. Intended to open a fresh ``ModelStats`` file for writing. """ self.stats_fn = stats_fn if any(arg is None for arg in ( stat_type, region_size, cov_thresh, num_most_signif)): self.open_for_writing = False # open file for reading try: self._parse_stats() except: raise th.TomboError( 'Invalid statistics file provided. Try running ' + 'tombo/scripts/convert_stats.py if this stats file ' + 'was created before Tombo v1.3.1') else: self.open_for_writing = True # set class attributes self.stat_type = stat_type self.region_size = region_size self.curr_block_num = 0 self.cov_thresh = cov_thresh self.num_most_signif = num_most_signif self.most_signif_num_batches = most_signif_num_batches # open file for writing self._create_new_stats_file() if self.stat_type not in LEVEL_STATS_TXTS: if self.stat_type in PER_READ_STATS: raise th.TomboError( 'This appears to be a model-based comparison stats ' + 'file. Open with tombo_stats.ModelStats.') raise th.TomboError( 'This file is not a valid LevelStats file. `stat_type` ' + 'listed as "' + self.stat_type + '".') # set values to access statistics consistently self.is_model_stats = False self._stat_slot = str('stat') if self.stat_type in (KS_TEST_TXT, U_TEST_TXT, T_TEST_TXT): self._stat_text = '-log10(p-value): {0:.2g}' def neg_log10(pos_stat): slot_stat = pos_stat[self._stat_slot] with np.errstate(divide='ignore'): r_stat = -np.log10(slot_stat) return r_stat self._stat_transform = neg_log10 elif self.stat_type == KS_STAT_TEST_TXT: self._stat_text = 'D Statistic: {0:.2g}' self._stat_transform = lambda pos_stat: ( 1 - pos_stat[self._stat_slot]) elif self.stat_type == U_STAT_TEST_TXT: self._stat_text = 'Common Language Marginal Effect: {0:.2g}' self._stat_transform = lambda pos_stat: -pos_stat[self._stat_slot] elif self.stat_type == T_STAT_TEST_TXT: self._stat_text = "Cohen's D: {0:.2g}" self._stat_transform = lambda pos_stat: -pos_stat[self._stat_slot] else: raise th.TomboError('Unknown statistic type.') return def _write_stat_block(self, grp_stats): """Write region group statistics block to file. """ try: block_data = self.stat_blocks.create_group( 'Block_' + unicode(self.curr_block_num)) self.curr_block_num += 1 except: th.warning_message('Statistics file not opened for writing.') return block_data.attrs['chrm'] = grp_stats.chrm block_data.attrs['strand'] = grp_stats.strand block_data.attrs['start'] = grp_stats.start grp_stats_arr = np.array( [pos_stats for pos_stats in zip( grp_stats.reg_stats, grp_stats.reg_poss, grp_stats.reg_cov, grp_stats.ctrl_cov) if not np.isnan(pos_stats[0])], dtype=[(str('stat'), 'f8'), (str('pos'), 'u4'), (str('cov'), 'u4'), (str('control_cov'), 'u4')]) block_data.create_dataset( 'block_stats', data=grp_stats_arr, compression="gzip") self._add_to_most_signif( grp_stats_arr, grp_stats.chrm, grp_stats.strand) self.is_empty = False return
[docs]def TomboStats(stat_fn): """Load per-reference location Tombo statistics. Safe method to load statistics for read-only purposes. Returns: Either :class:`tombo.tombo_stats.ModelStats` or :class:`tombo.tombo_stats.LevelStats` depending on the type of Tombo statistics file provided. """ try: stats = ModelStats(stat_fn) except th.TomboError: stats = LevelStats(stat_fn) return stats
[docs]class PerReadStats(object): """Store and accses per-read modified base testing statistics .. automethod:: __init__ """ # TODO add attributes def _parse_per_read_stats(self): self._fp = h5py.File(self.per_read_stats_fn, 'r') self.stat_type = self._fp.attrs.get('stat_type') self.region_size = self._fp.attrs.get('block_size') self.per_read_blocks = self._fp[STAT_BLOCKS_H5_NAME] self.num_blocks = 0 blocks_index = defaultdict(dict) for block_name, block_data in self.per_read_blocks.items(): blocks_index[ (block_data.attrs.get('chrm'), block_data.attrs.get('strand'))][ block_data.attrs.get('start')] = block_name self.num_blocks += 1 self.blocks_index = dict(blocks_index) return def _create_new_per_read_stats_file(self): # try to remove file for overwriting old results try: os.remove(self.per_read_stats_fn) except: pass # open file for writing self._fp = h5py.File(self.per_read_stats_fn, 'w') # save attributes to file and open stats blocks group self.curr_block_num = 0 self._fp.attrs['stat_type'] = self.stat_type self._fp.attrs['block_size'] = self.region_size self.per_read_blocks = self._fp.create_group(STAT_BLOCKS_H5_NAME) return
[docs] def __init__(self, per_read_stats_fn, stat_type=None, region_size=None): """Open per-read statistics file. Examples:: per_read_stats = tombo_stats.PerReadStats(\ 'path/to/sample.tombo.per_read_stats') int_data = tombo_helper.intervalData( chrm='chr20', start=10000, end=10100, strand='+') reg_per_read_stats = per_read_stats.get_region_per_read_stats( int_data, num_reads=10) Args: per_read_stats_fn (str): filename containing (or to write) per-read Tombo statistics stat_type (str): type of statistic (model_compare, de_novo, or sample_compare); only applicable for new file writing region_size (int): size of chunked storage blocks; only applicable for new file writing Warning: If ``stat_type`` and ``region_size`` are provided the current file's contents will be deleted. Intended to open a fresh ``PerReadStats`` file for writing. """ self.per_read_stats_fn = per_read_stats_fn if stat_type is None or region_size is None: # open file for reading try: self._parse_per_read_stats() except: th.error_message_and_exit( 'Non-existent or invalid per-read statistics file ' + 'provided.') else: # set class attributes self.stat_type = stat_type self.region_size = region_size self._create_new_per_read_stats_file() self.are_pvals = self.stat_type != ALT_MODEL_TXT self._stat_slot = str('stat') self._stat_text = '-log10(p-value): {0:.2g}' def neg_log10(pos_stat): slot_stat = pos_stat[self._stat_slot] with np.errstate(divide='ignore'): r_stat = -np.log10(slot_stat) return r_stat self._stat_transform = neg_log10 return
def _write_per_read_block( self, per_read_block, read_id_lookup, chrm, strand, start): """Write region statistics block to file. """ try: block_data = self.per_read_blocks.create_group( 'Block_' + unicode(self.curr_block_num)) self.curr_block_num += 1 except: th.warning_message( 'Per-read statistics file not opened for writing.') return block_data.attrs['chrm'] = chrm block_data.attrs['strand'] = strand block_data.attrs['start'] = start block_data.create_dataset( 'block_stats', data=per_read_block, compression="gzip") # add lookup dict for read_id slot stored in table to save space and # avoid memory leak due to vlen slots in h5py datasets dt = h5py.special_dtype(vlen=str) read_ids = np.array(list(read_id_lookup.keys()), dtype=dt) read_ids_ds = block_data.create_dataset( 'read_ids', read_ids.shape, dtype=dt, compression="gzip") read_ids_ds[...] = read_ids block_data.create_dataset( 'read_id_vals', data=np.array(list(read_id_lookup.values())), compression="gzip") self._fp.flush() return
[docs] def get_region_per_read_stats(self, interval_data, num_reads=None): """Extract per-read statistics over the specifed interval. Args: interval_data (:class:`tombo.tombo_helper.intervalData`): genomic interval num_reads (int): randomly select this many reads (default: inlcude all reads) Returns: `np.array` structured array containing ``pos``, ``stat`` and ``read_id`` for per-read stats over requested interval """ try: cs_blocks = self.blocks_index[( interval_data.chrm, interval_data.strand)] except KeyError: return int_block_stats = [] for block_start, block_name in cs_blocks.items(): if (interval_data.end < block_start or interval_data.start > block_start + self.region_size): continue # extract stats from FAST5 block_stats = self.per_read_blocks[block_name]['block_stats'][:] reg_poss = block_stats['pos'] reg_read_stats = block_stats[self._stat_slot] # extract and convert read_ids back into strings if 'read_id_vals' in self.per_read_blocks[block_name]: block_read_id_lookup = dict([ (read_id_val, read_id) for read_id, read_id_val in zip(self.per_read_blocks[block_name]['read_ids'].value, self.per_read_blocks[block_name]['read_id_vals'].value)]) else: # read_ids previously stored (inefficiently) as attributes # so parse read_ids attributes for backwards compatibility block_read_id_lookup = dict([ (read_id_val, read_id) for read_id, read_id_val in self.per_read_blocks[block_name]['read_ids'].attrs.items()]) reg_read_ids = [ block_read_id_lookup[r_id] for r_id in block_stats['read_id']] int_block_stats.append(np.array( list(zip(reg_poss, reg_read_stats, reg_read_ids)), dtype=[(str('pos'), 'u4'), (str('stat'), 'f8'), (str('read_id'), object)])) if len(int_block_stats) == 0: return if len(int_block_stats) == 1: int_block_stats = int_block_stats[0] else: int_block_stats = np.concatenate(int_block_stats) all_int_stats = int_block_stats[ (int_block_stats['pos'] >= interval_data.start) & (int_block_stats['pos'] < interval_data.end)] read_ids = set(all_int_stats['read_id']) if num_reads is not None and num_reads < len(read_ids): int_plot_reads = set(random.sample(read_ids, num_reads)) all_int_stats = all_int_stats[ np.array([read_id in int_plot_reads for read_id in all_int_stats['read_id']])] return all_int_stats
[docs] def compute_motif_stats( self, motif_descs, genome_index, stats_per_block=None, total_stats_limit=None): """Compute lists of statistic values and whether this site represents a match to the provided motifs Args: motif_descs (list; see :class:`tombo.tombo_helper.parse_motif_descs`): containing tuples with :class:`tombo.tombo_helper.TomboMotif` and motif/modification names genome_index (:class:`tombo.tombo_helper.Fasta`): genome index stats_per_block (int): statistics to include in calculations per-block (`--multiprocess-region-size`) total_stats_limit (int): maximum total statistics to include in computation (Default: include all stats) Returns: Dictionary with (key) motif/modification name and (value) list of tuples containing statistic value and boolean motif match """ return _compute_motif_stats( self, motif_descs, genome_index, stats_per_block=stats_per_block, total_stats_limit=total_stats_limit)
[docs] def compute_ground_truth_stats(self, ground_truth_locs): """Compute lists of statistic values and ground truth modification status (boolean) Args: ground_truth_locs: list containing tuples of 1) modified locations (from :class:`tombo.tombo_helper.parse_locs_file`), 2) unmodified locations and mod names. Returns: Dictionary with (key) motif/modification name and (value) list of tuples containing statistic value and boolean motif match """ return _compute_ground_truth_stats(self, ground_truth_locs)
[docs] def compute_ctrl_motif_stats( self, pr_ctrl_stats, motif_descs, genome_index, stats_per_block=None, total_stats_limit=None): """Compute lists of statistic values and whether this site represents a match to the provided motifs Args: pr_ctrl_stats (:class:`tombo.tombo_stats.PerReadStats`): control per-read statistics motif_descs (list; see :class:`tombo.tombo_helper.parse_motif_descs`): containing tuples with :class:`tombo.tombo_helper.TomboMotif` and motif/modification names genome_index (:class:`tombo.tombo_helper.Fasta`): genome index stats_per_block (int): statistics to include in calculations per-block (`--multiprocess-region-size`) total_stats_limit (int): maximum total statistics to include in computation (Default: include all stats) Returns: Dictionary with (key) motif/modification name and (value) list of tuples containing statistic value and boolean native or control sample stats """ return _compute_ctrl_motif_stats( self, pr_ctrl_stats, motif_descs, genome_index, stats_per_block=stats_per_block, total_stats_limit=total_stats_limit)
def __iter__(self): """ Iterator over all statistics blocks, yeilding chrm, strand, start, end, block_stats """ self.iter_all_cs = iter(list(self.blocks_index)) self.iter_curr_cs = next(self.iter_all_cs) self.iter_curr_cs_blocks = iter( self.blocks_index[self.iter_curr_cs].items()) return self def __next__(self): try: next_start, next_block_name = next(self.iter_curr_cs_blocks) except StopIteration: # move to next chromosome and strand # this will raise a second StopIteration # when the end of the blocks is hit self.iter_curr_cs = next(self.iter_all_cs) self.iter_curr_cs_blocks = iter(sorted( self.blocks_index[self.iter_curr_cs].items())) next_start, next_block_name = next(self.iter_curr_cs_blocks) chrm, strand = self.iter_curr_cs return (chrm, strand, next_start, next_start + self.region_size, self.per_read_blocks[next_block_name]['block_stats'][:]) # for python2 compatibility
[docs] def next(self): """Return next per-read statistics block from file including (chrm, strand, block start, block end and per-read statistics table ``numpy structured array``) """ return self.__next__()
[docs] def close(self): """Close HDF5 file """ self._fp.close() return
[docs] def get_reg_stats(self, chrm, strand, start, end): if (chrm, strand) not in self.blocks_index: return reg_stats = [] for reg_start, block_name in sorted( self.blocks_index[(chrm, strand)].items()): # if this is an overlapping block if reg_start < end and reg_start + self.region_size > start: # extract stats overlapping this region reg_stats_block = self.per_read_blocks[ block_name]['block_stats'][:] reg_stats.append(reg_stats_block[ np.logical_and( np.greater_equal(reg_stats_block['pos'], start), np.less(reg_stats_block['pos'], end))]) if len(reg_stats) == 0: return elif len(reg_stats) == 1: return reg_stats[0] return np.vstack(reg_stats)
################################ ##### Base-by-base Testing ##### ################################ def compute_posterior_samp_dists( ctrl_means, ctrl_sds, ctrl_cov, ctrl_reg_data, std_ref, prior_weights, min_test_reads, fm_offset): dnstrm_bases = std_ref.kmer_width - std_ref.central_pos - 1 gnm_begin_lag = ( std_ref.central_pos if ctrl_reg_data.strand == '+' else dnstrm_bases) gnm_end_lag = ( dnstrm_bases if ctrl_reg_data.strand == '+' else std_ref.central_pos) reg_seq = ctrl_reg_data.copy().update( start=ctrl_reg_data.start - gnm_begin_lag - fm_offset, end=ctrl_reg_data.end + gnm_end_lag + fm_offset).add_seq().seq if ctrl_reg_data.strand == '-': reg_seq = th.rev_comp(reg_seq) reg_ref_means, reg_ref_sds = std_ref.get_exp_levels_from_seq_with_gaps( reg_seq, ctrl_reg_data.strand == '-') # compute vectorized weighted means for new mean and sd estimates post_ref_means = (( (prior_weights[0] * reg_ref_means) + (ctrl_cov * ctrl_means)) / (prior_weights[0] + ctrl_cov)) post_ref_sds = (( (prior_weights[1] * reg_ref_sds) + (ctrl_cov * ctrl_sds)) / (prior_weights[1] + ctrl_cov)) # This bit should work, but the SD estimates seem to be incorrect # and the computation is likely far too much for likely very similar # results from a weighted average with the prior SD. """ # compute posterior sds; see example formula here: # https://www.statlect.com/fundamentals-of-statistics/\ # normal-distribution-Bayesian-estimation # optimizations to matrix ops applied here def compute_mean_diff_factor(event_means, ref_mean): valid_indices = ~np.isnan(event_means) num_valid_indices = sum(valid_indices) if num_valid_indices < min_test_reads: return np.NAN n = float(num_valid_indices + prior_weights[0]) c1 = (n - 1) / n c2 = -1 / n mean_diffs = event_means[valid_indices] - ref_mean mds_sum = mean_diffs.sum() return sum(((i_md * c1) + (mds_sum - i_md) * c2) * i_md for i, i_md in enumerate(mean_diffs)) mean_diff_factors = np.array([ compute_mean_diff_factor(b_events, ref_mean) for b_events, ref_mean in zip(ctrl_base_events, reg_ref_means)]) post_ref_sds = np.sqrt(( mean_diff_factors + (prior_weights[1] * np.square(reg_ref_sds))) / ( ctrl_cov + prior_weights[1])) """ return post_ref_means, post_ref_sds def get_reads_ref( reg_data, min_test_reads, fm_offset, std_ref=None, prior_weights=None, est_mean=False): """Get mean and standard deviation of levels from a sample across the genome """ central_func = np.mean if est_mean else np.median reg_size = reg_data.end - reg_data.start + (fm_offset * 2) level_means, level_sds = np.empty(reg_size), np.empty(reg_size) level_means[:] = np.NAN level_sds[:] = np.NAN # expand region to include fm_offset bases_levels = reg_data.copy().update( start=reg_data.start - fm_offset, end=reg_data.end + fm_offset).get_base_levels() valid_indices = np.logical_not(np.isnan(bases_levels)) cov = valid_indices.sum(axis=1) cov_regs = np.where(np.diff(np.concatenate([ [False,], np.greater_equal(cov, min_test_reads), [False,]])))[0] if len(cov_regs) == 0: return level_means, level_sds, {} for cov_start, cov_end in zip(cov_regs[:-1:2], cov_regs[1::2]): level_means[cov_start:cov_end] = np.array([ central_func(b_levels[valid_indices[cov_start + i]]) for i, b_levels in enumerate(bases_levels[cov_start:cov_end])]) level_sds[cov_start:cov_end] = np.array([ np.std(b_levels[valid_indices[cov_start + i]]) for i, b_levels in enumerate(bases_levels[cov_start:cov_end])]) if std_ref is not None: if prior_weights is None: prior_weights = (MEAN_PRIOR_CONST, SD_PRIOR_CONST) level_means, level_sds = compute_posterior_samp_dists( level_means, level_sds, cov, reg_data, std_ref, prior_weights, min_test_reads, fm_offset) # convert coverage to a dict for later lookup cov = dict(zip(range(reg_data.start - fm_offset, reg_data.end + fm_offset), cov)) # mask regions with 0 estimated SD (likely very low coverage) zero_sd_sites = level_sds == 0 level_means[zero_sd_sites] = np.NAN level_sds[zero_sd_sites] = np.NAN return level_means, level_sds, cov def compute_sample_compare_read_stats( r_data, ctrl_means, ctrl_sds, fm_offset=FM_OFFSET_DEFAULT, reg_data=None): """Compute signficance statistics using comparison of two sequenceing samples method for a single read within a specified genomic region. Args: r_data (:class:`tombo.tombo_helper.readData`): read data ctrl_means (`np.array::np.float64`): mean level values from control set of reads ctrl_sds (`np.array::np.float64`): level SD values from control set of reads fm_offset (int): Fisher's Method offset for computing locally combined p-values (optional; default: 1) reg_data (:class:`tombo.tombo_helper.intervalData`): region to test (default: test whole read) Returns: Read testing results, positions tested and the read_id 1) r_pvals (`np.array::np.float64`): p-values for testing over specified region 2) r_poss (`np.array::np.int64`): genomic positions for returned p-values 3) read_id (str): read identifier """ reg_start = reg_data.start if reg_data is not None else r_data.start reg_size = (reg_data.end - reg_data.start if reg_data is not None else r_data.end - r_data.start) def comp_clip_and_flip(): with h5py.File(r_data.fn, 'r') as fast5_data: r_means = th.get_single_slot_read_centric( fast5_data, 'norm_mean', r_data.corr_group) read_id = th.get_raw_read_slot(fast5_data).attrs.get('read_id') if r_means is None: raise th.TomboError( 'Read does not contain re-squiggled level means.') read_start, read_end = r_data.start, r_data.end if read_start + fm_offset < reg_start: num_start_clip = reg_start - (read_start + fm_offset) read_start = reg_start - fm_offset if r_data.strand == '+': r_means = r_means[num_start_clip:] else: r_means = r_means[:-num_start_clip] if read_end - fm_offset > reg_start + reg_size: num_end_clip = (read_end - fm_offset) - (reg_start + reg_size) read_end = reg_start + reg_size + fm_offset if r_data.strand == '+': r_means = r_means[:-num_end_clip] else: r_means = r_means[num_end_clip:] # flip means to match genomic positions if r_data.strand == '-': r_means = r_means[::-1] return r_means, read_start, read_end, read_id def get_read_comp_z_score(r_means, read_start, read_end): r_z_scores = np.abs( r_means - ctrl_means[read_start - reg_start + fm_offset: read_end - reg_start + fm_offset]) / ctrl_sds[ read_start - reg_start + fm_offset: read_end - reg_start + fm_offset] return r_z_scores def get_pvals(r_z_scores): # mask out nan z-scores for efficient CDF computation r_poss = np.where(np.logical_not(np.isnan(r_z_scores)))[0] r_pvals = np.empty(r_z_scores.shape) r_pvals[:] = np.NAN valid_r_pvals = stats.norm.cdf(-r_z_scores[r_poss]) * 2.0 r_pvals[r_poss] = valid_r_pvals return r_pvals, r_poss r_means, read_start, read_end, read_id = comp_clip_and_flip() r_z_scores = get_read_comp_z_score(r_means, read_start, read_end) if np.sum(np.logical_not(np.isnan(r_z_scores))) == 0: raise th.TomboError('No valid z-scores in read.') r_pvals, r_poss = get_pvals(r_z_scores) if fm_offset > 0: r_pvals = calc_window_fishers_method(r_pvals, fm_offset) r_poss = np.where(np.logical_not(np.isnan(r_pvals)))[0] r_pvals = r_pvals[r_poss] else: r_pvals = r_pvals[r_poss] r_poss += read_start return {SAMP_COMP_TXT:r_pvals}, {SAMP_COMP_TXT:r_poss}, read_id def compute_de_novo_read_stats( r_data, std_ref, fm_offset=FM_OFFSET_DEFAULT, reg_data=None): """Compute signficance statistics using de novo comparison to a canonical model method for a single read within a specified genomic region. Args: r_data (:class:`tombo.tombo_helper.readData`): read data std_ref (:class:`tombo.tombo_stats.TomboModel`): canonical expected signal level model fm_offset (int): Fisher's Method offset for computing locally combined p-values (optional; default: 1) reg_data (:class:`tombo.tombo_helper.intervalData`): region to test (default: test whole read) Returns: Read testing results, positions tested and the read_id 1) r_pvals (`np.array::np.float64`): p-values for testing over specified region 2) r_poss (`np.array::np.int64`): genomic positions for returned p-values 3) read_id (str): read identifier """ reg_start = reg_data.start if reg_data is not None else r_data.start reg_size = (reg_data.end - reg_data.start if reg_data is not None else r_data.end - r_data.start) dnstrm_bases = std_ref.kmer_width - std_ref.central_pos - 1 gnm_begin_lag = (std_ref.central_pos if r_data.strand == '+' else dnstrm_bases) gnm_end_lag = (dnstrm_bases if r_data.strand == '+' else std_ref.central_pos) def de_novo_clip_and_flip(): with h5py.File(r_data.fn, 'r') as fast5_data: r_means, r_seq = th.get_multiple_slots_read_centric( fast5_data, ['norm_mean', 'base'], r_data.corr_group) read_id = th.get_raw_read_slot(fast5_data).attrs.get('read_id') if r_means is None or r_seq is None: raise th.TomboError( 'Read does not contain valid re-squiggled data.') r_seq = b''.join(r_seq).decode() read_start, read_end = r_data.start, r_data.end # clip read if it extends outside the current genomic region, so # stats are only computed within this region if read_start + gnm_begin_lag + fm_offset < reg_start: num_start_clip = reg_start - ( read_start + gnm_begin_lag + fm_offset) read_start = reg_start - gnm_begin_lag - fm_offset if r_data.strand == '+': r_means = r_means[num_start_clip:] r_seq = r_seq[num_start_clip:] else: r_means = r_means[:-num_start_clip] r_seq = r_seq[:-num_start_clip] if read_end - gnm_end_lag - fm_offset > reg_start + reg_size: num_end_clip = (read_end - gnm_end_lag - fm_offset) - ( reg_start + reg_size) read_end = reg_start + reg_size + gnm_end_lag + fm_offset if r_data.strand == '+': r_means = r_means[:-num_end_clip] r_seq = r_seq[:-num_end_clip] else: r_means = r_means[num_end_clip:] r_seq = r_seq[num_end_clip:] # if this read does not cover enough of this region for stat # computation raise an error to be handled below if len(r_seq) < std_ref.kmer_width: raise th.TomboError( 'Read does not contain information in this region.') r_ref_means, r_ref_sds = std_ref.get_exp_levels_from_seq( r_seq, r_data.strand == '-') if r_data.strand == '-': # reverse means to match genomic order r_means = r_means[::-1] # clip means that don't have testing model data # note that this is still extended by fm_offset r_means = r_means[gnm_begin_lag:-gnm_end_lag] read_start += gnm_begin_lag read_end -= gnm_end_lag return (r_means, r_ref_means, r_ref_sds, read_start, read_end, read_id) (r_means, r_ref_means, r_ref_sds, read_start, read_end, read_id) = de_novo_clip_and_flip() z_scores = np.abs(r_means - r_ref_means) / r_ref_sds r_pvals = stats.norm.cdf(-z_scores) * 2.0 if fm_offset > 0: r_pvals = calc_window_fishers_method(r_pvals, fm_offset) # ignore errors in max over NAN values if fisher's method was used with np.errstate(invalid='ignore'): r_pvals = np.maximum(r_pvals, SMALLEST_PVAL) r_poss = np.arange(read_start, read_end) return {DE_NOVO_TXT:r_pvals}, {DE_NOVO_TXT:r_poss}, read_id def calc_llh_ratio( reg_means, reg_ref_means, reg_ref_vars, reg_alt_means, reg_alt_vars): """Compute log likelihood ratio. This is about 10X slower than the cython version in tombo._c_helper, but has been kept for debugging purposes. """ # compute log likelihood ratio # positive value means standard base fits data better # negative value means alternative base fits data better return (np.sum(np.square(reg_means - reg_alt_means) / reg_alt_vars) + np.sum(np.log(reg_alt_vars))) - ( np.sum(np.square(reg_means - reg_ref_means) / reg_ref_vars) + np.sum(np.log(reg_ref_vars))) def trim_seq_and_means( seq, means, r_start, reg_start, reg_end, strand, kmer_width, central_pos, max_motif_bb, max_motif_ab): """Return trimmed arrays to the genomic region specified. Args: seq: read genomic sequence (read-centric for rev strand) means: read base level means r_start: genomic start position reg_start: genomic region start reg_end: genomic region end strand: read mapped strand kmer_width: standard and alt k-mer width central_pos: central position within standard k-mer max_motif_bb: maximum (over alt models) bases required upstream for motif search max_motif_ab: maximum (over alt models) bases required downstream for motif search Arrays are: 1) read centric k-mers (for expected level lookup) 2) read-centric k-mer model-able means 3) genome-centric alt test-able start 4) motif search seq """ r_end = r_start + means.shape[0] # save un-clipped motif search seq motif_search_seq = seq # clip read if it extends outside the current genomic region, so # stats are only computed within this region num_start_clip, num_end_clip = 0, 0 if r_start + kmer_width - 1 < reg_start: if strand == '+': num_start_clip = reg_start - (r_start + kmer_width - 1) else: num_end_clip = reg_start - (r_start + kmer_width - 1) r_start = reg_start - (kmer_width - 1) if r_end - kmer_width + 1 > reg_end: if strand == '+': num_end_clip = r_end - kmer_width + 1 - reg_end else: num_start_clip = r_end - kmer_width + 1 - reg_end # clip sequence to that required for expected level lookups seq = seq[num_start_clip:] if num_end_clip > 0: seq = seq[:-num_end_clip] # clip extra bits from means to test-able means means = means[num_start_clip + central_pos:] means = means[:-(num_end_clip + kmer_width - central_pos - 1)] # if this read does not cover enough of this region for stat # alternate stat computation raise an error to be handled below if means.shape[0] < kmer_width: raise th.TomboError('Read sequence too short in this region.') kmers = th.get_seq_kmers(seq, kmer_width) if len(kmers) != means.shape[0]: raise th.TomboError('Mismatching k-mer and mean levels.') # shift r_start to first alt test-able position r_start += kmer_width - 1 # clip and/or pad sequence for motif searching if num_start_clip + kmer_width - 1 - max_motif_bb >= 0: motif_search_seq = motif_search_seq[ num_start_clip + kmer_width - 1 - max_motif_bb:] else: motif_search_seq = 'N' * -( num_start_clip + kmer_width - 1 - max_motif_bb) + motif_search_seq if num_end_clip + kmer_width - 1 - max_motif_ab >= 0: motif_search_seq = motif_search_seq[ :-(num_end_clip + kmer_width - 1 - max_motif_ab)] else: motif_search_seq = motif_search_seq + 'N' * -( num_end_clip + kmer_width - 1 - max_motif_ab) if (len(motif_search_seq) - max_motif_bb - max_motif_bb != means.shape[0] - ((kmer_width - 1) * 2)): th.TomboError('Motif search sequence not correct length.') return kmers, means, r_start, motif_search_seq def compute_alt_model_read_stats( r_data, std_ref, alt_refs, use_standard_llhr=False, reg_data=None): """Compute signficance statistics using comparison of read signal to canonical and alternative models method for a single read within a specified genomic region. Args: r_data (:class:`tombo.tombo_helper.readData`): read data std_ref (:class:`tombo.tombo_stats.TomboModel`): canonical expected signal level model alt_refs (list of :class:`tombo.tombo_stats.AltModel`): alternative expected signal level models use_standard_llhr (bool): compute standard likelihood ratio; for details see https://nanoporetech.github.io/tombo/modified_base_detection.html#alternative-model-method (optional; default: False) reg_data (:class:`tombo.tombo_helper.intervalData`): region to test (default: test whole read) Returns: Read testing results, positions tested and the read_id 1) r_llhrs (`np.array::np.float64`): log-likelihood ratios (or psuedo-llhrs) for testing over specified region 2) r_poss (`np.array::np.int64`): genomic positions for returned p-values 3) read_id (str): read identifier """ reg_start = reg_data.start if reg_data is not None else r_data.start reg_end = reg_data.end if reg_data is not None else r_data.end # number of sequence positions to keep in order to look up both motif hits # and expected levels max_motif_bb = max([ alt_ref.motif.mod_pos - 1 for _, alt_ref in alt_refs]) max_motif_ab = max([ alt_ref.motif.motif_len - alt_ref.motif.mod_pos for _, alt_ref in alt_refs]) def alt_clip_and_flip(): """Get read information in order to test requested region of this read base means, seq, k-mers, genomic start and read id """ with h5py.File(r_data.fn, 'r') as fast5_data: r_means, r_seq = th.get_multiple_slots_read_centric( fast5_data, ['norm_mean', 'base'], r_data.corr_group) read_id = th.get_raw_read_slot(fast5_data).attrs.get('read_id') if r_means is None or r_seq is None: raise th.TomboError( 'Read does not contain valid re-squiggled data.') r_seq = b''.join(r_seq).decode() r_kmers, r_means, r_start, motif_search_seq = trim_seq_and_means( r_seq, r_means, r_data.start, reg_start, reg_end, r_data.strand, std_ref.kmer_width, std_ref.central_pos, max_motif_bb, max_motif_ab) return r_kmers, r_means, motif_search_seq, r_start, read_id r_kmers, r_means, motif_search_seq, r_start, read_id = alt_clip_and_flip() testable_len = r_means.shape[0] - std_ref.kmer_width + 1 r_ref_means, r_ref_sds = std_ref.get_exp_levels_from_kmers(r_kmers) r_ref_vars = np.square(r_ref_sds) all_poss = {} all_llhrs = {} for alt_name, alt_ref in alt_refs: alt_base_poss = [] log_lh_ratios = [] # note search space is clipped since all k-mers covering the position # of interest must be valid alt_i_motif_search_seq = motif_search_seq[ max_motif_bb - (alt_ref.motif.mod_pos - 1):] if max_motif_ab - (alt_ref.motif.motif_len - alt_ref.motif.mod_pos) > 0: alt_i_motif_search_seq = alt_i_motif_search_seq[ :-(max_motif_ab - ( alt_ref.motif.motif_len - alt_ref.motif.mod_pos))] for alt_m in alt_ref.motif.motif_pat.finditer(alt_i_motif_search_seq): alt_pos = alt_m.start() if r_data.strand == '+': alt_base_poss.append(r_start + alt_pos) else: alt_base_poss.append(r_start + testable_len - alt_pos - 1) r_pos_alt_means, r_pos_alt_sds = alt_ref.get_exp_levels_from_kmers( r_kmers[alt_pos:alt_pos + alt_ref.kmer_width]) pos_args = [r_means[alt_pos:alt_pos + std_ref.kmer_width], r_ref_means[alt_pos:alt_pos + std_ref.kmer_width], r_pos_alt_means] if CONST_SD_MODEL: const_var = r_ref_vars[alt_pos] if use_standard_llhr: pos_lh_ratio = c_calc_llh_ratio_const_var( *(pos_args + [const_var])) else: pos_lh_ratio = c_calc_scaled_llh_ratio_const_var( *(pos_args + [const_var, OCLLHR_SCALE, OCLLHR_HEIGHT, OCLLHR_POWER])) else: if use_standard_llhr: pos_lh_ratio = c_calc_llh_ratio( *(pos_args + [ r_ref_vars[alt_pos:alt_pos + std_ref.kmer_width], np.square(r_pos_alt_sds)])) else: raise th.TomboError( 'Variable SD scaled likelihood ratio not implemented.') log_lh_ratios.append(pos_lh_ratio) all_llhrs[alt_name] = np.array(log_lh_ratios) all_poss[alt_name] = np.array(alt_base_poss) return all_llhrs, all_poss, read_id def apply_per_read_thresh( reg_base_stats, single_read_thresh, lower_thresh, stat_type, stat_locs, ctrl_cov=None): reg_cov = np.array([base_stats.shape[0] for base_stats in reg_base_stats]) if lower_thresh is not None: # filter base statistics that fall between the upper and lower # stat threshold for the log likelihood statistic reg_base_stats = [ base_stats[np.logical_or(base_stats <= lower_thresh, base_stats >= single_read_thresh)] for base_stats in reg_base_stats] valid_cov = np.array([base_stats.shape[0] for base_stats in reg_base_stats]) elif stat_type == ALT_MODEL_TXT: # filter base statistics that fall between the upper and lower # stat threshold for the log likelihood statistic reg_base_stats = [base_stats[np.abs(base_stats) >= single_read_thresh] for base_stats in reg_base_stats] valid_cov = np.array([base_stats.shape[0] for base_stats in reg_base_stats]) else: valid_cov = reg_cov if stat_type == SAMP_COMP_TXT: ctrl_cov = [ctrl_cov[pos] if pos in ctrl_cov else 0 for pos in stat_locs] else: # convert to list since python2 repeat objects can't be pickled ctrl_cov = list(repeat(0, stat_locs.shape[0])) reg_frac_std_base = np.array([ np.greater_equal( base_stats, single_read_thresh).sum() / base_stats.shape[0] if base_stats.shape[0] > 0 else np.NAN for base_stats in reg_base_stats]) return reg_frac_std_base, reg_cov, ctrl_cov, valid_cov def collate_reg_stats( stats, stat_locs, read_ids, per_read_q, reg_data, single_read_thresh, lower_thresh, stat_type, stat_name, ctrl_cov): stats = np.concatenate(stats) stat_locs = np.concatenate(stat_locs) # remove nans possibly introduced by fisher's method calculcations valid_poss = ~np.isnan(stats) stat_locs = stat_locs[valid_poss] stats = stats[valid_poss] assert stat_locs.shape[0] == stats.shape[0], '\t'.join(map(str, ( stat_locs.shape[0], stats.shape[0]))) if per_read_q is not None: valid_read_ids = [ rep_r_id for rep_r_id, is_valid in zip( [rep_r_id for r_id, r_len in read_ids for rep_r_id in repeat(r_id.decode(), r_len)], valid_poss) if is_valid] read_id_lookup = dict(( (read_id, read_id_val) for read_id_val, read_id in enumerate(set(valid_read_ids)))) conv_read_ids = np.array([ read_id_lookup[r_id] for r_id in valid_read_ids]) assert conv_read_ids.shape[0] == stat_locs.shape[0] per_read_block = np.array( list(zip(stat_locs, stats, conv_read_ids)), dtype=[(str('pos'), 'u4'), (str('stat'), 'f8'), (str('read_id'), 'u4')]) per_read_q.put(( stat_name, (per_read_block, read_id_lookup, reg_data.chrm, reg_data.strand, reg_data.start))) # get order of all bases from position array as_stat_locs = np.argsort(stat_locs) # sort all positions from all reads stat_locs = stat_locs[as_stat_locs] # get unique tested genomic positions across all reads us_stat_locs = np.unique(stat_locs) if stat_locs.shape[0] == 0: raise th.TomboError('No valid positions in this region.') # then sort the stats array by genomic position and # split into stats by genomic base position reg_base_stats = np.split( stats[as_stat_locs], np.where(np.concatenate([[0,], np.diff(stat_locs)]) > 0)[0]) reg_frac_std_base, reg_cov, ctrl_cov, valid_cov = apply_per_read_thresh( reg_base_stats, single_read_thresh, lower_thresh, stat_type, stat_locs, ctrl_cov) return th.regionStats( reg_frac_std_base, us_stat_locs, reg_data.chrm, reg_data.strand, reg_data.start, reg_cov, ctrl_cov, valid_cov) def compute_reg_stats( reg_data, fm_offset, min_test_reads, single_read_thresh, lower_thresh, ctrl_reg_data, std_ref, alt_refs, use_standard_llhr, per_read_q, stat_type, prior_weights): if stat_type == SAMP_COMP_TXT: ctrl_means, ctrl_sds, ctrl_cov = get_reads_ref( ctrl_reg_data, min_test_reads, fm_offset, std_ref, prior_weights) else: # TODO get region sequence and expected levels/sds here # instead of for each read # after that add per-read stat computation to API ctrl_cov = None # store multiple alt references lists (default to stat_type for de novo or # sample comp testing) stat_names = [stat_type,] if stat_type != ALT_MODEL_TXT else list( zip(*alt_refs))[0] reg_read_stats, stat_locs, reg_ids = [ dict((stat_name, []) for stat_name in stat_names) for _ in range(3)] for r_data in reg_data.reads: try: if stat_type == SAMP_COMP_TXT: r_stats, r_poss, read_id = compute_sample_compare_read_stats( r_data, ctrl_means, ctrl_sds, fm_offset, reg_data) elif stat_type == DE_NOVO_TXT: r_stats, r_poss, read_id = compute_de_novo_read_stats( r_data, std_ref, fm_offset, reg_data) else: r_stats, r_poss, read_id = compute_alt_model_read_stats( r_data, std_ref, alt_refs, use_standard_llhr, reg_data) except th.TomboError: continue if r_stats is None: continue for stat_name, stat_r_stats in r_stats.items(): reg_read_stats[stat_name].append(stat_r_stats) reg_ids[stat_name].append((read_id, stat_r_stats.shape[0])) stat_locs[stat_name].append(r_poss[stat_name]) if sum(len(stat_reg_read_stats) for stat_reg_read_stats in reg_read_stats.values()) == 0: raise th.TomboError('Reads contains no statistics in this region.') reg_stats = [ (stat_name, collate_reg_stats( stat_name_stats, stat_locs[stat_name], reg_ids[stat_name], per_read_q, reg_data, single_read_thresh, lower_thresh, stat_type, stat_name, ctrl_cov)) for stat_name, stat_name_stats in reg_read_stats.items()] return reg_stats ################################### ########## Level Testing ########## ################################### def compute_ks_tests(samp_base_levels, ctrl_base_levels, return_stat): def compute_pos_ks_test(pos_samp_levels, pos_ctrl_levels): """Compute effect size statistic or p-value of two-sample Kolmogorov-Smirnov test Using definition from https://github.com/scipy/scipy/blob/v0.14.0/scipy/stats/stats.py#L3886 """ samp_n, ctrl_n = pos_samp_levels.shape[0], pos_ctrl_levels.shape[0] pos_all_levels = np.concatenate([pos_samp_levels, pos_ctrl_levels]) samp_cdf = np.searchsorted( pos_samp_levels, pos_all_levels, side='right') / samp_n ctrl_cdf = np.searchsorted( pos_ctrl_levels, pos_all_levels, side='right') / ctrl_n d = np.max(np.absolute(samp_cdf - ctrl_cdf)) if return_stat: # subtract 1 so most significant are smallest values return 1 - d en = np.sqrt(samp_n * ctrl_n / float(samp_n + ctrl_n)) return stats.distributions.kstwobign.sf((en + 0.12 + 0.11 / en) * d) samp_valid_indices = np.logical_not(np.isnan(samp_base_levels)) ctrl_valid_indices = np.logical_not(np.isnan(ctrl_base_levels)) return np.array([compute_pos_ks_test( np.sort(pos_samp_levels[samp_valid_indices[i]]), np.sort(pos_ctrl_levels[ctrl_valid_indices[i]])) for i, (pos_samp_levels, pos_ctrl_levels) in enumerate(zip( samp_base_levels, ctrl_base_levels))]) def compute_u_tests(samp_base_levels, ctrl_base_levels, return_stat): def compute_pos_u_test(pos_samp_levels, pos_ctrl_levels): """Compute effect size statistic or p-value of two-sample u-test """ samp_n, ctrl_n = pos_samp_levels.shape[0], pos_ctrl_levels.shape[0] tot_comps = samp_n * ctrl_n pos_all_levels = np.concatenate([pos_samp_levels, pos_ctrl_levels]) ranks = np.empty(samp_n + ctrl_n, int) ranks[pos_all_levels.argsort()] = np.arange(1, samp_n + ctrl_n + 1) samp_ranks_sum = ranks[:samp_n].sum() #ctrl_ranks_sum = ranks[samp_n:].sum() u1 = samp_ranks_sum - (samp_n * (samp_n + 1)) / 2 #u2 = ctrl_ranks_sum - (ctrl_n * (ctrl_n + 1)) / 2 u2 = tot_comps - u1 u = min(u1, u2) mu = tot_comps / 2 if return_stat: # this will be negative (flip sign for stat transform) return (u - mu) / mu rhou = np.sqrt(tot_comps * (tot_comps + 1) / 12) z = (u - mu) / rhou return stats.norm.cdf(z) * 2.0 samp_valid_indices = np.logical_not(np.isnan(samp_base_levels)) ctrl_valid_indices = np.logical_not(np.isnan(ctrl_base_levels)) return np.array([compute_pos_u_test( np.sort(pos_samp_levels[samp_valid_indices[i]]), np.sort(pos_ctrl_levels[ctrl_valid_indices[i]])) for i, (pos_samp_levels, pos_ctrl_levels) in enumerate(zip( samp_base_levels, ctrl_base_levels))]) def compute_t_tests(samp_base_levels, ctrl_base_levels, return_stat): def compute_pos_t_test(pos_samp_levels, pos_ctrl_levels): """Compute effect size statistic or p-value of two-sample t-test """ samp_n, ctrl_n = pos_samp_levels.shape[0], pos_ctrl_levels.shape[0] tot_comps = samp_n * ctrl_n samp_mean, samp_sd = c_mean_std(pos_samp_levels) ctrl_mean, ctrl_sd = c_mean_std(pos_ctrl_levels) if return_stat: # this will be negative (flip sign for stat transform) return -np.abs(samp_mean - ctrl_mean) / np.sqrt( ((samp_sd ** 2) + (ctrl_sd ** 2)) / 2) sp = np.sqrt((((samp_n - 1) * (samp_sd ** 2)) + (ctrl_n - 1) * (ctrl_sd ** 2)) / (samp_n + ctrl_n - 2)) t = -np.abs(samp_mean - ctrl_mean) / ( sp * np.sqrt((1 / samp_n) + (1 / ctrl_n))) # t dist with samp_n + ctrl_n - 2 d.o.f. return stats.t.cdf(t, samp_n + ctrl_n - 2) * 2.0 samp_valid_indices = np.logical_not(np.isnan(samp_base_levels)) ctrl_valid_indices = np.logical_not(np.isnan(ctrl_base_levels)) return np.array([compute_pos_t_test( np.sort(pos_samp_levels[samp_valid_indices[i]]), np.sort(pos_ctrl_levels[ctrl_valid_indices[i]])) for i, (pos_samp_levels, pos_ctrl_levels) in enumerate(zip( samp_base_levels, ctrl_base_levels))]) def compute_group_reg_stats( reg_data, ctrl_reg_data, fm_offset, min_test_reads, stat_type): samp_base_levels = reg_data.copy().update( start=reg_data.start - fm_offset, end=reg_data.end + fm_offset).get_base_levels() ctrl_base_levels = ctrl_reg_data.copy().update( start=ctrl_reg_data.start - fm_offset, end=ctrl_reg_data.end + fm_offset).get_base_levels() # get regions with coverage greater than min_test_reads samp_cov = np.logical_not(np.isnan(samp_base_levels)).sum(axis=1) ctrl_cov = np.logical_not(np.isnan(ctrl_base_levels)).sum(axis=1) cov_regs = np.where(np.diff(np.concatenate([[False,], np.logical_and( np.greater_equal(samp_cov, min_test_reads), np.greater_equal(ctrl_cov, min_test_reads)), [False,]])))[0] if len(cov_regs) == 0: return [] reg_stats, reg_poss, reg_cov, reg_ctrl_cov = [], [], [], [] for cov_start, cov_end in zip(cov_regs[:-1:2], cov_regs[1::2]): if cov_end - cov_start < (fm_offset * 2) + 1: continue if stat_type in (KS_TEST_TXT, KS_STAT_TEST_TXT): cov_reg_stats = compute_ks_tests( samp_base_levels[cov_start:cov_end], ctrl_base_levels[cov_start:cov_end], stat_type == KS_STAT_TEST_TXT) elif stat_type in (U_TEST_TXT, U_STAT_TEST_TXT): cov_reg_stats = compute_u_tests( samp_base_levels[cov_start:cov_end], ctrl_base_levels[cov_start:cov_end], stat_type == U_STAT_TEST_TXT) elif stat_type in (T_TEST_TXT, T_STAT_TEST_TXT): cov_reg_stats = compute_t_tests( samp_base_levels[cov_start:cov_end], ctrl_base_levels[cov_start:cov_end], stat_type == T_STAT_TEST_TXT) else: raise NotImplementedError('Unrecognized test type.') if fm_offset > 0: if stat_type in (KS_TEST_TXT, U_TEST_TXT, T_TEST_TXT): cov_reg_stats = calc_window_fishers_method( cov_reg_stats, fm_offset) else: cov_reg_stats = calc_window_means(cov_reg_stats, fm_offset) reg_stats.append(cov_reg_stats) reg_poss.append(np.arange(reg_data.start - fm_offset + cov_start, reg_data.start - fm_offset + cov_end)) reg_cov.append(samp_cov[cov_start:cov_end]) reg_ctrl_cov.append(ctrl_cov[cov_start:cov_end]) # if the valid coveraage only intersects the end of the region, # there may be no valid stats from this region. if len(reg_stats) == 0: return [] return [(stat_type, th.groupStats( np.concatenate(reg_stats), np.concatenate(reg_poss), reg_data.chrm, reg_data.strand, reg_data.start, np.concatenate(reg_cov), np.concatenate(reg_ctrl_cov))),] ############################################## ########## Testing Multi-processing ########## ############################################## def _test_signif_worker( region_q, stats_q, progress_q, per_read_q, reads_index, fm_offset, min_test_reads, single_read_thresh, lower_thresh, ctrl_reads_index, std_ref, alt_refs, use_standard_llhr, stat_type, prior_weights): ctrl_reg_data = None while True: try: reg_data = region_q.get(block=False) except queue.Empty: # sometimes throws false empty error with get(block=False) sleep(0.01) if not region_q.empty(): continue break if ctrl_reads_index is not None: ctrl_reg_data = reg_data.copy().add_reads(ctrl_reads_index) reg_data.add_reads(reads_index) if len(reg_data.reads) == 0: progress_q.put(1) continue try: if stat_type in (ALT_MODEL_TXT, DE_NOVO_TXT, SAMP_COMP_TXT): stat_type_reg_stats = compute_reg_stats( reg_data, fm_offset, min_test_reads, single_read_thresh, lower_thresh, ctrl_reg_data, std_ref, alt_refs, use_standard_llhr, per_read_q, stat_type, prior_weights) else: stat_type_reg_stats = compute_group_reg_stats( reg_data, ctrl_reg_data, fm_offset, min_test_reads, stat_type) except th.TomboError: progress_q.put(1) continue for stat_type_i_reg_stats in stat_type_reg_stats: stats_q.put(stat_type_i_reg_stats) progress_q.put(1) return if _PROFILE_SIGNIF: _test_signif_wrapper = _test_signif_worker def _test_signif_worker(*args): import cProfile cProfile.runctx('_test_signif_wrapper(*args)', globals(), locals(), filename='test_signif.prof') return def _get_stats_queue( stats_q, stats_conn, min_test_reads, stats_file_bn, stat_type, alt_names, reg_size, cov_damp_counts, num_most_signif): # multiple files with alt_names all_stats = {} if stat_type == ALT_MODEL_TXT: for alt_name in alt_names: all_stats[alt_name] = ModelStats( stats_file_bn + '.' + alt_name + '.tombo.stats', stat_type=stat_type, region_size=reg_size, cov_damp_counts=cov_damp_counts, cov_thresh=min_test_reads, num_most_signif=num_most_signif) elif stat_type in (DE_NOVO_TXT, SAMP_COMP_TXT): all_stats[stat_type] = ModelStats( stats_file_bn + '.tombo.stats', stat_type=stat_type, region_size=reg_size, cov_damp_counts=cov_damp_counts, cov_thresh=min_test_reads, num_most_signif=num_most_signif) else: all_stats[stat_type] = LevelStats( stats_file_bn + '.tombo.stats', stat_type=stat_type, region_size=reg_size, cov_thresh=min_test_reads, num_most_signif=num_most_signif) while True: try: stat_name, reg_stats = stats_q.get(block=False) all_stats[stat_name]._write_stat_block(reg_stats) except queue.Empty: # wait for main process to send indicator that all regions # have been processed if stats_conn.poll(): sleep(0.1) break sleep(0.1) continue # Clear leftover values from queues while not stats_q.empty(): stat_name, reg_stats = stats_q.get(block=False) all_stats[stat_name]._write_stat_block(reg_stats) for stat_name_all_stats in all_stats.values(): stat_name_all_stats.close() stats_conn.send(True) return if _PROFILE_SIGNIF_STATS_OUT: _get_stats_queue_wrapper = _get_stats_queue def _get_stats_queue(*args): import cProfile cProfile.runctx('_get_stats_queue_wrapper(*args)', globals(), locals(), filename='test_signif_stats_out.prof') return def _get_per_read_queue( per_read_q, per_read_conn, per_read_bn, stat_type, alt_names, region_size): per_read_stats = {} if stat_type == ALT_MODEL_TXT: for alt_name in alt_names: per_read_stats[alt_name] = PerReadStats( per_read_bn + '.' + alt_name + '.tombo.per_read_stats', stat_type, region_size) else: per_read_stats[stat_type] = PerReadStats( per_read_bn + '.tombo.per_read_stats', stat_type, region_size) while True: try: stat_name, per_read_block = per_read_q.get(block=False) per_read_stats[stat_name]._write_per_read_block(*per_read_block) del per_read_block except queue.Empty: if per_read_conn.poll(): sleep(0.1) break sleep(0.1) continue # Clear leftover values from queues while not per_read_q.empty(): stat_name, per_read_block = per_read_q.get(block=False) per_read_stats[stat_name]._write_per_read_block(*per_read_block) del per_read_block for stat_name_per_read_stats in per_read_stats.values(): stat_name_per_read_stats.close() # indicate that the process has closed per_read_conn.send(True) return if _PROFILE_SIGNIF_PER_READ: _get_per_read_queue_wrapper = _get_per_read_queue def _get_per_read_queue(*args): import cProfile cProfile.runctx( '_get_per_read_queue_wrapper(*args)', globals(), locals(), filename='test_signif_per_read.prof') return def _get_progress_queue(progress_q, prog_conn, num_regions): th.status_message( 'Performing modified base detection across genomic regions.') bar = tqdm(total=num_regions, smoothing=0) tot_num_rec_proc = 0 while True: try: iter_val = progress_q.get(block=False) tot_num_rec_proc += iter_val bar.update(iter_val) except queue.Empty: if prog_conn.poll(): break sleep(0.1) continue bar.close() prog_conn.send(tot_num_rec_proc) return def test_significance( reads_index, stat_type, stats_file_bn, region_size, num_processes, min_test_reads, num_most_signif, per_read_bn=None, single_read_thresh=None, lower_thresh=None, cov_damp_counts=None, fm_offset=None, ctrl_reads_index=None, std_ref=None, alt_refs=None, use_standard_llhr=False, prior_weights=None): """Test for significant shifted signal in mutliprocessed batches """ region_q = Queue() stats_q = Queue(STAT_BLOCKS_QUEUE_LIMIT) progress_q = Queue() per_read_q = Queue(STAT_BLOCKS_QUEUE_LIMIT) \ if per_read_bn else None # split chromosomes into separate regions to process independently num_regions = 0 # TODO add min coverage when adding ctrl coverage (instead of adding # as is done currently) for chrm, strand, reg_start in reads_index.iter_cov_regs( 1, region_size, ctrl_reads_index): region_q.put(th.intervalData( chrm=chrm, start=reg_start, end=reg_start + region_size, strand=strand)) num_regions += 1 # wait for queue items to register in queue and avoid queue appearing empty sleep(0.1) test_args = ( region_q, stats_q, progress_q, per_read_q, reads_index, fm_offset, min_test_reads, single_read_thresh, lower_thresh, ctrl_reads_index, std_ref, alt_refs, use_standard_llhr, stat_type, prior_weights) test_ps = [] for p_id in range(num_processes): p = Process(target=_test_signif_worker, args=test_args) p.start() test_ps.append(p) # start queue getter processes if VERBOSE: main_prog_conn, prog_conn = Pipe() prog_p = Process(target=_get_progress_queue, args=(progress_q, prog_conn, num_regions)) prog_p.daemon = True prog_p.start() # main region stats queue getter alt_names = None if alt_refs is None else list(zip(*alt_refs))[0] main_stats_conn, stats_conn = Pipe() stats_p = Process(target=_get_stats_queue, args=( stats_q, stats_conn, min_test_reads, stats_file_bn, stat_type, alt_names, region_size, cov_damp_counts, num_most_signif)) stats_p.daemon = True stats_p.start() # per-read stats queue getter if per_read_bn is not None: main_per_read_conn, per_read_conn = Pipe() per_read_p = Process( target=_get_per_read_queue, args=(per_read_q, per_read_conn, per_read_bn, stat_type, alt_names, region_size)) per_read_p.daemon = True per_read_p.start() # wait for test processes to finish for test_p in test_ps: test_p.join() # in a very unlikely case the progress queue could die while the # main process remains active and thus we would have a deadlock here if VERBOSE and prog_p.is_alive(): # send signal to getter queue to finish and return results main_prog_conn.send(True) # returns total number of processed reads if that is needed main_prog_conn.recv() if per_read_bn is not None: main_per_read_conn.send(True) main_per_read_conn.recv() main_stats_conn.send(True) main_stats_conn.recv() return ################################################ ########## Aggregate Multi-processing ########## ################################################ def _write_stats( stats_q, stats_fn, stat_type, region_size, cov_damp_counts, min_test_reads, num_most_signif, num_blocks, num_processes): all_stats = ModelStats( stats_fn, stat_type=stat_type, region_size=region_size, cov_damp_counts=cov_damp_counts, cov_thresh=min_test_reads, num_most_signif=num_most_signif) if VERBOSE: bar = tqdm(total=num_blocks, smoothing=0) num_agg_ps_finished = 0 while True: try: agg_stats = stats_q.get(block=False) if agg_stats is None: num_agg_ps_finished += 1 if num_agg_ps_finished >= num_processes: break continue if VERBOSE: bar.update(1) ((reg_frac_std_base, reg_cov, ctrl_cov, valid_cov), chrm, strand, start, us_reg_poss) = agg_stats all_stats._write_stat_block( th.regionStats(reg_frac_std_base, us_reg_poss, chrm, strand, start, reg_cov, ctrl_cov, valid_cov)) except queue.Empty: sleep(0.1) if VERBOSE: bar.close() all_stats.close() if all_stats.is_empty: th.error_message_and_exit( 'No genomic positions contain --minimum-test-reads.') return def _agg_stats_worker( pr_stats_q, stats_q, stat_type, single_read_thresh, lower_thresh): while True: try: block_pr_stats = pr_stats_q.get(block=False) # None value indicates that per-reads blocks have been exhausted if block_pr_stats is None: stats_q.put(None) break chrm, strand, start, end, block_stats = block_pr_stats block_stats.sort(order=str('pos')) reg_poss = block_stats['pos'] us_reg_poss = np.unique(reg_poss) reg_base_stats = np.split( block_stats['stat'], np.where(np.concatenate( [[0,], np.diff(reg_poss)]) > 0)[0]) reg_stats = apply_per_read_thresh( reg_base_stats, single_read_thresh, lower_thresh, stat_type, reg_poss) stats_q.put((reg_stats, chrm, strand, start, us_reg_poss)) except queue.Empty: sleep(0.1) return def _load_stats_batches(pr_stats_fn, pr_stats_q, num_processes): pr_stats = PerReadStats(pr_stats_fn) for pr_block in pr_stats: pr_stats_q.put(pr_block) for _ in range(num_processes): pr_stats_q.put(None) return def aggregate_per_read_stats( pr_stats_fn, single_read_thresh, lower_thresh, stats_fn, cov_damp_counts, min_test_reads, num_most_signif, num_processes): if VERBOSE: th.status_message( 'Loading and aggregating per-read statistics.') # pre-load per-read stats queue pr_stats = PerReadStats(pr_stats_fn) stat_type, num_blocks, region_size = ( pr_stats.stat_type, pr_stats.num_blocks, pr_stats.region_size) pr_stats.close() pr_stats_q = Queue(STAT_BLOCKS_QUEUE_LIMIT) stats_q = Queue(STAT_BLOCKS_QUEUE_LIMIT) write_stats_q = Queue(STAT_BLOCKS_QUEUE_LIMIT) load_stats_p = Process(target=_load_stats_batches, args=( pr_stats_fn, pr_stats_q, num_processes)) load_stats_p.daemon = True load_stats_p.start() agg_stats_ps = [] for p_id in range(num_processes): agg_p = Process(target=_agg_stats_worker, args=( pr_stats_q, stats_q, stat_type, single_read_thresh, lower_thresh)) agg_p.daemon = True agg_p.start() agg_stats_ps.append(agg_p) write_stats_p = Process(target=_write_stats, args=( stats_q, stats_fn, stat_type, region_size, cov_damp_counts, min_test_reads, num_most_signif, num_blocks, num_processes)) write_stats_p.daemon = True write_stats_p.start() # wait for processes to complete load_stats_p.join() for agg_p in agg_stats_ps: agg_p.join() write_stats_p.join() return ########################## ##### Main Functions ##### ########################## def _est_ref_main(args): global VERBOSE VERBOSE = not args.quiet th.VERBOSE = VERBOSE if min(args.upstream_bases, args.downstream_bases) == 0: th.error_message_and_exit( 'Context upstream and downstream must be greater ' + 'than 0 for model estimation.') std_ref = estimate_kmer_model( args.fast5_basedirs, args.corrected_group, args.basecall_subgroups, args.minimum_test_reads, args.upstream_bases, args.downstream_bases, args.minimum_kmer_observations, args.kmer_specific_sd, args.coverage_threshold, args.estimate_mean, args.multiprocess_region_size, args.processes) std_ref.write_model(args.tombo_model_filename) return def _est_alt_ref_main(args): global VERBOSE VERBOSE = not args.quiet th.VERBOSE = VERBOSE alt_ref = estimate_alt_model( args.fast5_basedirs, args.control_fast5_basedirs, args.corrected_group, args.basecall_subgroups, args.tombo_model_filename, args.seq_sample_type, args.alternate_model_base, args.alt_fraction_percentile, args.minimum_kmer_observations, args.save_density_basename, args.kernel_density_bandwidth, args.alternate_density_filename, args.control_density_filename, args.processes) # returns None when profiling method if alt_ref is None: return alt_ref.name = args.alternate_model_name alt_ref.write_model(args.alternate_model_filename) return def _est_motif_alt_ref_main(args): global VERBOSE VERBOSE = not args.quiet th.VERBOSE = VERBOSE alt_ref = estimate_motif_alt_model( args.fast5_basedirs, args.corrected_group, args.basecall_subgroups, args.motif_description, args.upstream_bases, args.downstream_bases, args.valid_locations_filename, args.minimum_kmer_observations, args.minimum_test_reads, args.coverage_threshold, args.multiprocess_region_size, args.processes) alt_ref.name = args.alternate_model_name alt_ref.write_model(args.alternate_model_filename) return def _estimate_scale_main(args): global VERBOSE VERBOSE = not args.quiet th.VERBOSE = VERBOSE if VERBOSE: th.status_message('Getting files list.') try: if not os.path.isdir(args.fast5s_basedir): th.error_message_and_exit( 'Provided [fast5-basedir] is not a directory.') fast5s_basedir = ( args.fast5s_basedir if args.fast5s_basedir.endswith('/') else args.fast5s_basedir + '/') fast5_fns = th.get_files_list(fast5s_basedir) except OSError: th.error_message_and_exit( 'Reads base directory, a sub-directory or an old (hidden) ' + 'index file does not appear to be accessible. Check ' + 'directory permissions.') if len(fast5_fns) < 1: th.error_message_and_exit( 'No files identified in the specified ' + 'directory or within immediate subdirectories.') th.status_message('Global scaling estimate: ' + unicode(estimate_global_scale(fast5_fns))) return def _de_novo_main( args, lower_thresh, single_read_thresh, seq_samp_type, reads_index): if seq_samp_type is None: seq_samp_type = th.get_seq_sample_type(reads_index=reads_index) std_ref = TomboModel( ref_fn=args.tombo_model_filename, seq_samp_type=seq_samp_type, reads_index=reads_index) stat_type = DE_NOVO_TXT lower_thresh, single_read_thresh = ( (lower_thresh, single_read_thresh) if single_read_thresh is not None else DE_NOVO_THRESH[seq_samp_type.name]) if VERBOSE: th.status_message( 'Performing de novo model testing against canonical model.') test_significance( reads_index, stat_type, args.statistics_file_basename, args.multiprocess_region_size, args.processes, args.minimum_test_reads, args.num_most_significant_stored, args.per_read_statistics_basename, single_read_thresh, lower_thresh, args.coverage_dampen_counts, fm_offset=args.fishers_method_context, std_ref=std_ref) return def _alt_main( args, lower_thresh, single_read_thresh, seq_samp_type, reads_index): if seq_samp_type is None: seq_samp_type = th.get_seq_sample_type(reads_index=reads_index) std_ref = TomboModel( ref_fn=args.tombo_model_filename, seq_samp_type=seq_samp_type, reads_index=reads_index) stat_type = ALT_MODEL_TXT lower_thresh, single_read_thresh = ( (lower_thresh, single_read_thresh) if single_read_thresh is not None else LLR_THRESH[seq_samp_type.name]) if VERBOSE: th.status_message('Performing alternative model testing.') alt_refs = load_alt_refs( args.alternate_model_filenames, args.alternate_bases, reads_index, std_ref, seq_samp_type) if len(alt_refs) == 0: th.error_message_and_exit('No alternative models successfully loaded.') if VERBOSE: th.status_message( 'Performing specific alternate base(s) testing.') test_significance( reads_index, stat_type, args.statistics_file_basename, args.multiprocess_region_size, args.processes, args.minimum_test_reads, args.num_most_significant_stored, args.per_read_statistics_basename, single_read_thresh, lower_thresh, args.coverage_dampen_counts, std_ref=std_ref, alt_refs=list(alt_refs.items()), use_standard_llhr=args.standard_log_likelihood_ratio) return def _model_samp_comp_main( args, lower_thresh, single_read_thresh, seq_samp_type, reads_index): stat_type = SAMP_COMP_TXT if single_read_thresh is None: if seq_samp_type is None: seq_samp_type = th.get_seq_sample_type(reads_index=reads_index) lower_thresh, single_read_thresh = SAMP_COMP_THRESH[seq_samp_type.name] if VERBOSE: th.status_message( 'Performing two-sample comparison significance testing.') ctrl_reads_index = th.TomboReads( args.control_fast5_basedirs, args.corrected_group, args.basecall_subgroups) # load expected levels ref for posterior computation std_ref = None if args.sample_only_estimates else TomboModel( ref_fn=args.tombo_model_filename, seq_samp_type=seq_samp_type, reads_index=reads_index) test_significance( reads_index, stat_type, args.statistics_file_basename, args.multiprocess_region_size, args.processes, args.minimum_test_reads, args.num_most_significant_stored, args.per_read_statistics_basename, single_read_thresh, lower_thresh, args.coverage_dampen_counts, fm_offset=args.fishers_method_context, ctrl_reads_index=ctrl_reads_index, std_ref=std_ref, prior_weights=args.model_prior_weights) return def _level_samp_comp_main(args, reads_index): if args.statistic_type == 'ks': stat_type = KS_TEST_TXT if args.store_p_value else KS_STAT_TEST_TXT elif args.statistic_type == 'u': stat_type = U_TEST_TXT if args.store_p_value else U_STAT_TEST_TXT elif args.statistic_type == 't': stat_type = T_TEST_TXT if args.store_p_value else T_STAT_TEST_TXT else: raise th.TomboError('Invalid statistic type.') if VERBOSE: th.status_message( 'Performing two-sample group comparison significance testing.') ctrl_reads_index = th.TomboReads( args.alternate_fast5_basedirs, args.corrected_group, args.basecall_subgroups) test_significance( reads_index, stat_type, args.statistics_file_basename, args.multiprocess_region_size, args.processes, args.minimum_test_reads, args.num_most_significant_stored, fm_offset=args.fishers_method_context, ctrl_reads_index=ctrl_reads_index) return def _test_shifts_main(args): global VERBOSE VERBOSE = not args.quiet th.VERBOSE = VERBOSE # Check extra requirements for alternative model testing if args.action_command == 'alternative_model': if args.print_available_models: _print_alt_models() sys.exit() if args.fast5_basedirs is None or args.statistics_file_basename is None: th.error_message_and_exit( 'Must provide both a set of FAST5 read files ' + '(--fast5-basedirs) and an output file basename ' + '(--statistics-file-basename).') if (args.alternate_model_filenames is None and args.alternate_bases is None): th.error_message_and_exit( 'Must provide an alterntive model against which to test.\n\t' + 'Run with --print-available-models option to see possible ' + 'values for the --alternate-bases option.') if 'single_read_threshold' in args: if args.single_read_threshold is None: lower_thresh = None single_read_thresh = None elif len(args.single_read_threshold) == 1: single_read_thresh = args.single_read_threshold[0] lower_thresh = None else: if len(args.single_read_threshold) > 2: th.warning_message( 'Only 1 or 2 values may be passed as single-read ' + 'thresholds. Only using the first 2 options provided.') lower_thresh = args.single_read_threshold[0] single_read_thresh = args.single_read_threshold[1] try: if args.seq_sample_type is None: seq_samp_type = None else: # sample compare does not have seq_sample_type in the namespace seq_samp_type = th.seqSampleType(DNA_SAMP_TYPE, False) \ if args.seq_sample_type == DNA_SAMP_TYPE else \ th.seqSampleType(RNA_SAMP_TYPE, True) except AttributeError: seq_samp_type = None reads_index = th.TomboReads( args.fast5_basedirs, args.corrected_group, args.basecall_subgroups) if args.action_command == 'de_novo': _de_novo_main( args, lower_thresh, single_read_thresh, seq_samp_type, reads_index) elif args.action_command == 'alternative_model': _alt_main( args, lower_thresh, single_read_thresh, seq_samp_type, reads_index) elif args.action_command == 'model_sample_compare': _model_samp_comp_main( args, lower_thresh, single_read_thresh, seq_samp_type, reads_index) elif args.action_command == 'level_sample_compare': _level_samp_comp_main(args, reads_index) else: th.error_message_and_exit('Invalid Tombo detect_modifications command.') return def _aggregate_per_read_main(args): global VERBOSE VERBOSE = not args.quiet th.VERBOSE = VERBOSE if len(args.single_read_threshold) == 1: lower_thresh = None single_read_thresh = args.single_read_threshold[0] else: if len(args.single_read_threshold) > 2: th.warning_message( 'Only 1 or 2 values may be passed as single-read ' + 'thresholds. Only using the first 2 options provided.') lower_thresh = args.single_read_threshold[0] single_read_thresh = args.single_read_threshold[1] aggregate_per_read_stats( args.per_read_statistics_filename, single_read_thresh, lower_thresh, args.statistics_filename, args.coverage_dampen_counts, args.minimum_test_reads, args.num_most_significant_stored, args.processes) return if __name__ == '__main__': sys.stderr.write('This is a module. See commands with `tombo -h`') sys.exit(1)