Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
import sys
import os
import editdistance
# TODO 1- Separate the nested classes, no need for them, which means I need to edit find_bubbles and probably many other
# parts of the code
def reverse_complement(dna):
complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'}
return ''.join([complement[base] for base in dna[::-1]])
class Node:
def __init__(self, identifier):
self.id = identifier # size is between 28 and 32 bytes
self.seq = ""
self.seq_len = 0 # mostly 28 bytes
self.start = [] # 96 bytes for 4 neighbors
self.end = [] # 96 bytes
self.visited = False # 28 bytes (used for bubble and superbubble detection)
self.which_chain = 0
self.which_sb = 0
self.which_b = 0
self.which_allele = -1 # only for branches in a bubble (0 or 1)
# Function to calculate the size of node recursively
# using sys.getsizeof() function doesn't calculate object's size recursively
def __sizeof__(self):
size = self.id.__sizeof__() + self.seq_len.__sizeof__() + self.visited.__sizeof__()
if len(self.start) == 0:
size += self.start.__sizeof__()
else:
for i in range(len(self.start)):
size += sys.getsizeof(self.start[i])
if len(self.end) == 0:
size += self.end.__sizeof__()
else:
for i in range(len(self.end)):
size += sys.getsizeof(self.end[i])
return size
class Graph:
def __init__(self):
self.nodes = {}
self.bubble_chains = {}
self.superbubbles = {}
self.k = 0
def __len__(self):
return len(self.nodes)
def __str__(self):
return "The graph has %d Nodes and %d " \
"Bubble Chains and %d Superbubbles" % (len(self.nodes), len(self.bubble_chains), len(self.superbubbles))
def __repr__(self):
return "The graph has %d Nodes and %d " \
"Bubble Chains and %d Superbubbles" % (len(self.nodes), len(self.bubble_chains), len(self.superbubbles))
# Find longest chain (bubbles-wise)
def max_chain(self):
maximum = (0, 0)
# for idx, chain in enumerate(self.bubble_chains):
for key, chain in self.bubble_chains.items():
if chain.__len__() > maximum[1]:
maximum = (key, chain.__len__())
return maximum
# Find the longest chain (sequence wise)
def longest_chain(self, k):
maximum = (0, 0)
for key, chain in self.bubble_chains.items():
if chain.chain_length(k) > maximum[1]:
maximum = (key, chain.chain_length(k))
return maximum
# Returns how many nodes the bubble chains has
def nodes_in_chains(self):
total = 0
for chain in self.bubble_chains.values():
total += len(chain.list_chain())
return total
# Returns the total number of bubbles
def all_bubbles(self):
total = 0
for chain in self.bubble_chains.values():
total += chain.__len__()
return total
# Returns how much sequence the chains covered
def chains_coverage(self, k):
total = 0
for chain in self.bubble_chains.values():
total += chain.chain_length(k)
return total
# resets visited status back to False
def nodes_reset(self):
for n in self.nodes.values():
n.visited = False
# Plots the distribution of bubbles in chains
def plots(self, k, output_dir=None):
import matplotlib.pyplot as plt
if output_dir is None:
output_dir = "."
elif not os.path.isdir(output_dir):
os.mkdir(output_dir)
bubble_dist = []
seqlen_dist = []
for chain in self.bubble_chains.values():
bubble_dist.append(chain.__len__())
seqlen_dist.append(chain.chain_length(k))
contigs_dist = []
for node in self.nodes.values():
contigs_dist.append(node.seq_len)
total_number_of_bubbules = 0
for chain in self.bubble_chains.values():
total_number_of_bubbules += chain.__len__()
edit_dist = []
for chain in self.bubble_chains.values():
for bubble in chain.bubbles:
x = bubble.edit_distance()
if x is not None:
edit_dist.append(x)
hamming_bubbles = []
for chain in self.bubble_chains.values():
for bubble in chain.bubbles:
x = bubble.hamming_dist()
if x is not None:
hamming_bubbles.append(x)
bubble_types = []
for chain in self.bubble_chains.values():
for bubble in chain.bubbles:
bubble_types.append(int(bubble.bubble_type))
print("The total number of bubbles is {}".format(total_number_of_bubbules))
print("\n The number of bubbles that have the same branches length are {}".format(len(hamming_bubbles)))
################################################################
plt.hist(edit_dist, bins=len(set(edit_dist))*2)
plt.title("Edit Distance Bubble Distribution", fontsize=12)
plt.xlabel("Edit Distance", fontsize=9)
plt.ylabel("Frequency of bubbles", fontsize=9)
plt.yscale('log', nonposy='clip')
output_file = os.path.join(output_dir, "edit_dist_distribution.png")
plt.savefig(output_file, dpi=200)
plt.clf()
plt.hist(hamming_bubbles, bins=len(set(hamming_bubbles))*2)
plt.title("Hamming Distance Bubble Distribution", fontsize=12)
plt.xlabel("Hamming Distance", fontsize=9)
plt.ylabel("Frequency of bubbles", fontsize=9)
plt.yscale('log', nonposy='clip')
output_file = os.path.join(output_dir, "hamming_dist_distribution.png")
plt.savefig(output_file, dpi=200)
plt.clf()
plt.hist(contigs_dist)
plt.title("Contigs Distribution", fontsize=12)
plt.xlabel("Contig Length", fontsize=9)
plt.ylabel("Frequency of Contig", fontsize=9)
plt.yscale('log', nonposy='clip')
output_file = os.path.join(output_dir, "contigs_distribution.png")
plt.savefig(output_file, dpi=200)
plt.clf()
plt.hist(bubble_dist, bins=max(bubble_dist))
plt.title("Histogram of chain lengths (bubble-wise)", fontsize=12)
plt.xlabel("Number of bubbles", fontsize=9)
plt.ylabel("Frequency of chains", fontsize=9)
plt.yscale('log', nonposy='clip')
output_file = os.path.join(output_dir, "bubble_distribution.png")
plt.savefig(output_file, dpi=200)
plt.clf()
plt.hist(bubble_types, bins=max(bubble_types))
plt.title("Histogram of bubble types", fontsize=12)
plt.xlabel("Number of bubble type", fontsize=9)
plt.ylabel("Frequency of bubble type", fontsize=9)
plt.yscale('log', nonposy='clip')
output_file = os.path.join(output_dir, "bubble_type_distribution.png")
plt.savefig(output_file, dpi=200)
plt.clf()
plt.hist(seqlen_dist)
plt.title("Histogram of chain lengths (sequence-wise)", fontsize=12)
plt.xlabel("Chain sequence length", fontsize=9)
plt.ylabel("Frequency of chains", fontsize=9)
plt.yscale('log', nonposy='clip')
output_file = os.path.join(output_dir, "sequence_distribution.png")
plt.savefig(output_file, dpi=200)
plt.clf()
zipped = list(zip(seqlen_dist, bubble_dist))
zipped.sort(reverse=True)
seqlen_dist = [x[0] for x in zipped]
bubble_dist = [x[1] for x in zipped]
plt.subplot(211)
plt.plot(seqlen_dist)
plt.title("Chains sequence lengths", fontsize=12)
plt.subplot(212)
plt.plot(bubble_dist)
plt.xlabel("Chains bubble lengths", fontsize=12)
output_file = os.path.join(output_dir, "bubbles_and_seq.png")
plt.savefig(output_file, dpi=250)
plt.clf()
# reading the GFA file
def read_gfa(self, gfa_file_path):
"""
:param gfa_file_path: gfa graph file.
:return: Dictionary of node ids and Node objects.
"""
if not os.path.exists(gfa_file_path):
print("the gfa file path you gave does not exists, please try again!")
sys.exit()
edges = []
with open(gfa_file_path, "r") as lines:
for line in lines:
if line.startswith("S"):
line = line.split()
n_id = line[1]
n_len = len(line[2])
self.nodes[int(n_id)] = Node(int(n_id))
self.nodes[int(n_id)].seq = str(line[2])
self.nodes[int(n_id)].seq_len = n_len
elif line.startswith("L"):
edges.append(line)
for e in edges:
line = e.split()
k = int(line[1])
neighbor = int(line[3])
if line[2] == "-":
from_start = True
else:
from_start = False
if line[4] == "-":
to_end = True
else:
to_end = False
if (k not in self.nodes) or (neighbor not in self.nodes):
continue
if from_start is True and to_end is True: # from start to end L x - y -
if (neighbor, 1) not in self.nodes[k].start:
self.nodes[k].start.append((neighbor, 1))
if (k, 0) not in self.nodes[neighbor].end:
self.nodes[neighbor].end.append((k, 0))
elif from_start is True and to_end is False: # from start to start L x - y +
if (neighbor, 0) not in self.nodes[k].start:
self.nodes[k].start.append((neighbor, 0))
if (k, 0) not in self.nodes[neighbor].start:
self.nodes[neighbor].start.append((k, 0))
elif from_start is False and to_end is False: # from end to start L x + y +
if (neighbor, 0) not in self.nodes[k].end:
self.nodes[k].end.append((neighbor, 0))
if (k, 1) not in self.nodes[neighbor].start:
self.nodes[neighbor].start.append((k, 1))
elif from_start is False and to_end is True: # from end to end L x + y -
if (neighbor, 1) not in self.nodes[k].end:
self.nodes[k].end.append((neighbor, 1))
if (k, 1) not in self.nodes[neighbor].end:
self.nodes[neighbor].end.append((k, 1))
# returns the number os singular bubbles
def num_single_bubbles(self):
counter = 0
for chain in self.bubble_chains.values():
if (len(chain.bubbles["bubbles"]) + len(chain.bubbles["superbubbles"])) < 2:
counter += 1
return counter
class Superbubble:
# stores the source, sink and the nodes in between
def __init__(self, source, sink, inside):
self.source = source
self.sink = sink
self.inside = inside
def length(self, k):
length = 0
length += self.source.seq_len + self.sink.seq_len - 2*(k - 2)
for node in self.inside:
length += node.seq_len - k - 1
return length
def list_superbubble(self, no_ends=False):
if not no_ends:
list_of_nodes = [self.sink.id, self.source.id]
for node in self.inside:
list_of_nodes.append(node.id)
else:
list_of_nodes = []
for node in self.inside:
list_of_nodes.append(node.id)
return list_of_nodes
def sb_membership(self, key, graph):
sb_to_remove = set()
for n in self.list_superbubble(no_ends=True):
if graph.nodes[n].which_sb != 0:
sb_to_remove.add(graph.nodes[n].which_sb)
graph.nodes[n].which_sb = key
# todo can't figure out why this is happening
# when I separated the problementic regions and ran the algorithm on them it worked fine
# but when I run it on the whole graph it breaks
for sb in list(sb_to_remove):
try:
del graph.superbubbles[sb]
except KeyError:
# import functions
# import pdb
# pdb.set_trace()
continue
self.source.which_sb = key
self.sink.which_sb = key
# I need to add similar functions here to the ones of a normal bubble for now
# because a chain can have normal bubbles and and superbubbles now
@staticmethod
def equal_branches():
return False
@staticmethod
def hamming_dist():
return None
@staticmethod
def edit_distance():
return None
class BubbleChain:
# stores a list of bubbles
def __init__(self):
# list of bubble objects
self.bubbles = {"bubbles": [], "superbubbles": []}
self.sorted = []
# the end nodes of the chains, two ends
self.ends = []
# how many bubbles in the chain
def __len__(self):
return len(self.bubbles["bubbles"]) + len(self.bubbles["superbubbles"])
def length(self):
return len(self.bubbles["bubbles"]) + len(self.bubbles["superbubbles"])
#########################################################
#########################################################
class Bubble:
# stores bubbles as two tuples, one for the two branches and one for the two ends
# easier for later statistics
def __init__(self, branches, ends, bubble_type):
self.branches = branches
self.ends = ends
self.bubble_type = bubble_type
# check if the length of the branches are equal
def equal_branches(self):
if self.branches[0].seq_len == self.branches[1].seq_len:
return True
return False
# calculate the hamming distance between two equal branches
def hamming_dist(self):
if len(self.branches) != 2:
return None
if not self.equal_branches():
return None
distance = 0
# todo I need to change comparing first 10 letters to compare the actual k-1 to make sure it's correct
# checking if they are in the same direction or I need to take the reverse complement of one
if self.branches[0].seq[0:10] == self.branches[1].seq[0:10]:
for i in range(self.branches[0].seq_len):
if self.branches[0].seq[i] != self.branches[1].seq[i]:
distance += 1
return distance
elif self.branches[0].seq[0:10] == reverse_complement(self.branches[1].seq)[0:10]:
reverse_c = reverse_complement(self.branches[1].seq)
for i in range(self.branches[0].seq_len):
if self.branches[0].seq[i] != reverse_c[i]:
distance += 1
return distance
else:
return None
# I'll use the editdistance library, much faster
def edit_distance(self):
if len(self.branches) != 2:
return None
# checking if they are in the same direction or I need to take the reverse complement of one
if self.branches[0].seq[0:10] == self.branches[1].seq[0:10]:
dist = editdistance.eval(self.branches[0].seq, self.branches[1].seq)
return dist
elif self.branches[0].seq[0:10] == reverse_complement(self.branches[1].seq)[0:10]:
dist = editdistance.eval(self.branches[0].seq, reverse_complement(self.branches[1].seq))
return dist
# length difference between both branches
def branches_dif(self):
return abs(self.branches[0].seq_len - self.branches[1].seq_len)
def list_bubble(self):
return [self.branches + self.ends]
#########################################################
#########################################################
# adds a bubble object to the chain
def add_bubble(self, branches, ends, bubble_type):
# print("Adding a bubble")
self.bubbles["bubbles"].append(self.Bubble(branches, ends, bubble_type))
# returns a list of all the nodes in the bubble chain
def list_chain(self, keep_chain_ends=True):
list_of_nodes = []
for bubble in self.bubbles["bubbles"]:
for n in bubble.branches:
if n.id not in list_of_nodes:
list_of_nodes.append(n.id)
for n in bubble.ends:
if n.id not in list_of_nodes:
list_of_nodes.append(n.id)
for sb in self.bubbles["superbubbles"]:
for n in sb.inside:
if n.id not in list_of_nodes:
list_of_nodes.append(n.id)
if sb.source.id not in list_of_nodes:
list_of_nodes.append(sb.source.id)
if sb.sink.id not in list_of_nodes:
list_of_nodes.append(sb.sink.id)
if not keep_chain_ends:
for n in self.ends:
list_of_nodes.remove(n)
# try:
# list_of_nodes.remove(n)
# except ValueError:
# print(list_of_nodes)
# print(n)
# print(self.bubbles)
# print(self.ends)
# sys.exit()
return list_of_nodes
# sequence length of the bubble chain
def chain_length(self, k):
checked = []
total_length = 0
for bubble in self.bubbles["bubbles"]:
total_length += (bubble.branches[0].seq_len + bubble.branches[1].seq_len) - 2*(k-1)
for n in bubble.ends:
# doing this because bubbles can share ends, so I don't count them twice
if n.id not in checked:
total_length += n.seq_len - k - 1
checked.append(n.id)
for sb in self.bubbles["superbubbles"]:
if sb.source.id not in checked:
total_length += sb.source.seq_len - k-1
if sb.sink.id not in checked:
total_length += sb.sink.seq_len - k-1
for n in sb.inside:
total_length += n.seq_len - k-1
#
# for superbubble in self.bubbles["superbubbles"]:
# for n in superbubble.inside:
# # I need to find a way to check if there are simple bubbles inside the superbubble and remove them
# pass
return total_length
# returns the ends of a bubble in case they were not set before
def find_ends(self):
ends = []
all_ends = []
for bubble in self.bubbles["bubbles"]:
all_ends += bubble.ends
for sb in self.bubbles["superbubbles"]:
all_ends.append(sb.source)
all_ends.append(sb.sink)
for n in all_ends:
if all_ends.count(n) == 1:
ends.append(n.id)
return ends
def sort(self):
# this function should sort the list of bubbles
# for now I need to fill sorted with this
start = self.ends[0]
while len(self.sorted) < self.__len__():
for b in self.bubbles["bubbles"]:
b_ends = [x.id for x in b.ends]
if b in self.sorted:
continue
if start in b_ends:
if b_ends.index(start) == 0:
start = b_ends[1]
else:
start = b_ends[0]
self.sorted.append(b)
for sb in self.bubbles["superbubbles"]:
sb_ends = [sb.source.id, sb.sink.id]
if sb in self.sorted:
continue
if start in sb_ends:
if sb_ends.index(start) is 0:
start = sb_ends[1]
else:
start = sb_ends[0]
self.sorted.append(sb)