Source code for sardana.pool.poolmeasurementgroup

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Pool library. It defines the base classes
for"""

__all__ = ["PoolMeasurementGroup", "MeasurementConfiguration",
           "ControllerConfiguration", "ChannelConfiguration",
           "SynchronizerConfiguration", "build_measurement_configuration"]

__docformat__ = 'restructuredtext'

import threading
import weakref
from typing import Any, Optional, Dict, Union, List

from taurus.core.tango.tangovalidator import TangoAttributeNameValidator

import sardana
from sardana import State, ElementType, TYPE_EXP_CHANNEL_ELEMENTS
from sardana.sardanaevent import EventType
from sardana.pool.pooldefs import AcqMode, SynchParam, AcqSynch, \
    SynchDomain, AcqSynchType

from sardana.util.string import camel_to_snake
from sardana.pool.poolgroupelement import PoolGroupElement
from sardana.pool.poolacquisition import PoolAcquisition
from sardana.pool.poolsynchronization import SynchDescription
from sardana.pool.poolexternal import PoolExternalObject
from sardana.pool.poolcontroller import PoolController

from sardana.taurus.core.tango.sardana import PlotType, Normalization


# ----------------------------------------------
# Measurement Group Configuration information
# ----------------------------------------------
# dict <str, obj> with (at least) keys:
#    - 'controllers' : dict<Controller, dict> where:
#        - key: ctrl
#        - value: dict<str, dict> with (at least) keys:
#                - 'timer' : the timer channel name / timer channel id
#                - 'monitor' : the monitor channel name / monitor channel id
#                - 'synchronization' : 'Gate'/'Software'
#                - 'channels' where value is a dict<str, obj> with (at least)
#                   keys:
#                    - 'id' : the channel name ( channel id )
#                    optional keys:
#                    - 'enabled' : True/False (default is True)
#                    any hints:
#                    - 'output' : True/False (default is True)
#                    - 'plot_type' : 'No'/'1D'/'2D' (default is 'No')
#                    - 'plot_axes' : list<str> 'where str is channel
#                                    name/'step#/'index#' (default is [])
#                    - 'label' : prefered label (default is channel name)
#                    - 'scale' : <float, float> with min/max (defaults to
#                                channel range if it is defined
#                    - 'plot_color' : int representing RGB
#    optional keys:
#    - 'label' : measurement group label (defaults to measurement group name)
#    - 'description' : measurement group description

# <MeasurementGroupConfiguration>
#  <timer>UxTimer</timer>
#  <monitor>CT1</monitor>
# </MeasurementGroupConfiguration>

# Example: 2 NI cards, where channel 1 of card 1 is wired to channel 1 of
# card 2 at configuration time we should set:

# ni0ctrl.setCtrlPar(0, 'synchronization', AcqSynch.SoftwareTrigger)
# ni0ctrl.setCtrlPar(0, 'timer', 1) # channel 1 is the timer
# ni0ctrl.setCtrlPar(0, 'monitor', 4) # channel 4 is the monitor
# ni1ctrl.setCtrlPar(0, 'synchronization', AcqSynch.HardwareTrigger)
# ni1ctrl.setCtrlPar(0, 'master', 0)

# when we count for 1.5 seconds:
# ni1ctrl.Load(1.5)
# ni0ctrl.Load(1.5)
# ni1ctrl.Start()
# ni0ctrl.Start()

"""

