Source code for fast5_research.util

from copy import deepcopy
from itertools import tee
from math import pow, log10
import os
import sys

import numpy as np
import numpy.lib.recfunctions as nprf


[docs]def qstring_to_phred(quality): """Compute standard phred scores from a quality string.""" qscores = [ord(q) - 33 for q in quality] return qscores
[docs]def mean_qscore(scores): """Returns the phred score corresponding to the mean of the probabilities associated with the phred scores provided. Taken from chimaera.common.utilities. :param scores: Iterable of phred scores. :returns: Phred score corresponding to the average error rate, as estimated from the input phred scores. """ if len(scores) == 0: return 0.0 sum_prob = 0.0 for val in scores: sum_prob += pow(10, -0.1 * val) mean_prob = sum_prob / len(scores) return -10.0 * log10(mean_prob)
[docs]def kmer_overlap_gen(kmers, moves=None): """From a list of kmers return the character shifts between them. (Movement from i to i+1 entry, e.g. [AATC,ATCG] returns [0,1]). Allowed moves may be specified in moves argument in order of preference. Taken from dragonet.bio.seq_tools :param kmers: sequence of kmer strings. :param moves: allowed movements, if None all movements to length of kmer are allowed. """ first = True yield 0 for last_kmer, this_kmer in window(kmers, 2): if first: if moves is None: l = len(this_kmer) moves = range(l + 1) first = False l = len(this_kmer) for j in moves: if j < 0: if last_kmer[:j] == this_kmer[-j:]: yield j break elif j > 0 and j < l: if last_kmer[j:l] == this_kmer[0:-j]: yield j break elif j == 0: if last_kmer == this_kmer: yield 0 break else: yield l break
[docs]def build_mapping_table(events, ref_seq, post, scale, path, model): """Build a mapping table based on output of a dragonet.mapper style object. Taken from chimaera.common.utilities. :param events: Numpy record array of events. Must contain the mean, stdv, start and length fields. :param ref_seq: String representation of the reference sequence. :param post: Numpy 2D array containing the posteriors (event, state). :param scale: Scaling object. :param path: Numpy 1D array containing position in reference. May contain negative values, which will be interpreted as "bad emissions". :param model: Model object to use. :returns: numpy record array containing summary fields. One record per event. ==================== ===================================================== Output Field Description ==================== ===================================================== *mean* mean value of event samples (level) *scaled_mean* *mean* scaled to the bare level emission (mean/mode) *stdv* standard deviation of event samples (noise) *scaled_stdv* *stdv* scaled to the bare stdv emission (mode) *start* start time of event /s *length* length of event /s *model_level* modelled event level, i.e. the level emission associated with the kmer *kmer*, scaled to the data *model_scaled_level* bare level emission *model_sd* modelled event noise, i.e. the sd emission associated with the kmer *kmer*, scaled to the data *model_scaled_sd* bare noise emission *seq_pos* aligned sequence position, position on Viterbi path *p_seq_pos* posterior probability of states on Viterbi path *kmer* kmer identity of *seq_pos* *mp_pos* aligned sequence position, position with highest posterioir *p_mp_pos* posterior probability of most probable states *mp_kmer* kmer identity of *mp_kmer* *good_emission* whether or not the HMM has tagged event as fitting the model ==================== ===================================================== """ kmer_len = len(model['kmer'][0]) kmer_index = seq_to_kmers(ref_seq, kmer_len) label_index = dict((j,i) for i,j in enumerate(model['kmer'])) kmer_dtype = '|S{}'.format(kmer_len) column_names = ['mean', 'scaled_mean', 'stdv', 'scaled_stdv', 'start', 'length', 'model_level', 'model_scaled_level', 'model_sd', 'model_scaled_sd', 'p_seq_pos', 'p_mp_pos', 'seq_pos', 'mp_pos', 'move', 'good_emission', 'kmer', 'mp_kmer'] column_types = [float] * 12 + [int] * 3 + [bool] + [kmer_dtype] * 2 table = np.zeros(events.size, dtype=list(zip(column_names, column_types))) zero_start = events['start'] - events['start'][0] # Sequence position seq_pos = np.where(path >= 0, path, np.abs(path) - 1) seq_kmer = [kmer_index[x] for x in seq_pos] seq_kmer_i = [label_index[i] for i in seq_kmer] table['seq_pos'] = seq_pos table['kmer'] = seq_kmer table['p_seq_pos'] = post[range(post.shape[0]), seq_pos] table['move'] = np.ediff1d(seq_pos, to_begin=[0]) # Highest posterior positions mp_pos = np.argmax(post, axis=1) table['mp_pos'] = mp_pos table['mp_kmer'] = [kmer_index[x] for x in mp_pos] table['p_mp_pos'] = post[range(post.shape[0]), table['mp_pos']] # The data for x in ('mean', 'start','length', 'stdv'): table[x] = events[x] # scaling data to model table['scaled_mean'] = (table['mean'] - scale.shift - scale.drift * zero_start) / scale.scale table['scaled_stdv'] = table['stdv'] / scale.scale_sd # The model table['model_scaled_level'] = model['level_mean'][seq_kmer_i] table['model_scaled_sd'] = model['sd_mean'][seq_kmer_i] # The model scaled to the data table['model_level'] = scale.shift + scale.drift * zero_start + scale.scale * table['model_scaled_level'] table['model_sd'] = scale.scale_sd * table['model_scaled_sd'] # Tag ignore and outlier states table['good_emission'] = [x >= 0 for x in path] return table
[docs]def build_mapping_summary_table(mapping_summary): """Build a mapping summary table :param mapping_summary: List of curr_map dictionaries :returns: Numpy record array containing summary contents. One record per array element of mapping_summary """ # Set memory allocation for variable length strings # This works, but there must be a better way max_len_name = 1 max_len_direction = 1 max_len_seq = 1 for summary_line in mapping_summary: len_name = len(summary_line['name']) if len_name > max_len_name: max_len_name = len_name len_direction = len(summary_line['direction']) if len_direction > max_len_direction: max_len_direction = len_direction len_seq = len(summary_line['seq']) if len_seq > max_len_seq: max_len_seq = len_seq column_names = ['name', 'direction', 'is_best', 'score', 'scale', 'shift', 'drift', 'scale_sd', 'var_sd', 'var', 'seq'] column_types = ['|S{}'.format(max_len_name)] + ['|S{}'.format(max_len_direction)] + [bool] + [float] * 7 + ['|S{}'.format(max_len_seq)] table = np.zeros(len(mapping_summary), dtype=list(zip(column_names, column_types))) for table_line, summary_line, in zip(table,mapping_summary): table_line['name'] = summary_line['name'] table_line['direction'] = summary_line['direction'] table_line['score'] = summary_line['score'] table_line['scale'] = summary_line['scale'].scale table_line['shift'] = summary_line['scale'].shift table_line['drift'] = summary_line['scale'].drift table_line['scale_sd'] = summary_line['scale'].scale_sd table_line['var_sd'] = summary_line['scale'].var_sd table_line['var'] = summary_line['scale'].var table_line['seq'] = summary_line['seq'] table['is_best'] = False is_best = np.argmin([line['score'] for line in mapping_summary]) table[is_best]['is_best'] = True return table
[docs]def create_basecall_1d_output(raw_events, scale, path, model, post=None): """Create the annotated event table and basecalling summaries similiar to chimaera. :param raw_events: :class:`np.ndarray` with fields mean, stdv, start and, length fields. :param scale: :class:`dragonet.basecall.scaling.Scaler` object (or object with attributes `shift`, `scale`, `drift`, `var`, `scale_sd`, `var_sd`, and `var_sd`. :param path: list containing state indices with respect to `model`. :param model: `:class:dragonet.util.model.Model` object. :param post: Two-dimensional :class:`np.ndarray` containing posteriors (event, state). :param quality_data: :class:np.ndarray Array containing quality_data, used to annotate events. :returns: A tuple of: * the annotated input event table * a dict of result """ events = raw_events.copy() model_state = np.array([model[x]['kmer'] for x in path]) raw_model_level = np.array([model[x]['level_mean'] for x in path]) move = np.array(list(kmer_overlap_gen(model_state))) counts = np.bincount(move) stays = counts[0] skips = counts[2] if len(counts) > 2 else 0 # Extend the event table read_start = events[0]['start'] model_level = scale.shift + scale.scale * raw_model_level +\ scale.drift * (events['start'] - read_start) new_columns = ['model_state', 'model_level', 'move'] column_data = [model_state, model_level, move] if post is not None: weights = np.sum(post, axis=1) new_columns.append('weights') column_data.append(weights) drop_first = set(new_columns) & set(events.dtype.names) events = nprf.drop_fields(events, drop_first) table = nprf.append_fields(events, new_columns, data=column_data, asrecarray=True) # Compile the results results = { 'num_events': events.size, 'called_events': events.size, 'shift': scale.shift, 'scale': scale.scale, 'drift': scale.drift, 'var': scale.var, 'scale_sd': scale.scale_sd, 'var_sd': scale.var_sd, 'num_stays': stays, 'num_skips': skips } return table, results
[docs]def create_mapping_output(raw_events, scale, path, model, seq, post=None, n_states=None, is_reverse=False, substates=False): """Create the annotated event table and summaries similiar to chimaera :param raw_events: :class:`np.ndarray` with fields `mean`, `stdv`, `start`, and `length` fields. :param scale: :class:`dragonet.basecall.scaling.Scaler` object (or object with attributes `shift`, `scale`, `drift`, `var`, `scale_sd`, `var_sd`, and `var_sd`. :param path: list containing state indices with respect to `model`. :param model: `:class:dragonet.util.model.Model` object. :param seq: String representation of the reference sequence. :param post: Two-dimensional :class:`np.ndarray` containing posteriors (event, state). :param is_reverse: Mapping refers to '-' strand (bool). :param substate: Mapping contains substates? :returns: A tuple of: * the annotated input event table, * a dict of result. """ events = raw_events.copy() direction = '+' if not is_reverse else '-' has_post = True # If we don't have a posterior, pass a mock object if post is None: if n_states is None: raise ValueError('n_states is required if post is None.') has_post = False post = MockZeroArray((len(events), n_states)) table = build_mapping_table(events, seq, post, scale, path, model) # Delete mocked out columns if not has_post: to_delete = ['p_seq_pos', 'mp_pos', 'mp_kmer', 'p_mp_pos'] table = nprf.drop_fields(table, to_delete) if direction == '-': events['seq_pos'] = len(seq) - table['seq_pos'] ref_start = table['seq_pos'][-1] ref_stop = table['seq_pos'][0] else: ref_start = table['seq_pos'][0] ref_stop = table['seq_pos'][-1] # Compute movement stats. _, stays, skips = compute_movement_stats(path) results = { 'direction': direction, 'reference': seq, 'ref_start': ref_start, 'ref_stop': ref_stop, 'shift': scale.shift, 'scale': scale.scale, 'drift': scale.drift, 'var': scale.var, 'scale_sd': scale.scale_sd, 'var_sd': scale.var_sd, 'num_stays': stays, 'num_skips': skips } return table, results
[docs]class MockZeroArray(np.ndarray): def __init__(self, shape): """Mock enough of ndarray interface to be passable as a posterior matrix to chimaera build_mapping_table :param shape: tuple, shape of array """ self.shape = shape
[docs] def argmax(self, axis=0): """Fake argmax values of an array.""" return np.zeros(self.shape[1-axis], dtype=int)
[docs]def validate_event_table(table): """Check if an object contains all columns of a basic event array.""" if not isinstance(table, np.ndarray): raise TypeError('Table is not a ndarray.') req_fields = ['mean', 'stdv', 'start', 'length'] if not set(req_fields).issubset(table.dtype.names): raise KeyError( 'Array does not contain fields for event array: {}, got {}.'.format( req_fields, table.dtype.names ) )
[docs]def validate_model_table(table): """Check if an object contains all columns of a dragonet Model.""" if not isinstance(table, np.ndarray): raise TypeError('Table is not a ndarray.') req_fields = ['kmer', 'level_mean', 'level_stdv', 'sd_mean', 'sd_stdv'] if not set(req_fields).issubset(table.dtype.names): raise KeyError( 'Object does not contain fields required for Model: {}, got {}.'.format( req_fields, table.dtype.names ) )
[docs]def validate_scale_object(obj): """Check if an object contains all attributes of dragonet Scaler.""" req_attributes = ['shift', 'scale', 'drift', 'var', 'scale_sd', 'var_sd'] msg = 'Object does not contain attributes required for Scaler: {}'.format(req_attributes) assert all([hasattr(obj, attr) for attr in req_attributes]), msg
[docs]def compute_movement_stats(path): """Compute movement stats from a mapping state path :param path: :class:`np.ndarry` containing position in reference. Negative values are interpreted as "bad emissions". """ vitstate_indices = np.where(path >= 0, path, np.abs(path) - 1) move = np.ediff1d(vitstate_indices, to_begin=0) counts = np.bincount(move) stays = counts[0] skips = counts[2] if len(counts) > 2 else 0 return move, stays, skips
[docs]def seq_to_kmers(seq, length): """Turn a string into a list of (overlapping) kmers. e.g. perform the transformation: 'ATATGCG' => ['ATA','TAT', 'ATG', 'TGC', 'GCG'] :param seq: character string :param length: length of kmers in output :returns: A list of overlapping kmers """ return [seq[x:x + length] for x in range(0, len(seq) - length + 1)]
[docs]def window(iterable, size): """Create an iterator returning a sliding window from another iterator :param iterable: Iterator :param size: Size of window :returns: an iterator returning a tuple containing the data in the window """ assert size > 0, "Window size for iterator should be strictly positive, got {0}".format(size) iters = tee(iterable, size) for i in range(1, size): for each in iters[i:]: next(each, None) return list(zip(*iters))
[docs]def readtsv(fname, fields=None, **kwargs): """Read a tsv file into a numpy array with required field checking :param fname: filename to read. If the filename extension is gz or bz2, the file is first decompressed. :param fields: list of required fields. """ if not file_has_fields(fname, fields): raise KeyError('File {} does not contain requested required fields {}'.format(fname, fields)) for k in ['names', 'delimiter', 'dtype']: kwargs.pop(k, None) table = np.genfromtxt(fname, names=True, delimiter='\t', dtype=None, encoding='utf8', **kwargs) # Numpy tricks to force single element to be array of one row return table.reshape(-1)
[docs]def docstring_parameter(*sub): """Allow docstrings to contain parameters.""" def dec(obj): obj.__doc__ = obj.__doc__.format(*sub) return obj return dec
[docs]def med_mad(data, factor=None, axis=None, keepdims=False): """Compute the Median Absolute Deviation, i.e., the median of the absolute deviations from the median, and the median :param data: A :class:`ndarray` object :param factor: Factor to scale MAD by. Default (None) is to be consistent with the standard deviation of a normal distribution (i.e. mad( N(0,\sigma^2) ) = \sigma). :param axis: For multidimensional arrays, which axis to calculate over :param keepdims: If True, axis is kept as dimension of length 1 :returns: a tuple containing the median and MAD of the data """ if factor is None: factor = 1.4826 dmed = np.median(data, axis=axis, keepdims=True) dmad = factor * np.median(abs(data - dmed), axis=axis, keepdims=True) if axis is None: dmed = dmed.flatten()[0] dmad = dmad.flatten()[0] elif not keepdims: dmed = dmed.squeeze(axis) dmad = dmad.squeeze(axis) return dmed, dmad
[docs]def mad(data, factor=None, axis=None, keepdims=False): """Compute the Median Absolute Deviation, i.e., the median of the absolute deviations from the median, and (by default) adjust by a factor for asymptotically normal consistency. :param data: A :class:`ndarray` object :param factor: Factor to scale MAD by. Default (None) is to be consistent with the standard deviation of a normal distribution (i.e. mad( N(0,\sigma^2) ) = \sigma). :param axis: For multidimensional arrays, which axis to calculate the median over. :param keepdims: If True, axis is kept as dimension of length 1 :returns: the (scaled) MAD """ _ , dmad = med_mad(data, factor=factor, axis=axis, keepdims=keepdims) return dmad
[docs]def file_has_fields(fname, fields=None): """Check that a tsv file has given fields :param fname: filename to read. If the filename extension is gz or bz2, the file is first decompressed. :param fields: list of required fields. :returns: boolean """ # Allow a quick return req_fields = deepcopy(fields) if isinstance(req_fields, str): req_fields = [fields] if req_fields is None or len(req_fields) == 0: return True req_fields = set(req_fields) inspector = open ext = os.path.splitext(fname)[1] if ext == '.gz': inspector = gzopen elif ext == '.bz2': inspector = bzopen has_fields = None with inspector(fname, 'r') as fh: present_fields = set(fh.readline().rstrip('\n').split('\t')) has_fields = req_fields.issubset(present_fields) return has_fields
[docs]def get_changes(data, ignore_cols=None, use_cols=None): """Return only rows of a structured array which are not equal to the previous row. :param data: Numpy record array. :param ignore_cols: iterable of column names to ignore in checking for equality between rows. :param use_cols: iterable of column names to include in checking for equality between rows (only used if ignore_cols is None). :returns: Numpy record array. """ cols = list(data.dtype.names) if ignore_cols is not None: for col in ignore_cols: cols.remove(col) elif use_cols is not None: cols = list(use_cols) changed_inds = np.where(data[cols][1:] != data[cols][:-1])[0] + 1 changed_inds = [0] + [i for i in changed_inds] return data[(changed_inds,)]
def _clean(value): """Convert numpy numeric types to their python equivalents.""" if isinstance(value, np.ndarray): if value.dtype.kind == 'S': return np.char.decode(value).tolist() else: return value.tolist() elif type(value).__module__ == np.__name__: conversion = value.item() if sys.version_info.major == 3 and isinstance(conversion, bytes): conversion = conversion.decode() return conversion elif sys.version_info.major == 3 and isinstance(value, bytes): return value.decode() else: return value def _clean_attrs(attrs): return {_clean(k): _clean(v) for k, v in attrs.items()} def _sanitize_data_for_writing(data): if isinstance(data, str): return data.encode() elif isinstance(data, np.ndarray) and data.dtype.kind == np.dtype(np.unicode): return data.astype('S') elif isinstance(data, np.ndarray) and len(data.dtype) > 1: dtypes = dtype_descr(data) for index, entry in enumerate(dtypes): type_check = entry[1] if isinstance(type_check, tuple): # an enum? return data if type_check.startswith('<U'): # numpy.astype can't handle empty string datafields for some # reason, so we'll explicitly state that. if len(entry[1]) <= 2 or (len(entry[1]) == 3 and entry[1][2] == '0'): raise TypeError('Empty datafield {} cannot be converted' ' by np.astype.'.format(entry[0])) dtypes[index] = (entry[0], '|S{}'.format(entry[1][2:])) return data.astype(dtypes) return data def _sanitize_data_for_reading(data): if sys.version_info.major == 3: if isinstance(data, bytes): return data.decode() elif isinstance(data, np.ndarray) and data.dtype.kind == 'S': return np.char.decode(data) elif isinstance(data, np.ndarray) and len(data.dtype) > 1: dtypes = list(dtype_descr(data)) for index, entry in enumerate(dtypes): type_check = entry[1] if isinstance(type_check, tuple): # an enum? return data if entry[1].startswith('|S'): # numpy.astype can't handle empty datafields for some # reason, so we'll explicitly state that. if len(entry[1]) <= 2 or (len(entry[1]) == 3 and entry[1][2] == '0'): raise TypeError('Empty datafield {} cannot be converted' ' by np.astype.'.format(entry[0])) dtypes[index] = (entry[0], '<U{}'.format(entry[1][2:])) if entry[1].startswith('|u'): # there seems to be a bug in astype dtypes[index] = (entry[0], entry[1].replace('|u', 'u')) return data.astype(dtypes) return data
[docs]def dtype_descr(arr): """Get arr.dtype.descr Views of structured arrays in which columns have been re-ordered nolonger support arr.dtype.descr see https://github.com/numpy/numpy/commit/dd8a2a8e29b0dc85dca4d2964c92df3604acc212 """ try: return arr.dtype.descr except ValueError: return tuple([(n, arr.dtype[n].descr[0][1]) for n in arr.dtype.names])
[docs]def group_vector(arr): """Group a vector by unique values. :param arr: input vector to be grouped. :returns: a dictionary mapping unique values to arrays of indices of the input vector. """ groups, keys_as_int = np.unique(arr, return_inverse = True) n_keys = max(keys_as_int) indices = [[] for i in range(n_keys + 1)] for i, k in enumerate(keys_as_int): indices[k].append(i) indices = [np.array(elt) for elt in indices] return dict(zip(groups, indices))