Skip to content
Permalink
master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
# -*- coding: utf-8 -*-
"""
This script has been developed on x86_64 Linux (Debian 7.5 "Wheezy"):
Python 2.7.3 (lxml.etree 2.3.2)
Python 3.2.3 (lxml.etree 2.3.2)
Attention
This is a prototype implementation intended as reference for further
development or as blueprint for a re-implementation in a different
language.
"""
import os as os
import sys as sys
import platform as platf
import argparse as argparse
import logging as logging
import fnmatch as fnm
import traceback as trb
import difflib as diffl
import collections as col
import re as re
import shlex as slx
try:
from lxml import etree
from lxml.etree import XMLSyntaxError
except ImportError as ie:
sys.stderr.write('\n\nModule lxml is not properly installed - cannot execute.\n')
sys.stderr.write('Please refer to: http://lxml.de/installation.html\n')
sys.stderr.write('If it is installed, make sure it is in your PYTHONPATH\n\n')
raise ie
__author__ = 'Peter Ebert'
__contact__ = 'pebert@mpi-inf.mpg.de'
__version__ = '0.3'
# ==================================================================
# Configuration
# The following should be set in an external configuration file,
# but since this is only a prototype implementation to be deployed
# as a single file, we do it this way
# Specify paths in the XML tree for the relevant information
AMD_TO_XML_INFO = {'inputs': 'inputs/filetype/identifier',
'outputs': 'outputs/filetype/identifier',
'references': 'references/filetype/identifier',
'parameters': 'software/tool/command_line'}
AMD_TO_XML_COM = {'inputs': 'inputs/filetype/comment',
'outputs': 'outputs/filetype/comment',
'references': 'references/filetype/comment',
'parameters': 'software/tool/comment'}
PROCESSNAME = 'name'
PROCESSVER = 'version'
# Identify sections in the analysis metadata file
# if line starts with: [ or (
# see parse_amd() method
AMD_SECTION_RE = "[\[\(]" # used for reg exp matching
AMD_SECTION_STR = "[]()" # used to strip section heading clean
# Placeholders are enclosed in curly braces
# see check_tool_parameters() method
PLACEHOLDER = "\{.+\}"
# Specify section names (headings) to be expected in analysis metadata
# and the sections required to be present in a analysis metadata file
# see parse_amd() method
SECTION_NAMES = ['description', 'inputs', 'outputs', 'references', 'parameters', 'metrics']
SECTION_REQUIRED = ['description', 'inputs', 'outputs', 'references', 'parameters']
# The analysis metadata file is tab-separated since ' ' is too likely
# to appear in the value field
# see parse_amd() method
SPLIT_KEY_VAL = '\t'
# Specify where certain information is expected
# see check_analysis_metadata() method
PROCESS_NAME_IN = 'description'
FILETYPE_IDS_IN = ['inputs', 'outputs', 'references']
PARAMETER_IDS_IN = 'parameters'
COMMENTS_IN = ['inputs', 'outputs', 'references', 'parameters']
# ==================================================================
def set_logging(debug):
"""
Create two loggers as a simple workaround to strictly separate
stdout from stderr
:return: empty
:rtype: None
"""
logger = logging.getLogger('OutLog')
logger.setLevel(logging.DEBUG)
form = logging.Formatter('[%(levelname)s] %(message)s')
outhdl = logging.StreamHandler(stream=sys.stdout)
outhdl.setFormatter(form)
outhdl.setLevel(logging.INFO)
logger.addHandler(outhdl)
logger = logging.getLogger('ErrLog')
logger.setLevel(logging.DEBUG)
errhdl = logging.StreamHandler(stream=sys.stderr)
errhdl.setFormatter(form)
errhdl.setLevel(logging.DEBUG if debug else logging.ERROR)
logger.addHandler(errhdl)
return
def log_version_info():
"""
Only printed if debugging is set, to help track dubious
behavior potentially related to incompatible module versions
:return: empty
:rtype: None
"""
errlog().debug('==========')
errlog().debug('Python version: {}'.format(platf.python_version()))
errlog().debug('System: {} - {}'.format(platf.system(), platf.machine()))
errlog().debug('System detail: {}'.format(platf.platform()))
errlog().debug('lxml.etree version: {}'.format(etree.__version__))
errlog().debug('==========')
return
def errlog():
return logging.getLogger('ErrLog')
def outlog():
return logging.getLogger('OutLog')
def get_cmdline_args():
""" Define epilog text and parse command line arguments
:return: parsed command line arguments
:rtype: Namespace
"""
epilog_text = ' ==== IMPORTANT: char escaping in XML ====\n'
quot = ' \t" --> "\n'
apos = ' \t\' --> '\n'
less = ' \t< --> &lt;\n'
greater = ' \t> --> &gt;\n'
amper = ' \t& --> &amp;\n'
frame_bottom = ' ========================================='
reminder = quot + apos + less + greater + amper + frame_bottom
epilog_text += reminder
parser = argparse.ArgumentParser(add_help=True, epilog=epilog_text,
description=' Validate one or more XML process files according to\n'
' a supplied XSD schema file. If analysis metadata files\n'
' are provided, these will be validated against the process\n'
' XML files.',
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--version', '-v', action='version', version=__version__)
parser.add_argument('--debug', '-d', dest='debug', action='store_true', default=False,
help='Log debug (status) messages to stderr. Otherwise, only actual error messages will'
' print to stderr.')
parser.add_argument('--schema', '-s', required=True, type=str, dest='schema',
help='Full path to XSD schema file.')
parser.add_argument('--process', '-p', required=True, nargs='+', dest='process',
help='Full path to the XML process files to be validated against the XSD.')
parser.add_argument('--analysis', '-a', nargs='*', default=[], dest='analysis',
help='Full path to analysis metadata files to be validated against the XML.'
' Note that the first analysis metadata file will be validated against'
' the first process, the second against the second and so on. If there'
' is exactly one process file and many analysis metadata files, then all'
' of them will be checked against this process.')
parser.add_argument('--strict', dest='strict', action='store_true', default=False,
help='Turn warnings into errors; warning messages will print to stderr and, if applicable,'
' the exit status will be changed to 1.')
parser.add_argument('--annoying', dest='annoying', action='store_true', default=False,
help='Complain about empty comment fields in the process XML. Note that empty comments'
' will only result in errors and a non-zero exit status if --strict is set.')
parser.add_argument('--process-batch', dest='processbatch', action='store_true', default=False,
help='If set, the parameter(s) specified via --process/-p are assumed to be'
' top-level folders. The script will walk down these folders and collect'
' all .xml files and validate these. Note that validating analysis metadata'
' files against the collected process file(s) is not supported.')
parser.add_argument('--no-wildcards', '-n', dest='nowildcards', action='store_true', default=False,
help='If set, it is assumed that each placeholder in the command lines of the process'
' XML is matched exactly to a filetype identifier or parameter identifier in'
' the analysis metadata file. By default, the validator applies a heuristic'
' to match placeholders like "{*_common_suffix}" in the command line to'
' the names "file1_common_suffix", "file2_common_suffix" and so on listed'
' in the analysis metadata file.')
args = parser.parse_args()
return args
def collect_process_files(topdir):
"""
:param topdir:
:return:
"""
def err(x):
raise OSError('Error encountered at {} - original error: {}'.format(x.filename, str(x)))
processes = []
for root, dirs, files in os.walk(topdir,
followlinks=False,
onerror=err):
if files:
xml_files = fnm.filter(files, '*.xml')
for fn in xml_files:
processes.append(os.path.join(root, fn))
return processes
def check_file_identifiers(process, amd_section, xmlpath, checked):
"""
Check for (case-insensitive) exact matches between the filetype
identifiers in the process and the keys of the analysis metadata section.
This methods checks the strict requirement that all filetype identifiers
in the process XML must be present in the analysis metadata file.
:param amd_section: one analysis metadata section
:type: dict
:param process: the process XML
:type: etree._ElementTree
:param xmlpath: the path in the process XML for iterative checking
:type: str
:param checked: a set containing all file identifiers that have been found
:type: set
:return: zero if all identifiers were found
:rtype: int
"""
retcode = 0
for elem in process.iterfind(xmlpath):
identifier = re.compile(elem.text.strip(), re.IGNORECASE)
found = False
for k in amd_section.keys():
if identifier.search(k) is not None:
found = True
break
if found:
checked.add(elem.text.strip())
else:
errlog().error('In path {}, identifier unmatched: {}'.format(xmlpath, identifier.pattern))
retcode = 1
return retcode
def check_wildcard_placeholders(unmatched, parameters, ftype_ids):
"""
Apply a heuristic to match wildcard placeholders by
exhaustive search for longest substring matches between
the placeholders and the parameters/filetype identifiers.
Keeps only the longest matches that are matched by
at least two parameters.
:param unmatched: the unmatched placeholders from the process
:type: set
:param parameters: all parameters from the analysis metadata file
:type: set
:param ftype_ids: all filetype identifiers from the process
:type: set
:return: all placeholders found via wildcard matching
:rtype: set
"""
# simple brute force all-vs-all comparisons
found = set()
longest_matches = col.defaultdict(list)
to_check = parameters.union(ftype_ids)
for u in unmatched:
leng = len(u)
# from Python docs:
# SequenceMatcher computes and caches detailed information about the second sequence,
# so if you want to compare one sequence against many sequences, use set_seq2() to set
# the commonly used sequence once and call set_seq1() repeatedly,
# once for each of the other sequences.
sqm = diffl.SequenceMatcher(a='', b=u)
longest = 0
for c in to_check:
sqm.set_seq1(c) # that is seq 'a'
m = sqm.find_longest_match(0, len(c), 0, leng)
if m.size > 0 and m.size >= longest:
longest = m.size
longest_matches[u].append((longest, c))
# reduce to only longest matches
for plh, mlist in longest_matches.items():
lm = max([t[0] for t in mlist])
mlist = list(filter(lambda t: t[0] == lm, mlist))
longest_matches[plh] = mlist
# now simply check if a placeholder matches at least two parameter/file keys
# (otherwise, a wildcard notation would not be needed)
debuglist = col.defaultdict(list)
for plh, mlist in longest_matches.items():
if len(mlist) >= 2:
found.add(plh)
for tup in mlist:
debuglist[plh].append(tup[1])
errlog().debug('Found wildcard parameters: {}'.format(_pprint_debug_output(debuglist)))
return found
def _pprint_debug_output(mapping):
"""
:param mapping: the wildcard name mapped to all actual parameter names
:type: dict
:return: nicely formatted string
:rtype: str
"""
out = '\n'
for wildc, params in mapping.items():
out += wildc + ' : ' + ' | '.join(params) + '\n'
out = out.rstrip('\n')
return out
def check_tool_parameters(process, amd_section, xmlpath, ftype_ids, args):
"""
Check for (case-insensitive) exact matches between the parameter
identifiers in the process XML and the identifiers in the AMD. Note
that this requires that all parameter placeholder are enclosed in curly
braces. One the plus side, this ensures that all parameters specified
in the process XML are part of the analysis metadata file. However, if
the command line contains indeed characters enclosed in curly braces, e.g.
as part of regular expression, the leads to a false positive and either a
warning or an error message if the --strict option is set.
To avoid that filetype identifiers are taken to be parameters, all components
of the command lines are checked against the list of already identified
filetype ids.
:param amd_section: one analysis metadata section
:type: dict
:param process: the process XML
:type: etree._ElementTree
:param xmlpath: the path in the process XML for iterative checking
:type: str
:param ftype_ids: all filetype identifiers found so far
:type: set
:param args: command line parameters
:type: Namespace object
:return: zero if all identifiers were found
:rtype: int
"""
retcode = 0
placeholder = re.compile(PLACEHOLDER)
parameters = set(amd_section.keys())
unmatched = set()
for elem in process.iterfind(xmlpath):
components = slx.split(elem.text)
for c in components:
mobj = placeholder.search(c)
if mobj is not None:
param = c[mobj.start():mobj.end()].strip('{}')
if param in ftype_ids or param in parameters:
continue # is filetype placeholder or exact match
else:
if args.nowildcards:
errlog().error('Process parameter {} has no exact match in analysis metadata file'.format(param))
retcode = 1
else:
unmatched.add(param)
if len(unmatched) > 0:
errlog().debug('Checking for wildcard parameters')
found = check_wildcard_placeholders(unmatched, parameters, ftype_ids)
unmatched -= found
if len(unmatched) > 0:
if args.strict:
errlog().error('Unmatched parameters in process XML: {}'.format(unmatched))
retcode = 1
else:
outlog().warning('Potentially unmatched parameters in process XML: {}'.format(unmatched))
return retcode
def check_comments(process, xmlpath, strict):
"""
:param process: the process XML
:type: etree._ElementTree
:param xmlpath: the path in the XML tree to the comment tag
:type: str
:param strict: report missing comments as errors
:type: bool
:return: non-zero if comments were missing and strict is true
:type: int
"""
retcode = 0
for elem in process.iterfind(xmlpath):
comment = elem.text
# if there is no comment, elem.text is None (and not the empty string)
# thus we have to check multiple conditions
if comment is None or not comment.strip() or re.search("no\s+comment(s)?", comment.strip()) is not None:
msg = 'Missing comment in XML path: {}'.format(xmlpath)
if strict:
retcode = 1
errlog().error(msg)
else:
outlog().warning(msg)
return retcode
def check_analysis_metadata(process, processname, processver, analysis, args):
"""
Check if process name is in the description of the analysis
metadata file, if all file identifiers are found and if all parameters
of the command lines are present
:param process: the process XML
:type: etree._ElementTree
:param processname: the process name as inferred from the filename
:type: str
:param analysis: the analysis metadata file content
:type: dict
:param args: command line parameters
:type: Namespace object
:return: zero if metadata are valid
:rtype: int
"""
retcode = 0
errlog().debug('Checking analysis metadata')
proc_re = re.compile("^" + processname + "(v|ver|version)?" + processver + "$", re.IGNORECASE)
if not any([proc_re.search(v) is not None for v in analysis[PROCESS_NAME_IN].values()]):
errlog().error('Cannot find process name {}v{}'
' in analysis metadata section "{}"'.format(processname, processver, PROCESS_NAME_IN))
retcode = 1
else:
errlog().debug('Checking filetype identifiers')
ftype_ids = set()
for s in FILETYPE_IDS_IN:
retcode |= check_file_identifiers(process, analysis[s], AMD_TO_XML_INFO[s], ftype_ids)
errlog().debug('Checking tool parameters')
retcode |= check_tool_parameters(process, analysis[PARAMETER_IDS_IN],
AMD_TO_XML_INFO[PARAMETER_IDS_IN], ftype_ids, args)
errlog().debug('Check of analysis metadata complete')
return retcode
def parse_amd(filepath, strict):
"""
This parses the analysis metadata file into a dictionary.
Note that, if the syntax of the analysis metadata files is strict,
this task can be accomplished by Python's Configparser module; this
requires then all section headers to be enclosed in [ ]
This method ignores all lines starting with #, i.e. they are interpreted
as comments
:param filepath:
:type: str
:param strict: turn warnings into errors
:type: bool
:return: the analysis metadata
:rtype: dict
"""
analysis_dict = {}
sec_start = re.compile(AMD_SECTION_RE) # identify sections with common pattern
with open(filepath, 'r') as infile:
current_section = None
section_dict = {}
for line in infile:
if not line.strip() or line.startswith('#'):
continue
line = line.strip()
if sec_start.match(line): # assume new section starts
section_name = line.strip(AMD_SECTION_STR).lower()
if section_name in SECTION_NAMES:
if current_section:
analysis_dict.update({current_section: section_dict})
current_section = section_name
section_dict = {}
else:
if strict:
errlog().error('Unknown section: {}'.format(line))
else:
outlog().warning('Unknown section: {}'.format(line))
else:
try:
k, v = line.split(SPLIT_KEY_VAL)
except ValueError as ve:
errlog().error('Malformed line: {}'.format(line))
raise ve
section_dict.update({k.strip(): v.strip()})
# add last/active section - bug spotted by Florian Schmidt
analysis_dict.update({current_section: section_dict})
reqs = set(SECTION_REQUIRED)
found = set(analysis_dict.keys())
missing = reqs - found
if missing:
errlog().error('Missing section in analysis metadata file: {}'.format(missing))
raise KeyError('Missing keys in analysis dict: {}'.format(missing))
return analysis_dict
def parse_xml(filepath, xmlparser):
"""
:param filepath:
:type: str
:param xmlparser: an XMLParser with encoding set according to XSD schema
:type: etree.XMLParser
:return: the process and its name
:rtype: etree._ElementTree
"""
try:
with open(filepath, 'rb') as xmlfile:
process = etree.parse(xmlfile, xmlparser)
return process
except (XMLSyntaxError, OSError) as err:
errlog().error('Cannot parse XML process file: {}'.format(filepath))
errlog().error('Error: {} - code {}'.format(err.msg, err.code))
errlog().error('Is the file readable with your user account?')
errlog().error('Is the file an XML version 1 file?')
errlog().error('Does the file contain invalid characters?'
' Please run "python mdvalid.py --help"'
' to see how to escape special characters in XML.')
raise err
def get_process_info(process):
"""
:param process: the parsed process XML
type: etree._ElementTree
:return: name and version of process
:rtype: str, str
"""
name = process.find(PROCESSNAME)
if name is None:
errlog().error('No process name defined in process XML')
raise XMLSyntaxError('Malformed process XML', None, None, None)
name = name.text
version = process.find(PROCESSVER)
if version is None:
errlog().error('No process version defined in process XML')
raise XMLSyntaxError('Malformed process XML', None, None, None)
version = version.text
name = name.strip()
try:
int(version)
except ValueError as ve:
errlog().error('Process version seems not to be a simple integer: {}'.format(version))
raise ve
return name, version
def validate_files(files, schema, encoding, args):
"""
:param files: the path to the XML process and the analysis metadata file
:type: tuple of str,str
:param schema:
:type: etree.XMLSchema
:param encoding: the document encoding as set in the XSD schema file
:type: str
:param args: the command line parameters
:type: Namespace object
:return: zero if all files are valid
:rtype: int
"""
retcode = 0
xmlfile, amdfile = files
xmlparser = etree.XMLParser(encoding=encoding, strip_cdata=False)
outlog().info('Checking: [XML] {} | [AMD] {}'.format(os.path.basename(xmlfile), os.path.basename(amdfile)))
try:
process = parse_xml(xmlfile, xmlparser)
processname, processver = get_process_info(process)
except Exception:
retcode = 1
else:
if schema.validate(process):
errlog().debug('XML file {} validates against schema'.format(os.path.basename(xmlfile)))
if args.annoying:
for c in COMMENTS_IN:
errlog().debug('Checking for comments')
retcode |= check_comments(process, AMD_TO_XML_COM[c], args.strict)
if amdfile:
analysis = parse_amd(amdfile, args.strict)
rc = check_analysis_metadata(process, processname, processver, analysis, args)
if rc == 1:
outlog().error('AMD file {} does not validate against process XML'.format(os.path.basename(amdfile)))
retcode |= rc
else:
errorlog = schema.error_log
errlog().error('XML file {} not valid: {}'.format(os.path.basename(xmlfile), errorlog.last_error))
retcode = 1
finally:
status = 'FAIL' if retcode > 0 else 'OK'
outlog().info('{}: [XML] {} | [AMD] {}\n====='.format(status, os.path.basename(xmlfile), os.path.basename(amdfile)))
return retcode
def create_schema(schema_path):
"""
:param schema_path:
:type: str
:return: XML schema and the file encoding
:rtype: etree.XMLSchema, str
"""
try:
with open(schema_path, 'rb') as xsdfile:
xsd_doc = etree.parse(xsdfile)
encoding = xsd_doc.docinfo.encoding
schema = etree.XMLSchema(xsd_doc)
return schema, encoding
except (XMLSyntaxError, OSError) as err:
errlog().error('Cannot parse XSD schema file: {}'.format(schema_path))
errlog().error('Is the file readable with your user account?')
errlog().error('Is the file an XSD schema file?')
raise err
def run():
"""
:return: return code
:rtype: int
"""
retcode = 0
args = get_cmdline_args()
set_logging(args.debug)
if args.debug:
log_version_info()
errlog().debug('Logging set')
try:
errlog().debug('Reading schema file and building parser...')
schema, encoding = create_schema(args.schema)
errlog().debug('XSD schema built')
# works only in Python 3.x
### for t in itertools.zip_longest(args.process, args.analysis, fillvalue='')
# so we make the cases explicit
if args.processbatch:
processes = []
for path in args.process:
assert os.path.isdir(path), 'This path is not a directory: {} -' \
' please specify only directories in --process-batch' \
' mode'.format(path)
subtree_proc = collect_process_files(path)
processes.extend(subtree_proc)
setattr(args, 'process', sorted(processes))
setattr(args, 'analysis', [])
if len(args.process) == len(args.analysis):
checklist = zip(args.process, args.analysis)
elif len(args.process) == 1 and len(args.analysis) > 1:
checklist = [(args.process[0], amd) for amd in args.analysis]
elif len(args.process) > len(args.analysis):
checklist = []
num_amd = len(args.analysis) - 1 # lists start at index 0
for i in range(len(args.process)):
amdf = '' if i > num_amd else args.analysis[i]
checklist.append((args.process[i], amdf))
else:
errlog().error('Combination of process and analysis metadata files cannot be handled.')
retcode = 1
checklist = []
for t in checklist:
retcode |= validate_files(t, schema, encoding, args)
except Exception as e:
errlog().error('====== Message of the exception')
errlog().error(str(e))
if args.debug:
errlog().error('====== Stack Traceback ======')
errlog().error(trb.format_exc())
errlog().error('=============================')
retcode = 1
finally:
return retcode
if __name__ == '__main__':
retcode = run()
sys.exit(retcode)