Source code for bwapy.libbwa

import argparse
from collections import namedtuple
import importlib
import imp
import os
import sys

from cffi import FFI
ffi = FFI()

"""High-level interface to bwa (mem) aligner."""


[docs]def get_shared_lib(name): """Cross-platform resolution of shared-object libraries, working around vagueries of setuptools. :param name: name of shared library to find. :returns: FFI shared library object. """ try: # after 'python setup.py install' we should be able to do this lib_file = importlib.import_module(name).__file__ except Exception as e: try: # after 'python setup.py develop' this should work lib_file = imp.find_module(name)[1] except Exception as e: raise ImportError('Cannot locate C library "{}".'.format(name)) else: lib_file = os.path.abspath(lib_file) finally: library = ffi.dlopen(lib_file) return library
libbwa = get_shared_lib('bwalib') ffi.cdef(""" //////////////////////////////// // Alignment hit list structures // typedef struct { int64_t rb, re; // [rb,re): reference sequence in the alignment int qb, qe; // [qb,qe): query sequence in the alignment int rid; // reference seq ID int score; // best local SW score int truesc; // actual score corresponding to the aligned region; possibly smaller than $score int sub; // 2nd best SW score int alt_sc; int csub; // SW score of a tandem hit int sub_n; // approximate number of suboptimal hits int w; // actual band width used in extension int seedcov; // length of regions coverged by seeds int secondary; // index of the parent hit shadowing the current hit; <0 if primary int secondary_all; int seedlen0; // length of the starting seed int n_comp:30, is_alt:2; // number of sub-alignments chained together float frac_rep; uint64_t hash; } mem_alnreg_t; typedef struct { size_t n, m; mem_alnreg_t *a; } mem_alnreg_v; typedef struct { // This struct is only used for the convenience of API. int64_t pos; // forward strand 5'-end mapping position int rid; // reference sequence index in bntseq_t; <0 for unmapped int flag; // extra flag uint32_t is_rev:1, is_alt:1, mapq:8, NM:22; // is_rev: whether on the reverse strand; mapq: mapping quality; NM: edit distance int n_cigar; // number of CIGAR operations uint32_t *cigar; // CIGAR in the BAM encoding: opLen<<4|op; op to integer mapping: MIDSH=>01234 char *XA; // alternative mappings int score, sub, alt_sc; } mem_aln_t; typedef struct { size_t n; mem_aln_t *aln; } mem_aln_v; void free_mem_aln_v (mem_aln_v *alns); /////////////////////// // bwa index structures // typedef uint64_t bwtint_t; typedef struct { bwtint_t primary; // S^{-1}(0), or the primary index of BWT bwtint_t L2[5]; // C(), cumulative count bwtint_t seq_len; // sequence length bwtint_t bwt_size; // size of bwt, about seq_len/4 uint32_t *bwt; // BWT // occurance array, separated to two parts uint32_t cnt_table[256]; // suffix array int sa_intv; bwtint_t n_sa; bwtint_t *sa; } bwt_t; typedef struct { int64_t offset; int32_t len; int32_t n_ambs; uint32_t gi; int32_t is_alt; char *name, *anno; } bntann1_t; typedef struct { int64_t offset; int32_t len; char amb; } bntamb1_t; typedef struct { int64_t l_pac; int32_t n_seqs; uint32_t seed; bntann1_t *anns; // n_seqs elements int32_t n_holes; bntamb1_t *ambs; // n_holes elements FILE *fp_pac; } bntseq_t; typedef struct { bwt_t *bwt; // FM-index bntseq_t *bns; // information on the reference sequences uint8_t *pac; // the actual 2-bit encoded reference sequences with 'N' converted to a random base int is_shm; int64_t l_mem; uint8_t *mem; } bwaidx_t; bwaidx_t *bwa_idx_load_all(const char *hint); void bwa_idx_destroy(bwaidx_t *idx); ///////////////// // Option parsing // typedef struct { int a, b; // match score and mismatch penalty int o_del, e_del; int o_ins, e_ins; int pen_unpaired; // phred-scaled penalty for unpaired reads int pen_clip5,pen_clip3;// clipping penalty. This score is not deducted from the DP score. int w; // band width int zdrop; // Z-dropoff uint64_t max_mem_intv; int T; // output score threshold; only affecting output int flag; // see MEM_F_* macros int min_seed_len; // minimum seed length int min_chain_weight; int max_chain_extend; float split_factor; // split into a seed if MEM is longer than min_seed_len*split_factor int split_width; // split into a seed if its occurence is smaller than this value int max_occ; // skip a seed if its occurence is larger than this value int max_chain_gap; // do not chain seed if it is max_chain_gap-bp away from the closest seed int n_threads; // number of threads int chunk_size; // process chunk_size-bp sequences in a batch float mask_level; // regard a hit as redundant if the overlap with another better hit is over mask_level times the min length of the two hits float drop_ratio; // drop a chain if its seed coverage is below drop_ratio times the seed coverage of a better chain overlapping with the small chain float XA_drop_ratio; // when counting hits for the XA tag, ignore alignments with score < XA_drop_ratio * max_score; only effective for the XA tag float mask_level_redun; float mapQ_coef_len; int mapQ_coef_fac; int max_ins; // when estimating insert size distribution, skip pairs with insert longer than this value int max_matesw; // perform maximally max_matesw rounds of mate-SW for each end int max_XA_hits, max_XA_hits_alt; // if there are max_hits or fewer, output them all int8_t mat[25]; // scoring matrix; mat[0] == 0 if unset } mem_opt_t; mem_opt_t * get_opts(int argc, char *argv[], bwaidx_t * idx); static const char valid_opts[]; /////////////////// // Run an alignment // mem_aln_v *align(mem_opt_t * opt, bwaidx_t * index, char * seq); """) Alignment = namedtuple('Alignment', [ 'rname', 'orient', 'pos', 'mapq', 'cigar', 'NM' ])
[docs]class BwaAligner(object): def __init__(self, index:str, options:str=''): """Interface to bwa mem alignment. :param index: bwa index base path. :param options: alignment options as would be given on the bwa mem command line. """ self.index_base = index.encode() self._cigchar = "MIDSH" # find which options are valid valid_opts = ffi.string(libbwa.valid_opts).decode().replace(':', '') for opt in options.split(): if opt[0] != '-': continue if opt[1] not in valid_opts: raise ValueError( "Option '{}' is not a valid option (allowed: {}).".format( opt, ' '.join(valid_opts) )) # we need to pass the index to the option parsing # TODO: clean up this requirement self.index = libbwa.bwa_idx_load_all(self.index_base) if self.index == ffi.NULL: raise ValueError('Failed to load bwa index.') argv = ['bwapy'] + options.split() argc = len(argv) self.opt = libbwa.get_opts(argc, [ffi.new('char[]', x.encode()) for x in argv], self.index ) if self.opt == ffi.NULL: raise ValueError('Failed to parse options.') def __del__(self): if hasattr(self, 'index'): libbwa.bwa_idx_destroy(self.index) if hasattr(self, 'opts'): cffi.free(self.opts) def _build_alignment(self, aln): cigar = aln.cigar cigar = ''.join( # oplen + op str(cigar[k]>>4) + self._cigchar[cigar[k] & 0xf] for k in range(aln.n_cigar) ) return Alignment( ffi.string(self.index.bns.anns[aln.rid].name).decode(), '+-'[aln.is_rev], aln.pos, aln.mapq, cigar, aln.NM )
[docs] def align_seq(self, seq:str): """Align a sequence to the index. :param seq: base sequence to align :returns: tuple of :class:`Alignment` """ alns = libbwa.align(self.opt, self.index, seq.encode()) if alns == ffi.NULL: alignments = tuple() else: alignments = tuple(self._build_alignment(alns.aln[i]) for i in range(alns.n)) libbwa.free_mem_aln_v(alns) return alignments
[docs]def get_parser(): parser = argparse.ArgumentParser('Align a sequence with bwa mem.') parser.add_argument('index', help='bwa index base path.') parser.add_argument('sequence', nargs='+', help='base sequence') return parser
[docs]def main(): args, opts = get_parser().parse_known_args() options = '' if len(opts) > 0: options = ' '.join(opts) aligner = BwaAligner(args.index, options=options) for i, seq in enumerate(args.sequence, 1): alignments = aligner.align_seq(seq) print('Found {} alignments for input {}.'.format(len(alignments), i)) for aln in alignments: print(' ', aln)