Source code for sardana.pool.poolsynchronization

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.tango-controls.org/static/sardana/latest/doc/html/index.html
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################


"""This module is part of the Python Pool library. It defines the classes
for the synchronization"""

__all__ = ["PoolSynchronization", "MultiSynchDescription", "SynchDescription", "TGChannel"]

import copy
import time
import threading
from functools import partial
from typing import Any, List, Union, Dict, Tuple

import sardana
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.log import DebugIt
from sardana import State
from sardana.sardanathreadpool import get_thread_pool
from sardana.pool.pooldefs import SynchDomain, SynchParam
from sardana.pool.poolaction import ActionContext, PoolActionItem, PoolAction
from sardana.util.funcgenerator import FunctionGenerator
from sardana.pool.poolmotor import PoolMotor
from sardana.pool.poolpseudomotor import PoolPseudoMotor

# The purpose of this class was inspired on the CTAcquisition concept


class TGChannel(PoolActionItem):
    """An item involved in the trigger/gate generation.
    Maps directly to a trigger object

    .. note::
        The TGChannel class has been included in Sardana
        on a provisional basis. Backwards incompatible changes
        (up to and including removal of the module) may occur if
        deemed necessary by the core developers.
    """

    def __init__(self, trigger_gate, info=None):
        PoolActionItem.__init__(self, trigger_gate)
        if info:
            self.__dict__.update(info)

    def __getattr__(self, name):
        return getattr(self.element, name)


