Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
from collections import defaultdict
from pathlib import Path
import subprocess
from typing import List, TextIO
import numpy as np
from vcf import model as vcf_model
from conflict_positions import Region
from general_functions import sv_call_to_sequence
from sequence_collection import NamedSequence, SequenceCollection
def create_alternative_haplotypes(sv_calls: List[vcf_model._Record], ref_seq, ref_start):
"""
Collects alternative haplotypes from a list of sv_calls.
'sv_calls' is a list of fetched SVs from a VCF file.
For each sample in the list, a genotype is constructed.
Then, unique combinations are collected. For each combination of SVs,
the corresponding DNA string is reconstructed using the SVs' information.
A SequenceCollection object including all these haplotypes and representatives
for each haplotype is returned.
"""
hap_coll = SequenceCollection()
genotypes_dict = defaultdict(lambda: list())
combinations_dict = defaultdict(lambda: set()) # use set to automatically handle duplicate samples
for esv in sv_calls:
for sample in esv.samples:
gt = sample.data.GT.replace(".", "0")
genotypes_dict[sample.sample].append(gt)
for sample, gts in genotypes_dict.items():
def get_hap_gt(index):
return "".join([x[index] for x in gts])
hap_gt0, hap_gt1 = get_hap_gt(0), get_hap_gt(2)
combinations_dict[hap_gt0].add(sample)
combinations_dict[hap_gt1].add(sample)
skip_combinations = False
# if len(combinations_dict) > 10:
# print("too many haplotypes, taking only ones with one SV")
# skip_combinations = True
for haplotype, samples in combinations_dict.items():
samples = list(samples)
gts = list(map(int, list(haplotype))) # "01001" --> [0,1,0,0,1]
var_count = sum(map(lambda x: x != 0, gts))
if skip_combinations and var_count > 1 or not var_count:
continue
# make a copy of the reference sequence
new_sequence = ref_seq
# we iterate from the back to avoid that the start indices have to be recalculated
for i in reversed(range(len(gts))):
gt = gts[i]
if not gt:
continue
sv = sv_calls[i]
new_sequence = sv_call_to_sequence(new_sequence, ref_start, sv.start, sv.REF, sv.ALT[gt-1])
if new_sequence is None:
break
else: # only append if all sv calls could be used correctly
if var_count == 1:
sv_idx = int(np.where(np.array(gts)!=0)[0])
gt = gts[sv_idx]
label = f"{sv_calls[sv_idx].ID}:ALT{gt}"
elif len(samples) == 1:
label = f"{var_count}-SV-Combination ({samples[0]})"
else:
label = f"{var_count}-SV-Combination ({samples[0]} and {len(samples)-1} others)"
hap_coll.haplotypes.append(NamedSequence(label, new_sequence, samples[0]))
return hap_coll
def get_closest_alternative(data, cluster_labels, representatives, data_mode="points", n_closest=1):
"""
Returns the 'n_closest' haplotypes to either cluster center. The data either
includes points (2D) from the PCA, or pairwise distances of all sequences.
'representatives' is a list of names of the alternative haplotypes and references.
It is essential for determining the position of reads within in the data.
The data matrix should be in the shape NxN for pw distances, where data[:n_reads, :n_reads]
are the reads' pw distances, or of shape Nx2, where data[:n_reads] contains the reads'
points in the PCA.
"""
def dist(a, b):
return np.linalg.norm(a-b, 2)
n = len(data) - len(representatives)
if data_mode == "points":
min_dists = []
c_centers = []
for i in range(int(max(cluster_labels)+1)):
idcs = cluster_labels == i
c_centers.append(np.mean(data[idcs]))
for i, _ in enumerate(representatives):
repr_dists = []
for c in c_centers:
repr_dists.append(dist(c, data[n+i]))
# print(f"{representatives[len(min_dists)]}: {repr} => {repr_dists=}")
min_dists.append(min(repr_dists))
print([f"{repr}: {value:.3f})" for (repr, value) in zip(representatives, min_dists)])
closest_repr_idx = np.argmin(min_dists)
else:
max_scores = []
cluster_idcs = [cluster_labels == c for c in range(int(max(cluster_labels)+1))]
if not cluster_idcs:
print("No clusters available. Measuring mean distance to all reads.")
cluster_idcs = [np.arange(len(data))]
for i, _ in enumerate(representatives):
repr_dists = []
for idcs in cluster_idcs:
repr_dists.append(np.mean(data[n+i, idcs]))
max_scores.append(max(repr_dists)) # append max, as this is alignment scores...
print([f"{repr}: {value:.3f}" for (repr, value) in zip(representatives, max_scores)])
closest_repr_idx = np.argmax(max_scores)
if n_closest > 1:
n_closest = min(n_closest, len(representatives))
nth_score = list(reversed(sorted(max_scores)))[n_closest-1]
idcs = np.where(np.array(max_scores) >= nth_score)[0]
return representatives[closest_repr_idx], idcs+n
return representatives[closest_repr_idx]
def expand_haplotype_to_fasta(sample, ref_file: Path, vcf_file: Path, region: Region, out_file: TextIO, padding=1E6):
"""
Saves the a bigger surrounding of the specified haplotype to a fasta file.
The tool bcftools consensus is used for fast integration of SVs into the
reference sequence.
"""
region = region.with_padding(padding)
print(f"Saving region={region} of {sample=} to {out_file.name}")
bcftools_in = subprocess.Popen(["bcftools", "consensus", "-s", sample, vcf_file],
stdin=subprocess.PIPE, stdout=out_file).stdin
subprocess.Popen(["samtools", "faidx", f"{ref_file}", f"{region}"], stdout=bcftools_in)