Source code for pore_c.datasources

import dask
from intake.source.base import DataSource, Schema
from pysam import FastaFile, FastxFile


[docs]class Fastq(DataSource): name = "fastq" version = "0.1.0" container = "python" partition_access = False description = "A fastq file" def __init__(self, urlpath, metadata=None): self._urlpath = urlpath super().__init__(metadata=metadata) def _open_dataset(self): return FastxFile(self._urlpath) def _get_schema(self): return Schema(datashape=None, dtype=None, shape=None, npartitions=None, extra_metadata={})
[docs] def read_chunked(self, chunksize=10000): self._load_metadata() from toolz import partition_all yield from partition_all(chunksize, self._open_dataset())
def _close(self): if self._dataset is not None: self._dataset.close()
[docs]class IndexedFasta(DataSource): name = "indexed_bedfile" version = "0.1.0" container = "python" partition_access = True description = "A bgzipped and indexed fasta file" def __init__(self, urlpath, metadata=None): self._urlpath = urlpath self._dataset = None self._dtype = None self._chroms = None super().__init__(metadata=metadata) def _open_dataset(self): self._dataset = FastaFile(self._urlpath) def _get_schema(self): if self._dataset is None: self._open_dataset() self._chroms = list(self._dataset.references) chrom_lengths = [{"chrom": t[0], "length": t[1]} for t in zip(self._dataset.references, self._dataset.lengths)] return Schema( datashape=None, dtype=None, shape=None, npartitions=len(self._chroms), extra_metadata={"chroms": chrom_lengths}, ) def _get_partition(self, i): chrom = self._chroms[i] return [{"seqid": chrom, "seq": self._dataset.fetch(chrom)}]
[docs] def read_chunked(self): self._load_metadata() for i in range(self.npartitions): yield self._get_partition(i)
[docs] def to_dask(self): from dask import bag as db self._load_metadata() return db.from_delayed([dask.delayed(self._get_partition(i)) for i in range(self.npartitions)])
def _close(self): # close any files, sockets, etc if self._dataset is not None: self._dataset.close()