Source code for sardana.macroserver.msparameter

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module contains the definition of the macroserver parameters for
macros"""

__all__ = ["WrongParam", "MissingParam", "SupernumeraryParam",
           "UnknownParamObj", "WrongParamType", "MissingRepeat",
           "SupernumeraryRepeat", "TypeNames", "Type", "ParamType",
           "ElementParamType", "ElementParamInterface",
           "AttrParamType", "AbstractParamTypes", "ParamDecoder"]

__docformat__ = 'restructuredtext'

from copy import deepcopy
from lxml import etree
from taurus.core.util.containers import CaselessDict

from sardana import ElementType, INTERFACES_EXPANDED
from sardana.sardanautils import is_non_str_seq
from sardana.macroserver.msbase import MSBaseObject
from sardana.macroserver.msexception import MacroServerException, \
    UnknownMacro, UnknownMacroLibrary


class OptionalParamClass(dict):
    def __init__(self, obj):
        super(OptionalParamClass, self).__init__(obj)
        attributes = dir(self)

        for attr in attributes:
            # items is necessary fo python 3.5 implementation of json
            if attr in ['__setattr__', 'raise_error', '__class__',
                        '__dict__', '__weakref__', 'items']:
                continue
            self.__setattr__(attr, self.raise_error)
        self.__setattr__ = self.raise_error

    def __repr__(self):
        return 'Optional'

    def raise_error(*args, **kwargs):
        raise RuntimeError('can not be accessed')


Optional = OptionalParamClass({'___optional_parameter__': True})


class WrongParam(MacroServerException):

    def __init__(self, *args):
        MacroServerException.__init__(self, *args)
        self.type = 'Wrong parameter'


class MissingParam(WrongParam):

    def __init__(self, *args):
        WrongParam.__init__(self, *args)
        self.type = 'Missing parameter'


class SupernumeraryParam(WrongParam):

    def __init__(self, *args):
        WrongParam.__init__(self, *args)
        self.type = 'Supernumerary parameter'


class UnknownParamObj(WrongParam):

    def __init__(self, *args):
        WrongParam.__init__(self, *args)
        self.type = 'Unknown parameter'


class WrongParamType(WrongParam):

    def __init__(self, *args):
        WrongParam.__init__(self, *args)
        self.type = 'Unknown parameter type'


class MissingRepeat(WrongParam):

    def __init__(self, *args):
        WrongParam.__init__(self, *args)
        self.type = 'Missing repeat'


class SupernumeraryRepeat(WrongParam):

    def __init__(self, *args):
        WrongParam.__init__(self, *args)
        self.type = 'Supernumerary repeat'


class TypeNames:
    """Class that holds the list of registered macro parameter types"""

    def __init__(self):
        self._type_names = {}
        self._pending_type_names = {}

    def addType(self, name):
        """Register a new macro parameter type"""
        setattr(self, name, name)
        self._type_names[name] = name
        if name in self._pending_type_names:
            del self._pending_type_names[name]

    def removeType(self, name):
        """remove a macro parameter type"""
        delattr(self, name)
        try:
            del self._type_names[name]
        except ValueError:
            pass

    def __str__(self):
        return str(list(self._type_names.keys()))

#    def __getattr__(self, name):
#        if name not in self._pending_type_names:
#            self._pending_type_names[name] = name
#        return self._pending_type_names[name]


# This instance of TypeNames is intended to provide access to types to the
# Macros in a "Type.Motor" fashion
Type = TypeNames()


[docs]class ParamType(MSBaseObject): All = 'All' # Capabilities ItemList = 'ItemList' ItemListEvents = 'ItemListEvents' capabilities = [] type_class = str def __init__(self, macro_server, name): MSBaseObject.__init__(self, name=name, full_name=name, macro_server=macro_server, elem_type=ElementType.ParameterType) def getName(self): return self.name def getObj(self, str_repr): return self.__class__.type_class(str_repr) @classmethod def hasCapability(cls, cap): return cap in cls.capabilities def serialize(self, *args, **kwargs): kwargs = MSBaseObject.serialize(self, *args, **kwargs) kwargs['composed'] = False return kwargs
class ElementParamType(ParamType): capabilities = ParamType.ItemList, ParamType.ItemListEvents def __init__(self, macro_server, name): ParamType.__init__(self, macro_server, name) def accepts(self, elem): return elem.getType() == self._name def getObj(self, name, pool=ParamType.All, cache=False): macro_server = self.macro_server if pool == ParamType.All: pools = macro_server.get_pools() else: pools = macro_server.get_pool(pool), for pool in pools: elem_info = pool.getObj(name, elem_type=self._name) if elem_info is not None and self.accepts(elem_info): return elem_info # not a pool object, maybe it is a macro server object (perhaps a macro # code or a macro library try: return macro_server.get_macro(name) except UnknownMacro: pass try: return macro_server.get_macro_lib(name) except UnknownMacroLibrary: pass # neither pool nor macroserver contains any element with this name raise UnknownParamObj('%s with name %s does not exist' % (self._name, name)) def getObjDict(self, pool=ParamType.All, cache=False): macro_server = self.macro_server objs = CaselessDict() if pool == ParamType.All: pools = macro_server.get_pools() else: pools = macro_server.get_pool(pool), for pool in pools: for elem_info in pool.getElements(): if self.accepts(elem_info): objs[elem_info.name] = elem_info macros = macro_server.get_macros() for macro_lib_name, macro_lib in list(macros.items()): if self.accepts(macro_lib): objs[macro_lib_name] = macro_lib for macro_name, macro in list(macro_server.get_macros().items()): if self.accepts(macro): objs[macro_name] = macro return objs def getObjListStr(self, pool=ParamType.All, cache=False): obj_dict = self.getObjDict(pool=pool, cache=cache) return list(obj_dict.keys()) def getObjList(self, pool=ParamType.All, cache=False): obj_dict = self.getObjDict(pool=pool, cache=cache) return list(obj_dict.values()) def serialize(self, *args, **kwargs): kwargs = ParamType.serialize(self, *args, **kwargs) kwargs['composed'] = True return kwargs class ElementParamInterface(ElementParamType): def __init__(self, macro_server, name): ElementParamType.__init__(self, macro_server, name) bases, doc = INTERFACES_EXPANDED.get(name) self._interfaces = bases def accepts(self, elem): elem_type = elem.getType() elem_interfaces = INTERFACES_EXPANDED.get(elem_type)[0] if elem_interfaces is None: return ElementParamType.accepts(self, elem) return self._name in elem_interfaces def getObj(self, name, pool=ParamType.All, cache=False): macro_server = self.macro_server if pool == ParamType.All: poolObjs = macro_server.get_pools() else: poolObjs = macro_server.get_pool(pool), # Here, we check if the element exist in various pools: # if only in one pool: we use this pool # if in >1 pool: check env var DefaultPool and use it # other cases raise exception elements = [] for poolObj in poolObjs: elem_info = poolObj.getElementWithInterface(name, self._name) if elem_info is not None and self.accepts(elem_info): elements.append(elem_info) if len(elements) == 0: msg = '{} with name {} does not exist in any Pool {}'.format( self._name, name, [p.name for p in poolObjs]) elif len(elements) == 1: # Element only exist in one Pool. We use it. return elements[0] elif len(elements) > 1: # Ambiguity, check for env var DefaultPool and # return the element in this pool if it exists try: default_pool = macro_server.get_default_pool() except RuntimeError as e: msg = ("Ambiguity detected. Element defined in more " "than one Pool (Hint: Try setting the DefaultPool " "environmental variable e.g. by executing " "`senv DefaultPool <poolName>` macro") raise RuntimeError(msg) from e for elem_pool in elements: if elem_pool.pool == default_pool.name: self.debug("Using element {} in {}".format( elem_pool.name, elem_pool.pool)) return elem_pool # Element not in DefaultPool msg = ("{} with name {} does not exist in {} but exists in other" "pools {} (Hint: Try changing the DefaultPool " "environmental variable e.g. by executing " "`senv DefaultPool <poolName>` macro".format( self._name, name, default_pool.name, [e.pool for e in elements])) # not a pool object, maybe it is a macro server object (perhaps a macro # class or a macro library try: return macro_server.get_macro(name) except UnknownMacro: if self._name == "MacroCode": msg += (" (Hint: if {} is not a typo, " "try adding macro library first " "e.g. by executing `addmaclib <macro module name>` " "macro)".format(name)) try: return macro_server.get_macro_lib(name) except UnknownMacroLibrary: if self._name == "MacroLibrary": msg += (" (Hint: if {0} is not a typo, " "try adding macro library first " "e.g. by executing `addmaclib {0}` " "macro)").format(name) # neither pool nor macroserver contains any element with this name raise UnknownParamObj(msg) def getObjDict(self, pool=ParamType.All, cache=False): macro_server = self.macro_server objs = CaselessDict() if macro_server.is_macroserver_interface(self._name): return macro_server.get_elements_with_interface(self._name) if pool == ParamType.All: pools = macro_server.get_pools() else: pools = macro_server.get_pool(pool), for pool in pools: elements = pool.getElementsWithInterface(self._name) for elem_info in list(elements.values()): if self.accepts(elem_info): objs[elem_info.name] = elem_info return objs def getObjListStr(self, pool=ParamType.All, cache=False): obj_dict = self.getObjDict(pool=pool, cache=cache) return list(obj_dict.keys()) def getObjList(self, pool=ParamType.All, cache=False): obj_dict = self.getObjDict(pool=pool, cache=cache) return list(obj_dict.values()) class AttrParamType(ParamType): pass AbstractParamTypes = ParamType, ElementParamType, ElementParamInterface, AttrParamType class ParamDecoder: def __init__(self, type_manager, params_def, raw_params): """Create ParamDecorder object and decode macro parameters :param type_manager: (sardana.macroserver.mstypemanager.TypeManager) type manager object :param params_def: list<list> macro parameter definition :param raw_params: (lxml.etree._Element or list) xml element representing macro with subelements representing parameters or list with parameter values """ self.type_manager = type_manager self.params_def = params_def self.raw_params = raw_params self.params = None self.decode() def decode(self): """Decode raw representation of parameters to parameters as passed to the prepare or run methods. """ # make a copy since in case of XML it could be necessary to modify # the raw_params - filter out elements different than params raw_params = deepcopy(self.raw_params) params_def = self.params_def # ignore other tags than "param" and "paramRepeat" # e.g. sequencer may create tags like "hookPlace" if isinstance(raw_params, etree._Element): for raw_param in raw_params: if not raw_param.tag in ("param", "paramrepeat"): raw_params.remove(raw_param) params = [] # check if too many parameters were passed len_params_def = len(params_def) if len(raw_params) > len_params_def: msg = ("%r are supernumerary with respect to definition" % raw_params[len_params_def:]) raise SupernumeraryParam(msg) # iterate over definition since missing values may just mean using # the default values for i, param_def in enumerate(params_def): try: raw_param = raw_params[i] except IndexError: raw_param = None obj = self.decodeNormal(raw_param, param_def) params.append(obj) self.params = params return self.params def decodeNormal(self, raw_param, param_def): """Decode and validate parameter :param raw_param: (lxml.etree._Element or list) xml element representing parameter :param param_def: (dict) parameter definition :return: (list): list with decoded parameter repetitions """ param_type = param_def["type"] name = param_def["name"] optional_param = False if isinstance(param_type, list): param = self.decodeRepeat(raw_param, param_def) else: type_manager = self.type_manager param_type = type_manager.getTypeObj(param_type) try: if isinstance(raw_param, etree._Element): value = raw_param.get("value") else: value = raw_param # None or [] indicates default value if value is None or (isinstance(value, list) and len(value) == 0): value = param_def['default_value'] if value is None: raise MissingParam("'%s' not specified" % name) elif value is Optional: param = None optional_param = True else: # cast to sting to fulfill with ParamType API param = param_type.getObj(str(value)) except ValueError as e: raise WrongParamType(str(e)) from e except UnknownParamObj as e: raise WrongParam(str(e)) from e if param is None and not optional_param: msg = 'Could not create %s parameter "%s" for "%s"' % \ (param_type.getName(), name, raw_param) raise WrongParam(msg) return param def decodeRepeat(self, raw_param_repeat, param_repeat_def): """Decode and validate repeat parameter :param raw_param_repeat: (lxml.etree._Element or list) xml element representing param repeat with subelements representing repetitions or list representing repetitions :param param_repeat_def: (dict) repeat parameter definition :return: (list): list with decoded parameter repetitions """ name = param_repeat_def['name'] param_type = param_repeat_def['type'] min_rep = param_repeat_def['min'] max_rep = param_repeat_def['max'] param_repeat = [] if raw_param_repeat is None: raw_param_repeat = param_repeat_def['default_value'] if raw_param_repeat is None: raw_param_repeat = [] len_rep = len(raw_param_repeat) if min_rep and len_rep < min_rep: msg = 'Found %d repetitions of param %s, min is %d' % \ (len_rep, name, min_rep) raise MissingRepeat(msg) if max_rep and len_rep > max_rep: msg = 'Found %d repetitions of param %s, max is %d' % \ (len_rep, name, max_rep) raise SupernumeraryRepeat(msg) # repeat params with only one member and only one repetition value are # allowed - encapsulate it in list and try to decode anyway; # for the moment this only works for non XML decoding but could be # extended in the future to support XML as well if not is_non_str_seq(raw_param_repeat)\ and not isinstance(raw_param_repeat, etree._Element): raw_param_repeat = [raw_param_repeat] for raw_repeat in raw_param_repeat: if len(param_type) > 1: repeat = [] for i, member_type in enumerate(param_type): try: member_raw = raw_repeat[i] except IndexError: member_raw = None member = self.decodeNormal(member_raw, member_type) repeat.append(member) else: # if the repeat parameter is composed of just one member # do not encapsulate it in list and pass directly the item if isinstance(raw_repeat, etree._Element): raw_repeat = raw_repeat[0] # check if one tries to decode repeat parameter of just one # member encapsulated in a list, empty lists are still allowed # to indicate default value elif isinstance(raw_repeat, list) and len(raw_repeat) > 0: msg = 'Repetitions of just one member must not be lists' raise WrongParam(msg) repeat = self.decodeNormal(raw_repeat, param_type[0]) param_repeat.append(repeat) return param_repeat def getParamList(self): return self.params def __iter__(self): return iter(self.params) def __getattr__(self, name): return getattr(self.params, name) class FlatParamDecoder: """Parameter decoder useful for macros with only one repeat parameter located at the very last place. It requires that the raw parameters are passed as a flat list of strings. """ def __init__(self, type_manager, params_def, raw_params): self.type_manager = type_manager self.params_def = params_def self.raw_params = raw_params self.params = None if not self.isPossible(params_def): msg = ("%s parameter definition is not compatible with" " FlatParamDecoder" % params_def) raise AttributeError(msg) self.decode() @staticmethod def isPossible(params_def): for param_def in params_def: param_type = param_def["type"] if isinstance(param_type, list): if param_def != params_def[-1]: # repeat parameter is not the last one # it won't be possible to decode it return False else: for sub_param_def in param_type: if isinstance(sub_param_def, list): # nested repeat parameter # it won't be possible to decode it return False return True def decode(self): params_def = self.params_def raw_params = self.raw_params _, self.params = self.decodeNormal(raw_params, params_def) return self.params def decodeNormal(self, raw_params, params_def): str_len = len(raw_params) obj_list = [] str_idx = 0 for i, par_def in enumerate(params_def): name = par_def['name'] type_class = par_def['type'] def_val = par_def['default_value'] if str_idx == str_len: if def_val is None: if not isinstance(type_class, list): raise MissingParam("'%s' not specified" % name) elif isinstance(type_class, list): min_rep = par_def['min'] if min_rep > 0: msg = "'%s' demands at least %d values" %\ (name, min_rep) raise WrongParam(msg) if not def_val is None: new_obj = def_val else: if isinstance(type_class, list): data = self.decodeRepeat(raw_params[str_idx:], par_def) dec_token, new_obj = data else: type_manager = self.type_manager type_name = type_class type_class = type_manager.getTypeClass(type_name) par_type = type_manager.getTypeObj(type_name) par_str = raw_params[str_idx] try: val = par_type.getObj(par_str) except ValueError as e: raise WrongParamType(str(e)) from e except UnknownParamObj as e: raise WrongParam(str(e)) from e if val is None: msg = 'Could not create %s parameter "%s" for "%s"' % \ (par_type.getName(), name, par_str) raise WrongParam(msg) dec_token = 1 new_obj = val str_idx += dec_token obj_list.append(new_obj) return str_idx, obj_list def decodeRepeat(self, raw_params, par_def): name = par_def['name'] param_def = par_def['type'] min_rep = par_def['min'] max_rep = par_def['max'] dec_token = 0 obj_list = [] rep_nr = 0 while dec_token < len(raw_params): if max_rep is not None and rep_nr == max_rep: break new_token, new_obj_list = self.decodeNormal(raw_params[dec_token:], param_def) dec_token += new_token if len(new_obj_list) == 1: new_obj_list = new_obj_list[0] obj_list.append(new_obj_list) rep_nr += 1 if rep_nr < min_rep: msg = 'Found %d repetitions of param %s, min is %d' % \ (rep_nr, name, min_rep) raise MissingRepeat(msg) return dec_token, obj_list def getParamList(self): return self.params def __iter__(self): return iter(self.params) def __getattr__(self, name): return getattr(self.params, name)