Source code for pomoxis.find_indels

import argparse
from concurrent.futures import ProcessPoolExecutor
import functools
import itertools
from math import ceil
import sys

import pysam

parser = argparse.ArgumentParser(
        prog='find_indels',
        description='Parse a bamfile and document indels.',
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)

parser.add_argument('bam', type=str, help='Path to bam file.')
parser.add_argument('-m', '--min_indel_length', type=int, default=0,
                    help='Filter out indels shorter than this length.')
parser.add_argument('-a', '--all_alignments', action='store_true',
                    help='Include secondary and supplementary alignments.')
parser.add_argument('-o', '--output', type=argparse.FileType('w'), default=sys.stdout,
                    help='Output indels to file instead of stdout.')
parser.add_argument('-b', '--bed', type=argparse.FileType('w'), default=None,
                    help='Additionaly output a .bed file.')
parser.add_argument('-t', '--threads', type=int, default=1,
                    help='Number of threads for parallel processing.')


def _get_indels(pairs, min_len=0):
    start = 0
    def check_is_indel(p):
         return None in p

    for is_indel, group in itertools.groupby(pairs, key=check_is_indel):
        length = sum(1 for x in group)
        if is_indel and length >= min_len:
            yield start, length
        start += length


[docs]def get_trimmed_pairs(aln): """Trim aligned pairs to the alignment. :param aln: `pysam.AlignedSegment` object :yields pairs: """ def pos_is_none(x): return x[1] is None or x[0] is None for qp, rp in itertools.dropwhile(pos_is_none, aln.get_aligned_pairs()): if (rp == aln.reference_end or qp == aln.query_alignment_end): break yield qp, rp
def _get_read_indels(read, min_indel_length=5): direction = '-' if read.is_reverse else '+' pairs = list(get_trimmed_pairs(read)) indels = [] for start_pair_i, n_pair in _get_indels(pairs, min_indel_length): qp, rp = pairs[start_pair_i] if rp is None: # insertion typ = 'ins' qp_start = qp qp_end = qp + n_pair # insertions are left-aligned, start at previous ref pos rp_start = pairs[start_pair_i - 1][1] # bed intervals are half-open, so exclude last position rp_end = rp_start + 1 else: typ = 'del' rp_start = rp rp_end = rp + n_pair qp_start = pairs[start_pair_i - 1][0] qp_end = qp_start + 1 indel = { 'type': typ, 'ref_name': read.reference_name, 'ref_start': rp_start, 'ref_end': rp_end, 'query_name': read.query_name, 'query_start': qp_start, 'query_end': qp_end, 'indel_len': n_pair, 'direction': direction, } indels.append(indel) return indels def _process_reads(bam_fp, start_stop, all_alignments=False, min_indel_length=None): start, stop = start_stop results = [] with pysam.AlignmentFile(bam_fp, 'rb') as bam: for i, read in enumerate(bam): if i < start: continue elif i == stop: break if read.is_unmapped: continue if not all_alignments and (read.is_secondary or read.is_supplementary): continue results.extend(_get_read_indels(read, min_indel_length=min_indel_length)) return results
[docs]def main(arguments=None): args = parser.parse_args(arguments) if args.threads > 1 and args.bed is not None: pool = ProcessPoolExecutor(args.threads) mapper = pool.map else: pool = None mapper = map args.threads = 1 headers = ['type', 'indel_len', 'ref_name', 'ref_start', 'ref_end', 'query_name', 'query_start', 'query_end', 'direction'] bed_cols = ['ref_name', 'ref_start', 'ref_end'] args.output.write('\t'.join(headers)) args.output.write('\n') # create a slice of reads to process in each thread to avoid looping through # bam n read times and reduce mp overhead with pysam.AlignmentFile(args.bam) as bam: n_reads = bam.count() n_reads_per_proc = ceil(n_reads / args.threads) ranges = [(start, min(start + n_reads_per_proc, n_reads)) for start in range(0, n_reads, n_reads_per_proc)] func = functools.partial(_process_reads, args.bam, all_alignments=args.all_alignments, min_indel_length=args.min_indel_length) for results in mapper(func, ranges): for result in results: out_row = (str(result[x]) for x in headers) args.output.write('\t'.join(out_row) + '\n') if args.bed is not None: out_row = (str(result[x]) for x in bed_cols) args.bed.write('\t'.join(out_row) + '\n') if pool is not None: pool.shutdown(wait=True)
if __name__ == '__main__': main()