Source code for galaxy.tools.toolbox.watcher
import logging
import os.path
import threading
import time
try:
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
can_watch = True
except ImportError:
Observer = None
FileSystemEventHandler = object
PollingObserver = None
can_watch = False
from galaxy.util.hash_util import md5_hash_file
from galaxy.web.stack import register_postfork_function
log = logging.getLogger(__name__)
[docs]def get_observer_class(config_value, default, monitor_what_str):
"""
"""
config_value = config_value or default
config_value = str(config_value).lower()
if config_value in ("true", "yes", "on", "auto"):
expect_observer = True
observer_class = Observer
elif config_value == "polling":
expect_observer = True
observer_class = PollingObserver
elif config_value in ('false', 'no', 'off'):
expect_observer = False
observer_class = None
else:
message = "Unrecognized value for watch_tools config option: %s" % config_value
raise Exception(message)
if expect_observer and observer_class is None:
message = "Watchdog library unavailable, cannot monitor %s." % monitor_what_str
if config_value == "auto":
log.info(message)
else:
raise Exception(message)
return observer_class
[docs]def get_tool_conf_watcher(reload_callback, tool_cache=None):
return ToolConfWatcher(reload_callback=reload_callback, tool_cache=tool_cache)
[docs]def get_tool_data_dir_watcher(tool_data_tables, config):
config_value = getattr(config, "watch_tool_data_dir", None)
observer_class = get_observer_class(config_value, default="False", monitor_what_str="tool-data directory")
if observer_class is not None:
return ToolDataWatcher(observer_class, tool_data_tables=tool_data_tables)
else:
return NullWatcher()
[docs]def get_tool_watcher(toolbox, config):
config_value = getattr(config, "watch_tools", None)
observer_class = get_observer_class(config_value, default="False", monitor_what_str="tools")
if observer_class is not None:
return ToolWatcher(toolbox, observer_class=observer_class)
else:
return NullWatcher()
[docs]class ToolConfWatcher(object):
def __init__(self, reload_callback, tool_cache=None):
self.paths = {}
self.cache = tool_cache
self._active = False
self._lock = threading.Lock()
self.thread = threading.Thread(target=self.check, name="ToolConfWatcher.thread")
self.thread.daemon = True
self.reload_callback = reload_callback
[docs] def start(self):
if not self._active:
self._active = True
register_postfork_function(self.thread.start)
[docs] def check(self):
"""Check for changes in self.paths or self.cache and call the event handler."""
hashes = {key: None for key in self.paths.keys()}
while self._active:
do_reload = False
with self._lock:
paths = list(self.paths.keys())
for path in paths:
try:
if not os.path.exists(path):
continue
mod_time = self.paths[path]
if not hashes.get(path, None):
hash = md5_hash_file(path)
if hash:
hashes[path] = md5_hash_file(path)
else:
continue
new_mod_time = os.path.getmtime(path)
if new_mod_time > mod_time:
new_hash = md5_hash_file(path)
if hashes[path] != new_hash:
self.paths[path] = new_mod_time
hashes[path] = new_hash
log.debug("The file '%s' has changes.", path)
do_reload = True
except IOError:
# in rare cases `path` may be deleted between `os.path.exists` calls
# and reading the file from the filesystem. We do not want the watcher
# thread to die in these cases.
try:
del hashes[path]
del paths[path]
except KeyError:
pass
if self.cache:
self.cache.cleanup()
do_reload = True
if not do_reload and self.cache:
removed_ids = self.cache.cleanup()
if removed_ids:
do_reload = True
if do_reload:
self.reload_callback()
time.sleep(1)
[docs] def monitor(self, path):
mod_time = None
if os.path.exists(path):
mod_time = os.path.getmtime(path)
with self._lock:
self.paths[path] = mod_time
if not self._active:
self.start()
[docs] def watch_file(self, tool_conf_file):
self.monitor(tool_conf_file)
if not self._active:
self.start()
[docs]class ToolWatcher(object):
def __init__(self, toolbox, observer_class):
self.toolbox = toolbox
self.tool_file_ids = {}
self.tool_dir_callbacks = {}
self.monitored_dirs = {}
self.observer = observer_class()
self.event_handler = ToolFileEventHandler(self)
self.start()
[docs] def watch_file(self, tool_file, tool_id):
tool_file = os.path.abspath(tool_file)
self.tool_file_ids[tool_file] = tool_id
tool_dir = os.path.dirname(tool_file)
if tool_dir not in self.monitored_dirs:
self.monitored_dirs[tool_dir] = tool_dir
self.monitor(tool_dir)
[docs] def watch_directory(self, tool_dir, callback):
tool_dir = os.path.abspath(tool_dir)
self.tool_dir_callbacks[tool_dir] = callback
if tool_dir not in self.monitored_dirs:
self.monitored_dirs[tool_dir] = tool_dir
self.monitor(tool_dir)
[docs]class ToolDataWatcher(object):
def __init__(self, observer_class, tool_data_tables):
self.tool_data_tables = tool_data_tables
self.monitored_dirs = {}
self.path_hash = {}
self.observer = observer_class()
self.event_handler = LocFileEventHandler(self)
self.start()
[docs] def watch_directory(self, tool_data_dir):
tool_data_dir = os.path.abspath(tool_data_dir)
if tool_data_dir not in self.monitored_dirs:
self.monitored_dirs[tool_data_dir] = tool_data_dir
self.monitor(tool_data_dir)
[docs]class LocFileEventHandler(FileSystemEventHandler):
def __init__(self, loc_watcher):
self.loc_watcher = loc_watcher
def _handle(self, event):
# modified events will only have src path, move events will
# have dest_path and src_path but we only care about dest. So
# look at dest if it exists else use src.
path = getattr(event, 'dest_path', None) or event.src_path
path = os.path.abspath(path)
if path.endswith(".loc"):
cur_hash = md5_hash_file(path)
if cur_hash:
if self.loc_watcher.path_hash.get(path) == cur_hash:
return
else:
time.sleep(0.5)
if cur_hash != md5_hash_file(path):
# We're still modifying the file, it'll be picked up later
return
self.loc_watcher.path_hash[path] = cur_hash
self.loc_watcher.tool_data_tables.reload_tables(path=path)
[docs]class ToolFileEventHandler(FileSystemEventHandler):
def __init__(self, tool_watcher):
self.tool_watcher = tool_watcher
def _handle(self, event):
# modified events will only have src path, move events will
# have dest_path and src_path but we only care about dest. So
# look at dest if it exists else use src.
path = getattr(event, 'dest_path', None) or event.src_path
path = os.path.abspath(path)
tool_id = self.tool_watcher.tool_file_ids.get(path, None)
if tool_id:
try:
self.tool_watcher.toolbox.reload_tool_by_id(tool_id)
except Exception:
pass
elif path.endswith(".xml"):
directory = os.path.dirname(path)
dir_callback = self.tool_watcher.tool_dir_callbacks.get(directory, None)
if dir_callback:
tool_file = event.src_path
tool_id = dir_callback(tool_file)
if tool_id:
self.tool_watcher.tool_file_ids[tool_file] = tool_id