This repository has been archived by the owner. It is now read-only.
Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
thesis_tm/find_bubbles.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
176 lines (137 sloc)
6.43 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import Graph | |
import pdb | |
import functions | |
def check_bubble(graph, start_node, direction, bubble_chain): | |
""" | |
:param graph: a Graph object | |
:param start_node: Is a tuple here of (n_id, direction) | |
:param direction: is the direction we want to check, start or end, opposite from where we came from | |
:param bubble_chain: is the bubble_chain object | |
:return: The new start if bubble was found, or None | |
""" | |
# all_neighbors will be list of the neighbors of the start node | |
all_neighbors = [x[0] for x in getattr(graph.nodes[start_node[0]], direction)] | |
if (1 < len(all_neighbors) < 3) and (start_node not in all_neighbors): | |
# print("The start node is {}".format(start_node)) | |
# if (len(all_neighbors) == 2) and (start_node not in all_neighbors): | |
inner_nodes = [] | |
bubble_sides = set() | |
for x in all_neighbors: | |
inner_nodes.append(graph.nodes[x]) | |
# this is here too strict to only exactly find bubbles | |
# before I was taking the set of the neighbors of the inner nodes and checking if they are | |
# which detected some tips too | |
for n in inner_nodes: | |
temp = set(n.start + n.end) | |
if len(temp) == 2: | |
for s in temp: | |
bubble_sides.add(s) | |
else: | |
return None | |
if len(bubble_sides) != 2: | |
return None | |
else: | |
bubble_sides.remove(start_node) | |
new_start = list(bubble_sides)[0] | |
bubble_chain.add_bubble(inner_nodes, [graph.nodes[new_start[0]], | |
graph.nodes[start_node[0]]], len(all_neighbors)) | |
for n in all_neighbors: | |
graph.nodes[n].visited = True | |
return new_start | |
else: | |
return None | |
def traverse_for_bubbles(graph, start_node, bubble_chain): | |
""" | |
:param graph: a Graph object | |
:param start_node: a Node object | |
:param bubble_chain: a BubbleChain object | |
""" | |
# I do know which direction I came from, maybe I can use that information so i don't check both direction | |
# Unless for some reason two edges come out from the same side for the start or end node of the bubble | |
# I need to go left and right form the node | |
# I want to take the opposite direction here | |
# If I came to the new_start from 0 then I want to check 1, and vise versa | |
if start_node[1] == 0: | |
direction = "end" | |
else: | |
direction = "start" | |
if not graph.nodes[start_node[0]].visited: | |
graph.nodes[start_node[0]].visited = True | |
# we check if the two neighbors hav one common neighbor on the other side | |
if direction == "start": | |
new_start = check_bubble(graph, (start_node[0], 0), direction, bubble_chain) | |
else: | |
new_start = check_bubble(graph, (start_node[0], 1), direction, bubble_chain) | |
# print("new_start in rule 1 is" + str(new_start)) | |
##################################### | |
# print("returned node from check bubble (or new start) is " + str(new_start) + "\n") | |
##################################### | |
if new_start is not None: | |
traverse_for_bubbles(graph, new_start, bubble_chain) | |
# no more bubbles that means we are at the end of the chain | |
else: | |
bubble_chain.ends.append(start_node[0]) | |
def find_bubbles(graph): | |
""" | |
:param graph: a Graph object | |
""" | |
start = time.time() | |
print("Sorting nodes\n") | |
# sorting nodes by size | |
sorted_nodes = sorted(graph.nodes, key=lambda x: graph.nodes[x].seq_len, reverse=True) | |
end = time.time() | |
print("time for sorting nodes is {} ".format(end - start)) | |
print("Finding bubbles\n") | |
start = time.time() | |
counter = 0 | |
for n_id in sorted_nodes: | |
# if not yet visited | |
if not graph.nodes[n_id].visited: | |
# starting a new bubbleChain | |
bubble_chain = Graph.BubbleChain() | |
# loop for each direction | |
for direction in ["start", "end"]: | |
if direction == "start": | |
start_node = check_bubble(graph, (n_id, 0), direction, bubble_chain) | |
else: | |
start_node = check_bubble(graph, (n_id, 1), direction, bubble_chain) | |
# if a bubble wasn't found check_bubble returns None | |
if start_node is not None: | |
traverse_for_bubbles(graph, start_node, bubble_chain) | |
# no more bubbles that means we are at the end of the chain | |
else: | |
bubble_chain.ends.append(n_id) | |
graph.nodes[n_id].visited = True | |
if len(bubble_chain) > 0: | |
if len(bubble_chain.ends) < 2: | |
# todo ends shouldn't be less than 2 (always have two ends) need to check this further. | |
# and usually when this happens there's one bubble reported, need to check the ends of this bubble | |
# if they can be merged with another chain | |
if len(bubble_chain) >= 1: | |
bubble_chain.ends = bubble_chain.find_ends() | |
else: | |
counter += 1 | |
pdb.set_trace() | |
continue | |
else: | |
key = len(graph.bubble_chains) + 1 | |
# I need first to check if the ends of the chain already have which_chain | |
# because sometimes two adjacent chains can share a node and the whole chain wasn't detected | |
# becaused of some small loop in that node or a small tip coming out of it | |
if len(bubble_chain.ends) < 2: | |
pdb.set_trace() | |
if (graph.nodes[bubble_chain.ends[0]].which_chain is not 0) or \ | |
(graph.nodes[bubble_chain.ends[1]].which_chain is not 0): | |
# todo merge chains here | |
print("merge here") | |
else: | |
for n in bubble_chain.list_chain(): | |
graph.nodes[n].which_chain = key | |
graph.bubble_chains[key] = bubble_chain | |
# graph.bubble_chains.append(bubble_chain) | |
print("The bubble chains that had a problem (a bubble but only one end) were {} ".format(counter)) | |
end = time.time() | |
print("time for finding bubbles is {} ".format(end - start)) | |
if __name__ == "__main__": | |
print("what") |