[docs] class SynchDescription(list): """Synchronization description. It is composed from groups - repetitions of equidistant synchronization events. Each group is described by :class:`~sardana.pool.pooldefs.SynchParam` parameters which may have values in :class:`~sardana.pool.pooldefs.SynchDomain` domains. """ def __init__(self, *args, **kwargs): """Cast SynchParam and SynchDomain keys from strings to enums.""" super().__init__() for group_str in list(*args, **kwargs): group = {} for param_str, conf_str in group_str.items(): try: param = SynchParam(int(param_str)) except ValueError: param = SynchParam.fromStr(param_str) if isinstance(conf_str, dict): conf = {} for domain_str, value in conf_str.items(): try: domain = SynchDomain(int(domain_str)) except ValueError: domain = SynchDomain.fromStr(domain_str) conf[domain] = value else: conf = conf_str group[param] = conf self.append(group) @property def repetitions(self): repetitions = 0 for group in self: repetitions += group[SynchParam.Repeats] return repetitions @property def delay_time(self): return self._get_param(SynchParam.Delay) @property def active_time(self): return self._get_param(SynchParam.Active) @property def total_time(self): return self._get_param(SynchParam.Total) @property def passive_time(self): return self.total_time - self.active_time @property def integration_time(self): integration_time = self.active_time if isinstance(integration_time, float): return integration_time elif len(integration_time) == 0: raise Exception("The synchronization description group has not " "been initialized") elif len(integration_time) > 1: raise Exception("There are more than one synchronization " "description groups") else: raise Exception("Synchronization description wrong format") def _get_param(self, param: sardana.pool.pooldefs.SynchDomain, domain: Any = SynchDomain.Time) -> Union[float, List[float]]: """ Extract parameter from synchronization description and its groups. If there is only one group in the synchronization description then returns float with the value. Otherwise a list of floats with different values. :param param: parameter type :param domain: domain :return: parameter value(s) :rtype float or [float] """ if len(self) == 1: return self[0][param][domain] values = [] for group in self: value = group[param][domain] repeats = group[SynchParam.Repeats] values += [value] * repeats return values
[docs] def to_dial(self, sign: int, offset: float) -> "SynchDescription": """Convert position domain group parameters to dial position Maintain time domain group parameters (if present) as they are. Formula is: pos = sign * dial + offset. :param sign: sign (1 or -1) to apply in the formula :param offset: offset to apply in the formula :return: new synchronization description in dial position """ synch = copy.deepcopy(self) for group in synch: try: pos = group[SynchParam.Initial][SynchDomain.Position] except KeyError: pass else: group[SynchParam.Initial][SynchDomain.Position] = (pos - offset) / sign for param in (SynchParam.Delay, SynchParam.Active, SynchParam.Total): try: disp = group[param][SynchDomain.Position] except KeyError: continue group[param][SynchDomain.Position] = disp / sign return synch
[docs] class MultiSynchDescription(dict): """Multiple synchronization descriptions. It is formed from pairs of :class:`~sardana.pool.controller.Synchronizer` names and :class:`~sardana.pool.poolsynchronization.SynchDescription` objects. A dictionary must be provided where keys are the synchronizer names as strings and values are either a :class:`~sardana.pool.poolsynchronization.SynchDescription` object or the equivalent :class:`List`. To assign a description to software synchronized elements, use the key `"software"`. """ def __init__(self, *args, **kwargs): # Initialize the dictionary with converted values super().__init__() self.update(*args, **kwargs) self._check_all_values_same() def _check_all_values_same(self): # Check if all values in the dictionary are the same values = list(self.values()) if values and all(v == values[0] for v in values): self.integration_time = values[0].integration_time self.passive_time = values[0].passive_time self.total_time = values[0].total_time self.active_time = values[0].active_time self.delay_time = values[0].delay_time self.repetitions = values[0].repetitions def __setitem__(self, key, value): super().__setitem__(key, SynchDescription(value))
[docs] def update(self, *args, **kwargs): """Cast dictionary values to :class:`~sardana.pool.poolsynchronization.SynchDescription` objects. """ # Convert all values to strings during update for k, v in dict(*args, **kwargs).items(): self[k] = SynchDescription(v)
[docs] @staticmethod def from_json(synch_description_json: str) -> "MultiSynchDescription": """JSON decode multi synchronization description data structure and return :class:`~sardana.pool.poolsynchronization.MultiSynchDescription`. :param synch_description_json: json-like multiple synchronization description :return: new multi synchronization description .. todo:: At some point remove the backwards compatibility for memorized values created with Python 2. In Python 2 IntEnum was serialized to "<class>.<attr>" e.g. "SynchDomain.Time" and we were using a class method `fromStr` to interpret the enumeration objects. """ synch_description_dict = CodecFactory().decode(('json', synch_description_json)) return MultiSynchDescription(synch_description_dict)
[docs] class PoolSynchronization(PoolAction): """Synchronization action. It coordinates trigger/gate elements and software synchronizer. .. todo: Think of moving the ready/busy mechanism to PoolAction """ def __init__(self, main_element, name="Synchronization"): PoolAction.__init__(self, main_element, name) # Even if rest of Sardana is using "." in logger names use "-" as # sepator. This is in order to avoid confusion about the logger # hierary - by default python logging use "." to indicate loggers' # hirarchy in case parent-children relation is established between the # loggers. # TODO: review if it is possible in Sardana to use a common separator. soft_synch_name = main_element.name + "-SoftSynch" self._synch_soft = FunctionGenerator(name=soft_synch_name) self._listener = None self._ready = threading.Event() self._ready.set() def _is_ready(self): return self._ready.is_set() def _wait(self, timeout=None): return self._ready.wait(timeout) def _set_ready(self, _=None): self._ready.set() def _is_busy(self): return not self._ready.is_set() def _set_busy(self): self._ready.clear()
[docs] def add_listener(self, listener): self._listener = listener
[docs] def start_action(self, ctrls: List, multi_synch_description: MultiSynchDescription, moveable: Union[PoolMotor, PoolPseudoMotor, None] = None, software_synchronizer_initial_domain: Any = None, *args: Any, **kwargs: Any) -> None: """Start synchronization action. :param ctrls: list of enabled trigger/gate controllers :param multi_synch_description: synchronization description for each synchronizer. :param moveable: (optional) moveable object used as the synchronization source in the Position domain :class:`~sardana.pool.poolpseudomotor.PoolPseudoMotor` :param software_synchronizer_initial_domain: (optional) - initial domain for software synchronizer, can be either :obj:`~sardana.pool.pooldefs.SynchDomain.Time` or :obj:`~sardana.pool.pooldefs.SynchDomain.Position` """ def pre_synch_one(pool_ctrl, axis, synch_description): ret = pool_ctrl.ctrl.PreSynchOne(axis, synch_description) if not ret: msg = ("%s.PreSynchOne(%d) returns False" % (ctrl.name, axis)) raise Exception(msg) with ActionContext(self): # loads synchronization description for ctrl in ctrls: pool_ctrl = ctrl.element pool_ctrl.ctrl.PreSynchAll() dial_synch_description = None for channel in ctrl.get_channels(enabled=True): synch_description = multi_synch_description[channel.name] axis = channel.axis # for backwards compatibility only translates to dial position # when one use "moveable on input" feature if (moveable is not None and channel.moveable_on_input is not None): if dial_synch_description is None: dial_synch_description = synch_description.to_dial( sign=moveable.sign.value, offset=moveable.offset.value ) pre_synch_one(pool_ctrl, axis, dial_synch_description) pool_ctrl.ctrl.SynchOne(axis, dial_synch_description) else: pre_synch_one(pool_ctrl, axis, synch_description) pool_ctrl.ctrl.SynchOne(axis, synch_description) pool_ctrl.ctrl.SynchAll() # attaching listener (usually acquisition action) # to the software trigger gate generator if self._listener is not None: # Get software synchronization synch_description = multi_synch_description["software"] if software_synchronizer_initial_domain is not None: self._synch_soft.initial_domain = software_synchronizer_initial_domain self._synch_soft.set_configuration(synch_description) self._synch_soft.add_listener(self._listener) remove_acq_listener = partial(self._synch_soft.remove_listener, self._listener) self.add_finish_hook(remove_acq_listener, False) self._synch_soft.add_listener( self.main_element.on_element_changed) remove_mg_listener = partial(self._synch_soft.remove_listener, self.main_element) self.add_finish_hook(remove_mg_listener, False) # subscribing to the position change events to generate events # in position domain if moveable is not None: position = moveable.get_position_attribute() position.add_listener(self._synch_soft) remove_pos_listener = partial(position.remove_listener, self._synch_soft) self.add_finish_hook(remove_pos_listener, False) # start software synchronizer if self._listener is not None: self._synch_soft.start() get_thread_pool().add(self._synch_soft.run) # PreStartAll on all controllers for ctrl in ctrls: pool_ctrl = ctrl.element pool_ctrl.ctrl.PreStartAll() # PreStartOne & StartOne on all elements for ctrl in ctrls: pool_ctrl = ctrl.element for channel in ctrl.get_channels(enabled=True): axis = channel.axis ret = pool_ctrl.ctrl.PreStartOne(axis) if not ret: raise Exception("%s.PreStartOne(%d) returns False" % (pool_ctrl.name, axis)) pool_ctrl.ctrl.StartOne(axis) # set the state of all elements to inform their listeners self._channels = [] for ctrl in ctrls: for channel in ctrl.get_channels(enabled=True): channel.set_state(State.Moving, propagate=2) self._channels.append(channel) # StartAll on all controllers for ctrl in ctrls: pool_ctrl = ctrl.element pool_ctrl.ctrl.StartAll()
[docs] def is_triggering(self, states: Dict[sardana.pool.poolelement.PoolElement, Tuple[Tuple[int, str], str]]) -> bool: """Determines if we are synchronizing or not based on the states returned by the controller(s) and the software synchronizer. :param states: a map containing state information as returned by read_state_info: ((state, status), exception_error) :return: returns True if is triggering or False otherwise """ for elem in states: state_info_idx = 0 state_idx = 0 state_tggate = states[elem][state_info_idx][state_idx] if self._is_in_action(state_tggate): return True return False
[docs] @DebugIt() def action_loop(self): """action_loop method """ states = {} for channel in self._channels: element = channel.element states[element] = None # Triggering loop # TODO: make nap configurable (see motion or acquisition loops) nap = 0.01 while True: self.read_state_info(ret=states) if not self.is_triggering(states): break time.sleep(nap) # Set element states after ending the triggering for element, state_info in list(states.items()): with element: element.clear_operation() state_info = element._from_ctrl_state_info(state_info) element.set_state_info(state_info, propagate=2) # wait for software synchronizer to finish if self._listener is not None: while True: if not self._synch_soft.is_started(): break time.sleep(0.01)