Source code for pore_c.catalogs

from logging import getLogger
from pathlib import Path

import pandas as pd
import yaml
from intake.catalog.local import YAMLFileCatalog
from intake.source.base import DataSource

__all__ = [
    "RawReadCatalog",
    "VirtualDigestCatalog",
    "ReferenceGenomeCatalog",
]

logger = getLogger(__name__)


class basePoreCCatalog(YAMLFileCatalog):
    _suffix_map = {}

    @classmethod
    def generate_paths(cls, prefix):
        res = {}
        exists = []
        for key, val in cls._suffix_map.items():
            res[key] = Path(prefix + val)
            logger.debug(f"Data for {cls.__name__} {key} will be saved to: {val}")
            if res[key].exists():
                exists.append(key)
        if exists:
            for file_id in exists:
                logger.error(f"Output file already exists for {file_id}: {res[file_id]}")
            raise OSError("Please remove output files before continuing")
        return res

    @property
    def user_metadata(self):
        return self.metadata.get("user_metadata", {})

    @property
    def porec_metadata(self):
        md = self.metadata.copy()
        if "user_metadata" in md:
            md.pop("user_metadata")
        return md

    @staticmethod
    def create_catalog_dict(klass, file_paths, metadata, user_metadata, drivers=None):  # noqa: C901
        if drivers is None:
            drivers = {}
        if not metadata:
            metadata = {}
        if user_metadata:
            metadata["user_metadata"] = user_metadata
        catalog_data = {
            "name": klass.name,
            "description": klass.description,
            "driver": f"pore_c.catalogs.{klass.__name__}",
            "metadata": metadata,
            "sources": {},
        }
        driver_lookup = {
            ".csv": "csv",
            ".parquet": "parquet",
            ".fq.gz": "pore_c.datasources.Fastq",
            ".fasta.gz": "pore_c.datasources.IndexedFasta",
            ".fa.gz": "pore_c.datasources.IndexedFasta",
            ".bam": "pore_c.datasources.NameSortedBamSource",
            ".fq": "pore_c.datasources.Fastq",
            ".pairs.gz": "pore_c.datasources.IndexedPairFile",
            ".catalog.yaml": "intake.catalog.local.YAMLFileCatalog",
        }
        catalog_path = file_paths["catalog"]
        for key, val in file_paths.items():
            if key == "catalog":
                pass
            elif isinstance(val, DataSource):
                raise NotImplementedError
                catalog_data["sources"][key] = val._yaml()
            elif isinstance(val, Path):
                driver = drivers.get(key, None)
                if driver is None:
                    for suffix, _driver in driver_lookup.items():
                        if val.name.endswith(suffix):
                            driver = _driver
                            break
                if driver is None:
                    logger.warning(f"No driver found found for {key}: {val}, skipping.")
                    continue
                try:
                    urlpath = "{{ CATALOG_DIR }}/" + str(val.relative_to(catalog_path.parent))
                except ValueError:
                    urlpath = str(val.resolve())
                if driver == "intake.catalog.local.YAMLFileCatalog":
                    path_key = "path"
                else:
                    path_key = "urlpath"
                catalog_data["sources"][key] = {"driver": driver, "args": {path_key: urlpath}}
            else:
                raise ValueError(val)
        return catalog_data


[docs]class RawReadCatalog(basePoreCCatalog): name = "pore_c_raw_reads" description = "A catalog of data associated with raw reads" _suffix_map = { "catalog": ".catalog.yaml", "pass_fastq": ".batch.fq.gz", "fail_fastq": ".fail.fq.gz", "read_metadata": ".read_metadata.parquet", "summary": ".summary.csv", }
[docs] @classmethod def create(cls, file_paths, *args, **kwds): catalog_data = basePoreCCatalog.create_catalog_dict(cls, file_paths, *args, **kwds) catalog_path = file_paths["catalog"] with catalog_path.open("w") as fh: fh.write(yaml.dump(catalog_data, default_flow_style=False, sort_keys=False)) cat = cls(str(catalog_path)) return cat
def __str__(self): return "<RawReadCatalog reads={num_sequences} bases={total_bases:,}>".format( **self.metadata["summary_stats"]["pass"] )
[docs]class VirtualDigestCatalog(basePoreCCatalog): name = "pore_c_virtual_digest" description = "An intake catalog file for a virtual digest of a reference genome" _suffix_map = {"catalog": ".catalog.yaml", "fragments": ".fragments.parquet", "digest_stats": ".digest_stats.csv"}
[docs] @classmethod def create(cls, file_paths, *args, **kwds): catalog_data = basePoreCCatalog.create_catalog_dict(cls, file_paths, *args, **kwds) catalog_path = file_paths["catalog"] with catalog_path.open("w") as fh: fh.write(yaml.dump(catalog_data, default_flow_style=False, sort_keys=False)) cat = cls(str(catalog_path)) return cat
def __str__(self): return "<VirtualDigestCatalog digest_type={} digest_param:{} num_fragments:{} path:{}>".format( self.metadata["digest_type"], self.metadata["digest_param"], self.metadata["num_fragments"], self.path )
[docs]class ReferenceGenomeCatalog(basePoreCCatalog): name = "pore_c_reference_genome" description = "An intake catalog file for a reference genome" _suffix_map = { "catalog": ".catalog.yaml", "fasta": ".fa", "chromsizes": ".chromsizes", "chrom_metadata": ".metadata.csv", }
[docs] @classmethod def create(cls, file_paths, *args, **kwds): catalog_data = basePoreCCatalog.create_catalog_dict( cls, file_paths, *args, **kwds, drivers={"chromsizes": "csv"} ) catalog_path = file_paths["catalog"] with catalog_path.open("w") as fh: fh.write(yaml.dump(catalog_data, default_flow_style=False, sort_keys=False)) cat = cls(str(catalog_path)) return cat
@property def chrom_lengths(self): return self.metadata["chrom_lengths"] @property def chrom_order(self): return list(self.metadata["chrom_lengths"].keys()) @property def chrom_dtype(self): return pd.CategoricalDtype(self.chrom_order, ordered=True) @property def genome_size(self): return sum(self.metadata["chrom_lengths"].values()) @property def genome_id(self): return self.metadata["genome_id"] def __str__(self): return "<ReferenceGenomeCatalog genome_id={} genome_size={:,} num_chroms={} chroms={}..{}>".format( self.genome_id, self.genome_size, len(self.chrom_lengths), ",".join(self.chrom_order[:3]), self.chrom_order[-1], )