#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool library. It defines the base classes
for"""
__all__ = ["PoolMotorGroup"]
__docformat__ = 'restructuredtext'
import time
import collections
from typing import Sequence, Any
import sardana
from sardana import ElementType
from sardana.sardanaattribute import SardanaAttribute
from sardana.pool.poolgroupelement import PoolGroupElement
from sardana.pool.poolmotion import PoolMotion
from PyTango import AttributeProxy
class Position(SardanaAttribute):
def __init__(self, *args, **kwargs):
self._w_value_map = None
super(Position, self).__init__(*args, **kwargs)
for pos_attr in self.obj.get_physical_position_attribute_iterator():
pos_attr.add_listener(self.on_change)
def _has_value(self):
for pos_attr in self.obj.get_physical_position_attribute_iterator():
if not pos_attr.has_value():
return False
return True
def _in_error(self):
for pos_attr in self.obj.get_physical_position_attribute_iterator():
if pos_attr.in_error():
return True
return False
def get_elements(self):
return self.obj.get_user_elements()
def get_element_nb(self):
return len(self.get_user_elements())
def _get_exc_info(self):
for position_attr in self.obj.get_physical_position_attribute_iterator():
if position_attr.error:
return position_attr.get_exc_info()
def _get_timestamp(self):
return max([pos_attr.timestamp for pos_attr in self.obj.get_physical_position_attribute_iterator()])
def _set_value(self, value, exc_info=None, timestamp=None, propagate=1):
raise Exception(
"Cannot set position value for motor group %s" % self.obj.name)
def _get_value(self):
return [position.value for position in self.obj.get_physical_position_attribute_iterator()]
def _get_write_value(self):
return [position.w_value for position in self.obj.get_physical_position_attribute_iterator()]
def _set_write_value(self, w_value, timestamp=None, propagate=1):
assert len(w_value) == self.get_element_nb()
if isinstance(w_value, collections.abc.Sequence):
w_value_map = {}
for v, elem in zip(w_value, self.get_elements()):
w_value_map[elem] = v
else:
w_value_map = w_value
w_value = []
for elem in self.get_elements():
w_value.append(w_value_map[elem])
self._w_value_map = w_value
super(Position, self).set_write_value(w_value, timestamp=timestamp,
propagate=propagate)
def on_change(self, evt_src, evt_type, evt_value):
self.fire_read_event(propagate=evt_type.priority)
def update(self, cache=True, propagate=1):
if cache:
for phy_elem_pos in self.obj.get_low_level_physical_position_attribute_iterator():
if not phy_elem_pos.has_value():
cache = False
break
if not cache:
dial_position_values = self.obj.motion.read_dial_position(
serial=True)
for motion_obj, position_value in \
list(dial_position_values.items()):
motion_obj.put_dial_position(
position_value, propagate=propagate)
[docs]
class PoolMotorGroup(PoolGroupElement):
def __init__(self, **kwargs):
self._physical_elements = []
self._in_start_move = False
kwargs['elem_type'] = ElementType.MotorGroup
PoolGroupElement.__init__(self, **kwargs)
on_change = self.on_change
self._position = Position(self, listeners=on_change)
def _create_action_cache(self):
motion_name = "%s.Motion" % self._name
return PoolMotion(self, motion_name)
# --------------------------------------------------------------------------
# Event forwarding
# --------------------------------------------------------------------------
[docs]
def on_change(self, evt_src, evt_type, evt_value):
# forward all events coming from attributes to the listeners
self.fire_event(evt_type, evt_value)
[docs]
def on_element_changed(self, evt_src, evt_type, evt_value):
name = evt_type.name.lower()
if name in ('state', 'position'):
state, status = self._calculate_states()
if name == 'state':
propagate_state = evt_type.priority
else:
propagate_state = 0
self.set_state(state, propagate=propagate_state)
self.set_status(status, propagate=propagate_state)
[docs]
def add_user_element(self, element, index=None):
elem_type = element.get_type()
if elem_type == ElementType.Motor:
pass
elif elem_type == ElementType.PseudoMotor:
# TODO: make this happen
pass
else:
raise Exception("element %s is not a motor" % element.name)
PoolGroupElement.add_user_element(self, element, index=index)
# --------------------------------------------------------------------------
# position
# --------------------------------------------------------------------------
[docs]
def get_position_attribute(self):
return self._position
[docs]
def get_low_level_physical_position_attribute_iterator(self):
return self.get_physical_elements_attribute_iterator()
[docs]
def get_physical_position_attribute_iterator(self):
return self.get_user_elements_attribute_iterator()
[docs]
def get_physical_positions_attribute_sequence(self):
return self.get_user_elements_attribute_sequence()
[docs]
def get_physical_positions_attribute_map(self):
return self.get_user_elements_attribute_map()
[docs]
def get_position(self, cache: bool = True, propagate: int = 1) -> sardana.sardanaattribute.SardanaAttribute:
"""Returns the user position.
:param cache:
if ``True`` (default) return value in cache, otherwise read value
from hardware
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:return:
the user position
"""
position = self._position
position.update(cache=cache, propagate=propagate)
return position
[docs]
def set_position(self, positions: Sequence[float]) -> None:
"""Moves the motor group to the specified user positions
:param positions:
the user positions to move to
"""
self.start_move(positions)
[docs]
def set_write_position(self, w_position: Sequence[float], timestamp: Any = None, propagate: int = 1) -> None:
"""Sets a new write value for the user position.
:param w_position:
the new write value for user position
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
"""
self._position.set_write_value(w_position, timestamp=timestamp,
propagate=propagate)
position = property(get_position, set_position,
doc="motor group positions")
# --------------------------------------------------------------------------
# default acquisition channel
# --------------------------------------------------------------------------
[docs]
def get_default_attribute(self):
return self.get_position_attribute()
# --------------------------------------------------------------------------
# motion
# --------------------------------------------------------------------------
[docs]
def get_motion(self):
return self.get_action_cache()
motion = property(get_motion, doc="motion object")
# --------------------------------------------------------------------------
# motion calculation
# --------------------------------------------------------------------------
[docs]
def calculate_motion(self, new_positions, items=None):
user_elements = self.get_user_elements()
if items is None:
items = {}
calculated = {}
for new_position, element in zip(new_positions, user_elements):
calculated[element] = new_position
# TODO: use Sardana attribute configuration and
# get rid of accessing tango - see sardana-org/sardana#663
from sardana.tango.core.util import _check_attr_range
try:
_check_attr_range(element.name, "position", new_position)
except ValueError as e:
# TODO: don't concatenate exception message whenever
# tango-controls/pytango#340 is fixed
msg = "requested move of {} is outside of limits ({})".format(
element.name, str(e)
)
raise ValueError(msg) from e
for new_position, element in zip(new_positions, user_elements):
element.calculate_motion(new_position, items=items,
calculated=calculated)
return items
[docs]
def start_move(self, new_position):
self._in_start_move = True
try:
return self._start_move(new_position)
finally:
self._in_start_move = False
def _start_move(self, new_positions):
self._stopped = False
self._aborted = False
self._released = False
items = self.calculate_motion(new_positions)
timestamp = time.time()
for item, position_info in list(items.items()):
item.set_write_position(position_info[0], timestamp=timestamp,
propagate=0)
if not self._simulation_mode:
self.motion.run(items=items)