Source code for pore_c.analyses.contacts

import logging
import os
import shutil
import subprocess as sp
from collections import defaultdict
from typing import Dict, Optional

import dask.dataframe as dd
import numpy as np
import pandas as pd
import pyranges as pr
from cooler import Cooler, create_cooler
from cooler.util import binnify
from pysam import FastaFile

from pore_c.analyses.reads import read_length_stats
from pore_c.analyses.reference import revcomp
from pore_c.config import PQ_ENGINE, PQ_VERSION, SHORT_RANGE_CUTOFF
from pore_c.model import (
    PoreCConcatemerRecord,
    PoreCConcatemerRecordDf,
    PoreCContactRecordDf,
)

logger = logging.getLogger(__name__)


[docs]def export_to_cooler( contact_table, output_prefix, cooler_resolution, fragment_table, chromsizes, query, query_columns=None, by_haplotype=False, ): results = [] if query_columns: columns = query_columns[:] else: columns = [] columns.extend(["align1_fragment_id", "align2_fragment_id"]) if by_haplotype: columns.extend(["align1_haplotype", "align2_haplotype"]) contact_df = dd.read_parquet(contact_table, engine=PQ_ENGINE, version=PQ_VERSION, columns=columns, index=False) if query: contact_df = contact_df.query(query) chrom_dict = pd.read_csv( chromsizes, sep="\t", header=None, names=["chrom", "size"], index_col=["chrom"], squeeze=True ) # create even-widht bins using cooler bins_df = binnify(chrom_dict, cooler_resolution) bins_df.index.name = "bin_id" # convert to ranges for overlap bins = pr.PyRanges(bins_df.reset_index().rename(columns={"start": "Start", "end": "End", "chrom": "Chromosome"})) fragment_df = dd.read_parquet(fragment_table, engine=PQ_ENGINE, version=PQ_VERSION).compute() midpoint_df = pr.PyRanges( fragment_df.reset_index()[["chrom", "start", "end", "fragment_id"]] .assign(start=lambda x: ((x.start + x.end) * 0.5).round(0).astype(int)) .eval("end = start + 1") .rename(columns={"chrom": "Chromosome", "start": "Start", "end": "End"}) ) # use a pyranges joing to assign fragments to bins fragment_to_bin = midpoint_df.join(bins, how="left").df[["fragment_id", "bin_id"]] fragment_to_bin = fragment_to_bin.set_index("fragment_id").sort_index() # .astype(np.uint32) nulls = fragment_to_bin["bin_id"] == -1 if nulls.any(): logger.warning( "Some fragments did not overlap bins, removing from analysis:\n{}".format( fragment_to_bin[nulls].join(fragment_df) ) ) fragment_to_bin = fragment_to_bin[~nulls] # use a join to assign each end of a contact to a bin binned_contacts = ( contact_df.merge(fragment_to_bin, how="inner", right_index=True, left_on="align1_fragment_id") .merge(fragment_to_bin, how="inner", right_index=True, left_on="align2_fragment_id", suffixes=[None, "_2"]) .rename(columns={"bin_id": "bin1_id", "bin_id_2": "bin2_id"}) ) if not by_haplotype: cooler_path = output_prefix + ".cool" # group size == number of contacts per bin_pair pixels = binned_contacts.groupby(["bin1_id", "bin2_id"]).size().rename("count").astype(np.int32).reset_index() create_cooler(cooler_path, bins_df, pixels, ordered=True, symmetric_upper=True, ensure_sorted=True) c = Cooler(cooler_path) logger.info(f"Created cooler: {c.info}") results.append(cooler_path) else: tmp_parquet = output_prefix + ".tmp.pq" pixels = ( # create a key to groupy by haplotype pair, order of haplotypes doesn't matter binned_contacts.assign( hap_key=lambda x: x[["align1_haplotype", "align2_haplotype"]].apply( lambda y: "{}_{}".format(*sorted(y)).replace("-1", "nohap"), axis=1, meta="object" ) ) .groupby(["hap_key", "bin1_id", "bin2_id"]) .size() .rename("count") .astype(np.int32) .reset_index() .astype({"hap_key": "category"}) ) # save to a temporary parquet file, this might not be necessary # but want to avoid the whole contact matrix hitting memory pixels.to_parquet( tmp_parquet, write_metadata_file=True, partition_on=["hap_key"], write_index=False, engine=PQ_ENGINE, version=PQ_VERSION, ) pixels = dd.read_parquet(tmp_parquet, engine=PQ_ENGINE, version=PQ_VERSION, columns=["hap_key"], index=False) hap_keys = pixels["hap_key"].unique().compute() # create a cooler for each haplotype pair for hap_key in hap_keys: cooler_path = f"{output_prefix}.{hap_key}.cool" pixels = dd.read_parquet( tmp_parquet, filters=[("hap_key", "==", hap_key)], index=False, engine=PQ_ENGINE, version=PQ_VERSION, columns=["bin1_id", "bin2_id", "count"], ) create_cooler(cooler_path, bins_df, pixels, ordered=True, symmetric_upper=True, ensure_sorted=True) c = Cooler(cooler_path) logger.info(f"Created cooler: {c.info}") results.append(cooler_path) shutil.rmtree(tmp_parquet) return results
[docs]def export_to_salsa_bed(contact_table, output_prefix, query, query_columns): if query_columns: columns = query_columns[:] else: columns = [] columns.extend( [ "read_name", "align1_chrom", "align1_start", "align1_end", "align1_strand", "align1_mapping_quality", "align2_chrom", "align2_start", "align2_end", "align2_strand", "align2_mapping_quality", ] ) contact_df = dd.read_parquet(contact_table, engine=PQ_ENGINE, version=PQ_VERSION, columns=columns, index=False) if query: contact_df = contact_df.query(query) bed_file = output_prefix + ".salsa.bed" dtypes = contact_df.head(1).dtypes strand_dtype = pd.CategoricalDtype(["+", "-"], ordered=False) meta = { "chrom": dtypes["align1_chrom"], "start": dtypes["align1_chrom"], "end": dtypes["align1_chrom"], "read_pair_id": str, "mapping_quality": dtypes["align1_mapping_quality"], "strand": strand_dtype, } def to_salsa_long(df): df = df.set_index("read_name") contact_idx = df.groupby(level="read_name")["align1_chrom"].transform(lambda x: np.arange(len(x), dtype=int)) df["align1_contact_idx"] = contact_idx df["align2_contact_idx"] = contact_idx new_columns = pd.MultiIndex.from_tuples( [c.replace("align", "").split("_", 1) for c in df.columns], names=["pair_idx", "value"] ) df.columns = new_columns df = df.stack(level="pair_idx").reset_index() df["read_pair_id"] = df[["read_name", "contact_idx", "pair_idx"]].apply( lambda x: f"read{x.read_name}_{x.contact_idx}/{x.pair_idx}", axis=1 ) df["strand"] = df["strand"].replace({True: "+", False: "-"}).astype(strand_dtype) return df[["chrom", "start", "end", "read_pair_id", "mapping_quality", "strand"]] contact_df.map_partitions(to_salsa_long, meta=meta).to_csv( bed_file, single_file=True, sep="\t", header=False, index=False ) return bed_file
[docs]def export_to_pairs(contact_table, output_prefix, chromsizes, query, query_columns): if query_columns: columns = query_columns[:] else: columns = [] columns.extend( [ "read_name", "contact_is_direct", "contact_read_distance", "align1_align_idx", "align1_chrom", "align1_fragment_start", "align1_fragment_end", "align1_strand", "align1_mapping_quality", "align2_align_idx", "align2_chrom", "align2_fragment_start", "align2_fragment_end", "align2_strand", "align2_mapping_quality", ] ) contact_df = dd.read_parquet(contact_table, engine=PQ_ENGINE, version=PQ_VERSION, columns=columns, index=False) chrom_dict = pd.read_csv( chromsizes, sep="\t", header=None, names=["chrom", "size"], index_col=["chrom"], squeeze=True ) if query: contact_df = contact_df.query(query) dtypes = contact_df.head(1).dtypes strand_dtype = pd.CategoricalDtype(["+", "-"], ordered=False) junction_dtype = pd.CategoricalDtype(["DJ", "IJ"], ordered=False) meta = { "readID": str, "chr1": dtypes["align1_chrom"], "pos1": dtypes["align1_fragment_start"], "chr2": dtypes["align2_chrom"], "pos2": dtypes["align2_fragment_start"], "strand1": strand_dtype, "strand2": strand_dtype, "pair_type": junction_dtype, "align1_idx": dtypes["align1_align_idx"], "align2_idx": dtypes["align2_align_idx"], "distance_on_read": dtypes["contact_read_distance"], } pairs_file = output_prefix + ".pairs" # dask to_csv doesn't support append writing, so need to concatenate header after the fact header_file = output_prefix + ".pairs.header" body_file = output_prefix + ".pairs.body" header_lines = lines = ["## pairs format 1.0", "#shape: upper triangle"] header_lines.extend(["#chromsize: {} {}".format(*_) for _ in chrom_dict.items()]) header_lines.append("#columns: {}".format(" ".join(meta.keys()))) with open(header_file, "w") as output_fh: output_fh.write("{}\n".format("\n".join(lines))) def to_pairs_df(df): df["contact_idx"] = df.groupby("read_name")["align1_chrom"].transform(lambda x: np.arange(len(x), dtype=int)) df = ( df.reset_index() .assign( readID=lambda x: x[["read_name", "contact_idx"]].apply( lambda y: f"read{y.read_name}_{y.contact_idx:d}", axis=1 ), pos1=lambda x: np.rint(x.eval("0.5 * (align1_fragment_start + align1_fragment_end)")).astype(int), pos2=lambda x: np.rint(x.eval("0.5 * (align2_fragment_start + align2_fragment_end)")).astype(int), align1_strand=lambda x: x["align1_strand"].replace({True: "+", False: "-"}).astype(strand_dtype), align2_strand=lambda x: x["align2_strand"].replace({True: "+", False: "-"}).astype(strand_dtype), pair_type=lambda x: x["contact_is_direct"].replace({True: "DJ", False: "IJ"}).astype(junction_dtype), ) .rename( columns=dict( align1_chrom="chr1", align2_chrom="chr2", align1_strand="strand1", align2_strand="strand2", align2_align_idx="align2_idx", align1_align_idx="align1_idx", contact_read_distance="distance_on_read", ) ) ) return df[list(meta.keys())] contact_df.map_partitions(to_pairs_df, meta=meta).to_csv( body_file, mode="w+", single_file=True, sep="\t", header=False, index=False ) sp.check_call(f"cat {header_file} {body_file} > {pairs_file}", shell=True) os.unlink(header_file) os.unlink(body_file) return pairs_file
[docs]def export_to_paired_end_fastq( contact_table, output_prefix, reference_fasta, query, query_columns=None, read_length=50 ): if query_columns: columns = query_columns[:] else: columns = [] columns.extend( [ "read_name", "align1_fragment_id", "align1_chrom", "align1_fragment_start", "align1_fragment_end", "align1_strand", "align2_fragment_id", "align2_chrom", "align2_fragment_start", "align2_fragment_end", "align2_strand", ] ) contact_df = dd.read_parquet(contact_table, engine=PQ_ENGINE, version=PQ_VERSION, columns=columns, index=False) if query: query += " & (align1_fragment_id != align2_fragment_id) " else: query += "(align1_fragment_id != align2_fragment_id) " contact_df = contact_df.query(query) fastq1 = output_prefix + ".1.fastq" fastq2 = output_prefix + ".2.fastq" ref = FastaFile(reference_fasta) fhs = [open(fastq1, "w"), open(fastq2, "w")] qual_str = "I" * read_length contact_counter = defaultdict(int) for partition in range(contact_df.npartitions): df = ( contact_df.get_partition(partition) .compute() .assign( pos1=lambda x: np.rint(x.eval("0.5 * (align1_fragment_start + align1_fragment_end)")).astype(int), pos2=lambda x: np.rint(x.eval("0.5 * (align2_fragment_start + align2_fragment_end)")).astype(int), ) ) df["pos1"].clip(lower=read_length, inplace=True) df["pos2"].clip(lower=read_length, inplace=True) for idx, row in df.iterrows(): contact_counter[row.read_name] += 1 contact_idx = contact_counter[row.read_name] read_name = f"{row.read_name}:{contact_idx:04}" read1 = ref.fetch(row.align1_chrom, row.pos1 - read_length, row.pos1) if row.align1_strand is False: read1 = revcomp(read1) fhs[0].write(f"@{read_name} 1:N:0:1\n{read1}\n{qual_str}\n") read2 = ref.fetch(row.align2_chrom, row.pos2 - read_length, row.pos2) if row.align2_strand is False: read2 = revcomp(read2) fhs[1].write(f"@{read_name} 2:N:0:1\n{read2}\n{qual_str}\n") return fastq1, fastq2
[docs]def export_to_merged_no_dups(contact_table, output_prefix, reference_fasta, query, query_columns=None, read_length=50): if query_columns: columns = query_columns[:] else: columns = [] columns.extend( [ "read_name", "align1_fragment_id", "align1_chrom", "align1_fragment_start", "align1_fragment_end", "align1_strand", "align2_fragment_id", "align2_chrom", "align2_fragment_start", "align2_fragment_end", "align2_strand", ] ) contact_df = dd.read_parquet(contact_table, engine=PQ_ENGINE, version=PQ_VERSION, columns=columns, index=False) if query: query += " & (align1_fragment_id != align2_fragment_id) " else: query += "(align1_fragment_id != align2_fragment_id) " contact_df = contact_df.query(query) mnd_file = output_prefix + ".mnd.txt" fh = open(mnd_file, "w") ref = FastaFile(reference_fasta) cigar_str = f"{read_length}M" contact_counter = defaultdict(int) for partition in range(contact_df.npartitions): df = ( contact_df.get_partition(partition) .compute() .astype({"align1_strand": "uint8", "align2_strand": "uint8"}) .assign( pos1=lambda x: np.rint(x.eval("0.5 * (align1_fragment_start + align1_fragment_end)")).astype(int), pos2=lambda x: np.rint(x.eval("0.5 * (align2_fragment_start + align2_fragment_end)")).astype(int), ) ) df["pos1"].clip(lower=read_length, inplace=True) df["pos2"].clip(lower=read_length, inplace=True) for _, row in df.iterrows(): contact_counter[row.read_name] += 1 contact_idx = contact_counter[row.read_name] read_name = f"{row.read_name}:{contact_idx:04}" read1 = ref.fetch(row.align1_chrom, row.pos1 - read_length, row.pos1) if row.align1_strand == 1: read1 = revcomp(read1) read2 = ref.fetch(row.align2_chrom, row.pos2 - read_length, row.pos2) if row.align2_strand == 1: read2 = revcomp(read2) # <str1> <chr1> <pos1> <frag1> # <str2> <chr2> <pos2> <frag2> # <mapq1> <cigar1> <sequence1> # <mapq2> <cigar2> <sequence2> # <readname1> <readname2> fh.write( f"{row.align1_strand} {row.align1_chrom} {row.pos1} {row.align1_fragment_id} " f"{row.align2_strand} {row.align2_chrom} {row.pos2} {row.align2_fragment_id} " f"60 {cigar_str} {read1} " f"60 {cigar_str} {read2} " f"{read_name}:1 {read_name}:2\n" ) return mnd_file
[docs]def gather_concatemer_stats(contact_df: PoreCContactRecordDf) -> PoreCConcatemerRecordDf: contact_df["is_short_range"] = contact_df["contact_fragment_distance"] < SHORT_RANGE_CUTOFF by_read = contact_df.groupby("read_name", as_index=True) read_stats = by_read[["read_length", "read_idx"]].first() num_fragments = ( by_read[["align1_fragment_id", "align2_fragment_id"]] .agg(lambda x: set(x.unique())) .apply(lambda x: len(x.align1_fragment_id.union(x.align2_fragment_id)), axis=1) .rename("num_fragments") ) num_reads = len(by_read) by_read_and_type = contact_df.groupby(["read_name", "contact_is_direct"], as_index=True) def flatten_index(df, sep="_"): df.columns = [sep.join(t) for t in df.columns.to_flat_index()] return df contact_counts = ( contact_df.groupby(["read_name", "contact_is_direct", "contact_is_cis"]) .size() .rename(index={True: "direct", False: "indirect"}, level="contact_is_direct") .rename(index={True: "cis", False: "trans"}, level="contact_is_cis") .unstack(fill_value=0, level="contact_is_direct") .unstack(fill_value=0, level="contact_is_cis") .pipe(flatten_index) .astype(int) .eval("total_cis = direct_cis + indirect_cis") .eval("total_trans = direct_trans + indirect_trans") .eval("direct = direct_cis + direct_trans") .eval("indirect = indirect_cis + indirect_trans") .eval("total = total_cis + total_trans") .add_suffix("_contacts") .eval("read_order = direct_contacts + 1") ) short_range_counts = ( contact_df.loc[contact_df.contact_is_cis] .groupby(["read_name", "contact_is_direct", "is_short_range"], as_index=True) .size() .rename(index={True: "short_range", False: "long_range"}, level=-1) .rename(index={True: "direct", False: "indirect"}, level=-2) .unstack(fill_value=0, level=-2) .unstack(fill_value=0, level=-1) .pipe(flatten_index) .eval("total_short_range = direct_short_range + indirect_short_range") .eval("total_long_range = direct_long_range + indirect_long_range") .add_suffix("_cis_contacts") ) haplotype_stats = by_read["haplotype_pair_type"].value_counts().unstack(fill_value=0) drop = [] for cat in contact_df.haplotype_pair_type.cat.categories: if cat == "null" or cat == "trans": if cat in haplotype_stats.columns: drop.append(cat) elif cat not in haplotype_stats.columns: haplotype_stats[cat] = 0 if drop: haplotype_stats = haplotype_stats.drop(columns=drop) haplotype_stats = haplotype_stats.add_prefix("haplotype_") max_distance = ( by_read_and_type[["contact_genome_distance", "contact_fragment_distance"]] .max() .unstack(fill_value=0) .astype(int) .rename(columns={True: "direct", False: "indirect"}) ) max_distance.columns = ["max_{1}_{0}".format(*_) for _ in max_distance.columns] dtype = PoreCConcatemerRecord.pandas_dtype() res = ( contact_counts.join(haplotype_stats, how="left") .join(read_stats, how="left") .join(max_distance, how="left") .join(short_range_counts, how="left") .join(num_fragments, how="left") .fillna({c: 0 for c in short_range_counts.columns}) .reset_index() # .astype(dtype) [list(dtype.keys())] ) assert len(res) == num_reads return res
[docs]def summarize_concatemer_table( concatemer_df: PoreCConcatemerRecordDf, read_summary_table: str, user_metadata: Optional[Dict] = None ) -> pd.DataFrame: concatemer_read_length_stats = read_length_stats(concatemer_df["read_length"]) # read_subset_dtype = pd.CategoricalDtype(["all", "pass", "fail"]) read_summary_table = ( pd.read_csv(read_summary_table) .query("read_subset == 'all'") .assign(index=0) # .astype({"read_subset": read_subset_dtype}) .set_index(["index", "read_subset"]) .unstack(fill_value=0) .rename({"all": "all_reads", "pass": "pass_read_qc", "fail": "fail_read_qc"}, axis=1, level=1) ) # print(read_summary_table) if user_metadata: user_metadata = pd.Series(user_metadata, name="value").to_frame() else: user_metadata = None contact_counts = ( concatemer_df.loc[:, lambda x: [c for c in x.columns if c.endswith("_contacts")]] .sum() .astype(int) .rename("count") .to_frame() ) contact_counts.index = pd.MultiIndex.from_tuples( [c.split("_", 1) for c in contact_counts.index], names=["subset", "value"] ) contact_props = ( contact_counts.div(contact_counts.xs("contacts", level="value", drop_level=True), level="subset", axis=0) .mul(100.0) .rename(columns={"count": "perc"}) ) contact_section = ( contact_counts.join(contact_props) .rename_axis("variable", axis=1) .stack() .rename("value") .to_frame() .reorder_levels([0, 2, 1]) .sort_index(level=[0, 1], sort_remaining=False) ) read_counts = read_summary_table.xs("num_sequences", axis=1).copy() read_data = {"all": read_counts.loc[0, "all_reads"], "concatemer": len(concatemer_df)} read_data["fail_filter"] = read_data["all"] - read_data["concatemer"] read_data = pd.Series(read_data, name="count").to_frame() read_data["perc"] = 100.0 * read_data["count"] / read_data.loc["all", "count"] read_data = ( read_data.rename_axis("variable", axis=1).stack().rename("value").to_frame().reorder_levels([1, 0]).sort_index() ) bases = read_summary_table.xs("total_bases", axis=1).copy() base_data = {"all": bases.loc[0, "all_reads"], "concatemer": concatemer_read_length_stats["total_bases"]} base_data["fail_filter"] = base_data["all"] - base_data["concatemer"] base_data = pd.Series(base_data, name="bases").to_frame() base_data["perc"] = 100.0 * base_data["bases"] / base_data.loc["all", "bases"] base_data = ( base_data.rename_axis("variable", axis=1).stack().rename("value").to_frame().reorder_levels([1, 0]).sort_index() ) read_length_data = { "all": read_summary_table.at[0, ("N50", "all_reads")], "concatemer": concatemer_read_length_stats["N50"], } read_length_data = ( pd.Series(read_length_data, name="value") .to_frame() .rename_axis("subset") .assign(variable="N50") .set_index("variable", append=True) .reorder_levels([1, 0]) .sort_index(level=0) ) # fragments_per_alignment = concatemer_df.eval("(1.0 * num_fragments) / read_order") # print(fragments_per_alignment.describe()) # fragment_data = { # "mean": fragments_per_alignment.mean(), # "median": fragments_per_alignment.median(), # } # print(fragment_data) length_bins = pd.IntervalIndex.from_breaks([1, 2, 3, 4, 6, 11, 21, 50, int(1e9)]) length_bin_labels = {} for i in length_bins: if i.length == 1: label = str(i.right) elif i.length >= int(1e8): label = f"gt_{i.left}" else: label = f"{i.left + 1}-{i.right}" length_bin_labels[i] = label read_order_hist = ( pd.cut(concatemer_df["read_order"], length_bins, labels=length_bin_labels) .value_counts() .rename("count") .sort_index() ) read_order_hist = read_order_hist.to_frame().rename(index=length_bin_labels).rename_axis("concatemer_order") read_order_hist["perc"] = read_order_hist.div(read_order_hist["count"].sum(), axis=1) * 100 read_order_hist = ( read_order_hist.rename_axis("variable", axis=1) .stack() .rename("value") .to_frame() .reorder_levels([1, 0]) .sort_index(level="variable", sort_remaining=False) ) total_contacts = contact_section.at[("total", "count", "contacts"), "value"] density_data = ( pd.Series( { "all": 1e9 * total_contacts / base_data.at[("bases", "all"), "value"], "concatemer": 1e9 * total_contacts / base_data.at[("bases", "concatemer"), "value"], } ) .rename_axis("divisor") .rename("value") .to_frame() .assign(variable="contacts_per_gb") .set_index("variable", append=True) .reorder_levels([1, 0]) ) def normalize_levels(dfs): max_levels = max(len(df.index.names) for df in dfs.values()) level_labels = [f"level_{x}" for x in range(max_levels)] res = {} for key, val in dfs.items(): existing_levels = val.index.names add_levels = max_levels - len(existing_levels) rename_levels = dict(zip(existing_levels, level_labels[add_levels:])) for x in range(add_levels): val[level_labels[x]] = "" val = val.set_index(level_labels[x], append=True) new_labels = [rename_levels.get(_, _) for _ in val.index.names] val.index.rename(new_labels, inplace=True) val = val.reorder_levels(level_labels) res[key] = val res = pd.concat(res, axis=0, names=["section"]) idx_names = res.index.names res = res.T.convert_dtypes().astype(str).T res.index.rename(idx_names, inplace=True) return res summary_dfs = { "user_metadata": user_metadata, "reads": read_data, "read_length": read_length_data, "bases": base_data, "contacts": contact_section, "density": density_data, "concatemer_order": read_order_hist, } if user_metadata is None: summary_dfs.pop("user_metadata") long_df = normalize_levels(summary_dfs) return long_df