This repository has been archived by the owner. It is now read-only.
Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
thesis_tm/functions.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
639 lines (530 sloc)
22.5 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import vg_pb2 | |
import stream | |
from Node import Node | |
from Graph import BubbleChain, Superbubble | |
import pdb | |
def reverse_complement(dna): | |
complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'} | |
return ''.join([complement[base] for base in dna[::-1]]) | |
# def merge_dict(dict1, dict2): | |
# """ | |
# :param dict1: First dictionary | |
# :param dict2: Second dictionary | |
# :return: merged dictionaries, if keys are the same, values are merged | |
# """ | |
# # print(type(dict1)) | |
# # print(dict1.keys()) | |
# # print(type(dict2)) | |
# # print(dict2.keys()) | |
# for x in dict1: | |
# if x in dict2: | |
# # print(dict2[x]) | |
# # print(dict1[x]) | |
# dict2[x] = list(set(dict2[x] + dict1[x])) | |
# else: | |
# dict2[x] = dict1[x] | |
# | |
# return dict2 | |
# Breadth first search function | |
def bfs(nodes, start_node, size): | |
""" | |
:param nodes: is the dictionary of nodes objects. | |
:param start_node: The start node of BFS search. | |
:param size: How many nodes in the path around start_node to return. | |
:return: | |
""" | |
# Fast implementation of list queue FIFO | |
class Queue: | |
"""A sample implementation of a First-In-First-Out | |
data structure.""" | |
def __init__(self): | |
self.in_stack = [] | |
self.out_stack = [] | |
def append(self, obj): | |
self.in_stack.append(obj) | |
def pop(self): | |
if not self.out_stack: | |
while self.in_stack: | |
self.out_stack.append(self.in_stack.pop()) | |
return self.out_stack.pop() | |
def __len__(self): | |
return len(self.in_stack) | |
queue = Queue() | |
path = set() | |
queue.append(start_node) | |
path.add(start_node) | |
# a list of all adjacent nodes to the start_node | |
neighbors = list( | |
set( | |
[x[0] for x in nodes[start_node].start] # just the node id, no direction | |
+ | |
[x[0] for x in nodes[start_node].end] # just the node id, no direction | |
) | |
) | |
if len(neighbors) == 0: | |
return list(path) | |
while (len(path) < size) or (len(queue) > 0): | |
if not queue: | |
return list(path) | |
start = queue.pop() | |
if start not in path: | |
path.add(start) | |
# a list of all adjacent nodes to the start_node | |
neighbors = list( | |
set( | |
[x[0] for x in nodes[start].start] # just the node id, no direction | |
+ | |
[x[0] for x in nodes[start].end] # just the node id, no direction | |
) | |
) | |
for i in neighbors: | |
if i not in path: | |
queue.append(i) | |
path.add(i) | |
return list(path) | |
# def return_edges(node, k): | |
# """ | |
# :param node: One node object. | |
# :param k: overlap between two nodes | |
# :return: list of GFA links between this nodes and its neighbors. | |
# """ | |
# edges = [] | |
# overlap = str(k-1) + "M\n" | |
# for n in node.start: | |
# if n[1] == 0: | |
# edge = str("\t".join(("L", str(node.id), "-", str(n[0]), "+", overlap))) | |
# edges.append(edge) | |
# else: | |
# edge = str("\t".join(("L", str(node.id), "-", str(n[0]), "-", overlap))) | |
# edges.append(edge) | |
# | |
# for n in node.end: | |
# if n[1] == 0: | |
# edge = str("\t".join(("L", str(node.id), "+", str(n[0]), "+", overlap))) | |
# edges.append(edge) | |
# else: | |
# edge = str("\t".join(("L", str(node.id), "+", str(n[0]), "-", overlap))) | |
# edges.append(edge) | |
# | |
# return edges | |
def write_gfa(nodes, k, list_of_nodes=None, ignore_nodes=None, output_file="output_file.gfa", | |
append=False, modified=False): | |
""" | |
:param nodes: Dictionary of nodes object. | |
:param list_of_nodes: A list of node ids of the path or nodes we want to generate a GFA file for. | |
:param k: overlap between two nodes. | |
:param ignore_nodes: nodes that I don't need to write their edges out | |
:param output_file: path to output file | |
:param append: if I want to append to a file instead of rewriting it | |
:param modified: To write my modified GFA file format instead of the standard GFA | |
:return: writes a gfa file | |
""" | |
if ignore_nodes is None: | |
ignore_nodes = set() | |
if list_of_nodes is None: | |
list_of_nodes = list(nodes.keys()) | |
# if os.path.exists(output_file): | |
# print("File already exists, do you want to overwrite. y/n: ") | |
# choice = input().lower() | |
# if choice == "y": | |
# output_file = output_file | |
# elif choice == "n": | |
# sys.exit() | |
# else: | |
# print("invalid choice") | |
# sys.exit() | |
if append is False: | |
f = open(output_file, "w+") | |
else: | |
if os.path.exists(output_file): | |
f = open(output_file, "a") | |
else: | |
print("the output file {} does not exist".format(output_file)) | |
return None | |
for n1 in list_of_nodes: | |
# writing nodes in gfa file | |
if modified: | |
node = nodes[n1] | |
# todo for now I don't care to which sb the node belongs, I just care about simple bubbles for phasing | |
specification = str(":".join((str(node.which_chain), str(0), | |
str(node.which_b), str(node.which_allele)))) | |
if nodes[n1].seq == "": | |
line = str("\t".join(("S", str(n1), str("A" * nodes[n1].seq_len), specification))) | |
else: | |
line = str("\t".join(("S", str(n1), nodes[n1].seq, specification))) | |
f.write(line + "\n") | |
else: | |
if nodes[n1].seq == "": | |
line = str("\t".join(("S", str(n1), str("A" * nodes[n1].seq_len)))) | |
else: | |
line = str("\t".join(("S", str(n1), nodes[n1].seq))) | |
f.write(line + "\n") | |
# getting the edges | |
edges = [] | |
overlap = str(int(k) - 1) + "M\n" | |
if n1 in ignore_nodes: | |
continue | |
for n in nodes[n1].start: | |
# I think the ignore nodes should be here | |
# because I want to ignore the end nodes of a bubble branch | |
# I only need the node, not it's edges | |
# Or maybe when I separate the branches, remove the edges to the rest of the graph | |
# checking if the node was removed from graph.nodes at some point then I don't write that edge | |
# also checking if I need to ignore the edges here | |
if n[0] in nodes: # and (n[0] not in ignore_nodes): | |
if n[1] == 0: | |
edge = str("\t".join(("L", str(n1), "-", str(n[0]), "+", overlap))) | |
edges.append(edge) | |
else: | |
edge = str("\t".join(("L", str(n1), "-", str(n[0]), "-", overlap))) | |
edges.append(edge) | |
for n in nodes[n1].end: | |
if n[0] in nodes: # and (n[0] not in ignore_nodes): | |
if n[1] == 0: | |
edge = str("\t".join(("L", str(n1), "+", str(n[0]), "+", overlap))) | |
edges.append(edge) | |
else: | |
edge = str("\t".join(("L", str(n1), "+", str(n[0]), "-", overlap))) | |
edges.append(edge) | |
for e in edges: | |
f.write(e) | |
f.close() | |
def write_b_sb_gfa(nodes, list_of_nodes=None, k=1, ignore_nodes=None, output_file="output_file.gfa"): | |
""" | |
:param nodes: Dictionary of nodes object. | |
:param list_of_nodes: A list of node ids of the path or nodes we want to generate a GFA file for. | |
:param k: overlap between two nodes. | |
:param ignore_nodes: nodes that I don't need to write their edges out | |
:param output_file: path to output file | |
:return: writes a gfa file | |
""" | |
if ignore_nodes is None: | |
ignore_nodes = set() | |
if list_of_nodes is None: | |
list_of_nodes = list(nodes.keys()) | |
# if os.path.exists(output_file): | |
# print("File already exists, do you want to overwrite. y/n: ") | |
# choice = input().lower() | |
# if choice == "y": | |
# output_file = output_file | |
# elif choice == "n": | |
# sys.exit() | |
# else: | |
# print("invalid choice") | |
# sys.exit() | |
f = open(output_file, "w+") | |
# gfa file header | |
# f.write("H\tVN:Z:1.0\n") | |
for n1 in list_of_nodes: | |
# writing nodes in gfa file | |
if nodes[n1].seq == "": | |
line = str("\t".join(("S", str(n1), str("A" * nodes[n1].seq_len)))) | |
else: | |
line = str("\t".join(("S", str(n1), nodes[n1].seq))) | |
f.write(line + "\n") | |
# getting the edges | |
edges = [] | |
overlap = str(int(k) - 1) + "M\n" | |
if n1 in ignore_nodes: | |
continue | |
for n in nodes[n1].start: | |
# I think the ignore nodes should be here | |
# because I want to ignore the end nodes of a bubble branch | |
# I only ned the node, not it's edges | |
# Or maybe when I separate the branches, remove the edges to the rest of the graph | |
# checking if the node was removed from graph.nodes at some point then I don't write that edge | |
# also checking if I need to ignore the edges here | |
if n[0] in nodes: # and (n[0] not in ignore_nodes): | |
if n[1] == 0: | |
edge = str("\t".join(("L", str(n1), "-", str(n[0]), "+", overlap))) | |
edges.append(edge) | |
else: | |
edge = str("\t".join(("L", str(n1), "-", str(n[0]), "-", overlap))) | |
edges.append(edge) | |
for n in nodes[n1].end: | |
if n[0] in nodes: # and (n[0] not in ignore_nodes): | |
if n[1] == 0: | |
edge = str("\t".join(("L", str(n1), "+", str(n[0]), "+", overlap))) | |
edges.append(edge) | |
else: | |
edge = str("\t".join(("L", str(n1), "+", str(n[0]), "-", overlap))) | |
edges.append(edge) | |
for e in edges: | |
f.write(e) | |
f.close() | |
########################################################################################### | |
def read_vg(vg_file_path): | |
""" | |
:param vg_file_path: vg graph file. | |
:return: Dictionary of node ids and Node objects. | |
""" | |
if not os.path.exists(vg_file_path): | |
print("the vg file path you gave does not exist, please try again!") | |
sys.exit() | |
nodes = dict() | |
edges = [] | |
with stream.open(vg_file_path, "rb") as in_stream: | |
for data in in_stream: | |
graph = vg_pb2.Graph() | |
graph.ParseFromString(data) | |
for n in graph.node: | |
# I am only saving the length of the sequence | |
nodes[n.id] = Node(n.id) | |
nodes[n.id].seq_len = len(n.sequence) | |
for e in graph.edge: | |
edges.append(e) | |
for e in edges: | |
k = getattr(e, "from") | |
neighbor = e.to | |
from_start = e.from_start | |
to_end = e.to_end | |
if from_start is True and to_end is True: # from start to end L x - y - | |
nodes[k].start.append((neighbor, 1)) | |
nodes[neighbor].end.append((k, 0)) | |
elif from_start is True and to_end is False: # from start to start L x - y + | |
nodes[k].start.append((neighbor, 0)) | |
nodes[neighbor].start.append((k, 0)) | |
elif from_start is False and to_end is False: # from end to start L x + y + | |
nodes[k].end.append((neighbor, 0)) | |
nodes[neighbor].start.append((k, 1)) | |
elif from_start is False and to_end is True: # from end to end L x + y - | |
nodes[k].end.append((neighbor, 1)) | |
nodes[neighbor].end.append((k, 1)) | |
return nodes | |
def read_gfa(gfa_file_path, modified=False): | |
""" | |
:param gfa_file_path: gfa graph file. | |
:param modified: if I'm reading my modified GFA with extra information for the nodes | |
:return: Dictionary of node ids and Node objects. | |
""" | |
if not os.path.exists(gfa_file_path): | |
print("the gfa file path you gave does not exists, please try again!") | |
sys.exit() | |
nodes = dict() | |
edges = [] | |
with open(gfa_file_path, "r") as lines: | |
for line in lines: | |
if line.startswith("S"): | |
if modified: | |
line = line.split("\t") | |
n_id = int(line[1]) | |
n_len = len(line[2]) | |
nodes[n_id] = Node(n_id) | |
nodes[n_id].seq_len = n_len | |
nodes[n_id].seq = str(line[2]) | |
specifications = str(line[3]) | |
specifications = specifications.split(":") | |
nodes[n_id].which_chain = int(specifications[0]) | |
nodes[n_id].which_sb = int(specifications[1]) | |
nodes[n_id].which_b = int(specifications[2]) | |
nodes[n_id].which_allele = int(specifications[3]) | |
else: | |
line = line.split() | |
n_id = int(line[1]) | |
n_len = len(line[2]) | |
nodes[n_id] = Node(n_id) | |
nodes[n_id].seq_len = n_len | |
nodes[n_id].seq = str(line[2]) | |
elif line.startswith("L"): | |
edges.append(line) | |
for e in edges: | |
line = e.split() | |
k = int(line[1]) | |
neighbor = int(line[3]) | |
if line[2] == "-": | |
from_start = True | |
else: | |
from_start = False | |
if line[4] == "-": | |
to_end = True | |
else: | |
to_end = False | |
if from_start is True and to_end is True: # from start to end L x - y - | |
if (neighbor, 1) not in nodes[k].start: | |
nodes[k].start.append((neighbor, 1)) | |
if (k, 0) not in nodes[neighbor].end: | |
nodes[neighbor].end.append((k, 0)) | |
elif from_start is True and to_end is False: # from start to start L x - y + | |
if (neighbor, 0) not in nodes[k].start: | |
nodes[k].start.append((neighbor, 0)) | |
if (k, 0) not in nodes[neighbor].start: | |
nodes[neighbor].start.append((k, 0)) | |
elif from_start is False and to_end is False: # from end to start L x + y + | |
if (neighbor, 0) not in nodes[k].end: | |
nodes[k].end.append((neighbor, 0)) | |
if (k, 1) not in nodes[neighbor].start: | |
nodes[neighbor].start.append((k, 1)) | |
elif from_start is False and to_end is True: # from end to end L x + y - | |
if (neighbor, 1) not in nodes[k].end: | |
nodes[k].end.append((neighbor, 1)) | |
if (k, 1) not in nodes[neighbor].end: | |
nodes[neighbor].end.append((k, 1)) | |
return nodes | |
########################################################################################### | |
def remove_edges(graph, node): | |
for neighbor in node.start: | |
if neighbor[1] == 0: | |
graph.nodes[neighbor[0]].start.remove((node.id, 0)) | |
else: | |
graph.nodes[neighbor[0]].end.remove((node.id, 0)) | |
for neighbor in node.end: | |
if neighbor[1] == 0: | |
graph.nodes[neighbor[0]].start.remove((node.id, 1)) | |
else: | |
graph.nodes[neighbor[0]].end.remove((node.id, 1)) | |
def remove_chains(graph, keep_ends=False): | |
nodes_to_remove = [] | |
for chain in graph.bubble_chains.values(): | |
nodes_to_remove += chain.list_chain() | |
# removing the endsWhy, so superbubbles can still function in case a source or a sink for a superbubble | |
# was the source or the sink of superbubble | |
if keep_ends: | |
for n in chain.ends: | |
nodes_to_remove.remove(n) | |
for n in nodes_to_remove: | |
remove_edges(graph, graph.nodes[n]) | |
try: | |
del graph.nodes[n] | |
except KeyError: | |
# So sometimes the same node can be present in two chains and the reason is that | |
# two chains can be adjacent and only a tip is disrupting | |
# print("there are {} of {}".format(nodes_to_remove.count(n), n)) | |
continue | |
def remove_nodes(graph, list_of_nodes): | |
for n in list_of_nodes: | |
try: | |
remove_edges(graph, graph.nodes[n]) | |
del graph.nodes[n] | |
except KeyError: | |
continue | |
def remove_lonely_nodes(graph): | |
nodes_to_remove = [] | |
for n in graph.nodes: | |
if (len(graph.noddes[n].start) == 0) and (len(graph.noddes[n].end) == 0): | |
nodes_to_remove.append(n) | |
for node in nodes_to_remove: | |
del graph.nodes[node] | |
def write_bubbles(graph, output_file): | |
chain_ends = set() | |
inner_nodes = [] | |
for chain in graph.bubble_chains.values(): | |
inner_nodes += chain.list_chain(keep_chain_ends=False) | |
for n in chain.ends: | |
chain_ends.add(n) | |
sb_ends = set() | |
sb_inner_nodes = [] | |
for sb in graph.superbubbles.values(): | |
sb_ends.add(sb.source.id) | |
sb_ends.add(sb.sink.id) | |
sb_inner_nodes += [x.id for x in sb.inside] | |
sb_ends = sb_ends.union(chain_ends) | |
all_nodes = inner_nodes + sb_inner_nodes + list(sb_ends) | |
print("The new graph has {} nodes".format(len(all_nodes))) | |
# ignore is a set because write_gfa function will check membership on it several times | |
# and sets have constant time membership checking. | |
write_gfa(nodes=graph.nodes, list_of_nodes=all_nodes, k=graph.k, output_file=output_file) | |
def change_membership(graph, chain, key): | |
# for chain in change_membership_list: | |
for node in chain.list_chain(): | |
graph.nodes[node].which_chain = key | |
def merge_sb_with_bchains(graph): | |
# I have two cases here, the superbubble is in the middle of a bubble chain which | |
# Second case is the superbubble on the side | |
sb_to_remove = [] | |
for sb_k, sb_v in graph.superbubbles.items(): | |
source_chain = sb_v.source.which_chain | |
sink_chain = sb_v.sink.which_chain | |
# case 2: superbubble at the end of chain | |
if source_chain == 0 or sink_chain == 0: | |
if source_chain != 0: | |
# bchains_to_change_membership.append(source_chain) | |
sb_to_remove.append(sb_k) | |
graph.bubble_chains[source_chain].bubbles["superbubbles"].append(sb_v) | |
graph.bubble_chains[source_chain].ends.remove(sb_v.source.id) | |
graph.bubble_chains[source_chain].ends.append(sb_v.sink.id) | |
change_membership(graph, graph.bubble_chains[source_chain], source_chain) | |
elif sink_chain != 0: | |
# bchains_to_change_membership.append(sink_chain) | |
sb_to_remove.append(sb_k) | |
graph.bubble_chains[sink_chain].bubbles["superbubbles"].append(sb_v) | |
graph.bubble_chains[sink_chain].ends.remove(sb_v.sink.id) | |
graph.bubble_chains[sink_chain].ends.append(sb_v.source.id) | |
change_membership(graph, graph.bubble_chains[sink_chain], sink_chain) | |
# case 1: superbubble is in the middle | |
if source_chain != 0 and sink_chain != 0: | |
# now I need to merge two chains (source_chain and sink_chain) and change membership to one of them | |
# I chose to remove sink_chain, so the new chain is going to be the source one | |
# bchains_to_remove.append(sink_chain) | |
sb_to_remove.append(sb_k) | |
graph.bubble_chains[source_chain].ends.remove(sb_v.source.id) | |
graph.bubble_chains[sink_chain].ends.remove(sb_v.sink.id) | |
graph.bubble_chains[source_chain].ends = graph.bubble_chains[source_chain].ends + graph.bubble_chains[sink_chain].ends | |
graph.bubble_chains[source_chain].bubbles["superbubbles"].append(sb_v) | |
graph.bubble_chains[source_chain].bubbles["superbubbles"] += graph.bubble_chains[sink_chain].bubbles["superbubbles"] | |
graph.bubble_chains[source_chain].bubbles["bubbles"] += graph.bubble_chains[sink_chain].bubbles["bubbles"] | |
change_membership(graph, graph.bubble_chains[source_chain], source_chain) | |
del graph.bubble_chains[sink_chain] | |
# try: | |
# graph.bubble_chains[source_chain].ends.remove(sb_v.source.id) | |
# except ValueError: | |
# pdb.set_trace() | |
# continue | |
# # graph.bubble_chains[sb_v.source.which_chain].ends.append(sb_v.sink.id) | |
# try: | |
# graph.bubble_chains[sink_chain].ends.remove(sb_v.sink.id) | |
# except ValueError: | |
# pdb.set_trace() | |
# continue | |
# # graph.bubble_ | |
# # now the new ends are | |
# new_ends = graph.bubble_chains[source_chain].ends + graph.bubble_chains[sink_chain].ends | |
# bchains_to_remove.append(sink_chain) | |
# # bchains_to_change_membership.append(source_chain) | |
# graph.bubble_chains[source_chain].bubbles["bubbles"] += graph.bubble_chains[sink_chain].bubbles["bubbles"] | |
# graph.bubble_chains[source_chain].ends = new_ends | |
# removing the superbubbles that has been merged with a chain | |
for k in sb_to_remove: | |
del graph.superbubbles[k] | |
# | |
# for c in bchains_to_remove: | |
# del graph.bubble_chains[c] | |
# change_membership(graph, bchains_to_change_membership) | |
# so after soring the bubbles and superbubbles, I need to fill in this information inside the nodes | |
# so I can write the modified GFA file | |
def fill_bubble_order(graph): | |
b_counter = 0 | |
for chain_num, chain in graph.bubble_chains.items(): | |
if len(chain.sorted) == 0: | |
chain.sort() | |
for idx, bubble in enumerate(chain.sorted): | |
b_counter += 1 | |
if isinstance(bubble, BubbleChain.Bubble): | |
# randomly assigning which branch is zero and which is 1 | |
for allel, node in enumerate(bubble.branches): | |
node.which_allele = int(allel) | |
node.which_chain = int(chain_num) | |
# node.which_b = int(idx + 1) | |
node.which_b = b_counter | |
# assigning the ends of the bubble to which chain | |
for node in bubble.ends: | |
node.which_chain = int(chain_num) | |
# node.which_b = int(idx + 1) | |
node.which_b = b_counter | |
if isinstance(bubble, Superbubble): | |
for node in bubble.list_superbubble(): | |
graph.nodes[node].which_sb = idx | |
""" | |
import Graph | |
import functions | |
import find_superbubbles | |
import find_bubbles | |
graph = Graph.Graph() | |
graph.read_gfa("testing/testing_phaseb/modified_gfa.gfa") | |
find_superbubbles.find_superbubbles(graph) | |
find_bubbles.find_bubbles(graph) | |
functions.merge_sb_with_bchains(graph) | |
functions.merge_sb_with_bchains(graph) | |
functions.fill_bubble_order(graph) | |
""" |