Source code for sardana.pool.poolacquisition

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Pool library. It defines the class for an
acquisition"""

__all__ = ["get_acq_ctrls", "AcquisitionState", "AcquisitionMap",
           "PoolCTAcquisition", "Pool0DAcquisition", "PoolIORAcquisition",
           "PoolAcquisitionHardware", "PoolAcquisitionSoftware",
           "PoolAcquisitionSoftwareStart"]

__docformat__ = 'restructuredtext'

import time
import weakref
import datetime
import traceback
import functools
import threading
from typing import Any, Dict, Union, List, Sequence, Optional

from taurus.core.util.log import DebugIt
from taurus.core.util.enumeration import Enumeration

from sardana import AttrQuality, SardanaValue, State, ElementType, \
    TYPE_TIMERABLE_ELEMENTS

from sardana.sardanathreadpool import get_thread_pool
from sardana.pool import AcqSynch, AcqMode
from sardana.pool.poolaction import ActionContext, PoolAction, \
    OperationContext
from sardana.pool.poolsynchronization import PoolSynchronization
from sardana.pool.poolexception import PoolException
from sardana.pool.poolelement import PoolElement
from sardana.pool.poolcontroller import PoolController
import sardana

#: enumeration representing possible motion states
AcquisitionState = Enumeration("AcquisitionState", (
    "Stopped",
    #    "StoppedOnError",
    #    "StoppedOnAbort",
    "Acquiring",
    "Invalid"))

AS = AcquisitionState
AcquiringStates = AS.Acquiring,
StoppedStates = AS.Stopped,  # MS.StoppedOnError, MS.StoppedOnAbort

AcquisitionMap = {
    # AS.Stopped           : State.On,
    AS.Acquiring: State.Moving,
    AS.Invalid: State.Invalid,
}


def is_value_error(value):
    if isinstance(value, SardanaValue) and value.error:
        return True
    return False


def get_acq_ctrls(ctrls: Sequence["sardana.pool.poolmeasurementgroup.ControllerConfiguration"]) -> "AcqController":
    """Converts configuration controllers into acquisition controllers.

    Takes care about converting their internals as well.

    :param ctrls: sequence of configuration controllers objects
    :return: sequence of acquisition controllers

    .. note::
        The get_acq_ctrls function has been included in Sardana
        on a provisional basis. Backwards incompatible changes
        (up to and including removal of the class) may occur if
        deemed necessary by the core developers.
    """
    action_ctrls = []
    for ctrl in ctrls:
        action_ctrl = AcqController(ctrl)
        action_ctrls.append(action_ctrl)
    return action_ctrls


def get_timerable_ctrls(ctrls: Sequence["sardana.pool.poolmeasurementgroup.ControllerConfiguration"], acq_mode: sardana.pool.AcqMode) -> "AcqController":
    """Converts timerable configuration controllers into acquisition
    controllers.

    Take care about converting their internals as well.
    Take care about assigning master according to acq_mode.

    :param ctrls: sequence of configuration controllers objects
    :param acq_mode: acquisition mode (timer/monitor)
    :return: sequence of acquisition controllers

    .. note::
        The get_timerable_ctrls function has been included in Sardana
        on a provisional basis. Backwards incompatible changes
        (up to and including removal of the class) may occur if
        deemed necessary by the core developers.
    """
    action_ctrls = []
    for ctrl in ctrls:
        attrs = {}
        if acq_mode is not None:
            master = None
            if acq_mode is AcqMode.Timer:
                master = ctrl.timer
            elif acq_mode is AcqMode.Monitor:
                master = ctrl.monitor
            attrs = {'master': master}
        action_ctrl = AcqController(ctrl, attrs)
        action_ctrls.append(action_ctrl)
    return action_ctrls


def get_timerable_items(ctrls: List["sardana.pool.poolmeasurementgroup.ControllerConfiguration"], master: "sardana.pool.poolmeasurementgroup.ChannelConfiguration", acq_mode: sardana.pool.AcqMode = AcqMode.Timer) -> "AcqController":
    """Converts timerable configuration items into acquisition items.

    The timerable items are controllers and master. Convert these into
    the corresponding acquisition items.

    Take care about converting their internals as well.
    Take care about assigning master according to acq_mode.

    :param ctrls: sequence of configuration controllers objects
    :param master: master configuration object
    :param acq_mode: acquisition mode (timer/monitor)
    :return: sequence of acquisition controllers

    .. note::
        The get_timerable_ctrls function has been included in Sardana
        on a provisional basis. Backwards incompatible changes
        (up to and including removal of the class) may occur if
        deemed necessary by the core developers.
    """
    ctrls = get_timerable_ctrls(ctrls, acq_mode)
    # Search master AcqConfigurationItem obj
    for ctrl in ctrls:
        for channel in ctrl.get_channels():
            if channel.configuration == master:
                master = channel
                break
    return ctrls, master


class ActionArgs(object):

    def __init__(self, args, kwargs=None):
        self.args = args
        if kwargs is None:
            kwargs = {}
        self.kwargs = kwargs


class AcqConfigurationItem(object):
    """Wrapper for configuration item that will be used in an action.

    .. note::
        The AcqConfigurationItem function has been included in Sardana
        on a provisional basis. Backwards incompatible changes
        (up to and including removal of the class) may occur if
        deemed necessary by the core developers.
    """

    def __init__(self, configuration: "sardana.pool.poolmeasurementgroup.ConfigurationItem", attrs: Optional[Dict] = None) -> None:
        """Constructs action item from a configuration item.

        Eventually it can be enriched with attrs.

        :param configuration: item configuration object
        :param attrs: extra attributes to be inserted
        """
        self._configuration = weakref.ref(configuration)
        self.enabled = True

        if attrs is not None:
            self.__dict__.update(attrs)

    def __getattr__(self, item):
        return getattr(self.configuration, item)

    def get_configuration(self):
        """Returns the element associated with this item"""
        return self._configuration()

    def set_configuration(self, configuration):
        """Sets the element for this item"""
        self._configuration = weakref.ref(configuration)

    configuration = property(get_configuration)


class AcqController(AcqConfigurationItem):
    """Wrapper for controller configuration that will be used in an action.

    .. note::
        The AcqController function has been included in Sardana
        on a provisional basis. Backwards incompatible changes
        (up to and including removal of the class) may occur if
        deemed necessary by the core developers.
    """

    def __init__(self, configuration: "sardana.pool.poolmeasurementgroup.ControllerConfiguration", attrs: Optional[Dict] = None) -> None:
        """Constructs action controller from a configuration controller.

        Eventually it can be enriched with attrs.

        :param configuration: controller configuration object
        :param attrs: extra attributes to be inserted
        """
        master = None
        if attrs is not None:
            master = attrs.get('master')
        self._channels = []
        self._channels_enabled = []
        self._channels_disabled = []
        ch_attrs = {'controller': self}
        for conf_channel in configuration.get_channels():
            action_channel = AcqConfigurationItem(conf_channel, ch_attrs)
            self._channels.append(action_channel)
            if conf_channel in configuration.get_channels(enabled=True):
                self._channels_enabled.append(action_channel)
            if conf_channel in configuration.get_channels(enabled=False):
                self._channels_disabled.append(action_channel)
            if master is None:
                continue
            if master == conf_channel:
                attrs['master'] = action_channel
                master = None
        AcqConfigurationItem.__init__(self, configuration, attrs)

    def get_channels(self, enabled=None):
        if enabled is None:
            return list(self._channels)
        elif enabled:
            return list(self._channels_enabled)
        else:
            return list(self._channels_disabled)


class AcquisitionBaseContext(OperationContext):

    def exit(self):
        pool_action = self._pool_action
        pool_action._reset_ctrl_dicts()
        return OperationContext.exit(self)


[docs] class PoolAcquisition(PoolAction): """Acquisition action which is internally composed for sub-actions. Handle acquisition of experimental channels of the following types: * timerable (C/T, 1D and 2D) synchronized by software or hardware trigger/gate/start * 0D Synchronized by T/G elements or sofware synchronizer. """ def __init__(self, main_element, name="Acquisition"): PoolAction.__init__(self, main_element, name) zerodname = name + ".0DAcquisition" hwname = name + ".HardwareAcquisition" swname = name + ".SoftwareAcquisition" sw_start_name = name + ".SoftwareStartAcquisition" synchname = name + ".Synchronization" self._sw_acq_args = None self._sw_start_acq_args = None self._0d_acq_args = None self._hw_acq_args = None self._synch_args = None self._sw_acq = PoolAcquisitionSoftware(main_element, name=swname) self._sw_start_acq = PoolAcquisitionSoftwareStart( main_element, name=sw_start_name) self._0d_acq = Pool0DAcquisition(main_element, name=zerodname) self._hw_acq = PoolAcquisitionHardware(main_element, name=hwname) self._synch = PoolSynchronization(main_element, name=synchname) self._handled_first_active = False
[docs] def event_received(self, *args, **kwargs): """Callback executed on event of software synchronizer. Reacts on start, active, passive or end type of events """ timestamp = time.time() _, type_, index = args name = type_.name if name == "state": return t_fmt = '%Y-%m-%d %H:%M:%S.%f' t_str = datetime.datetime.fromtimestamp(timestamp).strftime(t_fmt) msg = '%s event with id: %d received at: %s' % (name, index, t_str) self.debug(msg) if name == "start": if self._sw_start_acq_args is not None: self._sw_start_acq._wait() self._sw_start_acq._set_busy() self.debug('Executing software start acquisition.') self._sw_start_acq._started = True get_thread_pool().add(self._sw_start_acq.run, self._sw_start_acq._set_ready, *self._sw_start_acq_args.args, **self._sw_start_acq_args.kwargs) elif name == "active": # this code is not thread safe, but for the moment we assume that # only one EventGenerator will work at the same time if self._handled_first_active: timeout = 0 else: timeout = None self._handled_first_active = True if self._sw_acq_args is not None: if not self._sw_acq._wait(timeout): msg = ('Skipping trigger: software acquisition is still' ' in progress.') self.debug(msg) return else: self._sw_acq._set_busy() self.debug('Executing software acquisition.') self._sw_acq_args.kwargs.update({'index': index}) self._sw_acq._started = True get_thread_pool().add(self._sw_acq.run, self._sw_acq._set_ready, *self._sw_acq_args.args, **self._sw_acq_args.kwargs) if self._0d_acq_args is not None: if not self._0d_acq._wait(timeout): msg = ('Skipping trigger: ZeroD acquisition is still in' ' progress.') self.debug(msg) return else: self._0d_acq._set_busy() self.debug('Executing ZeroD acquisition.') self._0d_acq_args.kwargs.update({'index': index}) self._0d_acq._started = True self._0d_acq._stopped = False self._0d_acq._aborted = False self._0d_acq._released = False get_thread_pool().add(self._0d_acq.run, self._0d_acq._set_ready, *self._0d_acq_args.args, **self._0d_acq_args.kwargs) elif name == "passive": # TODO: _0d_acq_args comparison may not be necessary if (self._0d_acq_args is not None and not self._0d_acq._is_ready()): self.debug('Stopping ZeroD acquisition.') self._0d_acq.stop_action()
[docs] def prepare(self, config, acq_mode, value, synch_description=None, moveable=None, software_synchronizer_initial_domain=None, nb_starts=1, **kwargs): """Prepare measurement process. Organize sub-action arguments and loads configuration parameters to the hardware controllers. """ self._sw_acq_args = None self._sw_start_acq_args = None self._0d_acq_args = None self._hw_acq_args = None self._synch_args = None self._handled_first_active = False ctrls_hw = [] ctrls_sw = [] ctrls_sw_start = [] repetitions = synch_description.repetitions latency = synch_description.passive_time # Prepare controllers synchronized by hardware acq_sync_hw = [AcqSynch.HardwareTrigger, AcqSynch.HardwareStart, AcqSynch.HardwareGate] ctrls = config.get_timerable_ctrls(acq_synch=acq_sync_hw, enabled=True) if len(ctrls) > 0: ctrls_hw = get_timerable_ctrls(ctrls, acq_mode) hw_args = (ctrls_hw, value, repetitions, latency) hw_kwargs = {} hw_kwargs.update(kwargs) self._hw_acq_args = ActionArgs(hw_args, hw_kwargs) # Prepare controllers synchronized by software Trigger and Gate acq_sync_sw = [AcqSynch.SoftwareGate, AcqSynch.SoftwareTrigger] ctrls = config.get_timerable_ctrls(acq_synch=acq_sync_sw, enabled=True) if len(ctrls) > 0: if acq_mode is AcqMode.Timer: master = config.get_master_timer_software() elif acq_mode is AcqMode.Monitor: master = config.get_master_monitor_software() ctrls_sw, master_sw = get_timerable_items(ctrls, master, acq_mode) sw_args = (ctrls_sw, value, master_sw) sw_kwargs = {'synch': True} sw_kwargs.update(kwargs) self._sw_acq_args = ActionArgs(sw_args, sw_kwargs) # Prepare controllers synchronized by software Start ctrls = config.get_timerable_ctrls(acq_synch=AcqSynch.SoftwareStart, enabled=True) if len(ctrls) > 0: if acq_mode is AcqMode.Timer: master = config.get_master_timer_software_start() elif acq_mode is AcqMode.Monitor: master = config.get_master_monitor_software_start() ctrls_sw_start, master_sw_start = get_timerable_items(ctrls, master, acq_mode) sw_start_args = (ctrls_sw_start, value, master_sw_start, repetitions, latency) sw_start_kwargs = {'synch': True} sw_start_kwargs.update(kwargs) self._sw_start_acq_args = ActionArgs(sw_start_args, sw_start_kwargs) # Prepare 0D controllers ctrls = config.get_zerod_ctrls(enabled=True) if len(ctrls) > 0: ctrls_acq_0d = get_acq_ctrls(ctrls) zerod_args = (ctrls_acq_0d,) zerod_kwargs = {'synch': True} zerod_kwargs.update(kwargs) self._0d_acq_args = ActionArgs(zerod_args, zerod_kwargs) # Prepare synchronizer controllers ctrls = config.get_synch_ctrls(enabled=True) ctrls_synch = get_acq_ctrls(ctrls) synch_args = (ctrls_synch, synch_description) synch_kwargs = { 'moveable': moveable, 'software_synchronizer_initial_domain': software_synchronizer_initial_domain } synch_kwargs.update(kwargs) self._synch_args = ActionArgs(synch_args, synch_kwargs) # Validate moveable and eventually configure active_input (multiplexor) for synch in config.get_synchs(enabled=True): moveable_on_input = synch.moveable_on_input if moveable_on_input is None: continue # multiplexor mode elif isinstance(moveable_on_input, dict): if moveable is None: input_id = None else: input_id = moveable_on_input.get(moveable.full_name) if input_id is None: input_id = moveable_on_input.get(moveable.name) if input_id is None: raise PoolException( "moveable {} is not configured on {} input".format( moveable, synch.name) ) synch.controller.set_axis_par( synch.axis, "active_input", input_id ) # coupled mode elif moveable_on_input not in (moveable.full_name, moveable.name): raise PoolException( "moveable {} is not configured on {} input".format( moveable, synch.name) ) # Load the configuration to the timerable controllers # TODO: apply the configuration only if necessary # Checking only the "changed" flag is not enough, one needs to check # if the controllers were not used with different measurement groups # configurations meanwhile (see: sardana-org/sardana#1171) in this # case the configuration must be applied even if it was not changed # if config.changed: ctrls = ctrls_hw + ctrls_sw_start + ctrls_sw for ctrl in ctrls: pool_ctrl = ctrl.element if not pool_ctrl.is_online(): raise RuntimeError('The controller {0} is ' 'offline'.format(pool_ctrl.name)) pool_ctrl.set_ctrl_par('acquisition_mode', acq_mode) pool_ctrl.operator = self.main_element pool_ctrl.set_ctrl_par('timer', ctrl.timer.axis) pool_ctrl.set_ctrl_par('monitor', ctrl.monitor.axis) synch = config.get_acq_synch_by_controller(pool_ctrl) pool_ctrl.set_ctrl_par('synchronization', synch) if ctrl.is_referable(): for channel in ctrl.get_channels(): value_ref_enabled = channel.value_ref_enabled pool_ctrl.set_axis_par(channel.axis, "value_ref_enabled", value_ref_enabled) if value_ref_enabled: pool_ctrl.set_axis_par(channel.axis, "value_ref_pattern", channel.value_ref_pattern) config.changed = False # Call synchronizer controllers prepare method self._prepare_synch_ctrls(ctrls_synch, nb_starts) # Call hardware and software start controllers prepare method ctrls = ctrls_hw + ctrls_sw_start self._prepare_ctrls(ctrls, value, repetitions, latency, nb_starts) # Call software controllers prepare method nb_starts = nb_starts * repetitions repetitions = 1 self._prepare_ctrls(ctrls_sw, value, repetitions, latency, nb_starts)
@staticmethod def _prepare_ctrls(ctrls, value, repetitions, latency, nb_starts): for ctrl in ctrls: axis = ctrl.master.axis pool_ctrl = ctrl.element pool_ctrl.ctrl.PrepareOne(axis, value, repetitions, latency, nb_starts) @staticmethod def _prepare_synch_ctrls(ctrls, nb_starts): for ctrl in ctrls: for chn in ctrl.get_channels(): axis = chn.axis pool_ctrl = ctrl.element pool_ctrl.ctrl.PrepareOne(axis, nb_starts)
[docs] def is_running(self): """Checks if acquisition is running. Acquisition is runnin if any of its sub-actions is running. """ return self._sw_start_acq.is_running()\ or self._0d_acq.is_running()\ or self._sw_acq.is_running()\ or self._hw_acq.is_running()\ or self._synch.is_running()
[docs] def run(self, *args, **kwargs): """Runs acquisition according to previous preparation.""" for elem in self.get_elements(): elem.put_state(None) # TODO: temporarily clear value buffers at the beginning of the # acquisition instead of doing it in the finish hook of each # acquisition sub-actions. See extensive explanation in the # constructor of PoolAcquisitionBase. try: elem.clear_value_buffer() except AttributeError: continue try: elem.clear_value_ref_buffer() except AttributeError: continue # clean also the pseudo counters, even the ones that do not # participate directly in the acquisition for pseudo_elem in elem.get_pseudo_elements(): pseudo_elem().clear_value_buffer() if self._hw_acq_args is not None: self._hw_acq._wait() self._hw_acq._set_busy() try: self._hw_acq.run(*self._hw_acq_args.args, **self._hw_acq_args.kwargs, cb=self._hw_acq._set_ready) except: self._hw_acq._set_ready() raise if self._sw_acq_args is not None\ or self._sw_start_acq_args is not None\ or self._0d_acq_args is not None: self._synch.add_listener(self) if self._synch_args is not None: self._synch._wait() self._synch._set_busy() try: self._synch.run(*self._synch_args.args, **self._synch_args.kwargs, cb=self._synch._set_ready) except: self._synch._set_ready() raise
[docs] def release_action(self): self._synch.release_action() self._hw_acq.release_action() self._sw_start_acq.release_action() self._sw_acq.release_action() self._0d_acq.release_action()
def _get_action_for_element(self, element): elem_type = element.get_type() if elem_type in TYPE_TIMERABLE_ELEMENTS: config = self.main_element.configuration try: acq_synch = config.get_acq_synch_by_channel(element) # when configuration was not yet set and one sets the # measurement group's integration time (this may happen on Tango # device initialization when memorized attributes are set we # fallback to software acquisition except KeyError: acq_synch = AcqSynch.SoftwareTrigger if acq_synch in (AcqSynch.SoftwareTrigger, AcqSynch.SoftwareGate): return self._sw_acq elif acq_synch == AcqSynch.SoftwareStart: return self._sw_start_acq elif acq_synch in (AcqSynch.HardwareTrigger, AcqSynch.HardwareGate, AcqSynch.HardwareStart): return self._hw_acq elif elem_type == ElementType.ZeroDExpChannel: return self._0d_acq elif elem_type == ElementType.TriggerGate: return self._synch else: raise RuntimeError("Could not determine action for element %s" % element)
[docs] def clear_elements(self): """Clears all elements from this action"""
[docs] def add_element(self, element: sardana.pool.poolelement.PoolElement) -> None: """Adds a new element to this action. :param element: the new element to be added """ action = self._get_action_for_element(element) action.add_element(element)
[docs] def remove_element(self, element: sardana.pool.poolelement.PoolElement) -> None: """Removes an element from this action. If the element is not part of this action, a ValueError is raised. :param element: the new element to be removed :raises: ValueError""" for action in self._get_acq_for_element(element): action.remove_element(element)
[docs] def get_elements(self, copy_of: bool = False) -> Sequence[sardana.pool.poolelement.PoolElement]: """Returns a sequence of all elements involved in this action. :param copy_of: If False (default) the internal container of elements is returned. If True, a copy of the internal container is returned instead :return: a sequence of all elements involved in this action. """ return (self._hw_acq.get_elements() + self._sw_acq.get_elements() + self._sw_start_acq.get_elements() + self._0d_acq.get_elements() + self._synch.get_elements())
[docs] def get_pool_controller_list(self) -> List[PoolController]: """Returns a list of all controller elements involved in this action. :return: a list of all controller elements involved in this action. """ return self._pool_ctrl_list
[docs] def get_pool_controllers(self) -> Dict[PoolController, Sequence[PoolElement]]: """Returns a dict of all controller elements involved in this action. :return: a dict of all controller elements involved in this action. """ ret = {} ret.update(self._hw_acq.get_pool_controllers()) ret.update(self._sw_acq.get_pool_controllers()) ret.update(self._sw_start_acq.get_pool_controllers()) ret.update(self._0d_acq.get_pool_controllers()) return ret
pool_controllers = property(get_pool_controllers)
[docs] def read_value(self, ret: Optional[Dict] = None, serial: bool = False) -> Dict[sardana.pool.poolelement.PoolElement, sardana.sardanavalue.SardanaValue]: """Reads value information of all elements involved in this action :param ret: output map parameter that should be filled with value information. If None is given (default), a new map is created an returned :param serial: If False (default) perform controller HW value requests in parallel. If True, access is serialized. :return: a map containing value information per element """ # TODO: this is broken now - fix it ret = self._ct_acq.read_value(ret=ret, serial=serial) ret.update(self._0d_acq.read_value(ret=ret, serial=serial)) return ret
class PoolAcquisitionBase(PoolAction): """Base class for sub-acquisition. .. note:: The PoolAcquisitionBase class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the module) may occur if deemed necessary by the core developers. .. todo: Think of moving the ready/busy mechanism to PoolAction """ def __init__(self, main_element, name): PoolAction.__init__(self, main_element, name) self._channels = [] self._index = None self._ready = threading.Event() self._ready.set() def _is_ready(self): return self._ready.is_set() def _wait(self, timeout=None): return self._ready.wait(timeout) def _set_ready(self, _=None): self._ready.set() def _is_busy(self): return not self._ready.is_set() def _set_busy(self): self._ready.clear() class PoolAcquisitionTimerable(PoolAcquisitionBase): """Base class for acquisitions of timerable channels. Implements a generic start_action method. action_loop method must be implemented by the sub-class. .. note:: The PoolAcquisitionTimerable class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the module) may occur if deemed necessary by the core developers. """ OperationContextClass = AcquisitionBaseContext def __init__(self, main_element, name): PoolAcquisitionBase.__init__(self, main_element, name) self._nb_states_per_value = None self._acq_sleep_time = None self._pool_ctrl_dict_loop = None self._pool_ctrl_dict_ref = None self._pool_ctrl_dict_value = None # TODO: for the moment we can not clear value buffers at the end of # the acquisition. This is because of the pseudo counters that are # based on channels synchronized by hardware and software. # These two acquisition actions finish at different moment so the # pseudo counter will loose the value buffer of some of its physicals # if we clear the buffer at the end. # Whenever there will be solution for that, after refactoring of the # acquisition actions, uncomment this line # self.add_finish_hook(self.clear_value_buffers, True) def get_read_value_ref_ctrls(self): return self._pool_ctrl_dict_ref def read_value_ref(self, ret: Optional[Dict] = None, serial: bool = False) -> Dict[sardana.pool.poolelement.PoolElement, Union[Any, Exception, None]]: """Reads value ref information of all elements involved in this action :param ret: output map parameter that should be filled with value information. If None is given (default), a new map is created an returned :param serial: If False (default) perform controller HW value requests in parallel. If True, access is serialized. :return: a map containing value information per element """ with ActionContext(self): return self.raw_read_value_ref(ret=ret, serial=serial) def raw_read_value_ref(self, ret: Optional[Dict] = None, serial: bool = False) -> Dict[sardana.pool.poolelement.PoolElement, sardana.sardanavalue.SardanaValue]: """**Unsafe**. Reads value ref information of all referable elements involved in this acquisition :param ret: output map parameter that should be filled with value information. If None is given (default), a new map is created an returned :param serial: If False (default) perform controller HW value requests in parallel. If True, access is serialized. :return: a map containing value information per element """ if ret is None: ret = {} read = self._raw_read_value_ref_concurrent if serial: read = self._raw_read_value_ref_serial value_info = self._value_info with value_info: value_info.init(len(self.get_read_value_ref_ctrls())) read(ret) value_info.wait() return ret def _raw_read_value_ref_serial(self, ret): """Internal method. Read value ref in a serial mode""" for pool_ctrl in self.get_read_value_ref_ctrls(): self._raw_read_ctrl_value_ref(ret, pool_ctrl) return ret def _raw_read_value_ref_concurrent(self, ret): """Internal method. Read value ref in a concurrent mode""" th_pool = get_thread_pool() for pool_ctrl in self.get_read_value_ref_ctrls(): th_pool.add(self._raw_read_ctrl_value_ref, None, ret, pool_ctrl) return ret def _raw_read_ctrl_value_ref(self, ret, pool_ctrl): """Internal method. Read controller value ref information and store it in ret parameter""" try: axes = [elem.axis for elem in self._pool_ctrl_dict_ref[pool_ctrl]] value_infos = pool_ctrl.raw_read_axis_value_refs(axes) ret.update(value_infos) finally: self._value_info.finish_one() def _process_value_buffer(self, acquirable, value, final=False): final_str = "final " if final else "" if is_value_error(value): self.error("Loop %sread value error for %s" % (final_str, acquirable.name)) msg = "Details: " + "".join( traceback.format_exception(*value.exc_info)) self.debug(msg) acquirable.put_value(value, propagate=2) else: acquirable.extend_value_buffer(value, propagate=2) def _process_value_ref_buffer(self, acquirable, value_ref, final=False): final_str = "final " if final else "" if is_value_error(value_ref): self.error("Loop read ref %svalue error for %s" % (final_str, acquirable.name)) msg = "Details: " + "".join( traceback.format_exception(*value_ref.exc_info)) self.debug(msg) acquirable.put_value_ref(value_ref, propagate=2) else: acquirable.extend_value_ref_buffer(value_ref, propagate=2) def in_acquisition(self, states: Dict[PoolElement, State]) -> bool: """Determines if we are in acquisition or if the acquisition has ended based on the current unit trigger modes and states returned by the controller(s) :param states: a map containing state information as returned by read_state_info :return: returns True if in acquisition or False otherwise """ for elem in states: s = states[elem][0][0] if self._is_in_action(s): return True @DebugIt() def start_action(self, ctrls, value, master, repetitions, latency, index, acq_sleep_time, nb_states_per_value, **kwargs): """ Prepares everything for acquisition and starts it :param ctrls: List of enabled pool acquisition controllers :param value: integration time/monitor counts :param repetitions: repetitions :param latency: :param master: master channel is the last one to start :param index: :param acq_sleep_time: sleep time between state queries :param nb_states_per_value: how many state queries between readouts :param args: :param kwargs: :return: """ pool = self.pool self._aborted = False self._stopped = False self._released = False self._index = index self._acq_sleep_time = acq_sleep_time if self._acq_sleep_time is None: self._acq_sleep_time = pool.acq_loop_sleep_time self._nb_states_per_value = nb_states_per_value if self._nb_states_per_value is None: self._nb_states_per_value = pool.acq_loop_states_per_value # make sure the controller which has the master channel is the last to # be called if master is not None: ctrls.remove(master.controller) ctrls.append(master.controller) # controllers that will be read during the action self._set_pool_ctrl_dict_loop(ctrls) # split controllers to read value and value reference self._split_ctrl(ctrls) # channels that are acquired (only enabled) self._channels = [] def load(channel, value, repetitions, latency=0): axis = channel.axis pool_ctrl = channel.controller ctrl = pool_ctrl.ctrl ctrl.PreLoadAll() res = ctrl.PreLoadOne(axis, value, repetitions, latency) if not res: msg = ("%s.PreLoadOne(%d) returned False" % (pool_ctrl.name, axis)) raise Exception(msg) ctrl.LoadOne(axis, value, repetitions, latency) ctrl.LoadAll() with ActionContext(self): # PreLoadAll, PreLoadOne, LoadOne and LoadAll for ctrl in ctrls: # TODO find solution for master now sardana only use timer load(ctrl.timer, value, repetitions, latency) # TODO: remove when the action allows to use tango attributes try: ctrls.pop('__tango__') except Exception: pass # PreStartAll on all enabled controllers for ctrl in ctrls: pool_ctrl = ctrl.element pool_ctrl.ctrl.PreStartAll() # PreStartOne & StartOne on all enabled elements for ctrl in ctrls: channels = ctrl.get_channels(enabled=True) # make sure that the master timer/monitor is started as # the last one channels.remove(ctrl.master) channels.append(ctrl.master) for channel in channels: axis = channel.axis pool_ctrl = ctrl.element ret = pool_ctrl.ctrl.PreStartOne(axis, value) if not ret: msg = ("%s.PreStartOne(%d) returns False" % (ctrl.name, axis)) raise Exception(msg) try: pool_ctrl = ctrl.element pool_ctrl.ctrl.StartOne(axis, value) except Exception as e: self.debug(e, exc_info=True) channel.set_state_info((State.Fault, None), propagate=2) msg = ("%s.StartOne(%d) failed" % (ctrl.name, axis)) raise Exception(msg) self._channels.append(channel) # set the state of all elements to Moving and inform their listeners for channel in self._channels: channel.set_state_info((State.Moving, None), propagate=2) # StartAll on all enabled controllers for ctrl in ctrls: try: pool_ctrl = ctrl.element pool_ctrl.ctrl.StartAll() except Exception as e: channels = ctrl.get_channels(enabled=True) self.debug(e, exc_info=True) for channel in channels: channel.set_state_info((State.Fault, None), propagate=2) msg = ("%s.StartAll() failed" % ctrl.name) raise Exception(msg) def _set_pool_ctrl_dict_loop(self, ctrls): ctrl_channels = {} for ctrl in ctrls: pool_channels = [] pool_ctrl = ctrl.element # only CT will be read in the loop, 1D and 2D not if ElementType.CTExpChannel not in ctrl.get_ctrl_types(): continue for channel in ctrl.get_channels(enabled=True): pool_channels.append(channel.element) ctrl_channels[pool_ctrl] = pool_channels self._pool_ctrl_dict_loop = ctrl_channels def _split_ctrl(self, ctrls): ctrl_channels_value = {} ctrl_channels_ref = {} for ctrl in ctrls: if not ctrl.is_referable(): pool_channels_value = [] pool_ctrl = ctrl.element for channel in ctrl.get_channels(enabled=True): pool_channels_value.append(channel.element) ctrl_channels_value[pool_ctrl] = pool_channels_value else: pool_channels_value = [] pool_channels_ref = [] pool_ctrl = ctrl.element for channel in ctrl.get_channels(enabled=True): if channel.value_ref_enabled: pool_channels_ref.append(channel.element) if channel.has_pseudo_elements(): pool_channels_value.append(channel.element) else: pool_channels_value.append(channel.element) if len(pool_channels_value) > 0: ctrl_channels_value[pool_ctrl] = pool_channels_value if len(pool_channels_ref) > 0: ctrl_channels_ref[pool_ctrl] = pool_channels_ref self._pool_ctrl_dict_value = ctrl_channels_value self._pool_ctrl_dict_ref = ctrl_channels_ref def _reset_ctrl_dicts(self): self._pool_ctrl_dict_loop = None self._pool_ctrl_dict_value = None self._pool_ctrl_dict_ref = None def clear_value_buffers(self): for channel in self._channels: channel.clear_value_buffer() class PoolAcquisitionHardware(PoolAcquisitionTimerable): """Acquisition action for controllers synchronized by hardware .. note:: The PoolAcquisitionHardware class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the module) may occur if deemed necessary by the core developers. .. todo:: Try to move the action loop logic to base class it is basically the same as in PoolAcquisitionSoftwareStart. """ def __init__(self, main_element, name="AcquisitionHardware"): PoolAcquisitionTimerable.__init__(self, main_element, name) def start_action(self, ctrls, value, repetitions, latency, acq_sleep_time=None, nb_states_per_value=None, **kwargs): PoolAcquisitionTimerable.start_action(self, ctrls, value, None, repetitions, latency, None, acq_sleep_time, nb_states_per_value, **kwargs) def get_read_value_ctrls(self): return self._pool_ctrl_dict_value @DebugIt() def action_loop(self): i = 0 states, values, value_refs = {}, {}, {} for channel in self._channels: element = channel.element states[element] = None nap = self._acq_sleep_time nb_states_per_value = self._nb_states_per_value while not self.was_released(): self.read_state_info(ret=states) if self.in_acquisition(states): for acquirable, state_info in list(states.items()): state_info = acquirable._from_ctrl_state_info(state_info) acquirable.set_state_info(state_info, propagate=1) else: # states will be set as finish hook # in order to update values with the last value readout break # read value every n times if not i % nb_states_per_value: self.read_value(ret=values) for acquirable, value in list(values.items()): self._process_value_buffer(acquirable, value) self.read_value_ref(ret=value_refs) for acquirable, value_ref in list(value_refs.items()): self._process_value_ref_buffer(acquirable, value_ref) time.sleep(nap) i += 1 with ActionContext(self): self.raw_read_value(ret=values) self.raw_read_value_ref(ret=value_refs) for acquirable, state_info in list(states.items()): if acquirable in values: value = values[acquirable] self._process_value_buffer(acquirable, value, final=True) if acquirable in value_refs: value_ref = value_refs[acquirable] self._process_value_ref_buffer(acquirable, value_ref, final=True) state_info = acquirable._from_ctrl_state_info(state_info) set_state_info = functools.partial(acquirable.set_state_info, state_info, propagate=2, safe=True) self.add_finish_hook(set_state_info, False) class PoolAcquisitionSoftware(PoolAcquisitionTimerable): """Acquisition action for controllers synchronized by software .. note:: The PoolAcquisitionSoftware class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the module) may occur if deemed necessary by the core developers. """ def __init__(self, main_element, name="AcquisitionSoftware", slaves=None): PoolAcquisitionTimerable.__init__(self, main_element, name) if slaves is None: slaves = () self._slaves = slaves def get_read_value_ctrls(self): # technical debt in order to work both in case of meas group and # single channel if self._pool_ctrl_dict_value is not None: return self._pool_ctrl_dict_value else: return self._pool_ctrl_dict def get_read_value_ref_ctrls(self): # technical debt in order to work both in case of meas group and # single channel if self._pool_ctrl_dict_ref is not None: return self._pool_ctrl_dict_ref else: return self._pool_ctrl_dict def get_read_value_loop_ctrls(self): return self._pool_ctrl_dict_loop def start_action(self, ctrls, value, master, index, acq_sleep_time=None, nb_states_per_value=None, **kwargs): PoolAcquisitionTimerable.start_action(self, ctrls, value, master, 1, 0, index, acq_sleep_time, nb_states_per_value, **kwargs) @DebugIt() def action_loop(self): states, values, value_refs = {}, {}, {} for channel in self._channels: element = channel.element states[element] = None nap = self._acq_sleep_time nb_states_per_value = self._nb_states_per_value i = 0 while not self.was_released(): self.read_state_info(ret=states) if self.in_acquisition(states): for acquirable, state_info in list(states.items()): state_info = acquirable._from_ctrl_state_info(state_info) acquirable.set_state_info(state_info, propagate=1) else: # states will be set as finish hook # in order to update values with the last value readout break # read value every n times if not i % nb_states_per_value: self.read_value_loop(ret=values) for acquirable, value in list(values.items()): acquirable.put_value(value, quality=AttrQuality.Changing) time.sleep(nap) i += 1 for slave in self._slaves: try: slave.stop_action() except Exception: self.warning("Unable to stop slave acquisition %s", slave.getLogName()) self.debug("Details", exc_info=1) with ActionContext(self): self.raw_read_value(ret=values) self.raw_read_value_ref(ret=value_refs) for acquirable, state_info in list(states.items()): if acquirable in values: value = values[acquirable] if is_value_error(value): self.error("Loop final read value error for: %s" % acquirable.name) msg = "Details: " + "".join( traceback.format_exception(*value.exc_info)) self.debug(msg) acquirable.get_value_attribute().set_quality( AttrQuality.Valid) acquirable.append_value_buffer(value, self._index, propagate=2) if acquirable in value_refs: value_ref = value_refs[acquirable] if is_value_error(value_ref): self.error("Loop final read value ref error for: %s" % acquirable.name) msg = "Details: " + "".join( traceback.format_exception(*value_ref.exc_info)) self.debug(msg) acquirable.append_value_ref_buffer(value_ref, self._index) state_info = acquirable._from_ctrl_state_info(state_info) set_state_info = functools.partial(acquirable.set_state_info, state_info, propagate=2, safe=True) self.add_finish_hook(set_state_info, False) class PoolAcquisitionSoftwareStart(PoolAcquisitionTimerable): """Acquisition action for controllers synchronized by software start .. note:: The PoolAcquisitionSoftwareStart class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the module) may occur if deemed necessary by the core developers. .. todo:: Try to move the action loop logic to base class it is basically the same as in PoolAcquisitionHardware. """ def __init__(self, main_element, name="AcquisitionSoftwareStart"): PoolAcquisitionTimerable.__init__(self, main_element, name) def get_read_value_ctrls(self): # technical debt in order to work both in case of meas group and # single channel return self._pool_ctrl_dict_value def start_action(self, ctrls, value, master, repetitions, latency, acq_sleep_time=None, nb_states_per_value=None, **kwargs): PoolAcquisitionTimerable.start_action(self, ctrls, value, master, repetitions, latency, None, acq_sleep_time, nb_states_per_value, **kwargs) @DebugIt() def action_loop(self): i = 0 states, values, value_refs = {}, {}, {} for channel in self._channels: element = channel.element states[element] = None nap = self._acq_sleep_time nb_states_per_value = self._nb_states_per_value while not self.was_released(): self.read_state_info(ret=states) if self.in_acquisition(states): for acquirable, state_info in list(states.items()): state_info = acquirable._from_ctrl_state_info(state_info) acquirable.set_state_info(state_info, propagate=1) else: # states will be set as finish hook # in order to update values with the last value readout break # read value every n times if not i % nb_states_per_value: self.read_value(ret=values) for acquirable, value in list(values.items()): if is_value_error(value): self.error("Loop read value error for %s" % acquirable.name) msg = "Details: " + "".join( traceback.format_exception(*value.exc_info)) self.debug(msg) acquirable.put_value(value) else: acquirable.extend_value_buffer(value) self.read_value_ref(ret=value_refs) for acquirable, value_ref in list(value_refs.items()): if is_value_error(value_ref): self.error("Loop read value ref error for %s" % acquirable.name) msg = "Details: " + "".join( traceback.format_exception(*value.exc_info)) self.debug(msg) acquirable.put_value_ref(value) else: acquirable.extend_value_ref_buffer(value_ref) time.sleep(nap) i += 1 with ActionContext(self): self.raw_read_value(ret=values) self.raw_read_value_ref(ret=value_refs) for acquirable, state_info in list(states.items()): if acquirable in values: value = values[acquirable] if is_value_error(value): self.error("Loop final read value error for: %s" % acquirable.name) msg = "Details: " + "".join( traceback.format_exception(*value.exc_info)) self.debug(msg) acquirable.put_value(value) else: acquirable.extend_value_buffer(value, propagate=2) if acquirable in value_refs: value_ref = value_refs[acquirable] if is_value_error(value_ref): self.error("Loop final read value ref error for: %s" % acquirable.name) msg = "Details: " + "".join( traceback.format_exception(*value_ref.exc_info)) self.debug(msg) acquirable.put_value_ref(value_ref) else: acquirable.extend_value_ref_buffer(value_ref, propagate=2) state_info = acquirable._from_ctrl_state_info(state_info) set_state_info = functools.partial(acquirable.set_state_info, state_info, propagate=2, safe=True) self.add_finish_hook(set_state_info, False) class PoolCTAcquisition(PoolAcquisitionTimerable): """..todo:: remove it, still used by pseudo counter""" def __init__(self, main_element, name="CTAcquisition", slaves=None): self._channels = None if slaves is None: slaves = () self._slaves = slaves PoolAcquisitionTimerable.__init__(self, main_element, name) def get_read_value_loop_ctrls(self): return self._pool_ctrl_dict_loop def in_acquisition(self, states: Dict[PoolElement, State]) -> bool: """Determines if we are in acquisition or if the acquisition has ended based on the current unit trigger modes and states returned by the controller(s) :param states: a map containing state information as returned by read_state_info :return: returns True if in acquisition or False otherwise """ for elem in states: s = states[elem][0][0] if self._is_in_action(s): return True @DebugIt() def action_loop(self): i = 0 states, values = {}, {} for element in self._channels: states[element] = None # values[element] = None nap = self._acq_sleep_time nb_states_per_value = self._nb_states_per_value # read values to send a first event when starting to acquire with ActionContext(self): self.raw_read_value_loop(ret=values) for acquirable, value in list(values.items()): acquirable.put_value(value, propagate=2) while True: self.read_state_info(ret=states) if self.in_acquisition(states): for acquirable, state_info in list(states.items()): state_info = acquirable._from_ctrl_state_info(state_info) acquirable.set_state_info(state_info, propagate=1) else: # states will be set as finish hook # in order to update values with the last value readout break # read value every n times if not i % nb_states_per_value: self.read_value_loop(ret=values) for acquirable, value in list(values.items()): acquirable.put_value(value) time.sleep(nap) i += 1 for slave in self._slaves: try: slave.stop_action() except Exception: self.warning("Unable to stop slave acquisition %s", slave.getLogName()) self.debug("Details", exc_info=1) with ActionContext(self): self.raw_read_state_info(ret=states) self.raw_read_value_loop(ret=values) for acquirable, state_info in list(states.items()): # first update the element state so that value calculation # that is done after takes the updated state into account acquirable.set_state_info(state_info, propagate=0) if acquirable in values: value = values[acquirable] acquirable.put_value(value, propagate=2) state_info = acquirable._from_ctrl_state_info(state_info) set_state_info = functools.partial(acquirable.set_state_info, state_info, propagate=2, safe=True) self.add_finish_hook(set_state_info, False) class Pool0DAcquisition(PoolAcquisitionBase): def __init__(self, main_element, name="0DAcquisition"): PoolAcquisitionBase.__init__(self, main_element, name) def start_action(self, conf_ctrls, index, acq_sleep_time=None, nb_states_per_value=None, **kwargs): """Prepares everything for acquisition and starts it. :param: config""" pool = self.pool # TODO: rollback this change when a proper synchronization between # acquisition actions will be develop. # Now the meta acquisition action is resettung them to 0. # self._aborted = False # self._stopped = False self._index = index self._acq_sleep_time = acq_sleep_time if self._acq_sleep_time is None: self._acq_sleep_time = pool.acq_loop_sleep_time self._nb_states_per_value = nb_states_per_value if self._nb_states_per_value is None: self._nb_states_per_value = pool.acq_loop_states_per_value # channels that are acquired (only enabled) self._channels = [] with ActionContext(self): # set the state of all elements to and inform their listeners for conf_ctrl in conf_ctrls: for conf_channel in conf_ctrl.get_channels(enabled=True): conf_channel.clear_buffer() conf_channel.set_state_info((State.Moving, None), propagate=2) self._channels.append(conf_channel) def in_acquisition(self, states: Dict[PoolElement, State]) -> bool: """Determines if we are in acquisition or if the acquisition has ended based on the current unit trigger modes and states returned by the controller(s) :param states: a map containing state information as returned by read_state_info :return: returns True if in acquisition or False otherwise """ for state in states: s = states[state][0] if self._is_in_action(s): return True def action_loop(self): states, values = {}, {} for conf_channel in self._channels: element = conf_channel.element states[element] = None values[element] = None nap = self._acq_sleep_time while not self.was_released(): self.read_value(ret=values) for acquirable, value in list(values.items()): acquirable.put_current_value(value, propagate=0) if self._stopped or self._aborted: break time.sleep(nap) for element in self._channels: value = element.accumulated_value.value_obj element.append_value_buffer(value, self._index, propagate=2) with ActionContext(self): self.raw_read_state_info(ret=states) for acquirable, state_info in list(states.items()): state_info = acquirable._from_ctrl_state_info(state_info) set_state_info = functools.partial(acquirable.set_state_info, state_info, propagate=2, safe=True) self.add_finish_hook(set_state_info, False) def stop_action(self, *args, **kwargs): """Stop procedure for this action.""" self._stopped = True def abort_action(self, *args, **kwargs): """Aborts procedure for this action""" self._aborted = True class PoolIORAcquisition(PoolAction): def __init__(self, pool, name="IORAcquisition"): self._channels = None PoolAction.__init__(self, pool, name) def start_action(self, *args, **kwargs): pass def in_acquisition(self, states): return True pass @DebugIt() def action_loop(self): i = 0 states, values = {}, {} for element in self._channels: states[element] = None values[element] = None # read values to send a first event when starting to acquire self.read_value(ret=values) for acquirable, value in list(values.items()): acquirable.put_value(value, propagate=2) while True: self.read_state_info(ret=states) if self.in_acquisition(states): for acquirable, state_info in list(states.items()): state_info = acquirable._from_ctrl_state_info(state_info) acquirable.set_state_info(state_info, propagate=1) else: # states will be set as finish hook # in order to update values with the last value readout break # read value every n times if not i % 5: self.read_value(ret=values) for acquirable, value in list(values.items()): acquirable.put_value(value) i += 1 time.sleep(0.01) self.read_state_info(ret=states) # first update the element state so that value calculation # that is done after takes the updated state into account for acquirable, state_info in list(states.items()): acquirable.set_state_info(state_info, propagate=0) # Do NOT send events before we exit the OperationContext, otherwise # we may be asked to start another action before we leave the context # of the current action. Instead, send the events in the finish hook # which is executed outside the OperationContext def finish_hook(*args, **kwargs): # read values and propagate the change to all listeners self.read_value(ret=values) for acquirable, value in list(values.items()): acquirable.put_value(value, propagate=2) # finally set the state and propagate to all listeners for acquirable, state_info in list(states.items()): acquirable.set_state_info(state_info, propagate=2) self.set_finish_hook(finish_hook)