"""Utilities for loading and reasoning about unparsed tools in directories."""
import fnmatch
import glob
import logging
import os
import re
import sys
import yaml
from galaxy.util import checkers
from .parser import get_tool_source
from ..tools import loader
log = logging.getLogger(__name__)
PATH_DOES_NOT_EXIST_ERROR = "Could not load tools from path [%s] - this path does not exist."
PATH_AND_RECURSIVE_ERROR = "Cannot specify a single file and recursive."
LOAD_FAILURE_ERROR = "Failed to load tool with path %s."
TOOL_LOAD_ERROR = object()
TOOL_REGEX = re.compile(r"<tool\s")
YAML_EXTENSIONS = [".yaml", ".yml", ".json"]
CWL_EXTENSIONS = YAML_EXTENSIONS + [".cwl"]
EXCLUDE_WALK_DIRS = ['.hg', '.git', '.venv']
def load_exception_handler(path, exc_info):
"""Default exception handler for use by load_tool_elements_from_path."""
log.warning(LOAD_FAILURE_ERROR % path, exc_info=exc_info)
def _load_tools_from_path(
path,
load_exception_handler,
recursive,
register_load_errors,
loader_func,
enable_beta_formats,
):
loaded_objects = []
for possible_tool_file in find_possible_tools_from_path(
path,
recursive=recursive,
enable_beta_formats=enable_beta_formats,
):
try:
tool_element = loader_func(possible_tool_file)
loaded_objects.append((possible_tool_file, tool_element))
except Exception:
exc_info = sys.exc_info()
load_exception_handler(possible_tool_file, exc_info)
if register_load_errors:
loaded_objects.append((possible_tool_file, TOOL_LOAD_ERROR))
return loaded_objects
def looks_like_a_tool(path_or_uri_like, invalid_names=[], enable_beta_formats=False):
"""Quick check to see if a file looks like it may be a tool file.
Whether true in a strict sense or not, lets say the intention and
purpose of this procedure is to serve as a filter - all valid tools must
"looks_like_a_tool" but not everything that looks like a tool is actually
a valid tool.
invalid_names may be supplied in the context of the tool shed to quickly
rule common tool shed XML files.
"""
path = resolved_path(path_or_uri_like)
if path is UNRESOLVED_URI:
# Assume the path maps to a real tool.
return True
looks = False
if os.path.basename(path) in invalid_names:
return False
if looks_like_a_tool_xml(path):
looks = True
if not looks and enable_beta_formats:
for tool_checker in BETA_TOOL_CHECKERS.values():
if tool_checker(path):
looks = True
break
return looks
[docs]def is_a_yaml_with_class(path, classes):
"""Determine if a file is a valid YAML with a supplied ``class`` entry."""
if not _has_extension(path, YAML_EXTENSIONS):
return False
with open(path, "r") as f:
try:
as_dict = yaml.safe_load(f)
except Exception:
return False
if not isinstance(as_dict, dict):
return False
file_class = as_dict.get("class", None)
return file_class in classes
[docs]def looks_like_a_cwl_artifact(path, classes=None):
"""Quick check to see if a file looks like it may be a CWL artifact."""
if not _has_extension(path, CWL_EXTENSIONS):
return False
with open(path, "r") as f:
try:
as_dict = yaml.safe_load(f)
except Exception:
return False
if not isinstance(as_dict, dict):
return False
file_class = as_dict.get("class", None)
if classes is not None and file_class not in classes:
return False
file_cwl_version = as_dict.get("cwlVersion", None)
return file_cwl_version is not None
def _find_tool_files(path_or_uri_like, recursive, enable_beta_formats):
path = resolved_path(path_or_uri_like)
if path is UNRESOLVED_URI:
# Pass the URI through and assume it maps to a real tool.
return [path_or_uri_like]
is_file = not os.path.isdir(path)
if not os.path.exists(path):
raise Exception(PATH_DOES_NOT_EXIST_ERROR)
elif is_file and recursive:
raise Exception(PATH_AND_RECURSIVE_ERROR)
elif is_file:
return [os.path.abspath(path)]
else:
if enable_beta_formats:
if not recursive:
files = glob.glob(path + "/*")
else:
files = _find_files(path, "*")
else:
if not recursive:
files = glob.glob(path + "/*.xml")
else:
files = _find_files(path, "*.xml")
return [os.path.abspath(_) for _ in files]
def _has_extension(path, extensions):
return any(path.endswith(e) for e in extensions)
def _find_files(directory, pattern='*'):
if not os.path.exists(directory):
raise ValueError("Directory not found {}".format(directory))
matches = []
for root, dirnames, filenames in os.walk(directory):
# exclude some directories (like .hg) from traversing
dirnames[:] = [dir for dir in dirnames if dir not in EXCLUDE_WALK_DIRS]
for filename in filenames:
full_path = os.path.join(root, filename)
if fnmatch.filter([full_path], pattern):
matches.append(os.path.join(root, filename))
return matches
UNRESOLVED_URI = object()
def resolved_path(path_or_uri_like):
"""If this is a simple file path, return the path else UNRESOLVED_URI."""
if "://" not in path_or_uri_like:
return path_or_uri_like
elif path_or_uri_like.startswith("file://"):
return path_or_uri_like[len("file://"):]
else:
return UNRESOLVED_URI
BETA_TOOL_CHECKERS = {
'yaml': looks_like_a_tool_yaml,
'cwl': looks_like_a_tool_cwl,
}
__all__ = (
"find_possible_tools_from_path",
"is_a_yaml_with_class",
"is_tool_load_error",
"load_tool_elements_from_path",
"load_tool_sources_from_path",
"looks_like_a_cwl_artifact",
"looks_like_a_tool_cwl",
"looks_like_a_tool_xml",
"looks_like_a_tool_yaml",
)