Source code for pomoxis.common_errors_from_bam

import argparse
import pysam
from collections import Counter
from pomoxis.summary_from_stats import qscore
from pomoxis.stats_from_bam import stats_from_aligned_read


[docs]def get_errors(aln, ref_seq=None): seq = aln.query_sequence errors = {} insertions = '' pairs = aln.get_aligned_pairs(with_seq=True) if pairs[0][0] is None or pairs[0][1] is None: raise ValueError('It does not look like bam is trimmed to a common alignment window') for qp, rp, rb in pairs[::-1]: # process pairs in reverse to easily accumulate insertions if qp is None: # deletion errors[rp] = (rb, '-') elif rp is None: # insertion insertions += seq[qp] # if we reach here, qp is not None and rp is not None elif len(insertions) > 0: # this also includes cases where the ref and query don't agree # e.g. ref A-TGC # query GTTGC would emit, A -> GT errors[rp] = (rb.upper(), seq[qp] + insertions[::-1]) insertions = '' elif seq[qp] != rb: # mismatch errors[rp] = (rb.upper(), seq[qp]) if ref_seq is not None and rp is not None: assert ref_seq[rp] == rb.upper() if ref_seq is not None: # check we can scuff up reference using errors and get back our query scuffed = scuff_ref(ref_seq, errors) if not scuffed == aln.query_sequence: raise ValueError('Scuffing up reference with errors did not recreate query') return errors
[docs]def count_errors(errors): counts = Counter() for rp, (ref, query) in errors.items(): if query == '-': counts['del'] += 1 elif ref != query[0]: counts['sub'] += 1 if len(query) > 1: counts['ins'] += len(query) - 1 return counts
[docs]def scuff_ref(ref_seq, errors): orig_seq = list(ref_seq) for rp, (rb, alt) in errors.items(): assert orig_seq[rp] == rb if alt == '-': orig_seq[rp] = '' else: orig_seq[rp] = alt return ''.join(orig_seq)
[docs]def get_qscores(counts, ref_len): return { 'Q(acc)': qscore(sum(counts.values()) / ref_len), 'Q(iden)': qscore(counts['sub'] / (ref_len - counts['del'])), 'Q(ins)': qscore(counts['ins'] / ref_len), 'Q(del)': qscore(counts['del'] / ref_len), }
[docs]def main(): parser = argparse.ArgumentParser( prog='common_errors_from_bam', description='Get errors common to multiple assemblies aligned to ref.', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('bam', help='input bam file containing assemblies trimmed to a common alignment window') parser.add_argument('ref_fasta', help='reference fasta file of the reference over that alignment window') parser.add_argument('-o', '--output_prefix', default='common_errors', help='Prefix for outputs.') args = parser.parse_args() bam = pysam.AlignmentFile(args.bam) if len(bam.references) > 1: raise ValueError('Bam should have just one reference') ref_lengths = dict(zip(bam.references, bam.lengths)) ref_seq = pysam.FastaFile(args.ref_fasta).fetch(bam.references[0]) # reads should already be trimmed to a common aligment start and end point reads = [r for r in bam] ref_end, ref_start = reads[0].reference_end, reads[0].reference_start ref_len = ref_end - ref_start if not (all([r.reference_end == ref_end for r in reads]) and all([r.reference_start == ref_start for r in reads])): raise ValueError('Alignments have not been trimmed to a common overlap window, try trim_alignments') # get errors in each read data = {} qscores = [] for aln in reads: errors = get_errors(aln, ref_seq) counts = count_errors(errors) # check we got the same error counts as stats_from_aligned_read stats = stats_from_aligned_read(aln, list(ref_lengths.keys()), list(ref_lengths.values())) for k in counts.keys(): if stats[k] != counts[k]: msg = "Error counts {} don't match those from the CIGAR str {}." raise ValueError(msg.format(counts, {k: stats[k] for k in counts.keys()})) qscores.append((aln.query_name, get_qscores(counts, ref_len))) data[aln.query_name] = errors # get intersection of errors names = list(data.keys()) common_errors = set(data[names[0]].keys()) # set of reference positions for name in names[1:]: common_errors = common_errors.intersection(set(data[name].keys())) remaining_errors = {} # loop through common errors, checking ref is the same and retaining the # error with the shortest edit distance for rp in common_errors: ref = data[names[0]][rp][0] assert all([d[rp][0] == ref for d in data.values()]) # refs should be same alts = [d[rp][1] for d in data.values()] if len(set([len(alt) for alt in alts])) > 1: # we should take the best one alts = sorted(alts, key=lambda x: len(x)) shortest = alts[0] others = alts[1:] if shortest == '-' and any([alt[0] == ref for alt in others]): # the alt with the insertion contained the ref, # and the alt with the deletion did not contain the insertion # so two wrongs make a right! continue else: remaining_errors[rp] = (ref, shortest) else: # pick most common, arbitrary for equal numbers remaining_errors[rp] = (ref, Counter(alts).most_common()[0][0]) # write fasta of ref scuffed with just common errors ref_scuffed = scuff_ref(ref_seq, remaining_errors) with open('{}.fasta'.format(args.output_prefix), 'w') as fh: fh.write('>{}\n'.format(args.output_prefix)) fh.write(ref_scuffed) remaining_counts = count_errors(remaining_errors) qscores.append(('common_errors', get_qscores(remaining_counts, ref_len))) # print qscores of individual reads and overlapping errors cols = ['Q(acc)', 'Q(iden)', 'Q(del)', 'Q(ins)'] with open('{}.txt'.format(args.output_prefix), 'w') as fh: fh.write('\t'.join(['name'] + cols) + '\n') for name, d in qscores: fh.write('\t'.join([name] + [str(d[c]) for c in cols]) + '\n')
if __name__ == '__main__': main()