Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
sv-conflict-analysis/analyse.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
executable file
409 lines (347 sloc)
17.1 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" TODO | |
CMD line options | |
IGV session paths | |
""" | |
""" Some rules for references: | |
There is one main reference (eg hg38), and n side references | |
IGV session paths need to be in the right order. If there is no session a ref, | |
this can be marked with a '_' in session path list | |
""" | |
import argparse | |
from pathlib import Path | |
import subprocess | |
import time | |
from typing import List | |
from pyliftover import LiftOver | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import xvfbwrapper | |
from alternative_haplotypes import ( | |
create_alternative_haplotypes, | |
expand_haplotype_to_fasta, | |
get_closest_alternative, | |
) | |
from conflict_positions import ConfMode, get_conflict_position, Region | |
from general_functions import ( | |
find_all_svs, | |
finish_bam, | |
load_sv_conflict, | |
open_bam, | |
open_ref, | |
) | |
from liftover import lift | |
from msa_on_conflicts import create_out_bam, gather_sequences, msa_workflow | |
from pdf_create import merge_pdfs, png_2_pdf | |
from read_cluster import ( | |
cluster_reads, | |
get_cluster_center_point, | |
pca_on_read_distance, | |
plot_with_clustering, | |
) | |
from reference_comparison import ref_comp_simple | |
from sequence_collection import NamedSequence | |
from vcfs_comp import make_reader as make_vcf_reader | |
NULL_PATH = "_" | |
def generate_igv_batch(conflict_dir: Path, regions: List[Region], msa_path: Path, args): | |
""" | |
Creates an IGV batch for each reference, including the BAM files. | |
For the main reference: mapped MSA result | |
""" | |
def load_if_exists(path: Path, batch_file, cmd="load"): | |
if path.name != NULL_PATH: | |
batch_file.write(f"{cmd} {path.absolute()}\n") | |
batch_names = [] | |
for i, ref_name in enumerate(args.reference_names): | |
if regions[i] is None: | |
print(f"Skipping igv batch for {ref_name} due to missing region.") | |
continue | |
batch_file_name = conflict_dir / f"igv_batch_{ref_name}.txt" | |
screenshot_file_name = conflict_dir / f"igv_screenshot_{ref_name}.png" | |
batch_names.append(batch_file_name) | |
with open(batch_file_name, 'w') as f: | |
f.write("new\n") | |
load_if_exists(args.references[i], f, cmd='genome') | |
load_if_exists(args.pacbio_reads[i], f) | |
load_if_exists(args.illumina_reads[i], f) | |
if i == 0: | |
load_if_exists(msa_path, f) | |
f.write(f"goto {regions[i]}\n") | |
f.write(f"snapshot {screenshot_file_name.absolute()}\n") | |
f.write("exit\n") | |
return batch_names | |
def check_and_clean_processes(process_and_times, timeout=10): | |
""" | |
Check if a subprocess is alive longer than the given timeout. | |
If that is the case, the process is terminated with a message. | |
""" | |
now = time.time() | |
for p, t in process_and_times: | |
if p.poll() is None and now - t > timeout: | |
print(f"Terminating process due to timeout ({p})") | |
p.terminate() | |
def path_list(s): | |
""" Parse a string containg a comma-separated list of paths to a list of paths. """ | |
if s[0] == '[': | |
s = s[1:-1] | |
return [Path(p) for p in s.split(",")] | |
def parse_args(): | |
""" Parse arguments from command line for the analyse.py script. """ | |
parser = argparse.ArgumentParser() | |
parser.add_argument("conflicts", type=str, | |
help="Conflict file (json), region file (csv:CHROM,START,END) or region string ('chr1:12-13;...;chr19:31-41')") | |
parser.add_argument("result_dir", type=Path, | |
help="Directory for subfolders for each region.") | |
parser.add_argument("--references", "-r", nargs="+", type=Path, | |
help="Paths to all references fasta files to be used.") | |
parser.add_argument("--reference_names", "-rn", nargs="+", type=str, | |
help="Display names of all given references.") | |
parser.add_argument("--pacbio_reads", "-pb", nargs="+", type=Path, | |
help="Paths to PacBio read data. List should be in references' order. Use '_' to skip a reference.") | |
parser.add_argument("--illumina_reads", "-ill", nargs="+", type=Path, | |
help="Paths to Illumina read data. List should be in references' order. Use '_' to skip a reference.") | |
parser.add_argument("--liftover_paths", "-lop", nargs="+", type=path_list, | |
help="Paths to Illumina read data. List should be in references' order. Use '_' to skip a reference.") | |
parser.add_argument("--additional_haplotypes", "-ah", type=Path, default=None, | |
help="Path to vcf file with extra haplotypes (like HGSVC). Must be created with the main reference.") | |
parser.add_argument("--skip", type=int, default=0, | |
help="Use this to skip the first N conflicts in the conflict file/string.") | |
parser.add_argument("--touch_only", action="store_true", | |
help="Use this to just 'touch' the result directories so that they are in the correct order.") | |
parser.add_argument("--msa", action="store_true", | |
help="Perform MSA for each region. Increases the running time drastically.") | |
args = parser.parse_args() | |
# check conflict input type and perform necessary actions | |
if args.conflicts.endswith(".csv"): | |
args.conflict_mode = ConfMode.CSV | |
elif args.conflicts.endswith(".txt"): | |
with open(args.conflicts) as c_f: | |
start_char = c_f.read()[0] | |
if start_char == "[": | |
args.conflict_mode = ConfMode.JSON | |
else: | |
args.conflict_mode = ConfMode.STRING | |
if args.conflict_mode != ConfMode.STRING: | |
args.conflicts = Path(args.conflicts) | |
return args | |
def check_paths(args: argparse.Namespace) -> bool: | |
""" | |
For all paths given in the args, check if they exist. | |
Exception is the result_dir which will be created on the go. | |
""" | |
if args.conflict_mode != ConfMode.STRING: | |
if not args.conflicts.exists(): | |
return False | |
d = vars(args) # access the Namespace object as dict | |
path_args = ['additional_haplotypes', 'vcf_dir', 'references', 'pacbio_reads', 'illumina_reads'] | |
def check_existance(p_): | |
if p_ is None or p_.name == NULL_PATH: | |
return True | |
if not p_.exists(): | |
print(f"{str(p_)} does not exist!") | |
return False | |
return True | |
for p in path_args: | |
if not p in d: | |
continue | |
entry = d[p] | |
if isinstance(entry, list): | |
for sub_entry in entry: | |
if not check_existance(sub_entry): | |
print(sub_entry) | |
return False | |
elif isinstance(entry, Path): | |
if not check_existance(entry): | |
print(entry) | |
return False | |
for liftover_path in args.liftover_paths: | |
for path in liftover_path: | |
if not check_existance(path): | |
print(path) | |
return False | |
return True | |
if __name__ == '__main__': | |
args = parse_args() | |
if not check_paths(args): | |
print("Exiting due to invalid path") | |
exit(1) | |
args.result_dir.mkdir(parents=True, exist_ok=True) | |
args.result_dir = args.result_dir.resolve() | |
xvfb = xvfbwrapper.Xvfb() | |
xvfb.start() | |
igv_processes = [] | |
finished_conflicts = [] | |
# Prepare the conflict information | |
if args.conflict_mode == ConfMode.STRING: | |
conflicts = args.conflicts.split(";") | |
else: | |
conflicts = open(args.conflicts) | |
# load references | |
main_reference = open_ref(args.references[0]) | |
main_ref_name = args.reference_names[0] | |
alt_references = [open_ref(ref_path) for ref_path in args.references[1:]] | |
alt_ref_names = args.reference_names[1:] | |
try: | |
# read in vcf files before loop as the reading has a big running time | |
if args.additional_haplotypes: | |
haplotype_vcf = make_vcf_reader(args.additional_haplotypes) | |
# Create liftover paths in advance | |
liftover_chain_files = {} | |
for chain_file_route in args.liftover_paths: | |
for chain_file in chain_file_route: | |
if not chain_file in liftover_chain_files: | |
liftover_chain_files[chain_file] = LiftOver(str(chain_file)) | |
# main loop | |
cnt = 0 | |
for conflict_idx, conflict in enumerate(conflicts): | |
if conflict_idx < args.skip: | |
continue | |
# determine conflict region | |
if args.conflict_mode == ConfMode.JSON: | |
svs = load_sv_conflict(conflict) | |
region_main_ref = get_conflict_position(svs) | |
else: | |
region_main_ref = get_conflict_position(conflict, args.conflict_mode) | |
# create result directory for conflict | |
conflict_dir = args.result_dir / f"{region_main_ref:_}" | |
conflict_dir.mkdir(exist_ok=True) | |
finished_conflicts.append(conflict_dir) | |
if args.touch_only: | |
subprocess.run(["touch", conflict_dir]) | |
continue | |
print(f"Analyzing region no. {conflict_idx+1} ({region_main_ref})...") | |
### MSA | |
msa_args = argparse.Namespace(padding = 1000, gap_open = 50.0, | |
gap_extension = 0.1, filter_incomplete = True, gap_dist = 10.0, | |
filter_between = None, reverse_alignment = False, min_quality=20) | |
read_file = open_bam(args.pacbio_reads[0]) | |
padded_region = region_main_ref.with_padding(1000) | |
msa_output_file = Path(NULL_PATH) | |
if args.msa: | |
aln_file = conflict_dir / f"MSA_mapped.bam" | |
out_bam = create_out_bam(aln_file, read_file) | |
# the list of sequences contains all reads used for the MSA and the main reference | |
# sequence. The main reference sequence is at the last position in the list | |
aln_success, ref_al, consensus, seq_coll = msa_workflow(region_main_ref, | |
read_file, out_bam, main_reference, main_ref_name, msa_args) | |
out_bam.close() | |
if aln_success: | |
msa_output_file = finish_bam(aln_file) | |
else: | |
seq_coll, _ = gather_sequences(padded_region, main_reference, | |
main_ref_name, read_file, msa_args) | |
n_reads = seq_coll.n_reads() | |
### LIFTOVER for all references | |
alt_regions = [] | |
for ref_idx, alt_ref in enumerate(alt_ref_names): | |
alt_ref_region = region_main_ref.copy() | |
for chain_file_name in args.liftover_paths[ref_idx]: | |
alt_ref_region = lift(liftover_chain_files[chain_file_name], alt_ref_region) | |
if alt_ref_region is None: | |
break | |
alt_regions.append(alt_ref_region) | |
alt_ref_str = None | |
if not alt_ref_region is None: | |
if len(alt_ref_region) > 10e5: | |
print("Alternative reference region is absurdely large. Liftover probably failed.") | |
else: | |
alt_ref_str = alt_ref_region.get_reference_seq(alt_references[ref_idx]) | |
seq_coll.references.append(NamedSequence(alt_ref_names[ref_idx], alt_ref_str)) | |
else: | |
print(f"Region not found in {alt_ref_names[ref_idx]} reference!") | |
### CREATE ALTERNATIVE HAPLOTYPES | |
if args.additional_haplotypes: | |
extra_svs = find_all_svs(haplotype_vcf, region_main_ref) | |
if extra_svs: | |
hap_coll = create_alternative_haplotypes(extra_svs, | |
seq_coll.references[0].sequence, region_main_ref.start-msa_args.padding) | |
seq_coll += hap_coll | |
else: | |
print("No additional SVs found in surrounding area!") | |
sample_repr = [s.repr_sample for s in seq_coll.get_named_sequences("refs,haps")] | |
### CREATE CLUSTERING OF READS AND SAVE PLOT | |
perform_clustering = len(seq_coll.reads) > 2 | |
if perform_clustering: | |
pca_dots, pca_object, pw_distances = pca_on_read_distance(seq_coll, conflict_dir) | |
cluster_labels = cluster_reads(pca_dots, eps=0.3, min_samples=min(n_reads//3+1, 5), | |
exclude_last_n=seq_coll.n_refs()+seq_coll.n_haps()) | |
if args.additional_haplotypes: | |
# closest_alternative = get_closest_alternative(pca_dots, cluster_labels, sample_repr, data_mode="points") | |
closest_alternative, top_n_idcs = get_closest_alternative(pw_distances, cluster_labels, sample_repr, data_mode="distances", n_closest=10) | |
expanded_haploytpe_fasta = open(conflict_dir / f"closest_haplotype_{closest_alternative}.fa", "w") | |
if closest_alternative in args.reference_names: | |
print(f"The closest haplotype to either cluster is {closest_alternative}.") | |
print("Thus, there will be no reconstructed fasta file for the closest one (avoiding data redundancy).") | |
expanded_haploytpe_fasta.write("NULL\n") | |
else: | |
expand_haplotype_to_fasta(closest_alternative, args.references[0], | |
args.additional_haplotypes, region_main_ref, expanded_haploytpe_fasta, padding=1E6) | |
# sort out far away haplotypes (except references) | |
plot_idcs = np.union1d(np.arange(len(seq_coll)-seq_coll.n_haps()), top_n_idcs) | |
label_idcs = np.union1d(np.arange(seq_coll.n_refs()), top_n_idcs-seq_coll.n_reads()) | |
labels = np.array(seq_coll.get_labels())[label_idcs].tolist() | |
legend = plot_with_clustering(pca_dots[plot_idcs], cluster_labels[plot_idcs], labels, pca_object) | |
else: | |
legend = plot_with_clustering(pca_dots, cluster_labels, seq_coll.get_labels(), pca_object) | |
plt.savefig(conflict_dir / "read_cluster.png", bbox_extra_artists=(legend,), bbox_inches="tight") | |
plt.close() | |
### REFERENCE COMPARISON | |
comparison_seqs = seq_coll.references | |
if perform_clustering: | |
cluster_labels = cluster_labels[:n_reads] | |
pca_dots = pca_dots[:n_reads] | |
for c in range(min(int(cluster_labels.max())+1, 2)): | |
idcs = cluster_labels == c | |
center_idx = get_cluster_center_point(pca_dots[idcs]) | |
center_idx_of_all = np.arange(seq_coll.n_reads())[idcs][center_idx] | |
new_seq = seq_coll.reads[center_idx_of_all] | |
new_seq.name = f"Cluster_{c} ({new_seq.name})" | |
comparison_seqs.append(new_seq) | |
ref_comp_simple(comparison_seqs, conflict_dir) | |
### CREATE A SAMPLOT | |
samplot_reads, samplot_names = [], [] | |
for i, ref in enumerate(args.reference_names): | |
if args.illumina_reads[i].name != NULL_PATH: | |
samplot_names.append(f"{ref} Illumina") | |
samplot_reads.append(args.illumina_reads[i]) | |
if args.pacbio_reads[i].name != NULL_PATH: | |
samplot_names.append(f"{ref} PacBio") | |
samplot_reads.append(args.pacbio_reads[i]) | |
_, pad_start, pad_end = region_main_ref.with_padding(500).vals() | |
samplot_command = ["samplot", "plot", "-n", *samplot_names, | |
"-o", conflict_dir / "samplot.png", "-b", *samplot_reads, | |
"-c", region_main_ref.chrom, "-s", str(pad_start), | |
"-e", str(pad_end)] | |
subprocess.Popen(samplot_command) | |
### PREPARE IGV BATCHES | |
igv_batches = generate_igv_batch(conflict_dir, [region_main_ref] + alt_regions, msa_output_file, args) | |
### RUN IGV COMMANDS | |
for batch_idx, batch_file in enumerate(igv_batches): | |
igv_log = open(conflict_dir / f"igv_log{batch_idx}.txt", "a") | |
cmd = ["igv", f"--batch={batch_file}"] | |
proc = (subprocess.Popen(cmd, stdout=igv_log, stderr=igv_log), time.time()) | |
igv_processes.append(proc) | |
# garbage collect other IGV processes that might be zombies already | |
# check_and_clean_processes(igv_processes, 300) | |
print("") | |
except KeyboardInterrupt: | |
print("Program stopped prematurely.") | |
print(f"{len(igv_processes)=}") | |
finally: | |
print("Waiting for IGV tasks and finishing PDFs.") | |
# as long as igv background processes are running, dont stop the xvfb instance | |
wait_secs = 1 | |
print(f"{len([p for (p, _) in igv_processes if p.poll() is None])} IGV processes still running...") | |
while any([p.poll() is None for (p, _) in igv_processes]): | |
print(f"Waiting {wait_secs} seconds for all igv processes to stop\r", end="") | |
check_and_clean_processes(igv_processes, 300) | |
time.sleep(1) | |
wait_secs += 1 | |
print("") | |
xvfb.stop() | |
time.sleep(20) | |
# merge all result PDFs in their locations | |
for conflict_dir in finished_conflicts: | |
png_2_pdf(conflict_dir) | |
merge_pdfs(conflict_dir) | |
if args.conflict_mode != ConfMode.STRING: | |
conflicts.close() |