Source code for sardana.macroserver.macros.test.macroexecutor

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

from taurus.core.util.singleton import Singleton
import time


[docs]class BaseMacroExecutor(object): """Abstract MacroExecutor class. Inherit from it if you want to create your own macro executor. """ log_levels = ['debug', 'output', 'info', 'warning', 'critical', 'error'] def __init__(self): # macro result self._result = None # macro exception status self._exception = None # buffer for state history self._state_buffer = [] # log buffers, one for each level for level in self.log_levels: setattr(self, '_%s' % level, None) # common buffer, any registered log level will be appended here self._common = None def _clean(self): """In case of reuse of the macro executor object this method executes the necessary cleanups. Extend if you need to clean your particular setups. """ self._result = None self._exception = None self._state_buffer = [] for level in self.log_levels: log_buffer = getattr(self, '_%s' % level) if not log_buffer is None: log_buffer.__init__() if self._common: self._common.__init__()
[docs] def run(self, macro_name, macro_params=None, sync=True, timeout=None): """Execute macro. :param macro_name: (string) name of macro to be executed :param macro_params: (list<string>) macro parameters (default is macro_params=None for macros without parameters or with the default values) :param sync: (bool) whether synchronous or asynchronous call (default is sync=True) :param timeout: (float) timeout (in s) that will be passed to the wait method, in case of synchronous execution; None means wait infinitely In asyncrhonous execution method :meth:`~wait` has to be explicitly called. """ if macro_params is None: macro_params = [] self._clean() self._run(macro_name, macro_params) if sync: self.wait(timeout)
def _run(self, macro_name, macro_params): """Method responsible for triggering the macro execution. Must be implemented in your macro executor. :param macro_name: (string) name of macro to be executed :param macro_params: (list<string>) macro parameters (default is macro_params=None for macros without parameters or with the default values) """ raise NotImplementedError('Method _run not implemented in class %s' % self.__class__.__name__)
[docs] def wait(self, timeout=None): """ Wait until macro is done. Use it in asynchronous executions. :param timeout: (float) waiting timeout (in s); None means wait infinitely """ if timeout is not None and timeout <= 0: timeout = None self._wait(timeout) # TODO: workaround: this sleep is necessary to perform multiple tests. time.sleep(2)
def _wait(self, timeout): """Method responsible for waiting until macro is done. Must be implemented in your macro executor. :param timeout: (float) waiting timeout (in s) """ raise NotImplementedError('Method _wait not implemented in class %s' % self.__class__.__name__)
[docs] def stop(self, started_event_timeout=3.0): """Stop macro execution. Execute macro in synchronous way before using this method. :param started_event_timeout: (float) waiting timeout for started event """ self._stop(started_event_timeout)
def _stop(self, started_event_timeout=3.0): """ Method responsible for stopping the macro execution. Must be implemented in your macro executor. :param started_event_timeout: (float) waiting timeout for started event """ raise NotImplementedError('Method _stop not implemented in class %s' % self.__class__.__name__)
[docs] def registerLog(self, log_level): """Start registering log messages. :param log_level: (str) string indicating the log level """ log_buffer_name = '_%s' % log_level setattr(self, log_buffer_name, []) self._registerLog(log_level)
def _registerLog(self, log_level): """ Method responsible for starting log registration. Must be implemented in your macro executor. :param log_level: (str) string indicating the log level """ raise NotImplementedError('Method _registerLog not implemented in ' 'class %s' % self.__class__.__name__)
[docs] def unregisterLog(self, log_level): """Stop registering log messages. :param log_level: (str) string indicating the log level """ self._unregisterLog(log_level)
def _unregisterLog(self, log_level): """Method responsible for stopping log registration. Must be implemented in your macro executor. :param log_level: (str) string indicating the log level """ raise NotImplementedError('Method _unregisterLog not implemented in ' 'class %s' % self.__class__.__name__)
[docs] def getLog(self, log_level): """Get log messages. :param log_level: (str) string indicating the log level :return: (seq<str>) list of strings with log messages """ log_buffer_name = '_%s' % log_level log = getattr(self, log_buffer_name) return log
[docs] def registerAll(self): """Register for macro result, all log levels and common buffer. """ for log_level in self.log_levels: self.registerLog(log_level) self.registerResult() self.createCommonBuffer()
[docs] def unregisterAll(self): """Unregister macro result, all log levels and common buffer. """ for log_level in self.log_levels: self.unregisterLog(log_level) self.unregisterResult()
[docs] def registerResult(self): """Register for macro result """ self._registerResult()
def _registerResult(self): """Method responsible for registering for macro result. Must be implemented in your macro executor. """ raise NotImplementedError('Method _registerResult not implemented in ' 'class %s' % self.__class__.__name__)
[docs] def unregisterResult(self): """Unregister macro result. """ self._unregisterResult()
def _unregisterResult(self): """Method responsible for unregistering for macro result. Must be implemented in your macro executor. """ raise NotImplementedError('Method _unregisterResult not implemented in' ' class %s' % self.__class__.__name__)
[docs] def getResult(self): """Get macro result. :return: (seq<str>) list of strings with Result messages """ return self._result
[docs] def createCommonBuffer(self): """Create a common buffer, where all the registered logs will be stored. """ self._common = []
[docs] def getCommonBuffer(self): """Get common buffer. Method getCommonBuffer can only be used if at least one buffer exists. :return: (seq<str>) list of strings with messages from all log levels .. seealso:: :meth:`~createCommonBuffer` """ return self._common
[docs] def getState(self): """Get macro execution state. :return: (str) """ state = None if len(self._state_buffer) > 0: state = self._state_buffer[-1] return state
[docs] def getStateBuffer(self): """Get buffer (history) of macro execution states. :return: (seq<str>) """ return self._state_buffer
[docs] def getExceptionStr(self): """Get macro exception type representation (None if the macro state is not exception). :return: (str) """ return self._exception
[docs]class MacroExecutorFactory(Singleton): """A scheme-agnostic factory for MacroExecutor instances Example:: f = MacroExecutorFactory() f.getMacroExecutor('tango://my/door/name') #returns a TangoMacroExecutor Note: For the moment, only TangoMacroExecutor is supported """
[docs] def getMacroExecutor(self, door_name=None): """ Returns a macro executor instance (a subclass of :class:`BaseMacroExecutor`) depending on the door being used. """ if door_name is None: from sardana import sardanacustomsettings door_name = getattr(sardanacustomsettings, 'UNITTEST_DOOR_NAME') #====================================================================== # TODO: Once SEP3 is done, it will define a better way to get the scheme # from a model name (including customized default schemes) # For the moment I implement it by calling an internal member of # TaurusManager from taurus.core import TaurusManager scheme = TaurusManager().getScheme(door_name) #====================================================================== if scheme == 'tango': return self._getTangoMacroExecutor(door_name) else: raise ValueError('No MacroExecutor supported for scheme %s' % scheme)
def _getTangoMacroExecutor(self, door_name): from sardana.tango.macroserver.test import TangoMacroExecutor return TangoMacroExecutor(door_name=door_name)
if __name__ == '__main__': from sardana import sardanacustomsettings door_name = getattr(sardanacustomsettings, 'UNITTEST_DOOR_NAME') print(MacroExecutorFactory().getMacroExecutor(door_name))