Source code for galaxy.objectstore.rods

"""
Object Store plugin for the Integrated Rule-Oriented Data Store (iRODS)

The module is named rods to avoid conflicting with the PyRods module, irods
"""

import logging
import os
import time
from posixpath import (
    basename as path_basename,
    dirname as path_dirname,
    join as path_join
)

try:
    import irods
except ImportError:
    irods = None

from galaxy.exceptions import (
    ObjectInvalid,
    ObjectNotFound
)
from galaxy.util.path import safe_relpath
from ..objectstore import (
    DiskObjectStore,
    local_extra_dirs
)

IRODS_IMPORT_MESSAGE = ('The Python irods package is required to use this '
                        'feature, please install it')

log = logging.getLogger(__name__)


[docs]class IRODSObjectStore(DiskObjectStore): """ Galaxy object store based on iRODS """ def __init__(self, config, file_path=None, extra_dirs=None): super(IRODSObjectStore, self).__init__(config, file_path=file_path, extra_dirs=extra_dirs) assert irods is not None, IRODS_IMPORT_MESSAGE self.cache_path = config.object_store_cache_path self.default_resource = config.irods_default_resource or None # Connect to iRODS (AssertionErrors will be raised if anything goes wrong) self.rods_env, self.rods_conn = rods_connect() # if the root collection path in the config is unset or relative, try to use a sensible default if config.irods_root_collection_path is None or (config.irods_root_collection_path is not None and not config.irods_root_collection_path.startswith('/')): rods_home = self.rods_env.rodsHome assert rods_home != '', "Unable to initialize iRODS Object Store: rodsHome cannot be determined and irods_root_collection_path in Galaxy config is unset or not absolute." if config.irods_root_collection_path is None: self.root_collection_path = path_join(rods_home, 'galaxy_data') else: self.root_collection_path = path_join(rods_home, config.irods_root_collection_path) else: self.root_collection_path = config.irods_root_collection_path # will return a collection object regardless of whether it exists self.root_collection = irods.irodsCollection(self.rods_conn, self.root_collection_path) if self.root_collection.getId() == -1: log.warning("iRODS root collection does not exist, will attempt to create: %s", self.root_collection_path) self.root_collection.upCollection() assert self.root_collection.createCollection(os.path.basename(self.root_collection_path)) == 0, "iRODS root collection creation failed: %s" % self.root_collection_path self.root_collection = irods.irodsCollection(self.rods_conn, self.root_collection_path) assert self.root_collection.getId() != -1, "iRODS root collection creation claimed success but still does not exist" if self.default_resource is None: self.default_resource = self.rods_env.rodsDefResource log.info("iRODS data for this instance will be stored in collection: %s, resource: %s", self.root_collection_path, self.default_resource) def __get_rods_path(self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None, strip_dat=True, **kwargs): # extra_dir should never be constructed from provided data but just # make sure there are no shenannigans afoot if extra_dir and extra_dir != os.path.normpath(extra_dir): log.warning('extra_dir is not normalized: %s', extra_dir) raise ObjectInvalid("The requested object is invalid") # ensure that any parent directory references in alt_name would not # result in a path not contained in the directory path constructed here if alt_name: if not safe_relpath(alt_name): log.warning('alt_name would locate path outside dir: %s', alt_name) raise ObjectInvalid("The requested object is invalid") # alt_name can contain parent directory references, but iRODS will # not follow them, so if they are valid we normalize them out alt_name = os.path.normpath(alt_name) path = "" if extra_dir is not None: path = extra_dir # extra_dir_at_root is ignored - since the iRODS plugin does not use # the directory hash, there is only one level of subdirectory. if not dir_only: # the .dat extension is stripped when stored in iRODS # TODO: is the strip_dat kwarg the best way to implement this? if strip_dat and alt_name and alt_name.endswith('.dat'): alt_name = os.path.splitext(alt_name)[0] default_name = 'dataset_%s' % obj.id if not strip_dat: default_name += '.dat' path = path_join(path, alt_name if alt_name else default_name) path = path_join(self.root_collection_path, path) return path def __get_cache_path(self, obj, **kwargs): # FIXME: does not handle collections # FIXME: collisions could occur here return os.path.join(self.cache_path, path_basename(self.__get_rods_path(obj, strip_dat=False, **kwargs))) def __clean_cache_entry(self, obj, **kwargs): # FIXME: does not handle collections try: os.unlink(self.__get_cache_path(obj, **kwargs)) except OSError: # it is expected that we'll call this method a lot regardless of # whether we think the cached file exists pass def __get_rods_handle(self, obj, mode='r', **kwargs): if kwargs.get('dir_only', False): return irods.irodsCollection(self.rods_conn, self.__get_rods_path(obj, **kwargs)) else: return irods.irodsOpen(self.rods_conn, self.__get_rods_path(obj, **kwargs), mode) def __mkcolls(self, rods_path): """ An os.makedirs() for iRODS collections. `rods_path` is the desired collection to create. """ assert rods_path.startswith(self.root_collection_path + '/'), '__mkcolls(): Creating collections outside the root collection is not allowed (requested path was: %s)' % rods_path mkcolls = [] c = irods.irodsCollection(self.rods_conn, rods_path) while c.getId() == -1: assert c.getCollName().startswith(self.root_collection_path + '/'), '__mkcolls(): Attempted to move above the root collection: %s' % c.getCollName() mkcolls.append(c.getCollName()) c.upCollection() for collname in reversed(mkcolls): log.debug('Creating collection %s' % collname) ci = irods.collInp_t() ci.collName = collname status = irods.rcCollCreate(self.rods_conn, ci) assert status == 0, '__mkcolls(): Failed to create collection: %s' % collname @local_extra_dirs def exists(self, obj, **kwargs): doi = irods.dataObjInp_t() doi.objPath = self.__get_rods_path(obj, **kwargs) log.debug('exists(): checking: %s', doi.objPath) return irods.rcObjStat(self.rods_conn, doi) is not None @local_extra_dirs def create(self, obj, **kwargs): if not self.exists(obj, **kwargs): rods_path = self.__get_rods_path(obj, **kwargs) log.debug('create(): %s', rods_path) dir_only = kwargs.get('dir_only', False) # short circuit collection creation since most of the time it will # be the root collection which already exists collection_path = rods_path if dir_only else path_dirname(rods_path) if collection_path != self.root_collection_path: self.__mkcolls(collection_path) if not dir_only: # rcDataObjCreate is used instead of the irodsOpen wrapper so # that we can prevent overwriting doi = irods.dataObjInp_t() doi.objPath = rods_path doi.createMode = 0o640 doi.dataSize = 0 # 0 actually means "unknown", although literally 0 would be preferable irods.addKeyVal(doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource) status = irods.rcDataObjCreate(self.rods_conn, doi) assert status >= 0, 'create(): rcDataObjCreate() failed: %s: %s: %s' % (rods_path, status, irods.strerror(status)) @local_extra_dirs def empty(self, obj, **kwargs): assert 'dir_only' not in kwargs, 'empty(): `dir_only` parameter is invalid here' h = self.__get_rods_handle(obj, **kwargs) try: return h.getSize() == 0 except AttributeError: # h is None raise ObjectNotFound()
[docs] def size(self, obj, **kwargs): assert 'dir_only' not in kwargs, 'size(): `dir_only` parameter is invalid here' h = self.__get_rods_handle(obj, **kwargs) try: return h.getSize() except AttributeError: # h is None return 0
@local_extra_dirs def delete(self, obj, entire_dir=False, **kwargs): assert 'dir_only' not in kwargs, 'delete(): `dir_only` parameter is invalid here' rods_path = self.__get_rods_path(obj, **kwargs) # __get_rods_path prepends self.root_collection_path but we are going # to ensure that it's valid anyway for safety's sake assert rods_path.startswith(self.root_collection_path + '/'), 'ERROR: attempt to delete object outside root collection (path was: %s)' % rods_path if entire_dir: # TODO raise NotImplementedError() h = self.__get_rods_handle(obj, **kwargs) try: # note: PyRods' irodsFile.delete() does not set force status = h.delete() assert status == 0, '%d: %s' % (status, irods.strerror(status)) return True except AttributeError: log.warning('delete(): operation failed: object does not exist: %s', rods_path) except AssertionError as e: # delete() does not raise on deletion failure log.error('delete(): operation failed: %s', e) finally: # remove the cached entry (finally is executed even when the try # contains a return) self.__clean_cache_entry(self, obj, **kwargs) return False @local_extra_dirs def get_data(self, obj, start=0, count=-1, **kwargs): log.debug('get_data(): %s') h = self.__get_rods_handle(obj, **kwargs) try: h.seek(start) except AttributeError: raise ObjectNotFound() if count == -1: return h.read() else: return h.read(count) # TODO: make sure implicit close is okay, DiskObjectStore actually # reads data into a var, closes, and returns the var @local_extra_dirs def get_filename(self, obj, **kwargs): log.debug("get_filename(): called on %s %s. For better performance, avoid this method and use get_data() instead.", obj.__class__.__name__, obj.id) cached_path = self.__get_cache_path(obj, **kwargs) if not self.exists(obj, **kwargs): raise ObjectNotFound() # TODO: implement or define whether dir_only is valid if 'dir_only' in kwargs: raise NotImplementedError() # cache hit if os.path.exists(cached_path): return os.path.abspath(cached_path) # cache miss # TODO: thread this incoming_path = os.path.join(os.path.dirname(cached_path), "__incoming_%s" % os.path.basename(cached_path)) doi = irods.dataObjInp_t() doi.objPath = self.__get_rods_path(obj, **kwargs) doi.dataSize = 0 # TODO: does this affect performance? should we get size? doi.numThreads = 0 # TODO: might want to VERIFY_CHKSUM_KW log.debug('get_filename(): caching %s to %s', doi.objPath, incoming_path) # do the iget status = irods.rcDataObjGet(self.rods_conn, doi, incoming_path) # if incoming already exists, we'll wait for another process or thread # to finish caching if status != irods.OVERWRITE_WITHOUT_FORCE_FLAG: assert status == 0, 'get_filename(): iget %s failed (%s): %s' % (doi.objPath, status, irods.strerror(status)) # POSIX rename is atomic # TODO: rename without clobbering os.rename(incoming_path, cached_path) log.debug('get_filename(): cached %s to %s', doi.objPath, cached_path) # another process or thread is caching, wait for it while not os.path.exists(cached_path): # TODO: force restart after mod time > some configurable, or # otherwise deal with this potential deadlock and interrupted # transfers time.sleep(5) log.debug("get_filename(): waiting on incoming '%s' for %s %s", incoming_path, obj.__class__.__name__, obj.id) return os.path.abspath(cached_path) @local_extra_dirs def update_from_file(self, obj, file_name=None, create=False, **kwargs): assert 'dir_only' not in kwargs, 'update_from_file(): `dir_only` parameter is invalid here' # do not create if not requested if create and not self.exists(obj, **kwargs): raise ObjectNotFound() if file_name is None: file_name = self.__get_cache_path(obj, **kwargs) # put will create if necessary doi = irods.dataObjInp_t() doi.objPath = self.__get_rods_path(obj, **kwargs) doi.createMode = 0o640 doi.dataSize = os.stat(file_name).st_size doi.numThreads = 0 irods.addKeyVal(doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource) irods.addKeyVal(doi.condInput, irods.FORCE_FLAG_KW, '') # TODO: might want to VERIFY_CHKSUM_KW log.debug('update_from_file(): updating %s to %s', file_name, doi.objPath) # do the iput status = irods.rcDataObjPut(self.rods_conn, doi, file_name) assert status == 0, 'update_from_file(): iput %s failed (%s): %s' % (doi.objPath, status, irods.strerror(status))
[docs] def get_object_url(self, obj, **kwargs): return None
[docs] def get_store_usage_percent(self): return 0.0
# monkeypatch an strerror method into the irods module def _rods_strerror(errno): """ The missing `strerror` for iRODS error codes """ if not hasattr(irods, '__rods_strerror_map'): irods.__rods_strerror_map = {} for name in dir(irods): v = getattr(irods, name) if type(v) == int and v < 0: irods.__rods_strerror_map[v] = name return irods.__rods_strerror_map.get(errno, 'GALAXY_NO_ERRNO_MAPPING_FOUND') if irods is not None: irods.strerror = _rods_strerror
[docs]def rods_connect(): """ A basic iRODS connection mechanism that connects using the current iRODS environment """ status, env = irods.getRodsEnv() assert status == 0, 'connect(): getRodsEnv() failed (%s): %s' % (status, irods.strerror(status)) conn, err = irods.rcConnect(env.rodsHost, env.rodsPort, env.rodsUserName, env.rodsZone) assert err.status == 0, 'connect(): rcConnect() failed (%s): %s' % (err.status, err.msg) status, pw = irods.obfGetPw() assert status == 0, 'connect(): getting password with obfGetPw() failed (%s): %s' % (status, irods.strerror(status)) status = irods.clientLoginWithObfPassword(conn, pw) assert status == 0, 'connect(): logging in with clientLoginWithObfPassword() failed (%s): %s' % (status, irods.strerror(status)) return env, conn