#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module contains the definition of the macroserver parameters for
macros"""
__all__ = ["WrongParam", "MissingParam", "SupernumeraryParam",
"UnknownParamObj", "WrongParamType", "MissingRepeat",
"SupernumeraryRepeat", "TypeNames", "Type", "ParamType",
"ElementParamType", "ElementParamInterface",
"AttrParamType", "AbstractParamTypes", "ParamDecoder"]
__docformat__ = 'restructuredtext'
from copy import deepcopy
from lxml import etree
from taurus.core.util.containers import CaselessDict
from sardana import ElementType, INTERFACES_EXPANDED
from sardana.sardanautils import is_non_str_seq
from sardana.macroserver.msbase import MSBaseObject
from sardana.macroserver.msexception import MacroServerException, \
UnknownMacro, UnknownMacroLibrary
class OptionalParamClass(dict):
def __init__(self, obj):
super(OptionalParamClass, self).__init__(obj)
attributes = dir(self)
for attr in attributes:
# items is necessary fo python 3.5 implementation of json
if attr in ['__setattr__', 'raise_error', '__class__',
'__dict__', '__weakref__', 'items']:
continue
self.__setattr__(attr, self.raise_error)
self.__setattr__ = self.raise_error
def __repr__(self):
return 'Optional'
def raise_error(*args, **kwargs):
raise RuntimeError('can not be accessed')
Optional = OptionalParamClass({'___optional_parameter__': True})
class WrongParam(MacroServerException):
def __init__(self, *args):
MacroServerException.__init__(self, *args)
self.type = 'Wrong parameter'
class MissingParam(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = 'Missing parameter'
class SupernumeraryParam(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = 'Supernumerary parameter'
class UnknownParamObj(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = 'Unknown parameter'
class WrongParamType(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = 'Unknown parameter type'
class MissingRepeat(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = 'Missing repeat'
class SupernumeraryRepeat(WrongParam):
def __init__(self, *args):
WrongParam.__init__(self, *args)
self.type = 'Supernumerary repeat'
class TypeNames:
"""Class that holds the list of registered macro parameter types"""
def __init__(self):
self._type_names = {}
self._pending_type_names = {}
def addType(self, name):
"""Register a new macro parameter type"""
setattr(self, name, name)
self._type_names[name] = name
if name in self._pending_type_names:
del self._pending_type_names[name]
def removeType(self, name):
"""remove a macro parameter type"""
delattr(self, name)
try:
del self._type_names[name]
except ValueError:
pass
def __str__(self):
return str(list(self._type_names.keys()))
# def __getattr__(self, name):
# if name not in self._pending_type_names:
# self._pending_type_names[name] = name
# return self._pending_type_names[name]
# This instance of TypeNames is intended to provide access to types to the
# Macros in a "Type.Motor" fashion
Type = TypeNames()
[docs]
class ParamType(MSBaseObject):
All = 'All'
# Capabilities
ItemList = 'ItemList'
ItemListEvents = 'ItemListEvents'
capabilities = []
type_class = str
def __init__(self, macro_server, name):
MSBaseObject.__init__(self, name=name, full_name=name,
macro_server=macro_server,
elem_type=ElementType.ParameterType)
def getName(self):
return self.name
def getObj(self, str_repr):
return self.__class__.type_class(str_repr)
@classmethod
def hasCapability(cls, cap):
return cap in cls.capabilities
def serialize(self, *args, **kwargs):
kwargs = MSBaseObject.serialize(self, *args, **kwargs)
kwargs['composed'] = False
return kwargs
class ElementParamType(ParamType):
capabilities = ParamType.ItemList, ParamType.ItemListEvents
def __init__(self, macro_server, name):
ParamType.__init__(self, macro_server, name)
def accepts(self, elem):
return elem.getType() == self._name
def getObj(self, name, pool=ParamType.All, cache=False):
macro_server = self.macro_server
if pool == ParamType.All:
pools = macro_server.get_pools()
else:
pools = macro_server.get_pool(pool),
for pool in pools:
elem_info = pool.getObj(name, elem_type=self._name)
if elem_info is not None and self.accepts(elem_info):
return elem_info
# not a pool object, maybe it is a macro server object (perhaps a macro
# code or a macro library
try:
return macro_server.get_macro(name)
except UnknownMacro:
pass
try:
return macro_server.get_macro_lib(name)
except UnknownMacroLibrary:
pass
# neither pool nor macroserver contains any element with this name
raise UnknownParamObj('%s with name %s does not exist' %
(self._name, name))
def getObjDict(self, pool=ParamType.All, cache=False):
macro_server = self.macro_server
objs = CaselessDict()
if pool == ParamType.All:
pools = macro_server.get_pools()
else:
pools = macro_server.get_pool(pool),
for pool in pools:
for elem_info in pool.getElements():
if self.accepts(elem_info):
objs[elem_info.name] = elem_info
macros = macro_server.get_macros()
for macro_lib_name, macro_lib in list(macros.items()):
if self.accepts(macro_lib):
objs[macro_lib_name] = macro_lib
for macro_name, macro in list(macro_server.get_macros().items()):
if self.accepts(macro):
objs[macro_name] = macro
return objs
def getObjListStr(self, pool=ParamType.All, cache=False):
obj_dict = self.getObjDict(pool=pool, cache=cache)
return list(obj_dict.keys())
def getObjList(self, pool=ParamType.All, cache=False):
obj_dict = self.getObjDict(pool=pool, cache=cache)
return list(obj_dict.values())
def serialize(self, *args, **kwargs):
kwargs = ParamType.serialize(self, *args, **kwargs)
kwargs['composed'] = True
return kwargs
class ElementParamInterface(ElementParamType):
def __init__(self, macro_server, name):
ElementParamType.__init__(self, macro_server, name)
bases, doc = INTERFACES_EXPANDED.get(name)
self._interfaces = bases
def accepts(self, elem):
elem_type = elem.getType()
elem_interfaces = INTERFACES_EXPANDED.get(elem_type)[0]
if elem_interfaces is None:
return ElementParamType.accepts(self, elem)
return self._name in elem_interfaces
def getObj(self, name, pool=ParamType.All, cache=False):
macro_server = self.macro_server
if pool == ParamType.All:
poolObjs = macro_server.get_pools()
else:
poolObjs = macro_server.get_pool(pool),
# Here, we check if the element exist in various pools:
# if only in one pool: we use this pool
# if in >1 pool: check env var DefaultPool and use it
# other cases raise exception
elements = []
for poolObj in poolObjs:
elem_info = poolObj.getElementWithInterface(name, self._name)
if elem_info is not None and self.accepts(elem_info):
elements.append(elem_info)
if len(elements) == 0:
msg = '{} with name {} does not exist in any Pool {}'.format(
self._name, name, [p.name for p in poolObjs])
elif len(elements) == 1:
# Element only exist in one Pool. We use it.
return elements[0]
elif len(elements) > 1:
# Ambiguity, check for env var DefaultPool and
# return the element in this pool if it exists
try:
default_pool = macro_server.get_default_pool()
except RuntimeError as e:
msg = ("Ambiguity detected. Element defined in more "
"than one Pool (Hint: Try setting the DefaultPool "
"environmental variable e.g. by executing "
"`senv DefaultPool <poolName>` macro")
raise RuntimeError(msg) from e
for elem_pool in elements:
if elem_pool.pool == default_pool.name:
self.debug("Using element {} in {}".format(
elem_pool.name, elem_pool.pool))
return elem_pool
# Element not in DefaultPool
msg = ("{} with name {} does not exist in {} but exists in other"
"pools {} (Hint: Try changing the DefaultPool "
"environmental variable e.g. by executing "
"`senv DefaultPool <poolName>` macro".format(
self._name, name, default_pool.name,
[e.pool for e in elements]))
# not a pool object, maybe it is a macro server object (perhaps a macro
# class or a macro library
try:
return macro_server.get_macro(name)
except UnknownMacro:
if self._name == "MacroCode":
msg += (" (Hint: if {} is not a typo, "
"try adding macro library first "
"e.g. by executing `addmaclib <macro module name>` "
"macro)".format(name))
try:
return macro_server.get_macro_lib(name)
except UnknownMacroLibrary:
if self._name == "MacroLibrary":
msg += (" (Hint: if {0} is not a typo, "
"try adding macro library first "
"e.g. by executing `addmaclib {0}` "
"macro)").format(name)
# neither pool nor macroserver contains any element with this name
raise UnknownParamObj(msg)
def getObjDict(self, pool=ParamType.All, cache=False):
macro_server = self.macro_server
objs = CaselessDict()
if macro_server.is_macroserver_interface(self._name):
return macro_server.get_elements_with_interface(self._name)
if pool == ParamType.All:
pools = macro_server.get_pools()
else:
pools = macro_server.get_pool(pool),
for pool in pools:
elements = pool.getElementsWithInterface(self._name)
for elem_info in list(elements.values()):
if self.accepts(elem_info):
objs[elem_info.name] = elem_info
return objs
def getObjListStr(self, pool=ParamType.All, cache=False):
obj_dict = self.getObjDict(pool=pool, cache=cache)
return list(obj_dict.keys())
def getObjList(self, pool=ParamType.All, cache=False):
obj_dict = self.getObjDict(pool=pool, cache=cache)
return list(obj_dict.values())
class AttrParamType(ParamType):
pass
AbstractParamTypes = ParamType, ElementParamType, ElementParamInterface, AttrParamType
class ParamDecoder:
def __init__(self, type_manager, params_def, raw_params):
"""Create ParamDecorder object and decode macro parameters
:param type_manager: (sardana.macroserver.mstypemanager.TypeManager)
type manager object
:param params_def: list<list> macro parameter definition
:param raw_params: (lxml.etree._Element or list) xml element
representing macro with subelements representing parameters or list
with parameter values
"""
self.type_manager = type_manager
self.params_def = params_def
self.raw_params = raw_params
self.params = None
self.decode()
def decode(self):
"""Decode raw representation of parameters to parameters as passed
to the prepare or run methods.
"""
# make a copy since in case of XML it could be necessary to modify
# the raw_params - filter out elements different than params
raw_params = deepcopy(self.raw_params)
params_def = self.params_def
# ignore other tags than "param" and "paramRepeat"
# e.g. sequencer may create tags like "hookPlace"
if isinstance(raw_params, etree._Element):
for raw_param in raw_params:
if not raw_param.tag in ("param", "paramrepeat"):
raw_params.remove(raw_param)
params = []
# check if too many parameters were passed
len_params_def = len(params_def)
if len(raw_params) > len_params_def:
msg = ("%r are supernumerary with respect to definition" %
raw_params[len_params_def:])
raise SupernumeraryParam(msg)
# iterate over definition since missing values may just mean using
# the default values
for i, param_def in enumerate(params_def):
try:
raw_param = raw_params[i]
except IndexError:
raw_param = None
obj = self.decodeNormal(raw_param, param_def)
params.append(obj)
self.params = params
return self.params
def decodeNormal(self, raw_param, param_def):
"""Decode and validate parameter
:param raw_param: (lxml.etree._Element or list) xml element
representing parameter
:param param_def: (dict) parameter definition
:return: (list): list with decoded parameter repetitions
"""
param_type = param_def["type"]
name = param_def["name"]
optional_param = False
if isinstance(param_type, list):
param = self.decodeRepeat(raw_param, param_def)
else:
type_manager = self.type_manager
param_type = type_manager.getTypeObj(param_type)
try:
if isinstance(raw_param, etree._Element):
value = raw_param.get("value")
else:
value = raw_param
# None or [] indicates default value
if value is None or (isinstance(value, list) and
len(value) == 0):
value = param_def['default_value']
if value is None:
raise MissingParam("'%s' not specified" % name)
elif value is Optional:
param = None
optional_param = True
else:
# cast to sting to fulfill with ParamType API
param = param_type.getObj(str(value))
except ValueError as e:
raise WrongParamType(str(e)) from e
except UnknownParamObj as e:
raise WrongParam(str(e)) from e
if param is None and not optional_param:
msg = 'Could not create %s parameter "%s" for "%s"' % \
(param_type.getName(), name, raw_param)
raise WrongParam(msg)
return param
def decodeRepeat(self, raw_param_repeat, param_repeat_def):
"""Decode and validate repeat parameter
:param raw_param_repeat: (lxml.etree._Element or list) xml element
representing param repeat with subelements representing repetitions
or list representing repetitions
:param param_repeat_def: (dict) repeat parameter definition
:return: (list): list with decoded parameter repetitions
"""
name = param_repeat_def['name']
param_type = param_repeat_def['type']
min_rep = param_repeat_def['min']
max_rep = param_repeat_def['max']
param_repeat = []
if raw_param_repeat is None:
raw_param_repeat = param_repeat_def['default_value']
if raw_param_repeat is None:
raw_param_repeat = []
len_rep = len(raw_param_repeat)
if min_rep and len_rep < min_rep:
msg = 'Found %d repetitions of param %s, min is %d' % \
(len_rep, name, min_rep)
raise MissingRepeat(msg)
if max_rep and len_rep > max_rep:
msg = 'Found %d repetitions of param %s, max is %d' % \
(len_rep, name, max_rep)
raise SupernumeraryRepeat(msg)
# repeat params with only one member and only one repetition value are
# allowed - encapsulate it in list and try to decode anyway;
# for the moment this only works for non XML decoding but could be
# extended in the future to support XML as well
if not is_non_str_seq(raw_param_repeat)\
and not isinstance(raw_param_repeat, etree._Element):
raw_param_repeat = [raw_param_repeat]
for raw_repeat in raw_param_repeat:
if len(param_type) > 1:
repeat = []
for i, member_type in enumerate(param_type):
try:
member_raw = raw_repeat[i]
except IndexError:
member_raw = None
member = self.decodeNormal(member_raw, member_type)
repeat.append(member)
else:
# if the repeat parameter is composed of just one member
# do not encapsulate it in list and pass directly the item
if isinstance(raw_repeat, etree._Element):
raw_repeat = raw_repeat[0]
# check if one tries to decode repeat parameter of just one
# member encapsulated in a list, empty lists are still allowed
# to indicate default value
elif isinstance(raw_repeat, list) and len(raw_repeat) > 0:
msg = 'Repetitions of just one member must not be lists'
raise WrongParam(msg)
repeat = self.decodeNormal(raw_repeat, param_type[0])
param_repeat.append(repeat)
return param_repeat
def getParamList(self):
return self.params
def __iter__(self):
return iter(self.params)
def __getattr__(self, name):
return getattr(self.params, name)
class FlatParamDecoder:
"""Parameter decoder useful for macros with only one repeat parameter
located at the very last place. It requires that the raw parameters are
passed as a flat list of strings.
"""
def __init__(self, type_manager, params_def, raw_params):
self.type_manager = type_manager
self.params_def = params_def
self.raw_params = raw_params
self.params = None
if not self.isPossible(params_def):
msg = ("%s parameter definition is not compatible with"
" FlatParamDecoder" % params_def)
raise AttributeError(msg)
self.decode()
@staticmethod
def isPossible(params_def):
for param_def in params_def:
param_type = param_def["type"]
if isinstance(param_type, list):
if param_def != params_def[-1]:
# repeat parameter is not the last one
# it won't be possible to decode it
return False
else:
for sub_param_def in param_type:
if isinstance(sub_param_def, list):
# nested repeat parameter
# it won't be possible to decode it
return False
return True
def decode(self):
params_def = self.params_def
raw_params = self.raw_params
_, self.params = self.decodeNormal(raw_params, params_def)
return self.params
def decodeNormal(self, raw_params, params_def):
str_len = len(raw_params)
obj_list = []
str_idx = 0
for i, par_def in enumerate(params_def):
name = par_def['name']
type_class = par_def['type']
def_val = par_def['default_value']
if str_idx == str_len:
if def_val is None:
if not isinstance(type_class, list):
raise MissingParam("'%s' not specified" % name)
elif isinstance(type_class, list):
min_rep = par_def['min']
if min_rep > 0:
msg = "'%s' demands at least %d values" %\
(name, min_rep)
raise WrongParam(msg)
if not def_val is None:
new_obj = def_val
else:
if isinstance(type_class, list):
data = self.decodeRepeat(raw_params[str_idx:], par_def)
dec_token, new_obj = data
else:
type_manager = self.type_manager
type_name = type_class
type_class = type_manager.getTypeClass(type_name)
par_type = type_manager.getTypeObj(type_name)
par_str = raw_params[str_idx]
try:
val = par_type.getObj(par_str)
except ValueError as e:
raise WrongParamType(str(e)) from e
except UnknownParamObj as e:
raise WrongParam(str(e)) from e
if val is None:
msg = 'Could not create %s parameter "%s" for "%s"' % \
(par_type.getName(), name, par_str)
raise WrongParam(msg)
dec_token = 1
new_obj = val
str_idx += dec_token
obj_list.append(new_obj)
return str_idx, obj_list
def decodeRepeat(self, raw_params, par_def):
name = par_def['name']
param_def = par_def['type']
min_rep = par_def['min']
max_rep = par_def['max']
dec_token = 0
obj_list = []
rep_nr = 0
while dec_token < len(raw_params):
if max_rep is not None and rep_nr == max_rep:
break
new_token, new_obj_list = self.decodeNormal(raw_params[dec_token:],
param_def)
dec_token += new_token
if len(new_obj_list) == 1:
new_obj_list = new_obj_list[0]
obj_list.append(new_obj_list)
rep_nr += 1
if rep_nr < min_rep:
msg = 'Found %d repetitions of param %s, min is %d' % \
(rep_nr, name, min_rep)
raise MissingRepeat(msg)
return dec_token, obj_list
def getParamList(self):
return self.params
def __iter__(self):
return iter(self.params)
def __getattr__(self, name):
return getattr(self.params, name)