Source code for sardana.sardanamodulemanager

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Sardana library. It defines the base
classes for module manager"""




__all__ = ["ModuleManager"]

__docformat__ = 'restructuredtext'

import imp
import sys
import threading
import importlib

from taurus.core import ManagerState
from taurus.core.util.log import Logger
from taurus.core.util.singleton import Singleton

from .sardanamanager import SardanaIDManager

_MINIMUM_EXCLUDE = [
    'sys',
    'os.path',
    '__builtin__',
    '__main__',
    'sardana',
    'taurus',
    'PyTango'
]


class PathContext(object):

    def __init__(self, path):
        self.path = path

    def __enter__(self):
        self.orig_path = list(sys.path)
        if self.path is not None:
            sys.path = self.path + sys.path

    def __exit__(self, etype, evalue, etraceback):
        sys.path = self.orig_path


class PathManager(SardanaIDManager):

    def __init__(self):
        SardanaIDManager.__init__(self)
        self._orig_path = list(sys.path)
        self._path_lock = threading.Lock()
        #: dict<int(path_id), list<int(path start index), list<str(path)>>
        self._path_info = {}

    @staticmethod
    def _decode_path(path):
        p = []
        for item in path:
            p.extend(item.split(":"))
        return p

    def add_python_path(self, path):
        """Adds a new path to the python path.

        :param path:
            a sequence of strings each string may contain an absolute path or a
            list of ":" or "\n" separated absolute paths
        :type path: list<str>
        :return:
            a path id identifying the changes that were made to sys.path. This
            ID can be used in :meth:`remove_path` to remove only the added path
        :rtype: int"""
        path = self._decode_path(path)
        path_len = len(path)
        pif = self._path_info

        with self._path_lock:
            path_id = self.get_new_id()

            for _, p_info in list(pif.items()):
                p_info[0] += path_len

            pif[path_id] = [0, path]
            sys.path = path + sys.path
        return path_id

    def remove_python_path(self, path_id):
        """Removes the path given by the path_id

        :param path_id:
            a path id identifying specific changes that were made to sys.path
        :type path_id: int"""
        pif = self._path_info
        start, path = pif[path_id]
        path_len = len(path)

        with self._path_lock:
            sys.path = sys.path[:start + 1] + sys.path[start + path_len:]
            del pif[path_id]

    def reset_python_path(self):
        with self._path_lock:
            sys.path = list(self._orig_path)
            self._path_info = {}


[docs]class ModuleManager(Singleton, Logger): """This class handles python module loading/reloading and unloading.""" def __init__(self): """ Initialization. Nothing to be done here for now.""" pass
[docs] def init(self, *args, **kwargs): """Singleton instance initialization.""" name = self.__class__.__name__ self._state = ManagerState.UNINITIALIZED self._path_manager = PathManager() self.call__init__(Logger, name) self.reInit()
[docs] def reInit(self): if self._state == ManagerState.INITED: return # dict<str, module> # key - module name (without path and without extension) # value - python module object reference self._modules = {} self._state = ManagerState.INITED
[docs] def cleanUp(self): if self._state == ManagerState.CLEANED: return self.unloadModules() self._modules = None self._state = ManagerState.CLEANED
[docs] def reset_python_path(self): return self._path_manager.reset_python_path()
[docs] def remove_python_path(self, path_id): return self._path_manager.remove_python_path(path_id)
[docs] def add_python_path(self, path): return self._path_manager.add_python_path(path)
[docs] def findFullModuleName(self, module_name, path=None): mfile = None try: mfile, pathname, _ = imp.find_module(module_name, path) finally: if mfile is not None: mfile.close() return pathname
[docs] def isValidModule(self, module_name, path=None): """ Method to verify is a module is loadable. """ m, mfile = None, None fake_name = "_" + module_name + "_" try: mfile, pathname, desc = imp.find_module(module_name, path) self.info("(re)loading module %s...", module_name) m = imp.load_module(fake_name, mfile, pathname, desc) except Exception as e: msg = "Invalid module %s" % module_name # special treatment of HKL controller - see sardana#394 # to be reconsidered after implementing SEP19 if (isinstance(e, ModuleNotFoundError) and module_name == "HklPseudoMotorController"): msg += ". The dependency package libhkl5 is probably not installed." msg += " Safe to ignore if you're not using this controller." self.warning(msg) else: self.error(msg) self.debug("Details:", exc_info=1) return False, sys.exc_info() finally: if mfile is not None: mfile.close() del sys.modules[fake_name] return True, None
[docs] def reloadModule(self, module_name, path=None, reload=True): """Loads/reloads the given module name""" valid, _ = self.isValidModule(module_name, path) if not valid: return None if not reload: return self.loadModule(module_name, path=path) self.unloadModule(module_name) m, mfile = None, None try: mfile, pathname, desc = imp.find_module(module_name, path) self.info("(re)loading module %s...", module_name) m = imp.load_module(module_name, mfile, pathname, desc) except: self.error("Error (re)loading module %s", module_name) self.debug("Details:", exc_info=1) raise finally: if mfile is not None: mfile.close() if m is None: return None self._modules[module_name] = m return m
[docs] def deep_reload_module(self, module_name, path=None, exclude=None): if module_name in sys.modules: module = sys.modules[module_name] else: module = self.loadModule(module_name, path) excl = list(_MINIMUM_EXCLUDE) if exclude is not None: excl.extend(exclude) import sardana.util.deepreload with PathContext(path): try: self.info("deep reloading module %s...", module_name) sardana.util.deepreload.reload(module, excl) except: self.error("Error deep reloading module %s", module_name) self.debug("Details:", exc_info=1) raise
[docs] def loadModule(self, module_name, path=None): """Loads the given module name. If the module has been already loaded into this python interpreter, nothing is done. :param module_name: the module to be loaded. :type module_name: :obj:`str` :param path: list of paths to look for modules [default: None] :type path: seq<str> or None :return: python module :raises: ImportError""" if module_name in sys.modules: return sys.modules[module_name] with PathContext(path): self.info("loading module %s...", module_name) try: module = importlib.import_module(module_name) except: self.error("Error loading module %s", module_name) self.debug("Details:", exc_info=1) raise self._modules[module_name] = module return module
[docs] def unloadModule(self, module_name): """Unloads the given module name""" if module_name in self._modules: self.debug("unloading module %s" % module_name) assert(module_name in sys.modules) self._modules.pop(module_name) del sys.modules[module_name]
[docs] def unloadModules(self, module_list=None): """Unloads the given module name""" modules = module_list or list(self._modules.keys()) for module in modules: self.unloadModule(module)
[docs] def getModule(self, module_name): """Returns the module object for the given module name""" m = self._modules.get(module_name) if m is None: m = self.reloadModule(module_name) return m
[docs] def getModuleNames(self): module_names = sorted(self._modules.keys()) return module_names