Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
import argparse
from pathlib import Path
import tempfile
import time
from Bio.Align.Applications import ClustalwCommandline
import Bio.AlignIO
import numpy as np
import pysam
from conflict_positions import Region
from general_functions import finish_bam, open_bam, open_ref, try_remove
from sequence_collection import NamedSequence, SequenceCollection
alphabet = ['A', 'C', 'G', 'T']
cigar_tags = ['M','I','D','N','S','H','P','=','X']
def make_cigar_tuples(s1, ref):
"""
Generate cigar tuples of a mapping of a sequence to a reference sequence.
Returns a list of tuples, where each tuple consists of the tag number and
the length.
"""
result = []
current_tag = -1
current_length = 1
if len(s1) != len(ref):
print("seqeuence and reference have different length")
for c1, c2 in zip(s1, ref):
new_tag = -1
if c1 == c2:
if c1 in alphabet:
new_tag = '='
elif c1 == '-':
continue
elif c1 == '-':
new_tag = 'D'
elif c2 == '-':
new_tag = 'I'
else:
new_tag = 'X'
if new_tag == current_tag:
current_length += 1
elif current_tag == -1:
current_tag = new_tag
current_length = 1
else:
result.append((current_tag, current_length))
current_tag = new_tag
current_length = 1
result.append((current_tag, current_length))
return [(cigar_tags.index(tag), length) for (tag, length) in result]
def get_read_region(read, region: Region):
"""
Iterates through the aligned positions of a read. Finds the start and
end indices of the read sequence that are closest to the given region argument.
Returns these indices and the indices regarding the reference (as tuple).
"""
start_idx, end_idx = 0, -1
ref_idcs = [0, 0]
ref_pos = read.get_reference_positions(full_length=True)
for i, p in enumerate(ref_pos):
if p is None:
continue
if p <= region.start:
start_idx = i
ref_idcs[0] = p
continue
if start_idx == 0:
start_idx = i
ref_idcs[0] = p
end_idx = i
ref_idcs[1] = p
if p >= region.end:
break
if end_idx == -1:
end_idx = len(ref_pos)
return start_idx, end_idx, ref_idcs
def get_consensus(sequences, cut_gaps=False):
"""
Create a consensus alignment (str) out of aligned seqeuences.
For each alignment column the most abundand character (including gaps) is
chosen. If one wants to have the final alignment without gaps, one has to
use cut_gaps=True.
Returns a consensus sequences as string.
"""
# we translate the characters to 0-4 to be able to use np.bincount
alph = {"-": 0, "A": 1, "C": 2, "G": 3, "T": 4}
fun = np.vectorize(lambda base: alph[base])
arr = np.array([list(x) for x in sequences])
arr = fun(arr)
counts = np.apply_along_axis(lambda x: np.bincount(x, minlength=5), axis=0, arr=arr)
consensus = counts.argmax(axis=0)
# now we need to back translate the argmax's
alph = dict((v,k) for k,v in alph.items())
fun = np.vectorize(lambda base: alph[base])
consensus = fun(consensus).astype('|S1').tobytes().decode('utf-8')
if cut_gaps:
return consensus.replace("-", "")
else:
return consensus
def create_out_bam(out_file: Path, template):
"""
Creates a new BAM file with a given name and another BAM file as template.
"""
return pysam.AlignmentFile(str(out_file), "wb", template=template)
def run_multiple_msas(out_file_all: Path, args):
"""
Run the msa workflow on several regions.
Following args need to be in the args Namespace object:
filter_incomplete (bool)
min_quality (int)
filter_between (pair of int or None)
padding (int)
reverse_alignment (bool)
gap_extension (float)
gap_open (float)
gap_dist (float)
read_file (path to BAM file)
read_index (None or bam.bai file)
reference (path to reference fasta file)
"""
with_ref = not args.reference is None
if with_ref:
reference = open_ref(args.reference)
else:
reference = None
cnt = 0
sf = open_bam(args.read_file, args.read_index)
out_bam = create_out_bam(out_file_all, sf)
with open(args.regions_file, "r") as rs:
for region in rs:
if cnt == args.first_n:
break
msa_workflow(region, sf, out_bam, reference, args)
cnt += 1
sf.close()
out_bam.close()
def msa_workflow(region: Region, reads, out_file, reference, ref_name, args):
"""
Takes reads that fully cover the given region (+padding) and performs an MSA
using the external tool clustalw2. The MSA will also contain the sequence obtained
from the reference genome. Thus, a new BAM file will be generated using the
alignment as mapping to the reference.
Following args need to be in the args Namespace object:
filter_incomplete (bool)
min_quality (int)
filter_between (pair of int or None)
padding (int)
reverse_alignment (bool)
gap_extension (float)
gap_open (float)
gap_dist (float)
Returns a bool confirming the success of the function, the alignment string
of the reference, the read consensus, and the SequenceCollection.
"""
padded_region = region.with_padding(args.padding)
seq_coll, ref_id = gather_sequences(padded_region, reference, ref_name, reads, args)
if len(seq_coll) <= 2:
print("Not enough reads found for that region!")
return False, None, None, seq_coll
temp_name = run_msa(seq_coll, args)
aln_file = temp_name.with_suffix(".aln")
ref_al, cons = rebuild_bam(aln_file, out_file, ref_id,
padded_region.start, args.reverse_alignment, ref_name)
# clean up files that have been produced by clustalw
try_remove(aln_file)
try_remove(temp_name.with_suffix(".dnd"))
return True, ref_al, cons, seq_coll
def gather_sequences(region: Region, reference, ref_name: str, reads, args):
"""
Collect reads that fit the given genome coordinates. If there are more than
40 reads covering the area, stops collecting as MSA runtime grows too much
with more sequences.
Following args need to be in the args Namespace object:
filter_incomplete (bool)
min_quality (int)
filter_between (pair of int or None)
Returns a SequenceCollection object, continaing the fitting reads and the
main reference sequence. Also returns the reference id (needed for generating the new bam file).
"""
ref_id = None
low_quality = 0
not_covering = 0
outside_filter = 0
seq_coll = SequenceCollection()
for read in reads.fetch(*region.vals()):
# TODO only do this when running MSA
if len(seq_coll.reads) >= 40: # use upper limit to cut MSA build time
break
if ref_id is None:
ref_id = read.reference_id
start_idx, end_idx, ref_idcs = get_read_region(read, region)
seq = read.seq[start_idx:end_idx]
if len(seq) == 0:
print(f"Empty sequence: {read.query_name}: start({read.pos}),\
indices: {start_idx}-{end_idx}, length({read.infer_read_length()})")
if args.filter_incomplete and (ref_idcs[0] > region.start or ref_idcs[1] < region.end):
# dont include "incomplete" reads (reads that start or end within the actual region)
# here we use the start and end of the region, not the padded values
not_covering += 1
continue
if read.mapping_quality < args.min_quality:
low_quality += 1
continue
if not args.filter_between is None:
for type_, length in read.cigartuples:
if type_ == 2 and args.filter_between[0] <= length <= args.filter_between[1]:
break
else:
outside_filter += 1
continue # if the loop hasnt been escaped early, there is no deletion of the desired size and we skip the read
id_name = read.query_name
ambiguous_name_idx = 0
while True:
new_name = f"{id_name}_v{ambiguous_name_idx}"
if new_name in seq_coll.get_names("reads"):
ambiguous_name_idx += 1
continue
else:
seq_coll.reads.append(NamedSequence(new_name, seq))
break
if not reference is None:
ref_seq = reference[region.chrom][region.start:region.end]
seq_coll.references.append(NamedSequence(ref_name, ref_seq))
print(f"Read stats: Low qual:{low_quality}, Not covering area:{not_covering},"
f" Outside 'filter_between':{outside_filter}, Reads used:{len(seq_coll.reads)} ")
return seq_coll, ref_id
def run_msa(sequences: SequenceCollection, args) -> Path:
"""
Creates a temporary file with all the given sequences. Then performs an MSA
with clustalw2 (must be in PATH).
Clustalw's output will be written to a file f"{temp.name}.aln".
Following args need to be in the args Namespace object:
reverse_alignment (bool)
gap_extension (float)
gap_open (float)
gap_dist (float)
Returns the Path to the temp file.
"""
with tempfile.NamedTemporaryFile("w") as temp:
tmp_name = temp.name
for named_seq in sequences.get_named_sequences():
temp.write(f"> {named_seq.name}\n")
if args.reverse_alignment:
temp.write(named_seq.sequence[::-1])
else:
temp.write(named_seq.sequence)
temp.write("\n")
temp.seek(0)
msa_cline = ClustalwCommandline(cmd="clustalw2", infile=temp.name)
msa_cline.gapext = args.gap_extension
msa_cline.gapopen = args.gap_open
msa_cline.gapdist = args.gap_dist
# msa_cline = ClustalOmegaCommandline(infile=tmp_name, outfile=tmp_name+".aln", auto=True, threads=8, outfmt="clustal")
print(f"{msa_cline}... ", end=" ", flush=True)
start_time = time.time()
msa_cline()
print(f"finished in {(time.time()-start_time):.3f}s!")
return Path(tmp_name)
def rebuild_bam(aln_file: Path, out_file, ref_id, region_start, reverse_alignment, ref_name):
"""
Generates a new BAM file containing sequences aligned to a reference.
Returns the alignement string of the reference sequence and the consensus
string of the reads.
"""
alignment = Bio.AlignIO.read(str(aln_file), format="clustal")
for al in alignment:
if al.id == ref_name:
ref_al = al.seq
break
else:
print("Reference not found in alignment")
cons_seqs = []
for al in alignment:
if al.id == ref_name:
continue
cons_seqs.append(al.seq)
read = pysam.AlignedSegment()
read.query_name = al.id
read.reference_id = ref_id
read.query_sequence = str(al.seq).replace("-", "")
read.reference_start = region_start
if reverse_alignment:
read.cigar = make_cigar_tuples(al.seq[::-1], ref_al[::-1])
else:
read.cigar = make_cigar_tuples(al.seq, ref_al)
cig_len = read.infer_read_length()
seq_len = len(read.query_sequence)
if cig_len != seq_len:
print(f"Mismatch in cigar length {cig_len} and seq length {seq_len}!!!")
print(f"{read.cigar}, \n{al.seq}, \n{ref_al}")
print(read.query_name)
out_file.write(read)
consensus = get_consensus(cons_seqs)
return ref_al, consensus
def pair(arg):
"""
Parse function for pair arguments.
"""
return [int(x) for x in arg.split(',')]
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("read_file", type=str)
parser.add_argument("regions_file", type=str)
parser.add_argument("--read_index", "-i", type=str, default=None)
parser.add_argument("--padding", "-p", type=int, default=300)
parser.add_argument("--reference", "-r", type=str, default=None)
parser.add_argument("--filter_incomplete", "-f", action='store_true')
parser.add_argument("--gap_extension", "-gx", type=float, default=6.6)
parser.add_argument("--gap_open", "-go", type=float, default=10.0)
parser.add_argument("--gap_dist", "-gd", type=int, default=4)
parser.add_argument("--reverse_alignment", "-ra", action='store_true')
parser.add_argument("--first_n", "-n", type=int, default=1)
parser.add_argument("--filter_between", "-fb", type=pair, default=None)
parser.add_argument("--min_quality", "-m", type=int, default=0)
args = parser.parse_args()
curr_time = time.strftime("%Y_%m_%d-%H_%M_%S")
settings_str = f"go_{args.gap_open}_ge_{args.gap_extension}_gd_{args.gap_dist}"
out_file_all = Path(f"alignments/new_aligned_{curr_time}_{settings_str}.bam")
try:
run_multiple_msas(out_file_all, args)
except KeyboardInterrupt:
print("Program exited prematurely")
finally:
# sort BAM file
finish_bam(out_file_all)