#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool library. It defines the base classes
for"""
__all__ = ["PoolMotorGroup"]
__docformat__ = 'restructuredtext'
import time
import collections
from sardana import ElementType
from sardana.sardanaattribute import SardanaAttribute
from sardana.pool.poolgroupelement import PoolGroupElement
from sardana.pool.poolmotion import PoolMotion
from PyTango import AttributeProxy
class Position(SardanaAttribute):
def __init__(self, *args, **kwargs):
self._w_value_map = None
super(Position, self).__init__(*args, **kwargs)
for pos_attr in self.obj.get_physical_position_attribute_iterator():
pos_attr.add_listener(self.on_change)
def _has_value(self):
for pos_attr in self.obj.get_physical_position_attribute_iterator():
if not pos_attr.has_value():
return False
return True
def _in_error(self):
for pos_attr in self.obj.get_physical_position_attribute_iterator():
if pos_attr.in_error():
return True
return False
def get_elements(self):
return self.obj.get_user_elements()
def get_element_nb(self):
return len(self.get_user_elements())
def _get_exc_info(self):
for position_attr in self.obj.get_physical_position_attribute_iterator():
if position_attr.error:
return position_attr.get_exc_info()
def _get_timestamp(self):
return max([pos_attr.timestamp for pos_attr in self.obj.get_physical_position_attribute_iterator()])
def _set_value(self, value, exc_info=None, timestamp=None, propagate=1):
raise Exception(
"Cannot set position value for motor group %s" % self.obj.name)
def _get_value(self):
return [position.value for position in self.obj.get_physical_position_attribute_iterator()]
def _get_write_value(self):
return [position.w_value for position in self.obj.get_physical_position_attribute_iterator()]
def _set_write_value(self, w_value, timestamp=None, propagate=1):
assert len(w_value) == self.get_element_nb()
if isinstance(w_value, collections.abc.Sequence):
w_value_map = {}
for v, elem in zip(w_value, self.get_elements()):
w_value_map[elem] = v
else:
w_value_map = w_value
w_value = []
for elem in self.get_elements():
w_value.append(w_value_map[elem])
self._w_value_map = w_value
super(Position, self).set_write_value(w_value, timestamp=timestamp,
propagate=propagate)
def on_change(self, evt_src, evt_type, evt_value):
self.fire_read_event(propagate=evt_type.priority)
def update(self, cache=True, propagate=1):
if cache:
for phy_elem_pos in self.obj.get_low_level_physical_position_attribute_iterator():
if not phy_elem_pos.has_value():
cache = False
break
if not cache:
dial_position_values = self.obj.motion.read_dial_position(
serial=True)
for motion_obj, position_value in \
list(dial_position_values.items()):
motion_obj.put_dial_position(
position_value, propagate=propagate)
[docs]class PoolMotorGroup(PoolGroupElement):
def __init__(self, **kwargs):
self._physical_elements = []
self._in_start_move = False
kwargs['elem_type'] = ElementType.MotorGroup
PoolGroupElement.__init__(self, **kwargs)
on_change = self.on_change
self._position = Position(self, listeners=on_change)
def _create_action_cache(self):
motion_name = "%s.Motion" % self._name
return PoolMotion(self, motion_name)
# --------------------------------------------------------------------------
# Event forwarding
# --------------------------------------------------------------------------
[docs] def on_change(self, evt_src, evt_type, evt_value):
# forward all events coming from attributes to the listeners
self.fire_event(evt_type, evt_value)
[docs] def on_element_changed(self, evt_src, evt_type, evt_value):
name = evt_type.name.lower()
if name in ('state', 'position'):
state, status = self._calculate_states()
if name == 'state':
propagate_state = evt_type.priority
else:
propagate_state = 0
self.set_state(state, propagate=propagate_state)
self.set_status(status, propagate=propagate_state)
[docs] def add_user_element(self, element, index=None):
elem_type = element.get_type()
if elem_type == ElementType.Motor:
pass
elif elem_type == ElementType.PseudoMotor:
# TODO: make this happen
pass
else:
raise Exception("element %s is not a motor" % element.name)
PoolGroupElement.add_user_element(self, element, index=index)
# --------------------------------------------------------------------------
# position
# --------------------------------------------------------------------------
[docs] def get_position_attribute(self):
return self._position
[docs] def get_low_level_physical_position_attribute_iterator(self):
return self.get_physical_elements_attribute_iterator()
[docs] def get_physical_position_attribute_iterator(self):
return self.get_user_elements_attribute_iterator()
[docs] def get_physical_positions_attribute_sequence(self):
return self.get_user_elements_attribute_sequence()
[docs] def get_physical_positions_attribute_map(self):
return self.get_user_elements_attribute_map()
[docs] def get_position(self, cache=True, propagate=1):
"""Returns the user position.
:param cache:
if ``True`` (default) return value in cache, otherwise read value
from hardware
:type cache:
bool
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:type propagate:
int
:return:
the user position
:rtype:
:class:`~sardana.sardanaattribute.SardanaAttribute`"""
position = self._position
position.update(cache=cache, propagate=propagate)
return position
[docs] def set_position(self, positions):
"""Moves the motor group to the specified user positions
:param positions:
the user positions to move to
:type positions:
sequence< :class:`~numbers.Number` >"""
self.start_move(positions)
[docs] def set_write_position(self, w_position, timestamp=None, propagate=1):
"""Sets a new write value for the user position.
:param w_position:
the new write value for user position
:type w_position:
sequence< :class:`~numbers.Number` >
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:type propagate: int"""
self._position.set_write_value(w_position, timestamp=timestamp,
propagate=propagate)
position = property(get_position, set_position,
doc="motor group positions")
# --------------------------------------------------------------------------
# default acquisition channel
# --------------------------------------------------------------------------
[docs] def get_default_attribute(self):
return self.get_position_attribute()
# --------------------------------------------------------------------------
# motion
# --------------------------------------------------------------------------
[docs] def get_motion(self):
return self.get_action_cache()
motion = property(get_motion, doc="motion object")
# --------------------------------------------------------------------------
# motion calculation
# --------------------------------------------------------------------------
[docs] def calculate_motion(self, new_positions, items=None):
user_elements = self.get_user_elements()
if items is None:
items = {}
calculated = {}
for new_position, element in zip(new_positions, user_elements):
calculated[element] = new_position
# TODO: use Sardana attribute configuration and
# get rid of accessing tango - see sardana-org/sardana#663
from sardana.tango.core.util import _check_attr_range
try:
_check_attr_range(element.name, "position", new_position)
except ValueError as e:
# TODO: don't concatenate exception message whenever
# tango-controls/pytango#340 is fixed
msg = "requested move of {} is outside of limits ({})".format(
element.name, str(e)
)
raise ValueError(msg) from e
for new_position, element in zip(new_positions, user_elements):
element.calculate_motion(new_position, items=items,
calculated=calculated)
return items
[docs] def start_move(self, new_position):
self._in_start_move = True
try:
return self._start_move(new_position)
finally:
self._in_start_move = False
def _start_move(self, new_positions):
self._stopped = False
self._aborted = False
self._released = False
items = self.calculate_motion(new_positions)
timestamp = time.time()
for item, position_info in list(items.items()):
item.set_write_position(position_info[0], timestamp=timestamp,
propagate=0)
if not self._simulation_mode:
self.motion.run(items=items)