#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool library. It defines the class which
controls finding, loading/unloading of device pool controller plug-ins."""
__all__ = ["ControllerManager"]
__docformat__ = 'restructuredtext'
import os
import re
import sys
import copy
import types
import inspect
from typing import Sequence, Any, Tuple, Optional, List
from collections import OrderedDict
from taurus.core import ManagerState
from taurus.core.util.log import Logger
from taurus.core.util.singleton import Singleton
import sardana
from sardana.sardanamodulemanager import ModuleManager
from sardana.pool import controller
from sardana.pool.poolexception import UnknownController
from sardana.pool.poolmetacontroller import ControllerLibrary, ControllerClass
CONTROLLER_TEMPLATE = '''
class @controller_name@(MotorController):
"""This class representes a Sardana motor controller."""
ctrl_features = []
MaxDevice = 1024
ctrl_properties = {}
ctrl_attributes = {}
axis_attributes = {}
def __init__(self, inst, props, *args, **kwargs):
MotorController.__init__(self, inst, props, *args, **kwargs)
'''
[docs]
class ControllerManager(Singleton, Logger):
"""The singleton class responsible for managing controller plug-ins."""
DEFAULT_CONTROLLER_DIRECTORIES = 'poolcontrollers',
def __init__(self):
"""Initialization. Nothing to be done here for now."""
pass
[docs]
def init(self, *args, **kwargs):
"""Singleton instance initialization."""
name = self.__class__.__name__
self._state = ManagerState.UNINITIALIZED
self.call__init__(Logger, name)
self._pool = None
self.reInit()
[docs]
def reInit(self):
"""Singleton re-initialization."""
if self._state == ManagerState.INITED:
return
#: dict<str, metacontroller.ControllerLibray>
#: key - module name (without path and without extension)
#: value - ControllerLibrary object representing the module
self._modules = {}
#: dict<str, <metacontroller.ControllerClass>
#: key - controller name
#: value - ControllerClass object representing the controller
self._controller_dict = {}
#: list<str>
#: elements are absolute paths
self._controller_path = []
l = []
for _, klass in inspect.getmembers(controller, inspect.isclass):
if not issubclass(klass, controller.Controller):
continue
l.append(klass)
self._base_classes = l
self._state = ManagerState.INITED
[docs]
def cleanUp(self):
"""Singleton clean up."""
if self._state == ManagerState.CLEANED:
return
# if self._modules:
# ModuleManager().unloadModules(self._modules.keys())
self._controller_path = None
self._controller_dict = None
self._modules = None
self._state = ManagerState.CLEANED
[docs]
def set_pool(self, pool):
self._pool = pool
[docs]
def get_pool(self):
return self._pool
[docs]
def setControllerPath(self, controller_path: Sequence[str], reload=True):
"""Registers a new list of controller directories in this manager.
:param controller_path: a sequence of absolute paths where this
manager should look for controllers
.. warning::
as a consequence all the controller modules will be reloaded.
This means that if any reference to an old controller object was
kept it will refer to an old module (which could possibly generate
problems of type class A != class A)."""
p = []
for item in controller_path:
p.extend(item.split(os.pathsep))
# filter empty and commented paths
p = [i for i in p if i and not i.startswith("#")]
# add basic dummy controller directory(ies)
pool_dir = os.path.dirname(os.path.abspath(__file__))
for ctrl_dir in self.DEFAULT_CONTROLLER_DIRECTORIES:
ctrl_dir = os.path.join(pool_dir, ctrl_dir)
if ctrl_dir not in p:
p.append(ctrl_dir)
self._controller_path = p
controller_file_names = self._findControllerLibNames()
for mod_name, file_name in controller_file_names.items():
dir_name = os.path.dirname(file_name)
path = [dir_name]
try:
self.reloadControllerLib(mod_name, path, reload=reload)
except Exception:
pass
[docs]
def getControllerPath(self) -> Sequence[str]:
"""Returns the current sequence of absolute paths used to look for
controllers.
:return: sequence of absolute paths
"""
return self._controller_path
def _findControllerLibNames(self, path=None):
"""internal method"""
path = path or self.getControllerPath()
ret = OrderedDict()
for p in reversed(path):
try:
for f in os.listdir(p):
name, ext = os.path.splitext(f)
if not name[0].isalpha():
continue
if ext.endswith('py'):
ret[name] = os.path.abspath(os.path.join(p, f))
except:
self.debug("'%s' is not a valid path" % p)
return ret
def _fromNameToFileName(self, lib_name, path=None):
"""internal method"""
path = path or self.getControllerPath()[0]
f_name = lib_name
if not f_name.endswith('.py'):
f_name += '.py'
if os.path.isabs(f_name):
path, _ = os.path.split(f_name)
if not path in self.getControllerPath():
raise Exception("'%s' is not part of the PoolPath" % path)
else:
f_name = os.path.join(path, f_name)
return f_name
[docs]
def getOrCreateControllerLib(self, lib_name: str, controller_name: Optional[str] = None) -> Tuple[str, str, int]:
"""
Gets the exiting controller lib or creates a new controller lib file.
If name is not None, a controller template code for the given
controller name is appended to the end of the file.
:param lib_name: module name, python file name, or full file
name (with path)
:param controller_name: an optional controller name. If
given a controller template code is appended to the
end of the file [default: None, meaning no controller
code is added)
:return: a sequence with three items: full_filename, code, line number
line number is 0 if no controller is created or n representing
the first line of code for the given controller name.
"""
# if only given the module name
controller_lib = self.getControllerLib(lib_name)
if controller_name is None:
line_nb = 0
if controller_lib is None:
f_name, code = self.createControllerLib(lib_name), ''
else:
f_name = controller_lib.get_file_name()
f = open(f_name)
code = f.read()
f.close()
else:
# if given controller name
if controller_lib is None:
f_name, code, line_nb = self.createController(
lib_name, controller_name)
else:
controller = controller_lib.get_controller(controller_name)
if controller is None:
f_name, code, line_nb = self.createController(
lib_name, controller_name)
else:
_, line_nb = controller.getCode()
f_name = controller.getFileName()
f = open(f_name)
code = f.read()
f.close()
return [f_name, code, line_nb]
[docs]
def setControllerLib(self, lib_name: str, code: str):
"""Creates a new controller library file with the given name and code.
The new module is imported and becomes imediately available.
:param lib_name: name of the new library
:param code: python code of the new library"""
f_name = self._fromNameToFileName(lib_name)
f = open(f_name, 'w')
f.write(code)
f.flush()
f.close()
_, name = os.path.split(f_name)
mod, _ = os.path.splitext(name)
self.reloadControllerLib(mod)
[docs]
def createControllerLib(self, lib_name, path=None):
"""Creates a new empty controller library (python module)"""
f_name = self._fromNameToFileName(lib_name, path)
if os.path.exists(f_name):
raise Exception(
"Unable to create controller lib: '%s' already exists" % f_name)
f = open(f_name, 'w')
f.close()
return f_name
[docs]
def createController(self, lib_name, controller_name):
"""Creates a new controller"""
f_name = self._fromNameToFileName(lib_name)
create = not os.path.exists(f_name)
template = ''
if create:
template += 'from sardana.pool.controller import *\n\n'
line_nb = 4
else:
template += '\n'
t = open(f_name, 'rU')
line_nb = -1
for line_nb, _ in enumerate(t):
pass
line_nb += 3
t.close()
f = open(f_name, 'a+')
try:
dir_name = os.path.realpath(__file__)
dir_name = os.path.dirname(dir_name)
template_fname = 'controller_template.txt'
template_fname = os.path.join(dir_name, template_fname)
f_templ = open(template_fname, 'r')
template += f_templ.read()
f_templ.close()
except:
self.debug(
"Failed to open template controller file. Using simplified template")
template += CONTROLLER_TEMPLATE
if f_templ:
f_templ.close()
template = template.replace('@controller_name@', controller_name)
try:
f.write(template)
f.flush()
f.seek(0)
code = f.read()
finally:
f.close()
return f_name, code, line_nb
[docs]
def reloadController(self, controller_name: str, path: Optional[Sequence[str]] = None) -> None:
"""Reloads the module corresponding to the given controller name
:raises: :exc:`sardana.pool.poolexception.UnknownController`
in case the controller is unknown or :exc:`ImportError` if
the reload process is not successfull
:param controller_name: controller class name
:param path: a list of absolute path to search for libraries
[default: None, meaning the current ControllerPath
will be used]"""
self.reloadControllers([controller_name], path=path)
[docs]
def reloadControllers(self,
controller_names: Sequence[str],
path: Optional[Sequence[str]] = None
):
"""Reloads the modules corresponding to the given controller names
:raises: :exc:`sardana.pool.poolexception.UnknownController`
in case the controller is unknown or :exc:`ImportError` if
the reload process is not successful
:param controller_names: a list of controller class names
:param path: a list of absolute path to search for libraries
[default: None, meaning the current ControllerPath
will be used]"""
module_names = []
for controller_name in controller_names:
module_name = self.getControllerMetaClass(
controller_name).get_module_name()
module_names.append(module_name)
self.reloadControllerLibs(module_names, path=path)
[docs]
def reloadControllerLibs(self,
module_names: Sequence[str],
path: Optional[Sequence[str]] = None,
reload: bool = True
):
"""Reloads the given library(=module) names
:raises: :exc:`sardana.pool.poolexception.UnknownController`
in case the controller is unknown or :exc:`ImportError` if
the reload process is not successful
:param module_names: a list of module names
:param path: a list of absolute path to search for libraries
[default: None, meaning the current ControllerPath
will be used]"""
ret = []
for module_name in module_names:
try:
m = self.reloadControllerLib(module_name, path, reload=reload)
if m:
ret.append(m)
except:
self.info("Failed to reload controller library %s", module_name)
self.debug("Failed to reload controller library %s details",
module_name, exc_info=1)
return ret
[docs]
def reloadControllerLib(self, module_name: str, path: Optional[Sequence[str]] = None, reload: bool = True) -> sardana.pool.poolmetacontroller.ControllerLibrary:
"""Reloads the given library(=module) names
:raises: :exc:`sardana.pool.poolexception.UnknownController`
in case the controller is unknown or :exc:`ImportError` if
the reload process is not successful
:param module_name: controller library name (=python module name)
:param path: a list of absolute path to search for libraries
[default: None, meaning the current ControllerPath
will be used]
:return: the ControllerLib object for the reloaded controller lib
"""
path = path or self.getControllerPath()
# reverse the path order:
# more priority elements last. This way if there are repeated elements
# they first ones (lower priority) will be overwritten by the last ones
if path:
path = copy.copy(path)
path.reverse()
# if there was previous Controller Lib info remove it
if module_name in self._modules:
self._modules.pop(module_name)
m, exc_info = None, None
try:
m = ModuleManager().reloadModule(module_name, path, reload=reload)
except:
exc_info = sys.exc_info()
controller_lib = None
params = dict(module=m, name=module_name, pool=self.get_pool())
if m is None or exc_info is not None:
params['exc_info'] = exc_info
controller_lib = ControllerLibrary(**params)
self._modules[module_name] = controller_lib
else:
controller_lib = ControllerLibrary(**params)
lib_contains_controllers = False
abs_file = controller_lib.file_path
for _, klass in inspect.getmembers(m, inspect.isclass):
if issubclass(klass, controller.Controller):
# if it is a class defined in some other class forget it to
# avoid replicating the same controller in different
# controller files
# use normcase to treat case insensitivity of paths on
# certain platforms e.g. Windows
if os.path.normcase(inspect.getabsfile(klass)) !=\
os.path.normcase(abs_file):
continue
lib_contains_controllers = True
self.addController(controller_lib, klass)
if lib_contains_controllers:
self._modules[module_name] = controller_lib
return controller_lib
[docs]
def addController(self, controller_lib, klass):
"""Adds a new controller class"""
controller_name = klass.__name__
exists = controller_lib.has_controller(controller_name)
if exists:
action = "Updating"
else:
action = "Adding"
self.debug("%s controller %s" % (action, controller_name))
try:
controller_class = ControllerClass(pool=self.get_pool(),
lib=controller_lib, klass=klass)
#self._setControllerTypes(klass, controller_class)
controller_lib.add_controller(controller_class)
self._controller_dict[controller_name] = controller_class
except:
self.warning("Failed to add controller class %s", controller_name,
exc_info=1)
if exists:
action = "Updated"
else:
action = "Added"
self.debug("%s controller %s" % (action, controller_name))
[docs]
def getControllerNames(self):
return sorted(self._controller_dict.keys())
[docs]
def getControllerLibNames(self):
return sorted(self._modules.keys())
[docs]
def getControllerLibs(self, filter=None):
ret, expr = [], None
if filter is not None:
expr = re.compile(filter, re.IGNORECASE)
for name, lib in self._modules.items():
if lib.has_errors() or (expr is not None and expr.match(name) is None):
continue
ret.append(lib)
ret.sort()
return ret
[docs]
def getControllers(self, filter=None):
if filter is None:
return sorted(self._controller_dict.values())
expr = re.compile(filter, re.IGNORECASE)
ret = sorted([kls for n, kls in self._controller_dict.items()
if not expr.match(n) is None])
return ret
[docs]
def getControllerLib(self, name):
if os.path.isabs(name):
abs_file_name = name
for lib in list(self._modules.values()):
if lib.file_path == abs_file_name:
return lib
elif name.count(os.path.extsep):
file_name = name
for lib in list(self._modules.values()):
if lib.file_name == file_name:
return lib
module_name = name
return self._modules.get(module_name)
[docs]
def getControllerClass(self, controller_name):
return self.getControllerMetaClass(controller_name).klass
def _getPlainControllerInfo(self, controller_names):
ret = []
for controller_name in controller_names:
controller_class = self.getControllerMetaClass(controller_name)
if controller_class is not None:
ret += controller_class.getInfo()
return ret
[docs]
def decodeControllerParameters(self, in_par_list):
if len(in_par_list) == 0:
raise RuntimeError('Controller name not specified')
controller_name_or_klass = in_par_list[0]
controller_class = controller_name_or_klass
if isinstance(controller_class, str):
controller_class = self.getControllerClass(controller_class)
if controller_class is None:
raise UnknownController("Unknown controller %s" %
controller_name_or_klass)
from sardana.macroserver.msparameter import ParamDecoder
out_par_list = ParamDecoder(controller_class, in_par_list)
return controller_class, in_par_list, out_par_list
[docs]
def strControllerParamValues(self, par_list: List[str]) -> List[str]:
"""Creates a short string representation of the parameter values list.
:param par_list: list of strings representing the parameter values.
:return: a list containning an abreviated version of the par_list
argument.
"""
ret = []
for p in par_list:
param_str = str(p)
if len(param_str) > 9:
param_str = param_str[:9] + "..."
ret.append(param_str)
return ret