Source code for sardana.pool.poolaction

#!/usr/bin/env python

# This file is part of Sardana
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <>.

"""This module is part of the Python Pool libray. It defines the class for an
abstract action over a set of pool elements"""

__all__ = ["PoolActionItem", "OperationInfo", "ActionContext", "PoolAction",

__docformat__ = 'restructuredtext'

import sys
import weakref
import traceback
import threading

from collections import OrderedDict

from taurus.core.util.log import Logger

from sardana import State
from sardana.sardanathreadpool import get_thread_pool
from sardana.pool.poolobject import PoolObject

[docs]class PoolActionItem(object): """The base class for an atomic action item""" def __init__(self, element): self._element = weakref.ref(element)
[docs] def get_element(self): """Returns the element associated with this item""" return self._element()
[docs] def set_element(self, element): """Sets the element for this item""" self._element = weakref.ref(element)
element = property(get_element)
[docs]class OperationInfo(object): """Stores synchronization data for a certain operation""" def __init__(self): """Constructor""" self.state_count = 0 self.state_lock = threading.Lock() self.state_event_lock = threading.Lock() self.state_event = threading.Event()
[docs] def init(self, count): """Initializes this operation with a certain count""" self.state_count = count self.state_event.clear() if count == 0: self.state_event.set()
[docs] def wait(self, timeout=None): """waits for the operation to finish""" return self.state_event.wait(timeout)
[docs] def finish_one(self): """Notifies this operation that one step was finished""" with self.state_event_lock: self.state_count = self.state_count - 1 if self.state_count < 1: self.state_count = 0 self.state_event.set()
[docs] def acquire(self): """Acquires this operation lock""" self.state_lock.acquire()
[docs] def release(self): """Releases this operation lock""" self.state_lock.release()
def __enter__(self): return self.acquire() def __exit__(self, exc_type, exc_value, exc_traceback): return self.release()
class BaseOperationContext(object): """Stores operation context""" def __init__(self, pool_action): self._pool_action = pool_action def enter(self): """Enters operation context""" pool_action = self._pool_action for element in pool_action.get_elements(): element.lock() element.set_operation(pool_action) for ctrl in pool_action.get_pool_controller_list(): ctrl.lock() def exit(self): """Leaves operation context""" pool_action = self._pool_action for element in reversed(pool_action.get_elements()): element.clear_operation() element.unlock() for ctrl in reversed(pool_action.get_pool_controller_list()): ctrl.unlock() pool_action.finish_action() return False def __enter__(self): return self.enter() def __exit__(self, exc_type, exc_value, exc_traceback): return self.exit() class OperationContext(BaseOperationContext): """Concrete operation context""" def enter(self): pool_action = self._pool_action for element in pool_action.get_elements(): element.set_operation(pool_action) def exit(self): pool_action = self._pool_action for element in reversed(pool_action.get_elements()): element.clear_operation() pool_action.finish_action() return False def __enter__(self): return self.enter() def __exit__(self, exc_type, exc_value, exc_traceback): return self.exit()
[docs]class ActionContext(object): """Stores an atomic action context""" def __init__(self, pool_action): self._pool_action = pool_action
[docs] def enter(self): """Enters operation""" pool_action = self._pool_action for element in pool_action.get_elements(): element.lock() for ctrl in pool_action.get_pool_controller_list(): ctrl.lock()
[docs] def exit(self): """Leaves operation""" pool_action = self._pool_action for element in reversed(pool_action.get_elements()): element.unlock() for ctrl in reversed(pool_action.get_pool_controller_list()): ctrl.unlock() return False
def __enter__(self): return self.enter() def __exit__(self, exc_type, exc_value, exc_traceback): return self.exit()
[docs]class PoolAction(Logger): """A generic class to handle any type of operation (like motion or acquisition)""" OperationContextClass = OperationContext def __init__(self, main_element, name="GlobalAction"): Logger.__init__(self, name) self._action_run_lock = threading.Lock() self._main_element = weakref.ref(main_element) self._aborted = False self._stopped = False self._released = False self._elements = [] self._pool_ctrl_dict = {} self._pool_ctrl_list = [] self._finish_hooks = OrderedDict() self._started = False self._running = False self._state_info = OperationInfo() self._value_info = OperationInfo()
[docs] def get_main_element(self): """Returns the main element for this action :return: sardana.pool.poolelement.PoolElement""" return self._main_element()
main_element = property(get_main_element)
[docs] def get_pool(self): """Returns the pool object for this action :return: sardana.pool.pool.Pool""" return self.main_element.pool
pool = property(get_pool)
[docs] def clear_elements(self): """Clears all elements from this action""" self._elements = [] self._pool_ctrl_dict = {} self._pool_ctrl_list = []
[docs] def add_element(self, element): """Adds a new element to this action. :param element: the new element to be added :type element: sardana.pool.poolelement.PoolElement""" ctrl = element.controller ctrl_items = self._pool_ctrl_dict.get(ctrl) if ctrl_items is None: ctrl_items = [] self._pool_ctrl_dict[ctrl] = ctrl_items self._pool_ctrl_list.append(ctrl) self._pool_ctrl_list.sort(key=PoolObject.get_id) self._elements.append(element) ctrl_items.append(element) # make sure elements are ordered by ID so that a multiple lock always # locks and unlocks in the same order self._elements.sort(key=PoolObject.get_id) ctrl_items.sort(key=PoolObject.get_id)
[docs] def remove_element(self, element): """Removes an element from this action. If the element is not part of this action, a ValueError is raised. :param element: the new element to be removed :type element: sardana.pool.poolelement.PoolElement :raises: ValueError""" ctrl = element.controller #element = weakref.ref(element) try: idx = self._elements.index(element) except ValueError: raise ValueError("action doesn't contain %s" % del self._elements[idx] ctrl_items = self._pool_ctrl_dict[ctrl] del ctrl_items[ctrl_items.index(element)] if not len(ctrl_items): del self._pool_ctrl_dict[ctrl] del self._pool_ctrl_list[self._pool_ctrl_list.index(ctrl)]
[docs] def get_elements(self, copy_of=False): """Returns a sequence of all elements involved in this action. :param copy_of: If False (default) the internal container of elements is returned. If True, a copy of the internal container is returned instead :type copy_of: bool :return: a sequence of all elements involved in this action. :rtype: seq<sardana.pool.poolelement.PoolElement>""" elements = self._elements if copy_of: elements = tuple(elements) return elements
[docs] def get_pool_controller_list(self): """Returns a list of all controller elements involved in this action. :return: a list of all controller elements involved in this action. :rtype: list<sardana.pool.poolelement.PoolController>""" return self._pool_ctrl_list
[docs] def get_pool_controllers(self): """Returns a dict of all controller elements involved in this action. :return: a dict of all controller elements involved in this action. :rtype: dict<sardana.pool.poolelement.PoolController, seq<sardana.pool.poolelement.PoolElement>>""" return self._pool_ctrl_dict
pool_controllers = property(get_pool_controllers) def _is_in_action(self, state): """Determines if the given state is a busy state (Moving or Running) or not. :return: True if state is a busy state or False otherwise :rtype: bool""" return state == State.Moving or state == State.Running
[docs] def is_running(self): """Determines if this action is running or not :return: True if action is running or False otherwise :rtype: bool""" return self._running
def _is_started(self): """Determines if this action is started or not. :return: True if action is started or False otherwise :rtype: bool ..warning:: This method was added as a workaround for the lack of proper synchronization between the software synchronizer and the acquisition actions.""" return self._started
[docs] def run(self, *args, **kwargs): """Runs this action""" self._running = True synch = kwargs.pop("synch", False) if synch: try: with self.OperationContextClass(self) as context: self.start_action(*args, **kwargs) self._started = False self.action_loop() finally: self._started = False self._running = False else: context = self.OperationContextClass(self) context.enter() try: self.start_action(*args, **kwargs) except: context.exit() self._running = False raise finally: self._started = False cb = kwargs.pop("cb", None) get_thread_pool().add(self._asynch_action_loop, cb, context)
[docs] def start_action(self, *args, **kwargs): """Start procedure for this action. Default implementation raises NotImplementedError :raises: NotImplementedError""" raise NotImplementedError("start_action must be implemented in " "subclass")
[docs] def set_finish_hooks(self, hooks): """Set finish hooks for this action. :param hooks: an ordered dictionary where keys are the hooks and values is a flag if the hook is permanent (not removed after the execution) :type hooks: OrderedDict or None """ self._finish_hooks = hooks
[docs] def add_finish_hook(self, hook, permanent=True): """Append one finish hook to this action. :param hook: hook to be appended :type hook: callable :param permanent: flag if the hook is permanent (not removed after the execution) :type permanent: boolean """ self._finish_hooks[hook] = permanent
[docs] def remove_finish_hook(self, hook): """Remove finish hook. """ self._finish_hooks.pop(hook)
[docs] def finish_action(self): """Finishes the action execution. If a finish hook is defined it safely executes it. Otherwise nothing happens""" hooks = self._finish_hooks for hook, permanent in list(hooks.items()): try: hook() except: self.warning("Exception running function finish hook", exc_info=1) finally: if not permanent: hooks.pop(hook)
[docs] def stop_action(self, *args, **kwargs): """Stop procedure for this action.""" self._stopped = True for pool_ctrl, elements in list(self.pool_controllers.items()): pool_ctrl.stop_elements(elements)
[docs] def abort_action(self, *args, **kwargs): """Aborts procedure for this action""" self._aborted = True for pool_ctrl, elements in list(self.pool_controllers.items()): pool_ctrl.abort_elements(elements)
[docs] def release_action(self): self._released = True
[docs] def emergency_break(self): """Tries to execute a stop. If it fails try an abort""" self._stopped = True for pool_ctrl, elements in list(self.pool_controllers.items()): pool_ctrl.emergency_break(elements)
[docs] def was_stopped(self): """Determines if the action has been stopped from outside :return: True if action has been stopped from outside or False otherwise :rtype: bool""" return self._stopped
[docs] def was_aborted(self): """Determines if the action has been aborted from outside :return: True if action has been aborted from outside or False otherwise :rtype: bool""" return self._aborted
[docs] def was_released(self): """Determines if the action has been released from outside :return: True if action has been released from outside or False otherwise :rtype: bool""" return self._released
[docs] def was_action_interrupted(self): """Determines if the action has been interruped from outside (either from an abort or a stop). :return: True if action has been interruped from outside or False otherwise :rtype: bool""" return self.was_aborted() or self.was_stopped()
def _asynch_action_loop(self, context): """Internal method. Asynchronous action loop""" try: self.action_loop() finally: context.exit() self._running = False
[docs] def action_loop(self): """Action loop for this action. Default implementation raises NotImplementedError :raises: NotImplementedError""" raise NotImplementedError( "action_loop must be implemented in subclass")
[docs] def read_state_info(self, ret=None, serial=False): """Reads state information of all elements involved in this action :param ret: output map parameter that should be filled with state information. If None is given (default), a new map is created an returned :type ret: dict :param serial: If False (default) perform controller HW state requests in parallel. If True, access is serialized. :type serial: bool :return: a map containing state information per element :rtype: dict<sardana.pool.poolelement.PoolElement, stateinfo>""" with ActionContext(self): return self.raw_read_state_info(ret=ret, serial=serial)
[docs] def raw_read_state_info(self, ret=None, serial=False): """**Unsafe**. Reads state information of all elements involved in this action :param ret: output map parameter that should be filled with state information. If None is given (default), a new map is created an returned :type ret: dict :param serial: If False (default) perform controller HW state requests in parallel. If True, access is serialized. :type serial: bool :return: a map containing state information per element :rtype: dict<sardana.pool.poolelement.PoolElement, stateinfo>""" if ret is None: ret = {} read = self._raw_read_state_info_concurrent if serial: read = self._raw_read_state_info_serial state_info = self._state_info with state_info: state_info.init(len(self.pool_controllers)) read(ret) state_info.wait() return ret
def _raw_read_state_info_serial(self, ret): """Internal method. Read state in a serial mode""" for pool_ctrl in self.pool_controllers: self._raw_read_ctrl_state_info(ret, pool_ctrl) return ret def _raw_read_state_info_concurrent(self, ret): """Internal method. Read state in a concurrent mode""" th_pool = get_thread_pool() for pool_ctrl in self.pool_controllers: th_pool.add(self._raw_read_ctrl_state_info, None, ret, pool_ctrl) return ret def _get_ctrl_error_state_info(self, pool_ctrl): """Internal method. Returns the controller error in form of a tuple<sardana.State, str>""" exc_t, exc_v, trb = sys.exc_info() if exc_t is None: if pool_ctrl.is_online(): return State.Fault, "Unknown controller error" else: if pool_ctrl.is_online(): err_msg = "".join( traceback.format_exception(exc_t, exc_v, trb)) return State.Fault, "Unexpected controller error:\n" + err_msg return State.Fault, pool_ctrl.get_ctrl_error_str() def _raw_read_ctrl_state_info(self, ret, pool_ctrl): """Internal method. Read controller information and store it in ret parameter""" try: axes = [elem.axis for elem in self.pool_controllers[pool_ctrl]] state_infos, error = pool_ctrl.raw_read_axis_states(axes) if error: pool_ctrl.warning("Read state error") for elem, (state_info, exc_info) in list(state_infos.items()): if exc_info is not None: pool_ctrl.debug("Axis %s error details:", elem.axis, exc_info=exc_info) ret.update(state_infos) except: self.error("Something wrong happend: Error should have been caught" "by ctrl.read_axis_states") self.debug("Details: ", exc_info=1) state_info = self._get_ctrl_error_state_info(pool_ctrl) for elem in self.pool_controllers[pool_ctrl]: ret[elem] = state_info finally: self._state_info.finish_one()
[docs] def get_read_value_ctrls(self): return self.pool_controllers
[docs] def read_value(self, ret=None, serial=False): """Reads value information of all elements involved in this action :param ret: output map parameter that should be filled with value information. If None is given (default), a new map is created an returned :type ret: dict :param serial: If False (default) perform controller HW value requests in parallel. If True, access is serialized. :type serial: bool :return: a map containing value information per element :rtype: dict<:class:~`sardana.pool.poolelement.PoolElement`, (value object, Exception or None)>""" with ActionContext(self): return self.raw_read_value(ret=ret, serial=serial)
[docs] def raw_read_value(self, ret=None, serial=False): """**Unsafe**. Reads value information of all elements involved in this action :param ret: output map parameter that should be filled with value information. If None is given (default), a new map is created an returned :type ret: dict :param serial: If False (default) perform controller HW value requests in parallel. If True, access is serialized. :type serial: bool :return: a map containing value information per element :rtype: dict<:class:~`sardana.pool.poolelement.PoolElement, :class:`sardana.sardanavalue.SardanaValue` >""" if ret is None: ret = {} read = self._raw_read_value_concurrent if serial: read = self._raw_read_value_serial value_info = self._value_info with value_info: value_info.init(len(self.get_read_value_ctrls())) read(ret) value_info.wait() return ret
def _raw_read_value_serial(self, ret): """Internal method. Read value in a serial mode""" for pool_ctrl in self.get_read_value_ctrls(): self._raw_read_ctrl_value(ret, pool_ctrl) return ret def _raw_read_value_concurrent(self, ret): """Internal method. Read value in a concurrent mode""" th_pool = get_thread_pool() for pool_ctrl in self.get_read_value_ctrls(): th_pool.add(self._raw_read_ctrl_value, None, ret, pool_ctrl) return ret def _raw_read_ctrl_value(self, ret, pool_ctrl): """Internal method. Read controller value information and store it in ret parameter""" try: axes = [elem.axis for elem in self.pool_controllers[pool_ctrl]] value_infos = pool_ctrl.raw_read_axis_values(axes) ret.update(value_infos) finally: self._value_info.finish_one()
[docs] def get_read_value_loop_ctrls(self): return self.pool_controllers
[docs] def read_value_loop(self, ret=None, serial=False): """Reads value information of all elements involved in this action :param ret: output map parameter that should be filled with value information. If None is given (default), a new map is created an returned :type ret: dict :param serial: If False (default) perform controller HW value requests in parallel. If True, access is serialized. :type serial: bool :return: a map containing value information per element :rtype: dict<:class:~`sardana.pool.poolelement.PoolElement`, (value object, Exception or None)>""" with ActionContext(self): return self.raw_read_value_loop(ret=ret, serial=serial)
[docs] def raw_read_value_loop(self, ret=None, serial=False): """**Unsafe**. Reads value information of all elements involved in this action :param ret: output map parameter that should be filled with value information. If None is given (default), a new map is created an returned :type ret: dict :param serial: If False (default) perform controller HW value requests in parallel. If True, access is serialized. :type serial: bool :return: a map containing value information per element :rtype: dict<:class:~`sardana.pool.poolelement.PoolElement, :class:`sardana.sardanavalue.SardanaValue` >""" if ret is None: ret = {} read = self._raw_read_value_concurrent_loop if serial: read = self._raw_read_value_serial_loop value_info = self._value_info with value_info: value_info.init(len(self.get_read_value_loop_ctrls())) read(ret) value_info.wait() return ret
def _raw_read_value_serial_loop(self, ret): """Internal method. Read value in a serial mode""" for pool_ctrl in self.get_read_value_loop_ctrls(): self._raw_read_ctrl_value(ret, pool_ctrl) return ret def _raw_read_value_concurrent_loop(self, ret): """Internal method. Read value in a concurrent mode""" th_pool = get_thread_pool() for pool_ctrl in self.get_read_value_loop_ctrls(): th_pool.add(self._raw_read_ctrl_value, None, ret, pool_ctrl) return ret