Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
from pathlib import Path
import subprocess
import sys
from Bio import Align
from matplotlib import pyplot as plt
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.decomposition import PCA
from sequence_collection import NamedSequence, SequenceCollection
def alternative_dist_matrix(seq_coll: SequenceCollection, path: Path):
this_file_directory = Path(__file__).resolve().parent
# create fasta file
fasta_path = path / "reads_and_haplotypes.fasta"
out_file = open(fasta_path, "w")
for named_sequence in seq_coll.get_named_sequences():
out_file.write(f"> {named_sequence.name}\n")
out_file.write(f"{named_sequence.sequence}\n")
out_file.close()
# if cpp program doesnt exist yet, compile it
needle_exe = this_file_directory / "needle.o"
needle_src = this_file_directory / "needleman_wunsch_omp.cpp"
if not needle_exe.exists():
print("Creating needle.o ...")
compile_cmd = ["g++", "--std=c++17", "-O3", "-fopenmp",
needle_src, "-o", needle_exe]
subprocess.run(compile_cmd)
# execute cpp programm
matrix_file_path = path / "distance_matrix.txt"
# run pariwise alignments with 8 threads
calc_cmd = [needle_exe, fasta_path, "2", "-1", "-1", matrix_file_path, "64"]
subprocess.run(calc_cmd)
# read distance matrix
matrix_file = open(matrix_file_path, "r")
dist_mat = []
for line in matrix_file:
dist_mat.append([float(x) for x in line[:-2].split("\t")])
return np.array(dist_mat)
def prepare_dist_matrix(seqs):
n = len(seqs)
dist_mat = np.zeros((n, n))
aligner = Align.PairwiseAligner(mode="global", match_score=2, open_gap_score=-0.5,
extend_gap_score=-0.1, mismatch_score=-0.1)
for i in range(n):
for j in range(i, n):
alignment = aligner.align(seqs[i], seqs[j])[0]
al_len = len(format(alignment).split("\n")[0])
result = alignment.score / al_len
dist_mat[j, i] = dist_mat[i, j] = result
# print(dist_mat.tolist())
return dist_mat
def pca_on_read_distance(seq_coll: SequenceCollection, path: Path, exclude_last_n=None):
dist_mat = alternative_dist_matrix(seq_coll, path)
pca = PCA(n_components=2)
if exclude_last_n is None:
pca.fit(dist_mat)
res = pca.transform(dist_mat)
else:
# only use read distances to perform pca
pca.fit(dist_mat[:-exclude_last_n, :-exclude_last_n])
res = pca.transform(dist_mat[:, :-exclude_last_n])
return res, pca, dist_mat
def plot_pca_result(data, labels=None, pca_object=None):
""" labels are going to be interpreted the following way:
the first n - (len(labels)-1) points are labeled with the first label
then, all the remaining dots get their own label """
x, y = zip(data.T)
x, y = x[0], y[0]
if labels is None or len(labels) == 0:
plt.scatter(x, y)
else:
n = len(x)
read_count = n - (len(labels)-1)
plt.scatter(x[:read_count], y[:read_count])
for i in range(read_count, n):
plt.scatter(x[i], y[i])
plt.legend(labels)
if pca_object is None:
pc_values = ["", ""]
else:
pc_values = list(map(lambda x: f"{x:.3f}", pca_object.explained_variance_ratio_[:2]))
plt.xlabel(f"PC1 {pc_values[0]}")
plt.ylabel(f"PC2 {pc_values[1]}")
def plot_with_clustering(data, cluster_labels, additional_labels=None, pca_object=None):
"""
Plots the reads with cluster information. The additional_labels argument
defines names of single points at the end of the data argument.
Thus, if n=additional_labels, the last n point in data get their own labels.
Returns the legend object, which is needed for correct figure saving
"""
x, y = zip(data.T)
x, y = x[0], y[0]
n = len(cluster_labels)
names = []
cluster_colors = plt.cm.Set3(np.linspace(0, 1, int(cluster_labels.max())+1))
for i, c in enumerate(cluster_colors):
idcs = cluster_labels == i
plt.scatter(x[idcs], y[idcs], color=c, marker="o")
names.append(f"Read Cluster {i}")
if -1. in cluster_labels:
if additional_labels is None:
idcs = cluster_labels == -1.
else:
is_read = np.arange(n) < (n - len(additional_labels))
idcs = np.logical_and(cluster_labels == -1., is_read)
plt.scatter(x[idcs], y[idcs], color="black", marker=".")
names.append("Read outliers")
if not additional_labels is None:
n_add = len(additional_labels)
for i, _ in enumerate(additional_labels):
idx = -n_add+i
plt.scatter(x[idx], y[idx], marker="x")
names += additional_labels
if pca_object is None:
pc_values = ["", ""]
else:
pc_values = list(map(lambda x: f"{x:.3f}", pca_object.explained_variance_ratio_[:2]))
plt.xlabel(f"PC1 {pc_values[0]}")
plt.ylabel(f"PC2 {pc_values[1]}")
return plt.legend(names, bbox_to_anchor=(1,1), loc='upper left')
def cluster_reads(data, eps=0.15, min_samples=5, normalize_scales=True, exclude_last_n=0):
data_norm = data[:-exclude_last_n].copy()
if normalize_scales:
for axis in range(data.shape[1]):
data_norm[:, axis] /= (data_norm[:, axis].max() - data_norm[:, axis].min())
clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(data_norm)
# append as many -1. to the end as were excluded from the clustering
return np.hstack((clustering.labels_, [-1.]*exclude_last_n))
def get_cluster_center_point(points):
""" get only the points of one cluster
returns the index of the point closest to the cluster center """
if len(points) == 0:
return None
cluster_center = np.mean(points, axis=0)
distances = np.sqrt(np.power(cluster_center - points, 2).sum(axis=1))
return distances.argmin()
if __name__ == '__main__':
read_file = open(sys.argv[1], "r")
sequences = [line.rstrip() for line in read_file]
sequence_collection = SequenceCollection()
sequence_collection.reads.extend([NamedSequence(s, f"Sequence {i}") for (i, s) in enumerate(sequences)])
dots, _, _ = pca_on_read_distance(sequence_collection, Path("."))
plot_pca_result(dots)
plt.savefig(sys.argv[2])