Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
find_exons_number/find_exons.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
301 lines (226 sloc)
12.2 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This script finds exons of the genes of interest | |
@author: Anastasiia Petrova | |
@contact: anastasiia.petrova(at)mpi-bn.mpg.de | |
""" | |
import argparse | |
import sys | |
import os | |
import re | |
import time | |
import logging | |
import subprocess | |
import numpy as np | |
from collections import defaultdict | |
import textwrap | |
logger = logging.getLogger('find_exons') | |
logger.setLevel(logging.INFO) | |
formatter = logging.Formatter("%(asctime)s : %(message)s", "%Y-%m-%d %H:%M") | |
def parse_args(): | |
parser = argparse.ArgumentParser(prog = '', description = textwrap.dedent(''' | |
This script produces a tsv-file to show the number of exons for the list of genes of interest. | |
'''), epilog='That is what you need to make this script work for you. Enjoy it') | |
required_arguments = parser.add_argument_group('required arguments') | |
required_arguments.add_argument('--gtf_genes', help='a .gtf file with genes', required=True) | |
required_arguments.add_argument('--gtf_exons', help='a .gtf file with exons', required=True) | |
required_arguments.add_argument('--genes_of_interest', nargs='*', dest='genes_of_interest', help='a .txt file with genes of interest containing one columnt with gene names', required=True) | |
#all other arguments are optional | |
parser.add_argument('--output_directory', default='output', const='output', nargs='?', help='output directory, by default ./output/') | |
parser.add_argument('--silent', action='store_true', help='while working with data write the information only into ./find_exons.log') | |
parser.add_argument('--exact', action='store_true', help='choose this parameter if you want to look for exact gene matches') | |
args = parser.parse_args() | |
return args | |
def check_directory(directory): | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
print('a new directory ' + directory + ' was created') | |
def check_existing_input_files(args): | |
print('checking the input files') | |
if not os.path.isfile(args.gtf_genes): | |
print('please make sure the .gtf file with genes exists') | |
sys.exit() | |
if not os.path.isfile(args.gtf_exons): | |
print('please make sure the .gtf file with exons exists') | |
sys.exit() | |
if len(args.genes_of_interest) == 1: | |
if not os.path.isfile(args.genes_of_interest[0]): | |
#print('working with one gene of interest') | |
#genes_of_interest = args.genes_of_interest | |
print('please provide a file with genes of interest in .txt format containint one column with names of genes you are interested in') | |
else: | |
print('reading from the file with genes') | |
#there is a file with list of genes. Extract them and return in an array | |
genes = [] | |
with open(args.genes_of_interest[0]) as genes_file: | |
for line in genes_file: | |
line_array = re.split(r'\t', line.rstrip('\n')) | |
for gene in line_array: | |
if ' ' in gene: | |
print('please make sure that genes in the file with genes of interest are separated with tabs or are written each on a new line') | |
sys.exit() | |
else: | |
genes.append(gene) | |
genes_file.close() | |
genes_of_interest = genes | |
else: #there is a list of genes of interest | |
#print('working with a list of genes') | |
#genes_of_interest = args.genes_of_interest | |
print('please provide only one file with genes of interest') | |
sys.exit() | |
return genes_of_interest | |
def procede_gtf_files(gtf_genes, gtf_exons, genes_of_interest, exact): | |
logger.info("working with gtf files") | |
all_genes = {} | |
grep = "grep -i " | |
#make a command for grep with or function | |
for gene in genes_of_interest: | |
grep = grep + "-e " + gene + ". " #. makes grep to look for genes starting with the name from the list of genes of interest | |
#finally add to the grep command so that we only look for protein coding | |
small_gtf_genes = subprocess.getoutput(grep + gtf_genes + "| grep \"protein_coding\"") #extract only the rows containing protein_coding | |
if len(small_gtf_genes) != 0: | |
#save the genes to the dictionary | |
for genes_line in re.split(r'\n', small_gtf_genes): | |
genes_array = re.split(r'\t', genes_line) | |
gene_start = int(genes_array[3]) | |
gene_end = int(genes_array[4]) | |
gene_length = gene_end - gene_start | |
attributes = re.split(r'; ', genes_array[8]) | |
for attribut in attributes: | |
if attribut.startswith("gene_name"): | |
gene_full_name = re.split(r'\s', attribut)[1].replace('"', '') | |
all_genes[gene_full_name] = all_genes.get(gene_full_name, {}) | |
exons = [] | |
#all_genes[gene_full_name] = {'gene_start': gene_start, 'gene_end': gene_end, 'gene_length': gene_length, 'exons': exons} | |
all_genes[gene_full_name] = {'gene_length': gene_length, 'exons': exons} | |
#now save the information about the exons to the diictionary | |
small_gtf_exons = subprocess.getoutput(grep + gtf_exons + "| grep \"protein_coding\" | grep \"transcript_support_level\"") | |
for exons_line in re.split(r'\n', small_gtf_exons): | |
genes_array = re.split(r'\t', exons_line) | |
exon_start = int(genes_array[3]) | |
exon_end = int(genes_array[4]) | |
exon_length = exon_end - exon_start | |
attributes = re.split(r'; ', genes_array[8]) | |
for attribut in attributes: | |
if attribut.startswith("gene_name"): | |
gene_full_name = re.split(r'\s', attribut)[1].replace('"', '') | |
elif attribut.startswith("exon_number"): | |
exon_number = int(re.split(r'\s', attribut)[1]) | |
elif attribut.startswith("transcript_support_level"): | |
tsl = re.split(r'\s', attribut)[1].replace('"', '') #do not cast on int, because the tsl can also be NA | |
if tsl == "NA": | |
tsl = 6 #"convert" the tsl to int, so that we can easily compare it afterwards | |
else: | |
tsl = int(tsl) | |
elif attribut.startswith("transcript_name"): | |
transcript_name = re.split(r'\s', attribut)[1].replace('"', '') | |
elif attribut.startswith("transcript_id"): | |
transcript_id = re.split(r'\s', attribut)[1].replace('"', '') | |
elif attribut.startswith("gene_id"): | |
gene_id = re.split(r'\s', attribut)[1].replace('"', '') | |
if gene_full_name in all_genes.keys(): | |
all_genes[gene_full_name]['exons'].append([exon_length, exon_number, tsl, exon_start, exon_end, transcript_name, transcript_id]) | |
all_genes[gene_full_name]['gene_id'] = gene_id | |
else: | |
logger.info("none of the requested genes were found in the .gtf file") | |
sys.exit() | |
return all_genes | |
def make_plot(x_array, y_array, gene_names, output_directory, figure_name, color_name): | |
the_max = max(max(x_array), max(y_array)) | |
plt.xlim(0, the_max) | |
plt.ylim(0, the_max) | |
fig, ax = plt.subplots() | |
plt.plot([0, the_max], [0, the_max], '--', color = "black", linewidth = 0.5) | |
plt.plot(x_array, y_array, 'o', color = color_name) | |
plt.xlabel("whole gene length") | |
plt.ylabel("sum length of exons") | |
for i,j,name in zip(x_array, y_array, gene_names): | |
ax.annotate('%s' %name, xy=(i,j), xytext=(1,0), textcoords='offset points', size='xx-small') | |
fig.savefig(os.path.join(output_directory, figure_name)) | |
plt.show() | |
def write_outputfile(all_genes, output_directory): | |
gene_names = [] | |
gene_lengths = [] | |
exons_lengths = [] | |
scores = [] | |
genes_with_transcript_names = {} | |
#find the sum length of exons for each gene | |
for gene in all_genes: | |
if all_genes[gene]['exons'] != []: | |
#sort after transcript_name, exon_number and tsl | |
sorted_exons = sorted(all_genes[gene]['exons'], key = lambda x: (x[5], x[1], x[2])) | |
check_exon = 1 | |
sum_length = 0 | |
tsls = [] | |
transcript_name = sorted_exons[0][5] #save the transcript name of the first exon | |
transcript_id = sorted_exons[0][6] #save the transcript_id of the first exon | |
sum_length = sorted_exons[0][0] #save the length of the first exon | |
tsls.append(sorted_exons[0][2]) #transcript support level of the first exon | |
for i in range(1, len(sorted_exons)): | |
if sorted_exons[i][5] != transcript_name: #this is a new transcript | |
#save the previous transcript_name | |
score = "%.2f" % round((sum_length * 100)/all_genes[gene]['gene_length'], 2) #which is actually the percentage of the sum length of the exons and the whole length of the gene | |
mean_tsl = int(np.mean(tsls)) #find the average tsl for this transcript | |
if mean_tsl > 5: | |
mean_tsl = "NA" | |
exons_number = len(tsls) | |
gene_transcript_name = gene + "_" + transcript_name | |
genes_with_transcript_names[gene_transcript_name] = genes_with_transcript_names.get(transcript_name, {}) | |
genes_with_transcript_names[gene_transcript_name] = {'gene': gene, 'sum_length': sum_length, 'gene_length': all_genes[gene]['gene_length'], 'score': score, 'mean_tsl': mean_tsl, 'exons_number': exons_number, 'gene_id': all_genes[gene]['gene_id'], 'transcript_id': transcript_id} | |
tsls = [] | |
tsls.append(sorted_exons[i][2]) | |
transcript_name = sorted_exons[i][5] | |
transcript_id = sorted_exons[i][6] | |
sum_length = sorted_exons[i][0] | |
elif i == len(sorted_exons) - 1: #this is the last transcript | |
sum_length = sum_length + sorted_exons[i][0] | |
tsls.append(sorted_exons[i][2]) | |
score = "%.2f" % round((sum_length * 100)/all_genes[gene]['gene_length'], 2) | |
mean_tsl = int(np.mean(tsls)) #find the average tsl for this transcript | |
if mean_tsl > 5: | |
mean_tsl = "NA" | |
exons_number = len(tsls) | |
gene_transcript_name = gene + "_" + transcript_name | |
genes_with_transcript_names[gene_transcript_name] = genes_with_transcript_names.get(transcript_name, {}) | |
genes_with_transcript_names[gene_transcript_name] = {'gene': gene, 'sum_length': sum_length, 'gene_length': all_genes[gene]['gene_length'], 'score': score, 'mean_tsl': mean_tsl, 'exons_number': exons_number, 'gene_id': all_genes[gene]['gene_id'], 'transcript_id': transcript_id} | |
else: #this is still the same transcript name | |
tsls.append(sorted_exons[i][2]) #save the tsl to find the average tsl later on | |
sum_length = sum_length + sorted_exons[i][0] #save the length of this exon | |
#else do nothing, there are no exons found corresponding to this gene | |
genes_with_transcript_names = sorted(genes_with_transcript_names.items(), key = lambda x: (x[1]['gene'], x[0])) | |
logger.info("writing the output file") | |
#now write an output file containing names of genes, their length, the sum length of exons and the prozent of exons in the gene length as score | |
output_file = open(os.path.join(output_directory, "genes_exons_correlation.txt"), 'w') | |
header = ["#gene_name", "transcript_name", "mean_tsl", "exons_len_percentage", "gene_length", "exons_number", "exons_sum_length", "gene_id", "transcript_id"] | |
output_file.write('\t'.join(header) + '\n') #write the header | |
for transcript in genes_with_transcript_names: | |
output_file.write('\t'.join([transcript[1]['gene'], transcript[0].replace(transcript[1]['gene'] + '_', ''), str(transcript[1]['mean_tsl']), str(transcript[1]['score']), str(transcript[1]['gene_length']), str(transcript[1]['exons_number']), str(transcript[1]['sum_length']), str(transcript[1]['gene_id']), str(transcript[1]['transcript_id'])]) + '\n') | |
output_file.close() | |
def main(): | |
start = time.time() | |
args = parse_args() | |
print("find_exons.py was called using these parameters:") | |
print(vars(args)) | |
args.genes = check_existing_input_files(args) | |
#check if there is an existing directory that user gave as input, otherwise create this directory from the path provided from the user | |
check_directory(args.output_directory) | |
fh = logging.FileHandler(os.path.join(args.output_directory, "find_exons.log")) | |
fh.setLevel(logging.INFO) | |
fh.setFormatter(formatter) | |
logger.addHandler(fh) | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.INFO) | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
#if user do not want to see the information about the status of jobs, remove the handler, that writes to the terminal | |
if args.silent: | |
logger.removeHandler(ch) | |
#logger.info("find_exons.py was called using these parameters:") | |
#logger.info(vars(args)) | |
all_genes = procede_gtf_files(args.gtf_genes, args.gtf_exons, args.genes, args.exact) | |
write_outputfile(all_genes, args.output_directory) | |
logger.info("find_exons needed %s seconds to generate the output" % (round(time.time() - start, 2))) | |
for handler in logger.handlers: | |
handler.close() | |
logger.removeFilter(handler) | |
if __name__ == "__main__": | |
main() |