#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.tango-controls.org/static/sardana/latest/doc/html/index.html
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool library. It defines the classes
for the synchronization"""
__all__ = ["PoolSynchronization", "SynchDescription", "TGChannel"]
import copy
import time
import threading
from functools import partial
from typing import Any, List, Union, Dict, Tuple
import sardana
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.log import DebugIt
from sardana import State
from sardana.sardanathreadpool import get_thread_pool
from sardana.pool.pooldefs import SynchDomain, SynchParam
from sardana.pool.poolaction import ActionContext, PoolActionItem, PoolAction
from sardana.util.funcgenerator import FunctionGenerator
from sardana.pool.poolmotor import PoolMotor
from sardana.pool.poolpseudomotor import PoolPseudoMotor
# The purpose of this class was inspired on the CTAcquisition concept
class TGChannel(PoolActionItem):
"""An item involved in the trigger/gate generation.
Maps directly to a trigger object
.. note::
The TGChannel class has been included in Sardana
on a provisional basis. Backwards incompatible changes
(up to and including removal of the module) may occur if
deemed necessary by the core developers.
"""
def __init__(self, trigger_gate, info=None):
PoolActionItem.__init__(self, trigger_gate)
if info:
self.__dict__.update(info)
def __getattr__(self, name):
return getattr(self.element, name)
[docs]
class SynchDescription(list):
"""Synchronization description. It is composed from groups - repetitions
of equidistant synchronization events. Each group is described by
:class:`~sardana.pool.pooldefs.SynchParam` parameters which may have
values in :class:`~sardana.pool.pooldefs.SynchDomain` domains.
"""
@property
def repetitions(self):
repetitions = 0
for group in self:
repetitions += group[SynchParam.Repeats]
return repetitions
@property
def delay_time(self):
return self._get_param(SynchParam.Delay)
@property
def active_time(self):
return self._get_param(SynchParam.Active)
@property
def total_time(self):
return self._get_param(SynchParam.Total)
@property
def passive_time(self):
return self.total_time - self.active_time
@property
def integration_time(self):
integration_time = self.active_time
if isinstance(integration_time, float):
return integration_time
elif len(integration_time) == 0:
raise Exception("The synchronization description group has not "
"been initialized")
elif len(integration_time) > 1:
raise Exception("There are more than one synchronization "
"description groups")
else:
raise Exception("Synchronization description wrong format")
def _get_param(self, param: sardana.pool.pooldefs.SynchDomain, domain: Any = SynchDomain.Time) -> Union[float, List[float]]:
"""
Extract parameter from synchronization description and its groups. If
there is only one group in the synchronization description then
returns float with the value. Otherwise a list of floats with
different values.
:param param: parameter type
:param domain: domain
:return: parameter value(s)
:rtype float or [float]
"""
if len(self) == 1:
return self[0][param][domain]
values = []
for group in self:
value = group[param][domain]
repeats = group[SynchParam.Repeats]
values += [value] * repeats
return values
[docs]
def to_dial(self, sign: int, offset: float) -> "SynchDescription":
"""Convert position domain group parameters to dial position
Maintain time domain group parameters (if present) as they are.
Formula is: pos = sign * dial + offset.
:param sign: sign (1 or -1) to apply in the formula
:param offset: offset to apply in the formula
:return: new synchronization description in dial position
"""
synch = copy.deepcopy(self)
for group in synch:
try:
pos = group[SynchParam.Initial][SynchDomain.Position]
except KeyError:
pass
else:
group[SynchParam.Initial][SynchDomain.Position] = (pos - offset) / sign
for param in (SynchParam.Delay,
SynchParam.Active,
SynchParam.Total):
try:
disp = group[param][SynchDomain.Position]
except KeyError:
continue
group[param][SynchDomain.Position] = disp / sign
return synch
[docs]
@staticmethod
def from_json(synch_description_json):
"""JSON decode synchronization description data structure and cast
SynchParam and SynchDomain keys from strings to enums.
.. todo:: At some point remove the backwards compatibility
for memorized values created with Python 2. In Python 2 IntEnum was
serialized to "<class>.<attr>" e.g. "SynchDomain.Time" and we were
using a class method `fromStr` to interpret the enumeration objects.
"""
synch_description_with_str_keys = CodecFactory().decode(('json', synch_description_json))
synch_description = SynchDescription()
for group_str in synch_description_with_str_keys:
group = {}
for param_str, conf_str in group_str.items():
try:
param = SynchParam(int(param_str))
except ValueError:
param = SynchParam.fromStr(param_str)
if isinstance(conf_str, dict):
conf = {}
for domain_str, value in conf_str.items():
try:
domain = SynchDomain(int(domain_str))
except ValueError:
domain = SynchDomain.fromStr(domain_str)
conf[domain] = value
else:
conf = conf_str
group[param] = conf
synch_description.append(group)
return synch_description
[docs]
class PoolSynchronization(PoolAction):
"""Synchronization action.
It coordinates trigger/gate elements and software synchronizer.
.. todo: Think of moving the ready/busy mechanism to PoolAction
"""
def __init__(self, main_element, name="Synchronization"):
PoolAction.__init__(self, main_element, name)
# Even if rest of Sardana is using "." in logger names use "-" as
# sepator. This is in order to avoid confusion about the logger
# hierary - by default python logging use "." to indicate loggers'
# hirarchy in case parent-children relation is established between the
# loggers.
# TODO: review if it is possible in Sardana to use a common separator.
soft_synch_name = main_element.name + "-SoftSynch"
self._synch_soft = FunctionGenerator(name=soft_synch_name)
self._listener = None
self._ready = threading.Event()
self._ready.set()
def _is_ready(self):
return self._ready.is_set()
def _wait(self, timeout=None):
return self._ready.wait(timeout)
def _set_ready(self, _=None):
self._ready.set()
def _is_busy(self):
return not self._ready.is_set()
def _set_busy(self):
self._ready.clear()
[docs]
def add_listener(self, listener):
self._listener = listener
[docs]
def start_action(self, ctrls: List, synch_description: SynchDescription, moveable: Union[PoolMotor, PoolPseudoMotor, None] = None,
software_synchronizer_initial_domain: Any = None,
*args: Any, **kwargs: Any) -> None:
"""Start synchronization action.
:param ctrls: list of enabled trigger/gate controllers
:param synch_description: synchronization description
:param moveable: (optional) moveable object used as the
synchronization source in the Position domain
:class:`~sardana.pool.poolpseudomotor.PoolPseudoMotor`
:param software_synchronizer_initial_domain: (optional) -
initial domain for software synchronizer, can be either
:obj:`~sardana.pool.pooldefs.SynchDomain.Time` or
:obj:`~sardana.pool.pooldefs.SynchDomain.Position`
"""
def pre_synch_one(pool_ctrl, axis, synch_description):
ret = pool_ctrl.ctrl.PreSynchOne(axis, synch_description)
if not ret:
msg = ("%s.PreSynchOne(%d) returns False" %
(ctrl.name, axis))
raise Exception(msg)
with ActionContext(self):
# loads synchronization description
for ctrl in ctrls:
pool_ctrl = ctrl.element
pool_ctrl.ctrl.PreSynchAll()
dial_synch_description = None
for channel in ctrl.get_channels(enabled=True):
axis = channel.axis
# for backwards compatibility only translates to dial position
# when one use "moveable on input" feature
if (moveable is not None
and channel.moveable_on_input is not None):
if dial_synch_description is None:
dial_synch_description = synch_description.to_dial(
sign=moveable.sign.value,
offset=moveable.offset.value
)
pre_synch_one(pool_ctrl, axis, dial_synch_description)
pool_ctrl.ctrl.SynchOne(axis, dial_synch_description)
else:
pre_synch_one(pool_ctrl, axis, synch_description)
pool_ctrl.ctrl.SynchOne(axis, synch_description)
pool_ctrl.ctrl.SynchAll()
# attaching listener (usually acquisition action)
# to the software trigger gate generator
if self._listener is not None:
if software_synchronizer_initial_domain is not None:
self._synch_soft.initial_domain = software_synchronizer_initial_domain
self._synch_soft.set_configuration(synch_description)
self._synch_soft.add_listener(self._listener)
remove_acq_listener = partial(self._synch_soft.remove_listener,
self._listener)
self.add_finish_hook(remove_acq_listener, False)
self._synch_soft.add_listener(
self.main_element.on_element_changed)
remove_mg_listener = partial(self._synch_soft.remove_listener,
self.main_element)
self.add_finish_hook(remove_mg_listener, False)
# subscribing to the position change events to generate events
# in position domain
if moveable is not None:
position = moveable.get_position_attribute()
position.add_listener(self._synch_soft)
remove_pos_listener = partial(position.remove_listener,
self._synch_soft)
self.add_finish_hook(remove_pos_listener, False)
# start software synchronizer
if self._listener is not None:
self._synch_soft.start()
get_thread_pool().add(self._synch_soft.run)
# PreStartAll on all controllers
for ctrl in ctrls:
pool_ctrl = ctrl.element
pool_ctrl.ctrl.PreStartAll()
# PreStartOne & StartOne on all elements
for ctrl in ctrls:
pool_ctrl = ctrl.element
for channel in ctrl.get_channels(enabled=True):
axis = channel.axis
ret = pool_ctrl.ctrl.PreStartOne(axis)
if not ret:
raise Exception("%s.PreStartOne(%d) returns False"
% (pool_ctrl.name, axis))
pool_ctrl.ctrl.StartOne(axis)
# set the state of all elements to inform their listeners
self._channels = []
for ctrl in ctrls:
for channel in ctrl.get_channels(enabled=True):
channel.set_state(State.Moving, propagate=2)
self._channels.append(channel)
# StartAll on all controllers
for ctrl in ctrls:
pool_ctrl = ctrl.element
pool_ctrl.ctrl.StartAll()
[docs]
def is_triggering(self, states: Dict[sardana.pool.poolelement.PoolElement, Tuple[Tuple[int, str], str]]) -> bool:
"""Determines if we are synchronizing or not based on the states
returned by the controller(s) and the software synchronizer.
:param states: a map containing state information as returned by
read_state_info: ((state, status), exception_error)
:return: returns True if is triggering or False otherwise
"""
for elem in states:
state_info_idx = 0
state_idx = 0
state_tggate = states[elem][state_info_idx][state_idx]
if self._is_in_action(state_tggate):
return True
return False
[docs]
@DebugIt()
def action_loop(self):
"""action_loop method
"""
states = {}
for channel in self._channels:
element = channel.element
states[element] = None
# Triggering loop
# TODO: make nap configurable (see motion or acquisition loops)
nap = 0.01
while True:
self.read_state_info(ret=states)
if not self.is_triggering(states):
break
time.sleep(nap)
# Set element states after ending the triggering
for element, state_info in list(states.items()):
with element:
element.clear_operation()
state_info = element._from_ctrl_state_info(state_info)
element.set_state_info(state_info, propagate=2)
# wait for software synchronizer to finish
if self._listener is not None:
while True:
if not self._synch_soft.is_started():
break
time.sleep(0.01)