from copy import deepcopy
import fnmatch
from glob import iglob
import itertools
import os
import re
import random
import shutil
import subprocess
import sys
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
import h5py
import numpy as np
import numpy.lib.recfunctions as nprf
import progressbar
from fast5_research.util import dtype_descr
from fast5_research.util import mad
from fast5_research.util import docstring_parameter
from fast5_research.util import readtsv
from fast5_research.util import validate_event_table, validate_model_table, validate_scale_object, _clean_attrs
from fast5_research.util import create_basecall_1d_output, create_mapping_output, mean_qscore, qstring_to_phred, _sanitize_data_for_writing, _sanitize_data_for_reading
warnings.simplefilter("always", DeprecationWarning)
[docs]class Fast5(h5py.File):
"""Class for grabbing data from single read fast5 files. Many attributes/
groups are assumed to exist currently (we're concerned mainly with reading).
Needs some development to make robust and for writing.
"""
__base_analysis__ = '/Analyses'
__event_detect_name__ = 'EventDetection'
__raw_path__ = '/Raw/Reads'
__raw_name_old__ = 'RawData'
__raw_path_old__ = '{}/{}/'.format(__base_analysis__, __raw_name_old__)
__raw_signal_path_old__ = '{}/Signal'.format(__raw_path_old__)
__raw_meta_path_old__ = '{}/Meta'.format(__raw_path_old__)
__channel_meta_path__ = '/UniqueGlobalKey/channel_id'
__tracking_id_path__ = 'UniqueGlobalKey/tracking_id'
__context_tags_path__ = 'UniqueGlobalKey/context_tags'
__default_event_path__ = 'Reads'
__default_basecall_1d_analysis__ = 'Basecall_1D'
__default_seq_section__ = 'template'
__default_basecall_fastq__ = 'BaseCalled_{}/Fastq'
__default_basecall_1d_events__ = 'BaseCalled_{}/Events'
__default_basecall_1d_model__ = 'BaseCalled_{}/Model'
__default_basecall_1d_summary__ = 'Summary/basecall_1d_{}'
__default_alignment_analysis__ = 'Alignment'
__default_section__ = 'template'
__default_mapping_analysis__ = 'Squiggle_Map'
__default_mapping_events__ = 'SquiggleMapped_{}/Events'
__default_mapping_model__ = 'SquiggleMapped_{}/Model'
__default_mapping_summary__ = 'Summary/squiggle_map_{}'
__default_substep_mapping_analysis__ = 'Substate_Map'
__default_substep_mapping_events__ = '/Events'
__default_basecall_mapping_analysis__ = 'AlignToRef'
__default_basecall_mapping_events__ = 'CurrentSpaceMapped_{}/Events/'
__default_basecall_mapping_model__ = 'CurrentSpaceMapped_{}/Model/'
__default_basecall_mapping_summary__ = '/Summary/current_space_map_{}/' # under AlignToRef analysis
__default_basecall_alignment_summary__ = '/Summary/genome_mapping_{}/' # under Alignment analysis
__default_engine_state_path__ = '/EngineStates/'
__temp_fields__ = ('heatsink', 'asic')
def __init__(self, fname, read='r'):
super(Fast5, self).__init__(fname, read)
# Attach channel_meta as attributes, slightly redundant
for k, v in _clean_attrs(self[self.__channel_meta_path__].attrs).items():
setattr(self, k, v)
# Backward compat.
self.sample_rate = self.sampling_rate
self.filename_short = os.path.splitext(os.path.basename(self.filename))[0]
short_name_match = re.search(re.compile(r'ch\d+_file\d+'), self.filename_short)
self.name_short = self.filename_short
if short_name_match:
self.name_short = short_name_match.group()
[docs] @classmethod
def New(cls, fname, read='w', tracking_id={}, context_tags={}, channel_id={}):
"""Construct a fresh single-read file, with meta data written to
standard locations.
"""
if read not in ('w', 'a'):
raise IOError("New file can only be opened with 'a' or 'w' intent.")
channel_id = cls.convert_channel_id(channel_id)
tracking_id = cls.convert_tracking_id(tracking_id)
# Start a new file, populate it with meta
with h5py.File(fname, read) as h:
h.attrs[_sanitize_data_for_writing('file_version')] = _sanitize_data_for_writing(1.0)
for data, location in zip(
[tracking_id, context_tags],
[cls.__tracking_id_path__, cls.__context_tags_path__]
):
# cjw: no idea why these must be str, just following ossetra
cls._add_attrs_to_fh(h, data, location, convert=str)
# These aren't forced to be str
cls._add_attrs_to_fh(h, channel_id, cls.__channel_meta_path__)
# return instance from new file
return cls(fname, 'a')
def _add_attrs(self, data, location, convert=None):
"""Convenience method for adding attrs to a possibly new group.
:param data: dict of attrs to add
:param location: hdf path
:param convert: function to apply to all dictionary values
"""
self._add_attrs_to_fh(self, data, location, convert=None)
@staticmethod
def _add_attrs_to_fh(fh, data, location, convert=None):
"""Implementation of _add_attrs as staticmethod. This allows
functionality to be used in .New() constructor but is otherwise nasty!
"""
if location not in fh:
fh.create_group(location)
attrs = fh[location].attrs
for k, v in data.items():
if convert is not None:
attrs[_sanitize_data_for_writing(k)] = _sanitize_data_for_writing(convert(v))
else:
attrs[_sanitize_data_for_writing(k)] = _sanitize_data_for_writing(v)
[docs] @staticmethod
def convert_channel_id(channel_id):
# channel_id: spec. requires these
req_fields = {
'digitisation': np.dtype('f8'),
'offset': np.dtype('f8'),
'range': np.dtype('f8'),
'sampling_rate': np.dtype('f8'),
'channel_number': str,
}
if not set(req_fields).issubset(set(channel_id.keys())):
raise KeyError(
'channel_id does not contain required fields: {},\ngot {}.'.format(req_fields.keys(), channel_id.keys())
)
channel_id = _type_meta(channel_id, req_fields)
return channel_id
[docs] @staticmethod
def convert_tracking_id(tracking_id):
# tracking_id: spec. says this group should be present as string, says
# nothing about required keys, but certain software require the
# following minimal set
req_fields = {
# guppy
'exp_start_time': np.dtype('U20'), # e.g. '1970-01-01T00:00:00Z'
'run_id': np.dtype('U32'), # '118ea414303a46d892603141b9cbd7b0'
'flow_cell_id': np.dtype('U8'), # 'FAH12345'
# ...add others
}
if not set(req_fields).issubset(set(tracking_id.keys())):
raise KeyError(
'tracking_id does not contain required fields: {},\ngot {}.'.format(req_fields.keys(), tracking_id.keys())
)
tracking_id = _type_meta(tracking_id, req_fields)
return tracking_id
def _add_string_dataset(self, data, location):
assert type(data) == str, 'Need to supply a string'
self.create_dataset(location, data=_sanitize_data_for_writing(data))
def _add_numpy_table(self, data, location):
self.create_dataset(location, data=_sanitize_data_for_writing(data), compression=True)
def _add_event_table(self, data, location):
validate_event_table(data)
self._add_numpy_table(data, location)
def _join_path(self, *args):
return '/'.join(args)
@property
def writable(self):
"""Can we write to the file."""
if self.mode is 'r':
return False
else:
return True
[docs] def assert_writable(self):
assert self.writable, "File not writable, opened with {}.".format(self.mode)
@property
def channel_meta(self):
"""Channel meta information as python dict"""
return _clean_attrs(self[self.__channel_meta_path__].attrs)
@property
def tracking_id(self):
"""Tracking id meta information as python dict"""
return _clean_attrs(self[self.__tracking_id_path__].attrs)
@property
def context_tags(self):
"""Context tags meta information as python dict"""
return _clean_attrs(self[self.__context_tags_path__].attrs)
@property
def attributes(self):
"""Attributes for a read, assumes one read in file"""
try:
return _clean_attrs(self.get_read(group=True).attrs)
except IndexError:
return _clean_attrs(self.get_read(group=True, raw=True).attrs)
[docs] def summary(self, rename=True, delete=True, scale=True):
"""A read summary, assumes one read in file"""
to_rename = zip(
('start_mux', 'abasic_found', 'duration', 'median_before'),
('mux', 'abasic', 'strand_duration', 'pore_before')
)
to_delete = ('read_number', 'scaling_used')
data = deepcopy(self.attributes)
data['filename'] = os.path.basename(self.filename)
data['run_id'] = self.tracking_id['run_id']
data['channel'] = self.channel_meta['channel_number']
if scale:
data['duration'] /= self.channel_meta['sampling_rate']
data['start_time'] /= self.channel_meta['sampling_rate']
if rename:
for i,j in to_rename:
try:
data[j] = data[i]
del data[i]
except KeyError:
pass
if delete:
for i in to_delete:
try:
del data[i]
except KeyError:
pass
for key in data:
if isinstance(data[key], float):
data[key] = np.around(data[key], 4)
return data
[docs] def strip_analyses(self, keep=('{}_000'.format(__event_detect_name__), __raw_name_old__)):
"""Remove all analyses from file
:param keep: whitelist of analysis groups to keep
"""
analyses = self[self.__base_analysis__]
for name in analyses.keys():
if name not in keep:
del analyses[name]
[docs] def repack(self, pack_opts=''):
"""Run h5repack on the current file. Returns a fresh object."""
path = os.path.abspath(self.filename)
path_tmp = '{}.tmp'.format(path)
mode = self.mode
self.close()
pack_opts = pack_opts.split()
subprocess.call(['h5repack'] + pack_opts + [path, path_tmp])
shutil.move(path_tmp, path)
return Fast5(path, mode)
###
# Extracting read event data
[docs] def get_reads(self, group=False, raw=False, read_numbers=None):
"""Iterator across event data for all reads in file
:param group: return hdf group rather than event data
"""
if not raw:
event_group = self.get_analysis_latest(self.__event_detect_name__)
event_path = self._join_path(event_group, self.__default_event_path__)
reads = self[event_path]
else:
try:
reads = self[self.__raw_path__]
except:
yield self.get_raw()[0]
if read_numbers is None:
it = reads.keys()
else:
it = (k for k in reads.keys()
if _clean_attrs(reads[k].attrs)['read_number'] in read_numbers)
if group == 'all':
for read in it:
yield reads[read], read
elif group:
for read in it:
yield reads[read]
else:
for read in it:
if not raw:
yield self._get_read_data(reads[read])
else:
yield self._get_read_data_raw(reads[read])
[docs] def get_read(self, group=False, raw=False, read_number=None):
"""Like get_reads, but only the first read in the file
:param group: return hdf group rather than event/raw data
"""
gen = None
if read_number is None:
gen = self.get_reads(group, raw)
else:
gen = self.get_reads(group, raw, read_numbers=[read_number])
try:
return(gen.next())
except AttributeError:
return(next(gen))
def _get_read_data(self, read, indices=None):
"""Private accessor to read event data"""
# We choose the following to always be floats
float_fields = ('start', 'length', 'mean', 'stdv')
events = read['Events']
# We assume that if start is an int or uint the data is in samples
# else it is in seconds already.
needs_scaling = False
if events['start'].dtype.kind in ['i', 'u']:
needs_scaling = True
dtype = np.dtype([(
d[0], 'float') if d[0] in float_fields else d
for d in dtype_descr(events)
])
data = None
with events.astype(dtype):
if indices is None:
data = events[()]
else:
try:
data = events[indices[0]:indices[1]]
except:
raise ValueError(
'Cannot retrieve events using {} as indices'.format(indices)
)
# File spec mentions a read.attrs['scaling_used'] attribute,
# its not clear what this is. We'll ignore it and hope for
# the best.
if needs_scaling:
data['start'] /= self.sample_rate
data['length'] /= self.sample_rate
return _sanitize_data_for_reading(data)
def _get_read_data_raw(self, read, indices=None, scale=True):
"""Private accessor to read raw data"""
raw = read['Signal']
dtype = float if scale else int
data = None
with raw.astype(dtype):
if indices is None:
data = raw[()]
else:
try:
data = raw[indices[0]:indices[1]]
except:
raise ValueError(
'Cannot retrieve events using {} as indices'.format(indices)
)
# Scale data to pA
if scale:
meta = self.channel_meta
raw_unit = meta['range'] / meta['digitisation']
data = (data + meta['offset']) * raw_unit
return data
@staticmethod
def _convert_meta_times(meta, sample_rate):
# Metadata should be written in samples (int), not seconds (float)
if isinstance(meta['start_time'], float):
meta['start_time'] = int(round(meta['start_time'] * sample_rate))
meta['duration'] = int(round(meta['duration'] * sample_rate))
return meta
[docs] def set_read(self, data, meta):
"""Write event data to file
:param data: event data
:param meta: meta data to attach to read
:param read_number: per-channel read counter
"""
req_fields = [
'start_time', 'duration', 'read_number',
'start_mux', 'read_id', 'scaling_used', 'median_before'
]
if not set(req_fields).issubset(meta.keys()):
raise KeyError(
'Read meta does not contain required fields: {}, got {}'.format(
req_fields, meta.keys()
)
)
req_fields = ['start', 'length', 'mean', 'stdv']
if not isinstance(data, np.ndarray):
raise TypeError('Data is not ndarray.')
if not set(req_fields).issubset(data.dtype.names):
raise KeyError(
'Read data does not contain required fields: {}, got {}.'.format(
req_fields, data.dtype.names
)
)
event_group = self.get_analysis_new(self.__event_detect_name__)
event_path = self._join_path(event_group, self.__default_event_path__)
path = self._join_path(
event_path, 'Read_{}'.format(meta['read_number'])
)
# Metadata should be written in samples (int), not seconds (float)
meta = self._convert_meta_times(meta, self.sample_rate)
self._add_attrs(meta, path)
uint_fields = ('start', 'length')
dtype = np.dtype([(
d[0], 'uint32') if d[0] in uint_fields else d
for d in dtype_descr(data)
])
# (see _get_read_data()). If the data is not an int or uint
# we assume it is in seconds and scale appropriately
if data['start'].dtype.kind not in ['i', 'u']:
data['start'] *= self.sample_rate
data['length'] *= self.sample_rate
self._add_event_table(
data.astype(dtype), self._join_path(path, 'Events')
)
[docs] def get_read_stats(self):
"""Combines stats based on events with output of .summary, assumes a
one read file.
"""
data = deepcopy(self.summary())
read = self.get_read()
n_events = len(read)
q = np.percentile(read['mean'], [10, 50, 90])
data['range_current'] = q[2] - q[0]
data['median_current'] = q[1]
data['num_events'] = n_events
data['median_sd'] = np.median(read['stdv'])
data['median_dwell'] = np.median(read['length'])
data['sd_current'] = np.std(read['mean'])
data['mad_current'] = mad(read['mean'])
data['eps'] = data['num_events'] / data['strand_duration']
return data
###
# Raw Data
[docs] @docstring_parameter(__raw_path_old__)
def get_raw(self, scale=True):
"""Get raw data in file, might not be present.
:param scale: Scale data to pA? (rather than ADC values)
.. warning::
This method is deprecated and should not be used, instead use
.get_read(raw=True) to read both MinKnow conformant files
and previous Tang files.
"""
warnings.warn(
"'Fast5.get_raw()' is deprecated, use 'Fast5.get_read(raw=True)'.",
DeprecationWarning,
stacklevel=2
)
try:
raw = self[self.__raw_signal_path_old__]
meta = _clean_attrs(self[self.__raw_meta_path_old__].attrs)
except KeyError:
raise KeyError('No raw data available.')
raw_data = None
if scale:
raw_data = raw[()].astype('float')
raw_unit = meta['range'] / meta['digitisation']
raw_data = (raw_data + meta['offset']) * raw_unit
else:
raw_data = raw[()]
return raw_data, meta['sample_rate']
[docs] def set_raw(self, raw, meta=None, read_number=None):
"""Set the raw data in file.
:param raw: raw data to add
:param read_number: read number (as usually given in filename and
contained within HDF paths, viz. Reads/Read_<>/). If not given
attempts will be made to guess the number (assumes single read
per file).
"""
# Enforce raw is 16bit int, don't try to second guess if not
if raw.dtype != np.int16:
raise TypeError('Raw data must be of type int16.')
# Attempt to guess read_number and meta from event detection group
if read_number is None:
exception = RuntimeError("'read_number' not given and cannot guess.")
try:
n_reads = sum(1 for _ in self.get_reads())
except IndexError:
# if no events present
raise exception
else:
if n_reads == 1:
read_number = _clean_attrs(self.get_read(group=True).attrs)['read_number']
meta = _clean_attrs(self.get_read(group=True, read_number=read_number).attrs)
else:
raise exception
# Metadata should be written in samples (int), not seconds (float)
meta = self._convert_meta_times(meta, self.sample_rate)
# Ensure meta values are in correct type
meta = self.convert_raw_meta(meta)
# Check meta is same as that for event data, if any
try:
event_meta = _clean_attrs(self.get_read(group=True, read_number=read_number).attrs)
except IndexError:
pass
else:
if sum(meta[k] != event_meta[k] for k in meta.keys()) > 0:
raise ValueError(
"Attempted to set raw meta data as {} "
"but event meta is {}".format(meta, event_meta)
)
# Good to go!
read_path = self._join_path(self.__raw_path__, 'Read_{}'.format(read_number))
data_path = self._join_path(read_path, 'Signal')
self._add_attrs(meta, read_path)
self[data_path] = raw
[docs] def set_raw_old(self, raw, meta):
"""Set the raw data in file.
:param raw: raw data to add
:param meta: meta data dictionary
.. warning::
This method does not write raw data conforming to the Fast5
specification. This class will currently still read data
written by this method.
"""
warnings.warn(
".set_raw() does not conform to the Fast5 spec.. Although this "
"class will read data written by this method, other tools "
"may fail to read the resultant file.",
FutureWarning
)
# Verify meta conforms to our standard
req_fields = ['range', 'digitisation', 'offset', 'sample_rate']
if not set(req_fields).issubset(set(meta.keys())):
raise KeyError(
'raw meta data dictionary must contain {} fields.'.format(req_fields)
)
if abs(meta['sample_rate'] - self.sample_rate) > 1.0: # is this a sensible error?
raise ValueError(
'Tried to set raw data with sample rate {}, should be {}.'.format(
meta['sample_rate'], self.sample_rate
)
)
self[self.__raw_signal_path_old__] = raw
self._add_attrs(meta, self.__raw_meta_path_old__)
###
# Analysis path resolution
[docs] def get_analysis_latest(self, name):
"""Get group of latest (present) analysis with a given base path.
:param name: Get the (full) path of newest analysis with a given base
name.
"""
try:
return self._join_path(
self.__base_analysis__,
sorted(filter(
lambda x: name in x, self[self.__base_analysis__].keys()
))[-1]
)
except (IndexError, KeyError):
raise IndexError('No analyses with name {} present.'.format(name))
[docs] def get_analysis_new(self, name):
"""Get group path for new analysis with a given base name.
:param name: desired analysis name
"""
# Formatted as 'base/name_000'
try:
latest = self.get_analysis_latest(name)
root, counter = latest.rsplit('_', 1)
counter = int(counter) + 1
except IndexError:
# Nothing present
root = self._join_path(
self.__base_analysis__, name
)
counter = 0
return '{}_{:03d}'.format(root, counter)
[docs] def get_model(self, section=__default_section__, analysis=__default_mapping_analysis__):
"""Get model used for squiggle mapping"""
base = self.get_analysis_latest(analysis)
model_path = self._join_path(base, self.__default_mapping_model__.format(section))
return self[model_path][()]
# The remaining are methods to read and write data as chimaera produces
# It is necessarily all a bit nasty, but should provide a more
# consistent interface to the files. Paths are defaulted
###
# Temperature etc.
[docs] @docstring_parameter(__default_engine_state_path__)
def get_engine_state(self, state, time=None):
"""Retrieve engine state from {}, either across the whole read
(default) or at a given time.
:param state: name of engine state
:param time: time (in seconds) at which to retrieve temperature
"""
location = self._join_path(
self.__default_engine_state_path__, state
)
states = self[location][()]
if time is None:
return states
else:
i = np.searchsorted(states['time'], time) - 1
return states[state][i]
[docs] @docstring_parameter(__default_engine_state_path__, __temp_fields__)
def get_temperature(self, time=None, field=__temp_fields__[0]):
"""Retrieve temperature data from {}, either across the whole read
(default) or at a given time.
:param time: time at which to get temperature
:param field: one of {}
"""
if field not in self.__temp_fields__:
raise RuntimeError("'field' argument must be one of {}.".format(self.__temp_fields__))
return self.get_engine_state('minion_{}_temperature'.format(field), time)
[docs] def set_engine_state(self, data):
"""Set the engine state data.
:param data: a 1D-array containing two fields, the first of which
must be named 'time'. The name of the second field will be used
to name the engine state and be used in the dataset path.
"""
fields = data.dtype.names
if fields[0] != 'time':
raise ValueError("First field of engine state data must be 'time'.")
if len(fields) != 2:
raise ValueError("Engine state data must contain exactly two fields.")
state = fields[1]
location = self._join_path(
self.__default_engine_state_path__, state
)
self[location] = data
###
# Template/adapter splitting data
__default_split_analysis__= 'Segment_Linear'
__split_summary_location__ = '/Summary/split_adapter'
[docs] @docstring_parameter(__base_analysis__)
def set_split_data(self, data, analysis=__default_split_analysis__):
"""Write a dict containing split point data.
:param data: `dict`-like object containing attrs to add
:param analysis: Base analysis name (under {})
.. warning::
Not checking currently for required fields.
"""
location = self._join_path(
self.get_analysis_new(analysis), self.__split_summary_location__
)
self._add_attrs(data, location)
[docs] @docstring_parameter(__base_analysis__)
def get_split_data(self, analysis=__default_split_analysis__):
"""Get signal segmentation data.
:param analysis: Base analysis name (under {})
"""
def _inner(path=analysis):
location = self._join_path(
self.get_analysis_latest(path), 'Summary'
)
try:
# there should be a single group under the above
grps = list(self[location].keys())
if len(grps) != 1:
raise
except:
raise IndexError('Cannot find location containing split point data.')
location = self._join_path(location, grps[0])
try:
return _clean_attrs(self[location].attrs)
except:
raise ValueError(
'Could not retrieve signal split point data from attributes of {}'.format(location)
)
try:
return _inner(analysis)
except:
# try a fallback location
return _inner('Adapter_Split')
[docs] @docstring_parameter(__base_analysis__)
def get_section_indices(self, analysis=__default_split_analysis__):
"""Get two tuples indicating the event indices for signal
segmentation boundaries.
:param analysis: Base analysis path (under {})
"""
# TODO: if the below fails, calculating the values on the fly would be
# a useful feature. Which brings to mind could we do such a thing
# in all cases of missing data? Probably not reasonble.
attrs = self.get_split_data(analysis)
try:
return (
(attrs['start_index_temp'], attrs['end_index_temp']),
(attrs['start_index_comp'], attrs['end_index_comp'])
)
except:
raise ValueError('Could not retrieve signal segmentation data.')
[docs] @docstring_parameter(__base_analysis__)
def get_section_events(self, section, analysis=__default_split_analysis__):
"""Get the event data for a signal section
:param analysis: Base analysis path (under {})
"""
indices = self.get_section_indices(analysis)
read = self.get_read(group=True)
events = None
if section == 'template':
events = self._get_read_data(read, indices[0])
elif section == 'complement':
events = self._get_read_data(read, indices[1])
else:
raise ValueError(
'"section" parameter for fetching events must be "template" or "complement".'
)
return events
###
# 1D Basecalling data
[docs] @docstring_parameter(__base_analysis__)
def set_basecall_data(self, events, scale, path, model, seq,
section=__default_section__, name='unknown',
post=None, score=None,
quality_data=None, qstring=None,
analysis=__default_basecall_1d_analysis__):
"""Create an annotated event table and 1D basecalling summary similiar
to chimaera and add them to the fast5 file.
:param events: Numpy record array of events. Must contain the mean,
stdv, start and length fields.
:param scale: Scaling object.
:param path: Viterbi path containing model pointers (1D np.array).
:param model: Model object.
:param seq: Basecalled sequence string for fastq.
:param section: String to use in paths, e.g. 'template'.
:param name: Identifier string for fastq.
:param post: Numpy 2D array containing the posteriors (event, state), used to annotate events.
:param score: Quality value for the whole strand.
:param quality_data: Numpy 2D array containing quality_data, used to annotate events.
:param qstring: Quality string for fastq.
:param analysis: Base analysis name (under {})
"""
# Validate input
self.assert_writable()
validate_event_table(events)
validate_scale_object(scale)
validate_model_table(model)
# Prepare paths
base = self.get_analysis_new(analysis)
fastq_path = self._join_path(base, self.__default_basecall_fastq__.format(section))
events_path = self._join_path(base, self.__default_basecall_1d_events__.format(section))
model_path = self._join_path(base, self.__default_basecall_1d_model__.format(section))
summary_path = self._join_path(base, self.__default_basecall_1d_summary__.format(section))
# Create annotated events and results
annot_events, results = create_basecall_1d_output(events, scale, path, model, post=None)
if seq is not None:
results['sequence_length'] = len(seq)
if score is not None:
results['strand_score'] = score
# Write fastq
if qstring is None:
qstring = '!' * len(seq)
else:
qscores = qstring_to_phred(qstring)
results['mean_qscore'] = mean_qscore(qscores)
fastq = '@{}\n{}\n+\n{}\n'.format(name, seq, qstring)
self._add_string_dataset(fastq, fastq_path)
# Write summary
self._add_attrs(results, summary_path)
# Write annotated events
read_meta = {
'start_time': annot_events[0]['start'],
'duration': annot_events[-1]['start'] + annot_events[-1]['length'] - annot_events[0]['start']
}
self._add_event_table(annot_events, events_path)
self._add_attrs(read_meta, events_path)
# Write model
scale_meta = {
'shift': scale.shift,
'scale': scale.scale,
'drift': scale.drift,
'var': scale.var,
'scale_sd': scale.scale_sd,
'var_sd': scale.var_sd
}
self._add_numpy_table(model, model_path)
self._add_attrs(scale_meta, model_path)
[docs] @docstring_parameter(__base_analysis__)
def get_basecall_data(self, section=__default_section__, analysis=__default_basecall_1d_analysis__):
"""Read the annotated basecall_1D events from the fast5 file.
:param section: String to use in paths, e.g. 'template'.
:param analysis: Base analysis name (under {})
"""
base = self.get_analysis_latest(analysis)
events_path = self._join_path(base, self.__default_basecall_1d_events__.format(section))
try:
# use _get_read_data to make sure int fields are converted as needed
return self._get_read_data({'Events': self[events_path]})
except:
raise ValueError('Could not retrieve basecall_1D data from {}'.format(events_path))
[docs] @docstring_parameter(__base_analysis__)
def get_alignment_attrs(self, section=__default_section__, analysis=__default_alignment_analysis__):
"""Read the annotated alignment meta data from the fast5 file.
:param section: String to use in paths, e.g. 'template'.
:param analysis: Base analysis name (under {})
"""
attrs = None
base = self.get_analysis_latest(analysis)
attr_path = self._join_path(base,
self.__default_basecall_alignment_summary__.format(section))
try:
attrs = _clean_attrs(self[attr_path].attrs)
except:
raise ValueError('Could not retrieve alignment attributes from {}'.format(attr_path))
return attrs
###
# Mapping data
[docs] @docstring_parameter(__base_analysis__)
def set_mapping_data(self, events, scale, path, model, seq, ref_name,
section=__default_section__,
post=None, score=None, is_reverse=False,
analysis=__default_mapping_analysis__):
"""Create an annotated event table and mapping summary similiar to
chimaera and add them to the fast5 file.
:param events: :class:`np.ndarray` of events. Must contain mean,
stdv, start and length fields.
:param scale: Scaling object.
:param path: :class:`np.ndarray` containing position in reference.
Negative values will be interpreted as "bad emissions".
:param model: Model object to use.
:param seq: String representation of the reference sequence.
:param section: Section of strand, e.g. 'template'.
:param name: Reference name.
:param post: Two-dimensional :class:`np.ndarray` containing posteriors.
:param score: Mapping quality score.
:param is_reverse: Mapping refers to '-' strand (bool).
:param analysis: Base analysis name (under {})
"""
# Validate input
self.assert_writable()
validate_event_table(events)
validate_model_table(model)
validate_scale_object(scale)
assert isinstance(seq, str), 'seq needs to be a str'
# Create annotated events and results
n_states = None
if post is None:
n_states = len(seq) - len(model['kmer'][0]) + 1
annot_events, results = create_mapping_output(
events, scale, path, model, seq, post=post, is_reverse=is_reverse, n_states=n_states
)
results['ref_name'] = ref_name
if score is not None:
results['strand_score'] = score
# Create scale meta to be written with model
scale_meta = {
'shift': scale.shift,
'scale': scale.scale,
'drift': scale.drift,
'var': scale.var,
'scale_sd': scale.scale_sd,
'var_sd': scale.var_sd
}
self._write_mapping_data(annot_events, results, model, scale_meta, section, analysis=analysis)
def _write_mapping_data(self, annot_events, results, model, scale_meta, section,
analysis=__default_mapping_analysis__):
# Prepare paths
paths_dict = {
'Squiggle_Map': (self.__default_mapping_analysis__, self.__default_mapping_events__,
self.__default_mapping_model__, self.__default_mapping_summary__),
'AlignToRef': (self.__default_basecall_mapping_analysis__, self.__default_basecall_mapping_events__,
self.__default_basecall_mapping_model__, self.__default_basecall_mapping_summary__)
}
base, event_path, model_path, summary_path = paths_dict[analysis]
base = self.get_analysis_new(analysis)
event_path = self._join_path(base, event_path.format(section))
model_path = self._join_path(base, model_path.format(section))
summary_path = self._join_path(base, summary_path.format(section))
# Write annotated events
read_meta = {
'start_time': annot_events[0]['start'],
'duration': annot_events[-1]['start'] + annot_events[-1]['length'] - annot_events[0]['start']
}
self._add_event_table(annot_events, event_path)
self._add_attrs(read_meta, event_path)
# Write model
self._add_numpy_table(model, model_path)
self._add_attrs(scale_meta, model_path)
# Write summary
self._add_attrs(results, summary_path)
[docs] @docstring_parameter(__base_analysis__)
def get_mapping_data(self, section=__default_section__, analysis=__default_mapping_analysis__, get_model=False):
"""Read the annotated mapping events from the fast5 file.
.. note::
The seq_pos column for the events table returned from basecall_mapping is
adjusted to be the genome position (consistent with squiggle_mapping)
:param section: String to use in paths, e.g. 'template'.
:param analysis: Base analysis name (under {}). For basecall mapping
use analysis = 'AlignToRef'.
"""
events = None
if analysis == self.__default_mapping_analysis__:
# squiggle_mapping
base = self.get_analysis_latest(analysis)
event_path = self._join_path(base, self.__default_mapping_events__.format(section))
try:
# use _get_read_data to make sure int fields are converted as needed
events = self._get_read_data({'Events': self[event_path]})
except:
raise ValueError('Could not retrieve squiggle_mapping data from {}'.format(event_path))
if get_model:
model_path = self._join_path(base, self.__default_mapping_model__.format(section))
try:
model = self[model_path][()]
except:
raise ValueError('Could not retrieve squiggle_mapping model from {}'.format(model_path))
attrs = self.get_mapping_attrs(section=section)
elif analysis == self.__default_substep_mapping_analysis__:
# substep mapping
base = self.get_analysis_latest(analysis)
event_path = self._join_path(base, self.__default_substep_mapping_events__.format(section))
try:
# use _get_read_data to make sure int fields are converted as needed
events = self._get_read_data({'Events': self[event_path]})
except:
raise ValueError('Could not retrieve substep_mapping data from {}'.format(event_path))
attrs=None
if get_model:
raise NotImplementedError('Retrieving substep model not implemented.')
else:
# basecall_mapping
base = self.get_analysis_latest(analysis)
event_path = self._join_path(base, self.__default_basecall_mapping_events__.format(section))
try:
# use _get_read_data to make sure int fields are converted as needed
events = self._get_read_data({'Events': self[event_path]})
except:
raise ValueError('Could not retrieve basecall_mapping data from {}'.format(event_path))
if get_model:
model_path = self._join_path(base, self.__default_basecall_mapping_model__.format(section))
try:
model = self[model_path][()]
except:
raise ValueError('Could not retrieve squiggle_mapping model from {}'.format(model_path))
# Modify seq_pos to be the actual genome position (consistent with squiggle_map)
attrs = self.get_mapping_attrs(section=section, analysis=self.__default_alignment_analysis__)
if attrs['direction'] == '+':
events['seq_pos'] = events['seq_pos'] + attrs['ref_start']
else:
events['seq_pos'] = attrs['ref_stop'] - events['seq_pos']
# add transition field
if attrs:
move = np.ediff1d(events['seq_pos'], to_begin=0)
if attrs['direction'] == '-':
move *= -1
if 'move' not in events.dtype.names:
events = nprf.append_fields(events, 'move', move)
else:
events['move'] = move
if get_model:
return events, model
else:
return events
[docs] def get_any_mapping_data(self, section=__default_section__, attrs_only=False, get_model=False):
"""Convenience method for extracting whatever mapping data might be
present, favouring squiggle_mapping output over basecall_mapping.
:param section: (Probably) 'template'
:param attrs_only: Use attrs_only=True to return mapping attributes without events
:returns: the tuple (events, attrs) or attrs only
"""
events = None
attrs = None
try:
if not attrs_only:
events = self.get_mapping_data(section=section, get_model=get_model)
attrs = self.get_mapping_attrs(section=section)
except Exception as e:
try:
if not attrs_only:
events = self.get_mapping_data(section=section,
analysis=self.__default_basecall_mapping_analysis__, get_model=get_model)
attrs = self.get_mapping_attrs(section=section,
analysis=self.__default_alignment_analysis__)
except Exception as e:
raise ValueError(
"Cannot find any mapping data at paths I know about. "
"Consider using get_mapping_data() with analysis argument."
)
if not attrs_only:
if get_model:
return events[0], attrs, events[1]
else:
return events, attrs
else:
return attrs
[docs] @docstring_parameter(__base_analysis__)
def get_mapping_attrs(self, section=__default_section__, analysis=__default_mapping_analysis__):
"""Read the annotated mapping meta data from the fast5 file.
Names which are inconsistent between squiggle_mapping and basecall_mapping are added to
basecall_mapping (thus duplicating the attributes in basecall mapping).
:param section: String to use in paths, e.g. 'template'.
:param analysis: Base analysis name (under {})
For basecall mapping use analysis = 'Alignment'
"""
attrs = None
if analysis == self.__default_mapping_analysis__:
# squiggle_mapping
base = self.get_analysis_latest(analysis)
attr_path = self._join_path(base, self.__default_mapping_summary__.format(section))
try:
attrs = _clean_attrs(self[attr_path].attrs)
except:
raise ValueError('Could not retrieve squiggle_mapping meta data from {}'.format(attr_path))
else:
# basecall_mapping
# AligToRef attributes (set AlignToRef first so that Alignment attrs are not overwritten)
base = self.get_analysis_latest(self.__default_basecall_mapping_analysis__)
attr_path = self._join_path(base, self.__default_basecall_mapping_summary__.format(section))
try:
attrs = _clean_attrs(self[attr_path].attrs)
except:
raise ValueError('Could not retrieve basecall_mapping meta data from {}'.format(attr_path))
# Rename some of the fields
rename = [
('genome_start', 'ref_start'),
('genome_end', 'ref_stop'),
]
for old, new in rename:
attrs[new] = attrs.pop(old)
# Alignment attributes
base = self.get_analysis_latest(analysis)
attr_path = self._join_path(
base, self.__default_basecall_alignment_summary__.format(section))
try:
genome = _clean_attrs(self[attr_path].attrs)['genome']
except:
raise ValueError('Could not retrieve basecall_mapping genome field from {}'.format(attr_path))
try:
attrs['reference'] = (self.get_reference_fasta(section = section)).split('\n')[1]
except:
raise ValueError('Could not retrieve basecall_mapping fasta from Alignment analysis')
# Add attributes with keys consistent with Squiggle_map
rc = '_rc'
is_rc = genome.endswith(rc)
attrs['ref_name'] = genome[:-len(rc)] if is_rc else genome
attrs['direction'] = '-' if is_rc else '+'
# Trim any other fields, the allowed are those produced by
# squiggle_mapping. We allow strand_score but do not require
# it since our writer does not require it.
required = [
'direction', 'ref_start', 'ref_stop', 'ref_name',
'num_skips', 'num_stays', 'reference'
]
additional = ['strand_score', 'shift', 'scale', 'drift', 'var', 'scale_sd', 'var_sd']
keep = required + additional
assert set(required).issubset(set(attrs)), 'Required mapping attributes not found'
for key in (set(attrs) - set(keep)):
del(attrs[key])
return attrs
###
# Sequence data
[docs] @docstring_parameter(__base_analysis__)
def get_fastq(self, analysis=__default_basecall_1d_analysis__, section=__default_seq_section__, custom=None):
"""Get the fastq (sequence) data.
:param analysis: Base analysis name (under {})
:param section: (Probably) 'template'
:param custom: Custom hdf path overriding all of the above.
"""
err_msg = 'Could not retrieve sequence data from {}'
if custom is not None:
location = custom
else:
location = self._join_path(
self.get_analysis_latest(analysis), self.__default_basecall_fastq__.format(section)
)
try:
fastq = self[location][()]
except:
# Did we get given section != template and no analysis, that's
# more than likely incorrect. Try alternative analysis
if section != self.__default_seq_section__ and analysis == self.__default_basecall_1d_analysis__:
location = self._join_path(
self.get_analysis_latest(self.__default_basecall_1d_analysis__),
self.__default_basecall_fastq__.format(section)
)
try:
fastq = self[location][()]
except:
raise ValueError(err_msg.format(location))
else:
raise ValueError(err_msg.format(location))
else:
fastq = _sanitize_data_for_reading(fastq)
return fastq
[docs] @docstring_parameter(__base_analysis__)
def get_sam(self, analysis=__default_alignment_analysis__, section=__default_seq_section__, custom=None):
"""Get SAM (alignment) data.
:param analysis: Base analysis name (under {})
:param section: (Probably) 'template'
:param custom: Custom hdf path overriding all of the above.
"""
if custom is not None:
location = custom
else:
location = self._join_path(
self.get_analysis_latest(analysis), 'Aligned_{}'.format(section), 'SAM'
)
try:
return self[location][()]
except:
raise ValueError('Could not retrieve SAM data from {}'.format(location))
[docs] @docstring_parameter(__base_analysis__)
def get_reference_fasta(self, analysis=__default_alignment_analysis__, section=__default_seq_section__, custom=None):
"""Get fasta sequence of known DNA fragment for the read.
:param analysis: Base analysis name (under {})
:param section: (Probably) 'template'
:param custom: Custom hdf path overriding all of the above.
"""
if custom is not None:
location = custom
else:
location = self._join_path(
self.get_analysis_latest(analysis), 'Aligned_{}'.format(section), 'Fasta'
)
try:
sequence = _sanitize_data_for_reading(self[location][()])
except:
raise ValueError('Could not retrieve sequence data from {}'.format(location))
return sequence
[docs]def recursive_glob(treeroot, pattern):
# Emulates e.g. glob.glob("**/*.fast5"", recursive=True) in python3
results = []
for base, dirs, files in os.walk(treeroot):
goodfiles = fnmatch.filter(files, pattern)
for f in goodfiles:
yield os.path.join(base, f)
[docs]def iterate_fast5(path='Stream', strand_list=None, paths=False, mode='r',
limit=None, shuffle=False, robust=False, progress=False,
recursive=False):
"""Iterate over directory of fast5 files, optionally only returning those in list
:param path: Directory in which single read fast5 are located or filename.
:param strand_list: List of strands, can be a python list of delimited
table. If the later and a filename field is present, this is used
to locate files. If a file is given and a strand field is present,
the directory index file is searched for and filenames built from that.
:param paths: Yield file paths instead of fast5 objects.
:param mode: Mode for opening files.
:param limit: Limit number of files to consider.
:param shuffle: Shuffle files to randomize yield of files.
:param robust: Carry on with iterating over FAST5 files after an exception was raised.
:param progress: Display progress bar.
:param recursive: Perform a recursive search for files in subdirectories of `path`.
"""
if strand_list is None:
# Could make glob more specific to filename pattern expected
if os.path.isdir(path):
if recursive:
files = recursive_glob(path, '*.fast5')
else:
files = iglob(os.path.join(path, '*.fast5'))
else:
files = [path]
else:
if isinstance(strand_list, list):
files = (os.path.join(path, x) for x in strand_list)
else:
reads = readtsv(strand_list)
if 'filename' in reads.dtype.names:
# Strand list contains a filename column
files = (os.path.join(path, x) for x in reads['filename'])
else:
raise KeyError("Strand file does not contain required field 'filename'.\n")
# shuffle means we can't be lazy
if shuffle and limit is not None:
files = np.random.choice(list(files), limit, replace=False)
elif shuffle:
random.shuffle(list(files))
elif limit is not None:
try:
files = files[:limit]
except TypeError:
files = itertools.islice(files, limit)
if progress:
bar = progressbar.ProgressBar()
files = bar(files)
for f in files:
if not os.path.exists(f):
sys.stderr.write('File {} does not exist, skipping\n'.format(f))
continue
if not paths:
try:
fh = Fast5(f, read=mode)
except Exception as e:
if robust:
sys.stderr.write("Could not open FAST5 file {}: {}\n".format(f, e))
else:
raise e
else:
yield fh
fh.close()
else:
yield os.path.abspath(f)
def _type_meta(meta, types):
"""Convert meta data fields into required types.
:param meta: dict of values
:param: types: dict of types or `np.dtype`
:returns: dict
"""
converted = {k:v for k,v in meta.items()}
for k, dtype in types.items():
if k in meta:
if isinstance(dtype, np.dtype):
val = np.array([meta[k]], dtype=dtype)[0]
else:
val = dtype(meta[k])
converted[k] = val
return converted