import dask
from intake.source.base import DataSource, Schema
from pysam import FastaFile, FastxFile
[docs]class Fastq(DataSource):
    name = "fastq"
    version = "0.1.0"
    container = "python"
    partition_access = False
    description = "A fastq file"
    def __init__(self, urlpath, metadata=None):
        self._urlpath = urlpath
        super().__init__(metadata=metadata)
    def _open_dataset(self):
        return FastxFile(self._urlpath)
    def _get_schema(self):
        return Schema(datashape=None, dtype=None, shape=None, npartitions=None, extra_metadata={})
[docs]    def read_chunked(self, chunksize=10000):
        self._load_metadata()
        from toolz import partition_all
        yield from partition_all(chunksize, self._open_dataset()) 
    def _close(self):
        if self._dataset is not None:
            self._dataset.close() 
[docs]class IndexedFasta(DataSource):
    name = "indexed_bedfile"
    version = "0.1.0"
    container = "python"
    partition_access = True
    description = "A bgzipped and indexed fasta file"
    def __init__(self, urlpath, metadata=None):
        self._urlpath = urlpath
        self._dataset = None
        self._dtype = None
        self._chroms = None
        super().__init__(metadata=metadata)
    def _open_dataset(self):
        self._dataset = FastaFile(self._urlpath)
    def _get_schema(self):
        if self._dataset is None:
            self._open_dataset()
        self._chroms = list(self._dataset.references)
        chrom_lengths = [{"chrom": t[0], "length": t[1]} for t in zip(self._dataset.references, self._dataset.lengths)]
        return Schema(
            datashape=None,
            dtype=None,
            shape=None,
            npartitions=len(self._chroms),
            extra_metadata={"chroms": chrom_lengths},
        )
    def _get_partition(self, i):
        chrom = self._chroms[i]
        return [{"seqid": chrom, "seq": self._dataset.fetch(chrom)}]
[docs]    def read_chunked(self):
        self._load_metadata()
        for i in range(self.npartitions):
            yield self._get_partition(i) 
[docs]    def to_dask(self):
        from dask import bag as db
        self._load_metadata()
        return db.from_delayed([dask.delayed(self._get_partition(i)) for i in range(self.npartitions)]) 
    def _close(self):
        # close any files, sockets, etc
        if self._dataset is not None:
            self._dataset.close()