#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
import copy
import time
import atexit
import threading
import PyTango
from sardana.macroserver.macros.test import BaseMacroExecutor
from taurus.core.util.codecs import CodecFactory
from sardana import sardanacustomsettings
[docs]
class TangoAttrCb(object):
'''An abstract callback class for Tango events'''
def __init__(self, tango_macro_executor):
self._tango_macro_executor = tango_macro_executor
[docs]
class TangoResultCb(TangoAttrCb):
'''Callback class for Tango events of the Result attribute'''
[docs]
def push_event(self, *args, **kwargs):
'''callback method receiving the event'''
event_data = args[0]
if event_data.err:
result = event_data.errors
else:
result = event_data.attr_value.value
self._tango_macro_executor._result = result
[docs]
class TangoLogCb(TangoAttrCb):
'''Callback class for Tango events of the log attributes
e.g. Output, Error, Critical
'''
def __init__(self, tango_macro_executor, log_name):
self._tango_macro_executor = tango_macro_executor
self._log_name = log_name
[docs]
def push_event(self, *args, **kwargs):
'''callback method receiving the event'''
event_data = args[0]
if event_data.attr_value:
log = event_data.attr_value.value
log_buffer_name = '_%s' % self._log_name
log_buffer = getattr(self._tango_macro_executor, log_buffer_name)
log_buffer.append(log)
common_buffer = self._tango_macro_executor._common
if common_buffer is not None:
common_buffer.append(log)
[docs]
class TangoStatusCb(TangoAttrCb):
'''Callback class for Tango events of the MacroStatus attribute'''
START_STATES = ['start']
DONE_STATES = ['finish', 'stop', 'exception']
[docs]
def push_event(self, *args, **kwargs):
'''callback method receiving the event'''
event_data = args[0]
if event_data.err:
self._state_buffer = event_data.errors
self._tango_macro_executor._done_event.set()
attr_value = getattr(event_data, 'attr_value')
if attr_value is None:
return
v = attr_value.value
if not len(v[1]):
return
fmt = v[0]
codec = CodecFactory().getCodec(fmt)
fmt, data = codec.decode(v)
for macro_status in data:
state = macro_status['state']
exc_stack = macro_status.get('exc_stack')
if state == 'exception':
exception_str = ''
for line in exc_stack:
exception_str += line
self._tango_macro_executor._exception = exception_str
if state in self.START_STATES:
# print 'TangoStatusCb.push_event: setting _started_event'
self._tango_macro_executor._started_event.set()
elif state in self.DONE_STATES:
# print 'TangoStatusCb.push_event: setting _done_event %s'
# %(state)
self._tango_macro_executor._done_event.set()
# else:
# print 'State %s' %(state)
self._tango_macro_executor._state_buffer.append(state)
[docs]
class TangoMacroExecutor(BaseMacroExecutor):
'''
Macro executor implemented using Tango communication with the Door device
'''
_api_util_cleanup_registered = False
def __init__(self, door_name=None):
super(TangoMacroExecutor, self).__init__()
if door_name is None:
door_name = getattr(sardanacustomsettings, 'UNITTEST_DOOR_NAME')
self._door = PyTango.DeviceProxy(door_name)
self._done_event = None
self._started_event = None
if not TangoMacroExecutor._api_util_cleanup_registered:
# remove whenever PyTango#390 gets fixed
atexit.register(PyTango.ApiUtil.cleanup)
TangoMacroExecutor._api_util_cleanup_registered = True
def _clean(self):
'''Recreates threading Events in case the macro executor is reused.'''
super(TangoMacroExecutor, self)._clean()
self._started_event = threading.Event()
self._done_event = threading.Event()
def _run(self, macro_name, macro_params):
'''reimplemented from :class:`BaseMacroExecutor`'''
# preaparing RunMacro command input arguments
argin = copy.copy(macro_params)
argin.insert(0, macro_name)
# registering for MacroStatus events
status_cb = TangoStatusCb(self)
self._status_id = self._door.subscribe_event('macrostatus',
PyTango.EventType.CHANGE_EVENT,
status_cb)
# executing RunMacro command
self._door.RunMacro(argin)
def _wait(self, timeout):
'''reimplemented from :class:`BaseMacroExecutor`'''
# TODO: In case of timeout = inf if the macro excecutor run a macro
# with wrong parameters it'll never awake of the done_event wait
# Pending to remove this comment when Sardana resolves the bug.
if self._done_event:
self._done_event.wait(timeout)
# add an extra sleep time for the following cases:
# - macro executing another macro - internal macro may finish but
# the external may still not
# - in case of stopping a macro, the events are emitted: first
# finish and then stop
# TODO: base the wait condition on the events from door state
# attribute
time.sleep(3)
self._door.unsubscribe_event(self._status_id)
def _stop(self, started_event_timeout=3.0):
'''reimplemented from :class:`BaseMacroExecutor`'''
self._started_event.wait(started_event_timeout)
try:
self._door.StopMacro()
except PyTango.DevFailed as e:
raise Exception("Unable to Stop macro: %s" % e)
def _registerLog(self, log_level):
'''reimplemented from :class:`BaseMacroExecutor`'''
log_cb = TangoLogCb(self, log_level)
id_log_name = '_%s_id' % log_level
id_log = self._door.subscribe_event(log_level,
PyTango.EventType.CHANGE_EVENT,
log_cb)
setattr(self, id_log_name, id_log)
def _unregisterLog(self, log_level):
'''reimplemented from :class:`BaseMacroExecutor`'''
id_log_name = '_%s_id' % log_level
id_log = getattr(self, id_log_name)
self._door.unsubscribe_event(id_log)
def _registerResult(self):
'''reimplemented from :class:`BaseMacroExecutor`'''
result_cb = TangoResultCb(self)
self._result_id = self._door.subscribe_event('result',
PyTango.EventType.CHANGE_EVENT,
result_cb)
def _unregisterResult(self):
'''reimplemented from :class:`BaseMacroExecutor`'''
self._door.unsubscribe_event(self._result_id)
[docs]
def getData(self):
'''Returns the data object for the last executed macro
:return: (obj)
'''
data = self._door.RecordData
return CodecFactory().decode(data)