Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
De-TE-ctor/detector.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
135 lines (96 sloc)
3.79 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import click | |
import os | |
import tempfile | |
import subprocess | |
import shlex | |
BUILD_DB_CMD = 'makeblastdb -dbtype prot -in %s -out %s' | |
RUN_BLAST_CMD = 'blastp -query %s -db %s -out %s -outfmt 6 -num_threads %d' | |
def read_blast_line(line): | |
""" | |
Parses a line of tabular blast output, returns a dict with the header | |
:param line: line from a blast file to parse | |
:return: dict with field names a keys and values the values from the line | |
""" | |
header = ['qseqid', 'sseqid', 'pident', 'length', 'mismatch', 'gapopen', 'qstart', 'qend', 'sstart', 'send', | |
'evalue', 'bitscore'] | |
types = [str, str, float, int, int, int, int, int, int, int, float, float] | |
return {h: t(v) for h, v, t in zip(header, line.strip().split(), types)} | |
@click.group() | |
def cli(): | |
""" | |
Detector can be used to detect transposable elements in a fasta file. | |
usage: detector <command> | |
potential commands: | |
build : build the database | |
blast : blasts a fasta file agains a database | |
analyze : checks the blast output if TEs are present | |
""" | |
pass | |
@cli.command() | |
@click.argument('path', type=click.Path(exists=True)) | |
@click.argument('db', type=click.Path()) | |
def build(path, db): | |
""" | |
Builds the blast database. | |
usage: detector build PATH DB | |
Fasta files in PATH should have the extension .fasta ! | |
The output from makeblastdb will be written to DB. | |
""" | |
click.secho("Building the database from FASTA files... ", fg='green', bold=True) | |
click.echo("Input path: %s" % path) | |
full_fasta = tempfile.mktemp() | |
fasta_count = 0 | |
# find fasta files and concatenate them to a temporary file | |
with open(full_fasta, 'w') as f_out: | |
for file in os.listdir(path): | |
if file.lower().endswith('.fasta'): | |
click.echo("\tFound file: %s" % file) | |
fasta_count += 1 | |
with (open(os.path.join(path, file), 'r')) as f_in: | |
for line in f_in: | |
f_out.write(line) | |
click.echo("Concatenated %d fasta files in temporary file %s" % (fasta_count, full_fasta)) | |
click.echo("\nCreating Blast database") | |
cmd = BUILD_DB_CMD % (full_fasta, db) | |
click.echo("Executing command: %s" % cmd) | |
subprocess.call(shlex.split(cmd)) | |
# delete temporary file | |
os.remove(full_fasta) | |
# done | |
click.secho("All done!", fg='green', bold=True) | |
@cli.command() | |
@click.option('--threads', default=1) | |
@click.argument('fasta', type=click.Path()) | |
@click.argument('db') | |
@click.argument('output', type=click.Path()) | |
def blast(fasta, db, output, threads): | |
""" | |
Runs Blast. | |
usage: detector blast FASTA DB OUTPUT | |
Blasts (blastp) the file defined by FASTA against a database DB and stores the output in OUTPUT | |
""" | |
click.secho("Running BLASTP... ", fg='green', bold=True) | |
cmd = RUN_BLAST_CMD % (fasta, db, output, threads) | |
click.echo("Executing command: %s" % cmd) | |
subprocess.call(shlex.split(cmd)) | |
# done | |
click.secho("All done!", fg='green', bold=True) | |
@cli.command() | |
@click.argument('blast_results', type=click.File()) | |
@click.argument('output', type=click.File(mode='w')) | |
def analyze(blast_results, output): | |
""" | |
Analyzes blast output | |
""" | |
click.secho("Analyzing file %s ... " % blast_results.name, fg='green', bold=True) | |
transposable_elements = set() | |
for line in blast_results: | |
data = read_blast_line(line) | |
if data['pident'] > 70 and data['evalue'] < 0.05 and data['length'] > 50: | |
transposable_elements.add(data['qseqid']) | |
if len(transposable_elements) > 0: | |
click.echo("Found %d potential TEs, writing list to %s ..." % (len(transposable_elements), output.name)) | |
for te in transposable_elements: | |
print(te, file=output) | |
# done | |
click.secho("All done!", fg='green', bold=True) | |