"""


def _to_fqdn(name, logger=None):
    full_name = name
    # try to use Taurus 4 to retrieve FQDN
    try:
        from taurus.core.tango.tangovalidator import TangoDeviceNameValidator
        full_name, _, _ = TangoDeviceNameValidator().getNames(name)
    # if Taurus3 in use just continue
    except ImportError:
        pass
    if full_name is None:
        full_name = name
    if full_name != name and logger:
        msg = ("PQDN full name is deprecated in favor of FQDN full name."
               " Re-apply configuration in order to upgrade.")
        logger.warning(msg)
    return full_name


def _filter_ctrls(ctrls, enabled=None):
    if enabled is None:
        return ctrls

    filtered_ctrls = []
    for ctrl in ctrls:
        if ctrl.enabled == enabled:
            filtered_ctrls.append(ctrl)
    return filtered_ctrls


def _get_timerable_ctrls(ctrls, acq_synch=None, enabled=None):
    timerable_ctrls = []
    if acq_synch is None:
        for ctrls in list(ctrls.values()):
            timerable_ctrls += ctrls
    elif isinstance(acq_synch, list):
        acq_synch_list = acq_synch
        for acq_synch in acq_synch_list:
            timerable_ctrls += ctrls[acq_synch]
    else:
        timerable_ctrls = list(ctrls[acq_synch])

    return _filter_ctrls(timerable_ctrls, enabled)


def _get_timerable_channels(ctrls, acq_synch=None, enabled=None):
    timerable_ctrls = _get_timerable_ctrls(ctrls, acq_synch, enabled)
    timerable_channels = []
    for ctrl in timerable_ctrls:
        timerable_channels.extend(ctrl.get_channels(enabled))
    return timerable_channels


[docs] class ConfigurationItem(object): """Container of configuration attributes related to a given element. Wrap an element to pretend its API. Manage the element's configuration. Hold an information whether the element is enabled. By default it is enabled. .. note:: The ConfigurationItem class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ def __init__(self, element: Any, attrs: Optional[Dict] = None) -> None: """Construct a wrapper around the element :param element: element to wrap :param: attrs: configuration attributes and their values """ self._element = weakref.ref(element) self.enabled = True if attrs is not None: self.__dict__.update(attrs) def __getattr__(self, item): return getattr(self.element, item)
[docs] def get_element(self): """Returns the element associated with this item""" return self._element()
[docs] def set_element(self, element): """Sets the element for this item""" self._element = weakref.ref(element)
element = property(get_element)
[docs] class ControllerConfiguration(ConfigurationItem): """Container of configuration attributes related to a given controller. Inherit behavior from :class:`~sardana.pool.poolmeasurementgroup.ConfigurationItem` and additionally hold information about its enabled/disabled channels. By default it is disabled. .. note:: The ControllerConfiguration class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ def __init__(self, element, attrs=None): ConfigurationItem.__init__(self, element, attrs) self.enabled = False self._channels = [] self._channels_enabled = [] self._channels_disabled = []
[docs] def add_channel(self, channel_item): """Aggregate a channel configuration item.""" self._channels.append(channel_item) if channel_item.enabled: self.enabled = True if self._channels_enabled is None: self._channels_enabled = [] self._channels_enabled.append(channel_item) else: if self._channels_disabled is None: self._channels_disabled = [] self._channels_disabled.append(channel_item)
[docs] def remove_channel(self, channel_item): """Remove a channel configuration item.""" self._channels.remove(channel_item) if channel_item.enabled: self._channels_enabled.remove(channel_item) if len(self._channels_enabled) == 0: self.enabled = False else: self._channels_disabled.remove(channel_item)
[docs] def update_state(self): """Update internal state based on the aggregated channels.""" self.enabled = False self._channels_enabled = [] self._channels_disabled = [] for channel_item in self._channels: if channel_item.enabled: self.enabled = True self._channels_enabled.append(channel_item) else: self._channels_disabled.append(channel_item)
[docs] def get_channels(self, enabled: Optional[bool] = None) -> List[Any]: """Return aggregated channels. :param enabled: which channels to return - True - only enabled - False - only disabled - None - all """ if enabled is None: return list(self._channels) elif enabled: return list(self._channels_enabled) else: return list(self._channels_disabled)
[docs] def validate(self): pass
[docs] class TimerableControllerConfiguration(ControllerConfiguration): """Container of configuration attributes related to a given timerable controller. Inherit behavior from :class:`~sardana.pool.poolmeasurementgroup.ControllerConfiguration` and additionally validate *timer* and *monitor* configuration. .. note:: The TimerableControllerConfiguration class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """
[docs] def update_timer(self): self._update_master("timer")
[docs] def update_monitor(self): self._update_master("monitor")
def _update_master(self, role): master = getattr(self, role, None) if master is None: idx = float("+inf") for channel in self._channels_enabled: if channel.index > idx: continue master = channel idx = channel.index else: found = False for channel in self._channels: if channel.full_name == master: master = channel found = True break if not found: master = None setattr(self, role, master)
[docs] def validate(self): # validate if the timer and monitor are disabled if the # controller is enabled if self.enabled \ and not self.timer.enabled \ and not self.monitor.enabled: err_msg = 'channel {0} used as timer and channel ' \ '{1} used as monitor are disabled. One of them ' \ 'must be enabled.'.format(self.timer.name, self.monitor.name) raise ValueError(err_msg)
[docs] class ExternalControllerConfiguration(ControllerConfiguration): """Container of configuration attributes related to a given external controller. Inherit behavior from :class:`~sardana.pool.poolmeasurementgroup.ControllerConfiguration`. .. note:: The ExternalControllerConfiguration class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ def __init__(self, element, attrs=None): ControllerConfiguration.__init__(self, self, attrs) self.full_name = element
[docs] class ChannelConfiguration(ConfigurationItem): """Container of configuration attributes related to a given experimental channel. Inherit behavior from :class:`~sardana.pool.poolmeasurementgroup.ConfigurationItem`. .. note:: The ChannelConfiguration class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """
[docs] class SynchronizerConfiguration(ConfigurationItem): """Container of configuration attributes related to a given synchronizer element. Inherit behavior from :class:`~sardana.pool.poolmeasurementgroup.ConfigurationItem`. By default it is disabled. .. note:: The ChannelConfiguration class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ def __init__(self, element, attrs=None): ConfigurationItem.__init__(self, element, attrs) self.enabled = False
[docs] def build_measurement_configuration(user_elements): """Create a minimal measurement configuration data structure from the user_elements list. .. highlight:: none Minimal configuration data structure:: dict <str, dict> with keys: - 'controllers' : where value is a dict<str, dict> where: - key: controller's full name - value: dict<str, dict> with keys: - 'channels' where value is a dict<str, obj> where: - key: channel's full name - value: dict<str, obj> with keys: - 'index' : where value is the channel's index <int> .. highlight:: default .. note:: The build_measurement_configuration function has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the function) may occur if deemed necessary by the core developers. """ user_config = {} external_user_elements = [] user_config["controllers"] = controllers = {} for index, element in enumerate(user_elements): elem_type = element.get_type() if elem_type == ElementType.External: external_user_elements.append((index, element)) continue if elem_type == ElementType.TriggerGate: continue ctrl = element.controller ctrl_data = controllers.get(ctrl.full_name) if ctrl_data is None: controllers[ctrl.full_name] = ctrl_data = {} ctrl_data['channels'] = channels = {} else: channels = ctrl_data['channels'] channels[element.full_name] = channel_data = {} channel_data['index'] = index if len(external_user_elements) > 0: controllers['__tango__'] = ctrl_data = {} ctrl_data['channels'] = channels = {} for index, element in external_user_elements: channels[element.full_name] = channel_data = {} channel_data['index'] = index return user_config
[docs] class MeasurementConfiguration(object): """Configuration of a measurement. Accepts import and export from/to a serializable data structure (based on dictionaries/lists and strings). Provides getter methods that facilitate extracting of information e.g. controllers of different types, master timers/monitors, etc. .. note:: The build_measurement_configuration function has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the function) may occur if deemed necessary by the core developers. """ DFT_DESC = 'General purpose measurement configuration' def __init__(self, parent=None): """Initialize measurement configuration object :param parent: (optional) object that this measurement configuration refers to (usually :class:`~sardana.pool.poolmeasurementgroup.PoolMeasurementGroup)` """ self._parent = None if parent is not None: self._parent = weakref.proxy(parent) self._config = None # Structure to store the controllers and their channels self._timerable_ctrls = {} self._zerod_ctrls = [] self._synch_ctrls = {} self._other_ctrls = [] self._master_timer_sw = None self._master_monitor_sw = None self._master_timer_sw_start = None self._master_monitor_sw_start = None self._label = None self._description = None self._user_config = {} self._channel_acq_synch = {} self._ctrl_acq_synch = {} self.changed = False # provide back. compatibility for value_ref_{enabled,pattern} # config parameters created with Sardana < 3. self._value_ref_compat = False # provide back. compatibility for synchronizer, timer and monitor # config parameters set on external channels created with Sardana < 3. self._external_ctrl_compat = False
[docs] def get_acq_synch_by_channel(self, channel: Union["sardana.pool.PoolBaseChannel", ChannelConfiguration]) -> sardana.pool.pooldefs.AcqSynch: """Return acquisition synchronization configured for this element. :param channel: channel to look for its acquisition synchronization or :class:`~sardana.pool.poolmeasurementgroup.ChannelConfiguration` :return: acquisition synchronization """ if isinstance(channel, ChannelConfiguration): channel = channel.element return self._channel_acq_synch[channel]
[docs] def get_acq_synch_by_controller(self, controller: Union[PoolController, ControllerConfiguration]) -> sardana.pool.pooldefs.AcqSynch: """Return acquisition synchronization configured for this controller. :param controller: controller to look for its acquisition synchronization or :class:`~sardana.pool.poolmeasurementgroup.ControllerConfiguration` :return: acquisition synchronization """ if isinstance(controller, ConfigurationItem): controller = controller.element return self._ctrl_acq_synch[controller]
[docs] def get_timerable_ctrls(self, acq_synch: Optional[sardana.pool.pooldefs.AcqSynch] = None, enabled: Optional[bool] = None) -> List[ControllerConfiguration]: # noqa """Return timerable controllers. Allow to filter controllers based on acquisition synchronization or whether these are enabled/disabled. :param acq_synch: (optional) filter controller based on acquisition synchronization :param enabled: (optional) filter controllers whether these are enabled/disabled: - :obj:`True` - enabled only - :obj:`False` - disabled only - :obj:`None` - all :return: timerable controllers that fulfils the filtering criteria """ return _get_timerable_ctrls(self._timerable_ctrls, acq_synch, enabled)
[docs] def get_timerable_channels(self, acq_synch: Optional[sardana.pool.pooldefs.AcqSynch] = None, enabled: Optional[bool] = None) -> List[ChannelConfiguration]: # noqa """Return timerable channels. Allow to filter channels based on acquisition synchronization or whether these are enabled/disabled. :param acq_synch: (optional) filter controller based on acquisition synchronization :param enabled: (optional) filter controllers whether these are enabled/disabled: - :obj:`True` - enabled only - :obj:`False` - disabled only - :obj:`None` - all :return: timerable channels that fulfils the filtering criteria """ return _get_timerable_channels(self._timerable_ctrls, acq_synch, enabled)
[docs] def get_zerod_ctrls(self, enabled: Optional[bool] = None) -> List[ControllerConfiguration]: # noqa """Return 0D controllers. Allow to filter controllers whether these are enabled/disabled. :param enabled: (optional) filter controllers whether these are enabled/disabled: - :obj:`True` - enabled only - :obj:`False` - disabled only - :obj:`None` - all :return: 0D controllers that fulfils the filtering criteria """ return _filter_ctrls(self._zerod_ctrls, enabled)
[docs] def get_synch_ctrls(self, enabled: Optional[bool] = None) -> List[ControllerConfiguration]: # noqa """Return synchronizer (currently only trigger/gate) controllers. Allow to filter controllers whether these are enabled/disabled. :param enabled: (optional) filter controllers whether these are enabled/disabled: - :obj:`True` - enabled only - :obj:`False` - disabled only - :obj:`None` - all :return: synchronizer controllers that fulfils the filtering criteria """ return _filter_ctrls(self._synch_ctrls, enabled)
[docs] def get_synchs(self, enabled: Optional[bool] = None) -> List[SynchronizerConfiguration]: # noqa """Return synchronizers (currently only trigger/gate). Allow to filter synchronizers whether these are enabled/disabled. :param enabled: (optional) filter synchronizers whether these are enabled/disabled: - :obj:`True` - enabled only - :obj:`False` - disabled only - :obj:`None` - all :return: synchronizers that fulfils the filtering criteria """ synchs = [] synch_ctrls = self.get_synch_ctrls(enabled) for ctrl in synch_ctrls: synchs.extend(ctrl.get_channels(enabled)) return synchs
[docs] def get_master_timer_software(self) -> ChannelConfiguration: # noqa """Return master timer in software acquisition. :return: master timer in software acquisition """ return self._master_timer_sw
[docs] def get_master_monitor_software(self) -> ChannelConfiguration: # noqa """Return master monitor in software acquisition. :return: master monitor in software acquisition """ return self._master_monitor_sw
[docs] def get_master_timer_software_start(self) -> ChannelConfiguration: # noqa """Return master timer in software start acquisition. :return: master timer in software start acquisition """ return self._master_monitor_sw_start
[docs] def get_master_monitor_software_start(self) -> ChannelConfiguration: # noqa """Return master monitor in software start acquisition. :return: master monitor in software start acquisition """ return self._master_timer_sw_start
[docs] def get_configuration_for_user(self): """Return measurement configuration serializable data structure.""" return self._user_config
[docs] def set_configuration_from_user(self, cfg): """Set measurement configuration from serializable data structure. Setting of the configuration includes the validation process. Setting of invalid configuration raises an exception and leaves the object as it was before the setting process. Thanks to that it is not necessary that the client application does the validation. The configuration parameters for given channels/controllers may differ depending on their types e.g. 0D channel does not support timer parameter while C/T does. .. todo:: Raise exceptions when setting _Synchronization_ parameter for external channels, 0D and PSeudoCounters. """ if not self._parent._is_online(cfg): self._parent.error("Some controllers of this measurement group are offline!") # Return because it won't be possible to know the controller types. # Before returning, assign "user configuration" to at least be able to # read it from the client (spock etc.). self._user_config = cfg return pool = self._parent.pool label = cfg.get('label', self._parent.name) description = cfg.get('description', self.DFT_DESC) timerable_ctrls = {AcqSynch.HardwareGate: [], AcqSynch.HardwareStart: [], AcqSynch.HardwareTrigger: [], AcqSynch.SoftwareStart: [], AcqSynch.SoftwareTrigger: [], AcqSynch.SoftwareGate: []} zerod_ctrls = [] synch_ctrls = [] other_ctrls = [] master_timer_sw = None master_monitor_sw = None master_timer_sw_start = None master_monitor_sw_start = None master_timer_idx_sw = float("+inf") master_monitor_idx_sw = float("+inf") master_timer_idx_sw_start = float("+inf") master_monitor_idx_sw_start = float("+inf") user_elem_ids = {} channel_acq_synch = {} ctrl_acq_synch = {} user_config = {} user_config['controllers'] = {} user_config['label'] = label user_config['description'] = description for ctrl_name, ctrl_data in list(cfg['controllers'].items()): # backwards compatibility for measurement groups created before # implementing feature-372: # https://sourceforge.net/p/sardana/tickets/372/ # WARNING: this is one direction backwards compatibility - it just # reads channels from the units, but does not write channels to the # units back if 'units' in ctrl_data: ctrl_data = ctrl_data['units']['0'] # discard controllers which don't have items (garbage) ch_count = len(ctrl_data['channels']) if ch_count == 0: continue external = ctrl_name in ['__tango__'] if external: ctrl = ctrl_name else: ctrl = pool.get_element_by_full_name(ctrl_name) assert ctrl.get_type() == ElementType.Controller user_config['controllers'][ctrl_name] = user_config_ctrl = {} ctrl_conf = {} conf_synch = None # The external controllers should not have synchronizer if external: for parameter in ['synchronizer', 'timer', 'monitor']: if parameter in ctrl_data: if self._external_ctrl_compat: msg = ( '{} is deprecated for external controllers ' 'e.g. Tango, since 3.0.3. Re-apply configuration ' 'in order to upgrade.' ).format(parameter) self._parent.warning(msg) else: raise ValueError( 'External controller does not allow ' 'to have {}'.format(parameter)) else: synchronizer = ctrl_data.get('synchronizer', 'software') if synchronizer is None or synchronizer == 'software': ctrl_conf['synchronizer'] = 'software' user_config_ctrl['synchronizer'] = 'software' else: user_config_ctrl['synchronizer'] = synchronizer pool_synch = pool.get_element_by_full_name(synchronizer) pool_synch_ctrl = pool_synch.controller conf_synch_ctrl = None conf_synch = None for conf_ctrl_created in synch_ctrls: if pool_synch_ctrl == conf_ctrl_created.element: conf_synch_ctrl = conf_ctrl_created for conf_synch_created in \ conf_ctrl_created.get_channels(): if pool_synch == conf_synch_created.element: conf_synch = conf_synch_created break break if conf_synch_ctrl is None: conf_synch_ctrl = \ ControllerConfiguration(pool_synch_ctrl) synch_ctrls.append(conf_synch_ctrl) if conf_synch is None: conf_synch = SynchronizerConfiguration(pool_synch) conf_synch_ctrl.add_channel(conf_synch) ctrl_conf['synchronizer'] = conf_synch try: synchronization = ctrl_data['synchronization'] except KeyError: synchronization = AcqSynchType.Trigger ctrl_conf['synchronization'] = synchronization user_config_ctrl['synchronization'] = synchronization acq_synch = None if external: ctrl_item = ExternalControllerConfiguration(ctrl) elif ctrl.is_timerable(): is_software = synchronizer == 'software' acq_synch = AcqSynch.from_synch_type(is_software, synchronization) ctrl_acq_synch[ctrl] = acq_synch timer = ctrl_data.get("timer") ctrl_conf["timer"] = timer monitor = ctrl_data.get("monitor") ctrl_conf["monitor"] = monitor ctrl_item = TimerableControllerConfiguration(ctrl, ctrl_conf) else: ctrl_item = ControllerConfiguration(ctrl, ctrl_conf) ctrl_enabled = False if 'channels' in ctrl_data: user_config_ctrl['channels'] = user_config_channel = {} for ch_name, ch_data in list(ctrl_data['channels'].items()): if external: validator = TangoAttributeNameValidator() full_name = ch_data.get('full_name', ch_name) params = validator.getUriGroups(full_name) params['pool'] = pool channel = PoolExternalObject(**params) else: try: channel = pool.get_element_by_full_name(ch_name) except KeyError: if not ch_data['enabled']: user_config_channel[ch_name] = ch_data continue raise ValueError( '{} is not defined'.format(ch_data['name'])) ch_data = self._fill_channel_data(channel, ch_data) user_config_channel[ch_name] = ch_data ch_item = ChannelConfiguration(channel, ch_data) ch_item.controller = ctrl_item ctrl_item.add_channel(ch_item) if ch_item.enabled: if external: id_ = channel.full_name else: id_ = channel.id user_elem_ids[ch_item.index] = id_ if ch_item.enabled: ctrl_enabled = True if acq_synch is not None: channel_acq_synch[channel] = acq_synch if not external and ctrl.is_timerable(): ctrl_item.update_timer() ctrl_item.update_monitor() msg_error = '' if ctrl_item.timer is None: timer_name = ctrl_data['timer'] ch_timer = pool.get_element_by_full_name(timer_name) msg_error += 'Channel {0} is not present but used as ' \ 'timer. '.format(ch_timer.name) if ctrl_item.monitor is None: monitor_name = ctrl_data['monitor'] ch_monitor = pool.get_element_by_full_name(monitor_name) msg_error += 'Channel {0} is not present but used as ' \ 'monitor.'.format(ch_monitor.name) if len(msg_error) > 0: raise ValueError(msg_error) if ctrl_item.enabled: user_config_ctrl['timer'] = ctrl_item.timer.full_name user_config_ctrl['monitor'] = ctrl_item.monitor.full_name else: user_config_ctrl['timer'] = ctrl_data['timer'] user_config_ctrl['monitor'] = ctrl_data['monitor'] # Update synchronizer state if ctrl_enabled and conf_synch is not None: conf_synch.enabled = True ctrl_item.validate() if external: other_ctrls.append(ctrl_item) elif ctrl.is_timerable(): timerable_ctrls[acq_synch].append(ctrl_item) # Find master timer/monitor the system take the channel with # less index if not ctrl_item.enabled: # Skip controllers disabled pass elif acq_synch in (AcqSynch.SoftwareTrigger, AcqSynch.SoftwareGate): if ctrl_item.timer.index < master_timer_idx_sw: master_timer_sw = ctrl_item.timer master_timer_idx_sw = ctrl_item.timer.index if ctrl_item.monitor.index < master_monitor_idx_sw: master_monitor_sw = ctrl_item.monitor master_monitor_idx_sw = ctrl_item.monitor.index elif acq_synch == AcqSynch.SoftwareStart: if ctrl_item.timer.index < master_timer_idx_sw_start: master_timer_sw_start = ctrl_item.timer master_timer_idx_sw_start = ctrl_item.timer.index if ctrl_item.monitor.index < master_monitor_idx_sw_start: master_monitor_sw_start = ctrl_item.monitor master_monitor_idx_sw_start = ctrl_item.monitor.index elif ctrl.get_ctrl_types()[0] == ElementType.ZeroDExpChannel: zerod_ctrls.append(ctrl_item) # Update synchronizer controller states for conf_synch_ctrl in synch_ctrls: conf_synch_ctrl.update_state() mnt_grp_timer = cfg.get('timer') if mnt_grp_timer is not None: msg = ("global timer is deprecated since release 3.3.3 " "Re-apply configuration in order to upgrade.") self._parent.warning(msg) mnt_grp_monitor = cfg.get('monitor') if mnt_grp_monitor is not None: msg = ("global monitor is deprecated since release 3.3.3 " "Re-apply configuration in order to upgrade.") self._parent.warning(msg) # Update internals values self._label = label self._description = description self._timerable_ctrls = timerable_ctrls self._zerod_ctrls = zerod_ctrls self._synch_ctrls = synch_ctrls self._other_ctrls = other_ctrls self._master_timer_sw = master_timer_sw self._master_monitor_sw = master_monitor_sw self._master_timer_sw_start = master_timer_sw_start self._master_monitor_sw_start = master_monitor_sw_start self._user_config = user_config self._channel_acq_synch = channel_acq_synch self._ctrl_acq_synch = ctrl_acq_synch # sorted ids may not be consecutive (if a channel is disabled) indexes = sorted(user_elem_ids.keys()) user_elem_ids_list = [user_elem_ids[idx] for idx in indexes] for conf_synch_ctrl in synch_ctrls: for conf_synch in conf_synch_ctrl.get_channels(enabled=True): user_elem_ids_list.append(conf_synch.id) self._parent.set_user_element_ids(user_elem_ids_list) # force assignment of user elements to the measurement group # in order to update the list of dependent elements (listeners) # of the element e.g. to prevent undefinition of an element added # to the measurement group. self._parent.get_user_elements() self.changed = True
def _fill_channel_data(self, channel, channel_data): """Fill channel default values for the given channel dictionary""" name = channel.name ctype = channel.get_type() full_name = channel.full_name # choose ndim ndim = None if ctype == ElementType.CTExpChannel: ndim = 0 elif ctype == ElementType.PseudoCounter: ndim = 0 elif ctype == ElementType.ZeroDExpChannel: ndim = 0 elif ctype == ElementType.OneDExpChannel: ndim = 1 elif ctype == ElementType.TwoDExpChannel: ndim = 2 elif ctype == ElementType.External: config = channel.get_config() if config is not None: ndim = int(config.data_format) elif ctype == ElementType.IORegister: ndim = 0 if ctype != ElementType.External and channel.is_referable(): value_ref_enabled = channel_data.get('value_ref_enabled', False) channel_data['value_ref_enabled'] = value_ref_enabled value_ref_pattern = channel_data.get('value_ref_pattern', '') channel_data['value_ref_pattern'] = value_ref_pattern elif 'value_ref_enabled' in channel_data or 'value_ref_pattern' in \ channel_data: if self._value_ref_compat: msg = 'value_ref_pattern/value_ref_enabled is deprecated ' \ 'for non-referable channels since 3.0.3. Re-apply ' \ 'configuration in order to upgrade.' self._parent.warning(msg) channel_data.pop('value_ref_enabled', None) channel_data.pop('value_ref_pattern', None) else: msg = ('The channel {} is not referable. You can not set ' 'the enabled and/or the pattern parameters.').format( name) raise ValueError(msg) # Definitively should be initialized by measurement group # index MUST be here already (asserting this in the following line) channel_data['index'] = channel_data['index'] channel_data['name'] = channel_data.get('name', name) channel_data['full_name'] = channel_data.get('full_name', full_name) channel_data['source'] = channel.get_source() channel_data['enabled'] = channel_data.get('enabled', True) channel_data['label'] = channel_data.get('label', channel_data['name']) channel_data['ndim'] = ndim # Probably should be initialized by measurement group channel_data['output'] = channel_data.get('output', True) # Perhaps should NOT be initialized by measurement group channel_data['plot_type'] = channel_data.get('plot_type', PlotType.No) channel_data['plot_axes'] = channel_data.get('plot_axes', []) channel_data['conditioning'] = channel_data.get('conditioning', '') channel_data['normalization'] = channel_data.get('normalization', Normalization.No) # TODO: think of filling other keys: data_type, data_units, nexus_path # here instead of feeling them on the Taurus extension level if ctype != ElementType.External: ctrl_name = channel.controller.full_name channel_data['_controller_name'] = channel_data.get( '_controller_name', ctrl_name) return channel_data
[docs] class PoolMeasurementGroup(PoolGroupElement): def __init__(self, **kwargs): self._state_lock = threading.Lock() self._monitor_count = None self._nb_starts = 1 self._pending_starts = 0 self._acquisition_mode = AcqMode.Timer self._config = MeasurementConfiguration(self) self._moveable = None self._moveable_obj = None # by default software synchronizer initial domain is set to Position self._software_synchronizer_initial_domain = SynchDomain.Position self._synch_description = SynchDescription() kwargs['elem_type'] = ElementType.MeasurementGroup PoolGroupElement.__init__(self, **kwargs) configuration = kwargs.get("configuration") if configuration is None: user_elements = self.get_user_elements() configuration = build_measurement_configuration(user_elements) self.set_configuration_from_user(configuration)
[docs] def init_attribute_values(self, attr_values: Optional[Dict[str, Any]] = None) -> None: """Initialize attributes with values. Set values to attributes as passed in `attr_values`. In lack of attribute value do nothing. :param attr_values: map of attribute names and values """ super().init_attribute_values(attr_values) if attr_values is None: attr_values = {} attr_name = "Configuration" configuration = attr_values.pop(attr_name, None) if configuration is not None: self._config._value_ref_compat = True self._config._external_ctrl_compat = True try: self._config.set_configuration_from_user(configuration) except: self._failed_init_attrs.append(attr_name) self.debug( "{} failed to init with {}".format(attr_name, configuration), exc_info=True ) finally: self._config._value_ref_compat = False self._config._external_ctrl_compat = False attrs = [ "AcquisitionMode", "SoftwareSynchronizerInitialDomain", "Moveable" ] for attr_name in attrs: value = attr_values.pop(attr_name, None) if value is not None: prop_name = camel_to_snake(attr_name) try: setattr(self, prop_name, value) except: self._failed_init_attrs.append(attr_name) self.debug( "{} failed to init with {}".format(attr_name, value), exc_info=True )
def _create_action_cache(self): acq_name = "%s.Acquisition" % self._name return PoolAcquisition(self, acq_name) def _calculate_states(self, state_info=None): state, status = PoolGroupElement._calculate_states(self, state_info) # check if software synchronizer is occupied synch_soft = self.acquisition._synch._synch_soft acq_sw = self.acquisition._sw_acq acq_sw_start = self.acquisition._sw_start_acq acq_0d = self.acquisition._0d_acq if (state in (State.On, State.Unknown) and (synch_soft.is_started() or acq_sw._is_started() or acq_sw_start._is_started() or acq_0d._is_started())): state = State.Moving status += "\nSoftware synchronization is in progress" return state, status
[docs] def on_element_changed(self, evt_src, evt_type, evt_value): name = evt_type.name if name == 'state': with self._state_lock: state, status = self._calculate_states() self.set_state(state, propagate=2) self.set_status(status)
[docs] def get_pool_controllers(self): return self.get_acquisition().get_pool_controllers()
[docs] def get_pool_controller_by_name(self, name): name = name.lower() for ctrl in self.get_pool_controllers(): if ctrl.name.lower() == name or ctrl.full_name.lower() == name: return ctrl
[docs] def add_user_element(self, element, index=None): '''Override the base behavior, so the TriggerGate elements are silently skipped if used multiple times in the group''' user_elements = self._user_elements if element in user_elements: # skipping TriggerGate element if already present if element.get_type() is ElementType.TriggerGate: return return PoolGroupElement.add_user_element(self, element, index)
[docs] def rename_element(self, old_name: str, new_name: str, propagate: int = 1) -> None: """Rename element in the controller. :param old_name: old name of the element :param new_name: new name of the element :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ self._config['label'] = new_name self.fire_event(EventType("configuration", priority=propagate), self._config)
# ------------------------------------------------------------------------- # configuration # ------------------------------------------------------------------------- def _is_internal_element(self, element_id): if self.pool.use_numeric_element_ids: return isinstance(element_id, int) try: # at this time all internal physical and pseudo elements # should be already created self.pool.get_element(id=element_id) except: return False return True def _is_managed_element(self, element): element_type = element.get_type() return (element_type in TYPE_EXP_CHANNEL_ELEMENTS or element_type is ElementType.TriggerGate) @property def configuration(self): return self._config # TODO: Check if it needed
[docs] def set_configuration(self, config=None, propagate=1): self._config.configuration = config if not propagate: return self.fire_event(EventType("configuration", priority=propagate), config)
[docs] def set_configuration_from_user(self, cfg, propagate=1): old_ctrls = self.get_pool_controllers() self._config.set_configuration_from_user(cfg) new_ctrls = self.get_pool_controllers() if set(old_ctrls) != set(new_ctrls): self.fire_event(EventType("latencytime", priority=1), self.latency_time) if not propagate: return self.fire_event(EventType("configuration", priority=propagate), self._config.get_configuration_for_user())
[docs] def get_user_configuration(self): return self._config.get_configuration_for_user()
# ------------------------------------------------------------------------- # integration time # -------------------------------------------------------------------------
[docs] def get_integration_time(self): return self._synch_description.integration_time
[docs] def set_integration_time(self, integration_time, propagate=1): total_time = integration_time + self.latency_time synch = [{SynchParam.Delay: {SynchDomain.Time: 0}, SynchParam.Active: {SynchDomain.Time: integration_time}, SynchParam.Total: {SynchDomain.Time: total_time}, SynchParam.Repeats: 1}] self.set_synch_description(synch, propagate)
integration_time = property(get_integration_time, set_integration_time, doc="the current integration time") # ------------------------------------------------------------------------- # monitor count # -------------------------------------------------------------------------
[docs] def get_monitor_count(self): return self._monitor_count
[docs] def set_monitor_count(self, monitor_count, propagate=1): self._monitor_count = monitor_count if not propagate: return self.fire_event(EventType("monitor_count", priority=propagate), monitor_count)
monitor_count = property(get_monitor_count, set_monitor_count, doc="the current monitor count") # ------------------------------------------------------------------------- # acquisition mode # -------------------------------------------------------------------------
[docs] def get_acquisition_mode(self): return self._acquisition_mode
[docs] def set_acquisition_mode(self, acquisition_mode, propagate=1): self._acquisition_mode = acquisition_mode if not propagate: return self.fire_event(EventType("acquisition_mode", priority=propagate), acquisition_mode)
acquisition_mode = property(get_acquisition_mode, set_acquisition_mode, doc="the current acquisition mode") # ------------------------------------------------------------------------- # synch_description # -------------------------------------------------------------------------
[docs] def get_synch_description(self): return self._synch_description
[docs] def set_synch_description(self, description, propagate=1): self._synch_description = \ SynchDescription(description) if not propagate: return try: integration_time = self._synch_description.integration_time except: integration_time = None self.fire_event(EventType("integration_time", priority=propagate), integration_time) self.fire_event(EventType("synch_description", priority=propagate), description)
synch_description = property(get_synch_description, set_synch_description, doc="the current synchronization description") # ------------------------------------------------------------------------- # moveable # -------------------------------------------------------------------------
[docs] def get_moveable(self): return self._moveable
[docs] def set_moveable(self, moveable, propagate=1, to_fqdn=True): self._moveable = moveable if self._moveable is None: self._moveable_obj = None else: if to_fqdn: moveable = _to_fqdn(moveable, logger=self) self._moveable_obj = self.pool.get_element_by_full_name(moveable) self.fire_event(EventType("moveable", priority=propagate), moveable)
moveable = property(get_moveable, set_moveable, doc="moveable source used in synchronization " "description") # ------------------------------------------------------------------------- # latency time # -------------------------------------------------------------------------
[docs] def get_latency_time(self): latency_time = 0 pool_ctrls = self.get_pool_controllers() for pool_ctrl in pool_ctrls: if not pool_ctrl.is_timerable(): continue candidate = pool_ctrl.get_ctrl_par("latency_time") if candidate > latency_time: latency_time = candidate return latency_time
latency_time = property(get_latency_time, doc="latency time between two consecutive " "acquisitions") # ------------------------------------------------------------------------- # software synchronizer initial domain # -------------------------------------------------------------------------
[docs] def get_software_synchronizer_initial_domain(self): return self._software_synchronizer_initial_domain
[docs] def set_software_synchronizer_initial_domain(self, domain): self._software_synchronizer_initial_domain = domain
software_synchronizer_initial_domain = property( get_software_synchronizer_initial_domain, set_software_synchronizer_initial_domain, doc="software synchronizer initial domain (SynchDomain.Time " "or SynchDomain.Position)" ) # ------------------------------------------------------------------------- # number of starts # -------------------------------------------------------------------------
[docs] def get_nb_starts(self): return self._nb_starts
[docs] def set_nb_starts(self, nb_starts, propagate=1): self._nb_starts = nb_starts if not propagate: return self.fire_event(EventType("nb_starts", priority=propagate), nb_starts)
nb_starts = property(get_nb_starts, set_nb_starts, doc="current number of starts") # ------------------------------------------------------------------------- # acquisition # -------------------------------------------------------------------------
[docs] def prepare(self): """Prepare for measurement. Delegate measurement preparation to the acquisition action. """ if len(self.get_user_elements()) == 0: # All channels were disabled raise RuntimeError('All channels in measurement group ' 'are disabled.') if self._acquisition_mode == AcqMode.Timer: role = 'timer' elif self._acquisition_mode == AcqMode.Monitor: role = 'monitor' else: raise RuntimeError('acquisition mode must be either Timer ' 'or Monitor') for ctrl in self._config.get_timerable_ctrls(enabled=True): master = getattr(ctrl, role, None) if master is None: msg = ('controller {0} does not have {1} ' 'configured.').format(ctrl.name, role) raise RuntimeError(msg) if not master.enabled: msg = 'channel {0} used as {1} must be enabled'.format( master.name, role) raise RuntimeError(msg) value = self._get_value() self._pending_starts = self.nb_starts self.acquisition.prepare(self.configuration, self.acquisition_mode, value, self._synch_description, self._moveable_obj, self.software_synchronizer_initial_domain, self.nb_starts)
[docs] def start_acquisition(self, value=None): """Start measurement. Delegate start measurement to the acquisition action. Provide backwards compatibility for starts without previous prepare. """ if self._pending_starts == 0: msg = "prepare is mandatory before starting acquisition" raise RuntimeError(msg) self._stopped = False self._aborted = False self._released = False self._pending_starts -= 1 if not self._simulation_mode: self.acquisition.run()
def _get_value(self): if self._acquisition_mode is AcqMode.Timer: value = self.get_integration_time() elif self.acquisition_mode is AcqMode.Monitor: value = self._monitor_count return value
[docs] def set_acquisition(self, acq_cache): self.set_action_cache(acq_cache)
[docs] def get_acquisition(self): return self.get_action_cache()
acquisition = property(get_acquisition, doc="acquisition object")
[docs] def stop(self): self._pending_starts = 0 self.acquisition._synch._synch_soft.stop() PoolGroupElement.stop(self)
[docs] def abort(self): self._pending_starts = 0 self.acquisition._synch._synch_soft.abort() PoolGroupElement.abort(self)
# ------------------------------------------------------------------------- # utils # ------------------------------------------------------------------------- def _is_online(self, cfg): pool = self.pool for ctrl_name in cfg['controllers']: external = ctrl_name in ['__tango__'] if not external: ctrl = pool.get_element_by_full_name(ctrl_name) if not ctrl.is_online(): return False return True # -------------------------------------------------------------------------- # release # --------------------------------------------------------------------------
[docs] def release(self): # override PoolBaseElement.releaes() cause the PoolAcquisition action # is composed from many sub-actions and the default # PoolBaseElement.get_operation() is not able to get the top action operation = self.acquisition if not operation.is_running(): self.warning("Operation is not running, can not release") return self._released = True self._state_event = None self.info("Release!") operation.release_action()