#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.tango-controls.org/static/sardana/latest/doc/html/index.html
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool library. It defines the classes
for the synchronization"""
__all__ = ["PoolSynchronization", "SynchDescription", "TGChannel"]
import copy
import time
import threading
from functools import partial
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.log import DebugIt
from sardana import State
from sardana.sardanathreadpool import get_thread_pool
from sardana.pool.pooldefs import SynchDomain, SynchParam
from sardana.pool.poolaction import ActionContext, PoolActionItem, PoolAction
from sardana.util.funcgenerator import FunctionGenerator
# The purpose of this class was inspired on the CTAcquisition concept
class TGChannel(PoolActionItem):
"""An item involved in the trigger/gate generation.
Maps directly to a trigger object
.. note::
The TGChannel class has been included in Sardana
on a provisional basis. Backwards incompatible changes
(up to and including removal of the module) may occur if
deemed necessary by the core developers.
"""
def __init__(self, trigger_gate, info=None):
PoolActionItem.__init__(self, trigger_gate)
if info:
self.__dict__.update(info)
def __getattr__(self, name):
return getattr(self.element, name)
[docs]class SynchDescription(list):
"""Synchronization description. It is composed from groups - repetitions
of equidistant synchronization events. Each group is described by
:class:`~sardana.pool.pooldefs.SynchParam` parameters which may have
values in :class:`~sardana.pool.pooldefs.SynchDomain` domains.
"""
@property
def repetitions(self):
repetitions = 0
for group in self:
repetitions += group[SynchParam.Repeats]
return repetitions
@property
def delay_time(self):
return self._get_param(SynchParam.Delay)
@property
def active_time(self):
return self._get_param(SynchParam.Active)
@property
def total_time(self):
return self._get_param(SynchParam.Total)
@property
def passive_time(self):
return self.total_time - self.active_time
@property
def integration_time(self):
integration_time = self.active_time
if isinstance(integration_time, float):
return integration_time
elif len(integration_time) == 0:
raise Exception("The synchronization description group has not "
"been initialized")
elif len(integration_time) > 1:
raise Exception("There are more than one synchronization "
"description groups")
else:
raise Exception("Synchronization description wrong format")
def _get_param(self, param, domain=SynchDomain.Time):
"""
Extract parameter from synchronization description and its groups. If
there is only one group in the synchronization description then
returns float with the value. Otherwise a list of floats with
different values.
:param param: parameter type
:type param: :class:`~sardana.pool.pooldefs.SynchParam`
:param domain: domain
:type param: :class:`~sardana.pool.pooldefs.SynchDomain`
:return: parameter value(s)
:rtype float or [float]
"""
if len(self) == 1:
return self[0][param][domain]
values = []
for group in self:
value = group[param][domain]
repeats = group[SynchParam.Repeats]
values += [value] * repeats
return values
[docs] def to_dial(self, sign, offset):
"""Convert position domain group parameters to dial position
Maintain time domain group parameters (if present) as they are.
Formula is: pos = sign * dial + offset.
:param sign: sign (1 or -1) to apply in the formula
:type sign: int
:param offset: offset to apply in the formula
:type offset: float
:return: new synchronization description in dial position
:rtype: :class:`~sardana.pool.poolsynchronization.SynchDescription`
"""
synch = copy.deepcopy(self)
for group in synch:
try:
pos = group[SynchParam.Initial][SynchDomain.Position]
except KeyError:
pass
else:
group[SynchParam.Initial][SynchDomain.Position] = (pos - offset) / sign
for param in (SynchParam.Delay,
SynchParam.Active,
SynchParam.Total):
try:
disp = group[param][SynchDomain.Position]
except KeyError:
continue
group[param][SynchDomain.Position] = disp / sign
return synch
[docs] @staticmethod
def from_json(synch_description_json):
"""JSON decode synchronization description data structure and cast
SynchParam and SynchDomain keys from strings to enums.
.. todo:: At some point remove the backwards compatibility
for memorized values created with Python 2. In Python 2 IntEnum was
serialized to "<class>.<attr>" e.g. "SynchDomain.Time" and we were
using a class method `fromStr` to interpret the enumeration objects.
"""
synch_description_with_str_keys = CodecFactory().decode(('json', synch_description_json))
synch_description = SynchDescription()
for group_str in synch_description_with_str_keys:
group = {}
for param_str, conf_str in group_str.items():
try:
param = SynchParam(int(param_str))
except ValueError:
param = SynchParam.fromStr(param_str)
if isinstance(conf_str, dict):
conf = {}
for domain_str, value in conf_str.items():
try:
domain = SynchDomain(int(domain_str))
except ValueError:
domain = SynchDomain.fromStr(domain_str)
conf[domain] = value
else:
conf = conf_str
group[param] = conf
synch_description.append(group)
return synch_description
[docs]class PoolSynchronization(PoolAction):
"""Synchronization action.
It coordinates trigger/gate elements and software synchronizer.
.. todo: Think of moving the ready/busy mechanism to PoolAction
"""
def __init__(self, main_element, name="Synchronization"):
PoolAction.__init__(self, main_element, name)
# Even if rest of Sardana is using "." in logger names use "-" as
# sepator. This is in order to avoid confusion about the logger
# hierary - by default python logging use "." to indicate loggers'
# hirarchy in case parent-children relation is established between the
# loggers.
# TODO: review if it is possible in Sardana to use a common separator.
soft_synch_name = main_element.name + "-SoftSynch"
self._synch_soft = FunctionGenerator(name=soft_synch_name)
self._listener = None
self._ready = threading.Event()
self._ready.set()
def _is_ready(self):
return self._ready.is_set()
def _wait(self, timeout=None):
return self._ready.wait(timeout)
def _set_ready(self, _=None):
self._ready.set()
def _is_busy(self):
return not self._ready.is_set()
def _set_busy(self):
self._ready.clear()
[docs] def add_listener(self, listener):
self._listener = listener
[docs] def start_action(self, ctrls, synch_description, moveable=None,
software_synchronizer_initial_domain=None,
*args, **kwargs):
"""Start synchronization action.
:param ctrls: list of enabled trigger/gate controllers
:type ctrls: list
:param synch_description: synchronization description
:type synch_description:
:class:`~sardana.pool.poolsynchronization.SynchDescription`
:param moveable: (optional) moveable object used as the
synchronization source in the Position domain
:type moveable: :class:`~sardna.pool.poolmotor.PoolMotor` or
:class:`~sardana.pool.poolpseudomotor.PoolPseudoMotor`
:param software_synchronizer_initial_domain: (optional) -
initial domain for software synchronizer, can be either
:obj:`~sardana.pool.pooldefs.SynchDomain.Time` or
:obj:`~sardana.pool.pooldefs.SynchDomain.Position`
"""
def pre_synch_one(pool_ctrl, axis, synch_description):
ret = pool_ctrl.ctrl.PreSynchOne(axis, synch_description)
if not ret:
msg = ("%s.PreSynchOne(%d) returns False" %
(ctrl.name, axis))
raise Exception(msg)
with ActionContext(self):
# loads synchronization description
for ctrl in ctrls:
pool_ctrl = ctrl.element
pool_ctrl.ctrl.PreSynchAll()
dial_synch_description = None
for channel in ctrl.get_channels(enabled=True):
axis = channel.axis
# for backwards compatibility only translates to dial position
# when one use "moveable on input" feature
if (moveable is not None
and channel.moveable_on_input is not None):
if dial_synch_description is None:
dial_synch_description = synch_description.to_dial(
sign=moveable.sign.value,
offset=moveable.offset.value
)
pre_synch_one(pool_ctrl, axis, dial_synch_description)
pool_ctrl.ctrl.SynchOne(axis, dial_synch_description)
else:
pre_synch_one(pool_ctrl, axis, synch_description)
pool_ctrl.ctrl.SynchOne(axis, synch_description)
pool_ctrl.ctrl.SynchAll()
# attaching listener (usually acquisition action)
# to the software trigger gate generator
if self._listener is not None:
if software_synchronizer_initial_domain is not None:
self._synch_soft.initial_domain = software_synchronizer_initial_domain
self._synch_soft.set_configuration(synch_description)
self._synch_soft.add_listener(self._listener)
remove_acq_listener = partial(self._synch_soft.remove_listener,
self._listener)
self.add_finish_hook(remove_acq_listener, False)
self._synch_soft.add_listener(
self.main_element.on_element_changed)
remove_mg_listener = partial(self._synch_soft.remove_listener,
self.main_element)
self.add_finish_hook(remove_mg_listener, False)
# subscribing to the position change events to generate events
# in position domain
if moveable is not None:
position = moveable.get_position_attribute()
position.add_listener(self._synch_soft)
remove_pos_listener = partial(position.remove_listener,
self._synch_soft)
self.add_finish_hook(remove_pos_listener, False)
# start software synchronizer
if self._listener is not None:
self._synch_soft.start()
get_thread_pool().add(self._synch_soft.run)
# PreStartAll on all controllers
for ctrl in ctrls:
pool_ctrl = ctrl.element
pool_ctrl.ctrl.PreStartAll()
# PreStartOne & StartOne on all elements
for ctrl in ctrls:
pool_ctrl = ctrl.element
for channel in ctrl.get_channels(enabled=True):
axis = channel.axis
ret = pool_ctrl.ctrl.PreStartOne(axis)
if not ret:
raise Exception("%s.PreStartOne(%d) returns False"
% (pool_ctrl.name, axis))
pool_ctrl.ctrl.StartOne(axis)
# set the state of all elements to inform their listeners
self._channels = []
for ctrl in ctrls:
for channel in ctrl.get_channels(enabled=True):
channel.set_state(State.Moving, propagate=2)
self._channels.append(channel)
# StartAll on all controllers
for ctrl in ctrls:
pool_ctrl = ctrl.element
pool_ctrl.ctrl.StartAll()
[docs] def is_triggering(self, states):
"""Determines if we are synchronizing or not based on the states
returned by the controller(s) and the software synchronizer.
:param states: a map containing state information as returned by
read_state_info: ((state, status), exception_error)
:type states: dict<PoolElement, tuple(tuple(int, str), str))
:return: returns True if is triggering or False otherwise
:rtype: bool
"""
for elem in states:
state_info_idx = 0
state_idx = 0
state_tggate = states[elem][state_info_idx][state_idx]
if self._is_in_action(state_tggate):
return True
return False
[docs] @DebugIt()
def action_loop(self):
"""action_loop method
"""
states = {}
for channel in self._channels:
element = channel.element
states[element] = None
# Triggering loop
# TODO: make nap configurable (see motion or acquisition loops)
nap = 0.01
while True:
self.read_state_info(ret=states)
if not self.is_triggering(states):
break
time.sleep(nap)
# Set element states after ending the triggering
for element, state_info in list(states.items()):
with element:
element.clear_operation()
state_info = element._from_ctrl_state_info(state_info)
element.set_state_info(state_info, propagate=2)
# wait for software synchronizer to finish
if self._listener is not None:
while True:
if not self._synch_soft.is_started():
break
time.sleep(0.01)