#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module contains the class definition for the macro server door"""
__all__ = ["MacroProxy", "BaseInputHandler", "MSDoor"]
__docformat__ = 'restructuredtext'
import weakref
import collections
from typing import Any, Optional, Dict
from taurus.core.util.log import Logger
from sardana import ElementType
from sardana.sardanaevent import EventType
from sardana.macroserver.macro import Macro
from sardana.macroserver.msbase import MSObject
from sardana.macroserver.msparameter import Type
from sardana.macroserver.msexception import MacroServerException
from sardana.taurus.core.tango.sardana.motion import Motion
class MacroProxy(object):
def __init__(self, door, macro_meta):
self._door = weakref.ref(door)
self._macro_meta = weakref.ref(macro_meta)
@property
def door(self):
return self._door()
@property
def macro_info(self):
return self._macro_meta()
def __call__(self, *args, **kwargs):
door = self.door
parent_macro = door.get_running_macro()
parent_macro.syncLog()
executor = parent_macro.executor
opts = dict(parent_macro=parent_macro, executor=executor)
kwargs.update(opts)
eargs = [self.macro_info.name]
eargs.extend(args)
return parent_macro.execMacro(*eargs, **kwargs)
class MacroProxyCache(dict):
def __init__(self, door):
self._door = weakref.ref(door)
self.rebuild()
@property
def door(self):
return self._door()
def rebuild(self):
self.clear()
door = self.door
macros = self.door.get_macros()
for macro_name, macro_meta in list(macros.items()):
self[macro_name] = MacroProxy(door, macro_meta)
class BaseInputHandler(object):
def __init__(self):
self._input = input
def input(self, input_data=None):
if input_data is None:
input_data = {}
prompt = input_data.get('prompt')
if prompt is None:
return self._input()
else:
return self._input(prompt)
[docs]
class MSDoor(MSObject):
"""Sardana door object"""
def __init__(self, **kwargs):
self._state = Macro.Ready
self._status = None
self._result = None
self._macro_status = None
self._record_data = None
self._macro_proxy_cache = None
self._input_handler = BaseInputHandler()
self._pylab_handler = None
kwargs['elem_type'] = ElementType.Door
MSObject.__init__(self, **kwargs)
# macros may use a custom log level output, add it to the Logger class
if not hasattr(self, "Output"):
Logger.addLevelName(15, "OUTPUT")
def output(loggable, msg, *args, **kw):
loggable.getLogObj().log(Logger.Output, msg, *args, **kw)
Logger.output = output
if not hasattr(self, "Result"):
Logger.addLevelName(18, "RESULT")
def get_macro_executor(self):
return self.macro_server.macro_manager.getMacroExecutor(self)
macro_executor = property(get_macro_executor)
def get_running_macro(self):
return self.macro_executor.getRunningMacro()
running_macro = property(get_running_macro)
def get_last_macro(self):
return self.macro_executor.getLastMacro()
last_macro = property(get_last_macro)
def get_macro_data(self):
macro = self.running_macro
if macro is None:
macro = self.last_macro
if macro is None:
raise MacroServerException("No macro has run so far " +
"or the macro data was not preserved.")
data = macro.data
return data
def set_pylab_handler(self, ph):
self._pylab_handler = ph
def get_pylab_handler(self):
return self._pylab_handler
pylab_handler = property(get_pylab_handler, set_pylab_handler)
def get_pylab(self):
ph = self.pylab_handler
if ph is None:
import matplotlib.pylab
ph = matplotlib.pylab
return ph
pylab = property(get_pylab)
def set_pyplot_handler(self, ph):
self._pyplot_handler = ph
def get_pyplot_handler(self):
return self._pyplot_handler
pyplot_handler = property(get_pyplot_handler, set_pyplot_handler)
def get_pyplot(self):
ph = self.pyplot_handler
if ph is None:
import matplotlib.pyplot
ph = matplotlib.pyplot
return ph
pyplot = property(get_pyplot)
def set_input_handler(self, ih):
self._input_handler = ih
def get_input_handler(self):
return self._input_handler
input_handler = property(get_input_handler, set_input_handler)
def append_prompt(self, prompt, msg):
if '?' in prompt:
prefix, suffix = prompt.rsplit('?', 1)
if not prefix.endswith(' '):
prefix += ' '
prompt = prefix + msg + '?' + suffix
else:
prompt += msg + ' '
return prompt
def input(self, msg, *args, **kwargs):
kwargs['data_type'] = kwargs.get('data_type', Type.String)
kwargs['allow_multiple'] = kwargs.get('allow_multiple', False)
if args:
msg = msg % args
if not msg.endswith(' '):
msg += ' '
dv = kwargs.get('default_value')
if dv is not None and not isinstance(dv, list):
dv = '[' + str(dv) + ']'
msg = self.append_prompt(msg, dv)
macro = kwargs.pop('macro', self.macro_executor.getRunningMacro())
if macro is None:
macro = self
input_data = dict(prompt=msg, type='input')
input_data.update(kwargs)
data_type = kwargs['data_type']
is_seq = not isinstance(data_type, str) and \
isinstance(data_type, collections.abc.Sequence)
if is_seq:
handle = self._handle_seq_input
else:
handle = self._handle_type_input
return handle(macro, input_data, data_type)
def _handle_seq_input(self, obj, input_data, data_type):
valid = False
allow_multiple = input_data['allow_multiple']
while not valid:
result = self.input_handler.input(input_data)
if allow_multiple:
r, dt = set(result), set(data_type)
if r.issubset(dt):
break
else:
if result in data_type:
break
obj.warning("Please give a valid option")
return result
def _handle_type_input(self, obj, input_data, data_type):
type_obj = self.type_manager.getTypeObj(data_type)
valid = False
while not valid:
result = self.input_handler.input(input_data)
try:
result_type = type_obj.getObj(result)
if result_type is None:
raise Exception("Must give a value")
valid = True
return result_type
except:
dtype = str(data_type).lower()
obj.warning("Please give a valid %s.", dtype)
def get_report_logger(self):
return self.macro_server.report_logger
report_logger = property(get_report_logger)
def report(self, msg: str, *args: Any, **kwargs: Any) -> None:
"""
Record a log message in the sardana report (if enabled) with default
level **INFO**. The msg is the message format string, and the args are
the arguments which are merged into msg using the string formatting
operator. (Note that this means that you can use keywords in the
format string, together with a single dictionary argument.)
*kwargs* are the same as :meth:`logging.Logger.debug` plus an optional
level kwargs which has default value **INFO**
Example::
self.report("this is an official report!")
:param msg: the message to be recorded
:param args: list of arguments
:param kwargs: list of keyword arguments"""
self.macro_server.report(msg, *args, **kwargs)
def get_state(self):
return self._state
def set_state(self, state, propagate=1):
self._state = state
self.fire_event(EventType("state", priority=propagate), state)
state = property(get_state, set_state)
def get_status(self):
return self._status
def set_status(self, status, propagate=1):
self._status = status
self.fire_event(EventType("status", priority=propagate), status)
status = property(get_status, set_status)
def get_result(self):
return self._result
def set_result(self, result, propagate=1):
self._result = result
self.fire_event(EventType("result", priority=propagate), result)
result = property(get_result, set_result)
def get_macro_status(self):
return self._macro_status
def set_macro_status(self, macro_status, propagate=1):
self._macro_status = macro_status
self.fire_event(EventType("macrostatus", priority=propagate),
macro_status)
macro_status = property(get_macro_status, set_macro_status)
def get_reserved_elements(self):
reserved_macro_objects = self.macro_executor._reserved_macro_objs
reserved_macro_objects_list = []
for key, value in reserved_macro_objects.items():
aux_dict = {}
elements_list = []
for element in value:
if isinstance(element, Motion):
motion_elements = element.names
elements_list.extend(motion_elements)
continue
elements_list.append(str(element))
aux_dict["elements"] = elements_list
reserved_macro_objects_list.append(aux_dict)
return reserved_macro_objects_list
reserved_elements = property(get_reserved_elements)
def get_record_data(self):
return self._record_data
def set_record_data(self, record_data, codec=None, propagate=1):
self._record_data = record_data
self.fire_event(EventType("recorddata", priority=propagate),
(codec, record_data))
record_data = property(get_record_data, set_record_data)
def get_env(self, key: Optional[str] = None, macro_name: Optional[str] = None) -> Dict:
"""Gets the environment with the context for this door matching the
given parameters:
- macro_name defines the context where to look for the environment. If
None, the global environment is used. If macro name is given the
environment in the context of that macro is given
- If key is None it returns the complete environment, otherwise
key must be a string containing the environment variable name.
:param key:
environment variable name [default: None, meaning all environment]
:param macro_name:
local context for a given macro [default: None, meaning no macro
context is used]
:raises: UnknownEnv"""
return self.macro_server.environment_manager.getAllDoorEnv(self.name)
def set_env(self, key, value):
return self.macro_server.set_env(key, value)
def _build_macro_proxy_cache(self):
self._macro_proxy_cache = MacroProxyCache(self)
def get_macro_proxies(self):
if self._macro_proxy_cache is None:
self._macro_proxy_cache = MacroProxyCache(self)
return self._macro_proxy_cache
def run_macro(self, par_str_list, asynch=False):
if isinstance(par_str_list, str):
par_str_list = par_str_list,
return self.macro_executor.run(par_str_list, asynch=asynch)
def __getattr__(self, name):
"""Get methods from macro server"""
return getattr(self.macro_server, name)