Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
sv-conflict-analysis/general_functions.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
134 lines (107 sloc)
3.87 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from pathlib import Path | |
import matplotlib.pyplot as plt | |
from pyfaidx import Fasta | |
import pysam | |
import vcf | |
from conflict_positions import Region | |
def sv_call_to_sequence(ref_seq, ref_start, sv_start, sv_ref, sv_alt): | |
""" | |
Takes an SV call and integrates it into the given reference sequence. | |
Reference and SV offsets are needed, aswell as the reference and alternative. | |
Returns a string with the modified reference sequence. | |
Example: "CAGGAGCGATCTAGGCGGGC", 42, 48, "C", "CTTT" | |
--> "CAGGAGCTTTATCTAGGCGGGC" | |
""" | |
cut_start = sv_start - ref_start | |
# print(f"{cut_start=}") | |
# print(f"{alt=}") | |
# print(f"{ref=}") | |
if sv_ref != (cut_away:=ref_seq[cut_start:cut_start+len(sv_ref)]): | |
if len(sv_ref) > len(cut_away): | |
print(f"Alternative haplotype spans out of padding! {len(sv_ref)=} > {len(cut_away)=}") | |
else: | |
diff_count = sum([x != y for x, y in zip(sv_ref, cut_away)]) | |
print(f"SV ref and reference do not match: {len(sv_ref)=}, {len(cut_away)=}, diff: {diff_count}") | |
return None | |
return ref_seq[:cut_start] + str(sv_alt) + ref_seq[cut_start+len(sv_ref):] | |
def load_sv_conflict(conflict): | |
""" | |
Parses a list (str) of SV ids which is in the shape '["DEL1", "INS2", ...]' | |
into a python list of strings | |
""" | |
return json.loads(conflict.replace('\'', '\"')) | |
def get_sv_type(sv): | |
""" | |
Returns the type of an SV as string | |
E.g. sv = 'svim_DEL_-1150_1461269_1462419_chr8_svim.DEL.22091' --> 'DEL' | |
""" | |
return sv.split("_")[1] | |
def get_sv_info(sv): | |
""" | |
Grab some SV information for printing. | |
""" | |
return f"{sv.ID} {sv.QUAL=} {sv.INFO=} {sv.samples[0].gt_nums}" | |
def get_sv_by_id(vcf_reader, sv_id, region: Region): | |
""" | |
searches for the entry in reader matching sv_id | |
""" | |
chrom, start, end = region.vals() | |
for sv in vcf_reader.fetch(chrom, start-1, end): | |
if sv.ID == sv_id: | |
return sv | |
return None | |
def find_all_svs(vcf_reader: vcf.Reader, region: Region): | |
""" | |
searches for SVs that are in the region, as the conflict might leave out some SVs | |
""" | |
try: | |
res = list(vcf_reader.fetch(*region.vals())) | |
except ValueError: | |
print(f"Error in VCF file {vcf_reader.filename} while fetching position {region}") | |
return [] | |
return res | |
# BAM and Reference specific functions | |
def open_bam(fname: Path, idx_name=None): | |
""" | |
Returns an opened SAM or BAM file, if idx_name is given, use it in the pysam-call. | |
""" | |
read_mode = "rb" if fname.suffix == ".bam" else "r" | |
if idx_name is None: | |
file = pysam.AlignmentFile(fname, read_mode) | |
else: | |
file = pysam.AlignmentFile(fname, read_mode, index_filename=idx_name) | |
return file | |
def open_ref(file: Path) -> Fasta: | |
""" | |
Returns an opened Fasta file. | |
Chromosome names are used as keys. | |
""" | |
return Fasta(str(file), as_raw=True, key_function=lambda x: x.split()[0]) | |
def finish_bam(bam_file: Path) -> Path: | |
""" | |
Sort a BAM file and index it. | |
Returns the path to the sorted BAM. | |
""" | |
sorted_file = Path(str(bam_file).removesuffix(".bam") + "_sorted.bam") | |
pysam.sort(str(bam_file), "-o", str(sorted_file)) | |
pysam.index(str(sorted_file)) | |
return sorted_file.absolute() | |
def sorted_barplot(value_dict, reverse=True): | |
""" | |
Creates a horizontal barplot with the bars sorted by quantity. | |
""" | |
sorted_dict = sorted(value_dict.items(), | |
key=lambda x: x[1], reverse=reverse) | |
sorted_dict = {x[0]: x[1] for x in sorted_dict} | |
plt.barh(list(sorted_dict.keys()), list(sorted_dict.values())) | |
return plt | |
def try_remove(file: Path): | |
""" | |
Try to remove a file. If it doenst exist, return False. | |
""" | |
try: | |
file.unlink() | |
except: | |
return False | |
return True |