Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
sv-conflict-analysis/reference_comparison.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
executable file
153 lines (127 sloc)
5.83 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import os | |
from pathlib import Path | |
import subprocess | |
from typing import List | |
from conflict_positions import Region | |
from general_functions import open_bam, open_ref | |
from liftover import lift_hg38_to_t2t_v1_1 | |
from msa_on_conflicts import get_read_region | |
from sequence_collection import NamedSequence | |
def get_positions_in_reference(read, start, end): | |
r_pos = read.get_reference_positions(full_length=True) | |
i = len(r_pos) - 1 | |
while i > 0: | |
if not r_pos[i] is None: | |
res_start = r_pos[i] | |
if i < start: | |
break | |
i -= 1 | |
i = 0 | |
while i < len(r_pos): | |
if not r_pos[i] is None: | |
res_end = r_pos[i] | |
if i > end: | |
break | |
i += 1 | |
return res_start, res_end | |
def ref_comp(args): | |
if args.read_index1 is None: | |
args.read_index1 = f"{args.read_file1}.bai" | |
if args.read_index2 is None: | |
args.read_index2 = f"{args.read_file2}.bai" | |
read_file1 = open_bam(args.read_file1, args.read_index1) | |
read_file2 = open_bam(args.read_file2, args.read_index2) | |
region = Region(args.region.replace(",", "")) | |
padding = args.padding | |
ref1 = open_ref(Path(args.ref1)) | |
ref2 = open_ref(Path(args.ref2)) | |
padded_region = region.with_padding(padding) | |
best_read = (None, 0) | |
for read in read_file1.fetch(*region.with_padding(1).vals()): | |
if not args.read_name is None and not read.query_name.startswith(args.read_name): | |
continue | |
area_start, area_end, ref_idcs = get_read_region(read, padded_region) | |
if ref_idcs[0] > padded_region.start or ref_idcs[1] < padded_region.end: | |
if (length:=ref_idcs[1] - ref_idcs[0]) > best_read[1]: | |
best_read = (read, length) | |
continue | |
best_read = (read, ref_idcs[1] - ref_idcs[0]) | |
break | |
else: | |
print("Read not found or doesnt cover area!") | |
print("Taking the best fitting read...") | |
read = best_read[0] | |
read_id = read.query_name | |
read_str = read.seq[area_start:area_end] | |
ref1_str = ref1[region.chrom][ref_idcs[0]:ref_idcs[1]] | |
ref1_region = Region(region.chrom, ref_idcs[0], ref_idcs[1]) | |
ref2_start, ref2_end, ref2_str = None, None, None | |
ref2_region = lift_hg38_to_t2t_v1_1(ref1_region) | |
if not ref2_region is None: | |
ref2_str = ref2_region.get_reference_seq(ref2) | |
# ref2_read_found = False | |
# if not chrom in ref2.keys(): | |
# print(f"{chrom} not found in ref2!") | |
# else: | |
# for r2_read in read_file2.fetch(chrom): | |
# if r2_read.query_name == read_id: | |
# print("Read found in read_file2!") | |
# ref2_start, ref2_end = get_positions_in_reference(r2_read, area_start, area_end) | |
# ref2_str = ref2[chrom][ref2_start:ref2_end] | |
# ref2_read_found = True | |
# break | |
# we need to change the directory as dotter has a weird bug with axis names | |
# if files are put in as paths | |
old_pwd = os.getcwd() | |
os.chdir(args.destination) | |
with open("/dev/null", "w") as null_fd: | |
out_ref1 = f"ref1_{ref1_region:_}.txt" | |
out_read = f"read_{read_id.replace('/', '_')}_{area_start}_{area_end}.txt" | |
dotter_data1 = f"ref1_vs_read.data" | |
dotter_pdf1 = f"x1.pdf" | |
print(f"{ref1_str}", file=open(out_ref1, "w")) | |
print(f"{read_str}", file=open(out_read, "w")) | |
subprocess.Popen(["dotter", "-b", dotter_data1, out_ref1, out_read, "-e", dotter_pdf1], stdout=null_fd, stderr=null_fd) | |
commands = [f"dotter -l {dotter_data1} {out_ref1} {out_read}"] | |
# if ref2_read_found: | |
if not ref2_region is None: | |
out_ref2 = f"ref2_{ref2_region:_}.txt" | |
dotter_data2 = f"ref1_vs_ref2.data" | |
dotter_data3 = f"ref2_vs_read.data" | |
dotter_pdf2 = f"x2.pdf" | |
dotter_pdf3 = f"x3.pdf" | |
print(f"{ref2_str}", file=open(out_ref2, "w")) | |
subprocess.Popen(["dotter", "-b", dotter_data2, out_ref1, out_ref2, "-e", dotter_pdf2], stdout=null_fd, stderr=null_fd) | |
subprocess.Popen(["dotter", "-b", dotter_data3, out_ref2, out_read, "-e", dotter_pdf3], stdout=null_fd, stderr=null_fd) | |
commands.append(f"dotter -l {dotter_data2} {out_ref1} {out_ref2}") | |
commands.append(f"dotter -l {dotter_data3} {out_ref2} {out_read}") | |
os.chdir(old_pwd) | |
return commands, (ref2_start, ref2_end), ref1_str, ref2_str | |
def ref_comp_simple(seqs: List[NamedSequence], out_dir: Path): | |
""" | |
Create a fasta file from sequences and names in out_dir and run dotter | |
on it comparing all sequences against each other. | |
""" | |
out_fasta = out_dir / f"comparison_candidates.fa" | |
with open(out_fasta, "w") as f: | |
for named_seq in seqs: | |
f.write(f">{named_seq.name}\n{named_seq.sequence}\n") | |
with open("/dev/null", "w") as null_fd: | |
subprocess.run(["dotter", out_fasta, out_fasta, "-e", out_dir / "x_multiple_dotplot.pdf"], stdout=null_fd, stderr=null_fd) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--region", type=str, default=None) | |
parser.add_argument("--padding", "-p", type=int, default=300) | |
parser.add_argument("--read_index1", "-i1", type=str, default=None) | |
parser.add_argument("--read_index2", "-i2", type=str, default=None) | |
parser.add_argument("--read_file1", "-f1", type=str, default=None) | |
parser.add_argument("--read_file2", "-f2", type=str, default=None) | |
parser.add_argument("--ref1", "-r1", type=str, default=None) | |
parser.add_argument("--ref2", "-r2", type=str, default=None) | |
parser.add_argument("--destination", "-d", type=str, default=None) | |
parser.add_argument("--read_name", "-rn", type=str, default=None) | |
args = parser.parse_args() | |
commands, _, _, _ = ref_comp(args) | |
print("Call these to view the dotplots:") | |
print("\n".join(commands)) |