Source code for sardana.pool.poolmotorgroup

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Pool library. It defines the base classes
for"""

__all__ = ["PoolMotorGroup"]

__docformat__ = 'restructuredtext'

import time
import collections
from typing import Sequence, Any

import sardana
from sardana import ElementType
from sardana.sardanaattribute import SardanaAttribute
from sardana.pool.poolgroupelement import PoolGroupElement
from sardana.pool.poolmotion import PoolMotion

from PyTango import AttributeProxy


class Position(SardanaAttribute):

    def __init__(self, *args, **kwargs):
        self._w_value_map = None
        super(Position, self).__init__(*args, **kwargs)
        for pos_attr in self.obj.get_physical_position_attribute_iterator():
            pos_attr.add_listener(self.on_change)

    def _has_value(self):
        for pos_attr in self.obj.get_physical_position_attribute_iterator():
            if not pos_attr.has_value():
                return False
        return True

    def _in_error(self):
        for pos_attr in self.obj.get_physical_position_attribute_iterator():
            if pos_attr.in_error():
                return True
        return False

    def get_elements(self):
        return self.obj.get_user_elements()

    def get_element_nb(self):
        return len(self.get_user_elements())

    def _get_exc_info(self):
        for position_attr in self.obj.get_physical_position_attribute_iterator():
            if position_attr.error:
                return position_attr.get_exc_info()

    def _get_timestamp(self):
        return max([pos_attr.timestamp for pos_attr in self.obj.get_physical_position_attribute_iterator()])

    def _set_value(self, value, exc_info=None, timestamp=None, propagate=1):
        raise Exception(
            "Cannot set position value for motor group %s" % self.obj.name)

    def _get_value(self):
        return [position.value for position in self.obj.get_physical_position_attribute_iterator()]

    def _get_write_value(self):
        return [position.w_value for position in self.obj.get_physical_position_attribute_iterator()]

    def _set_write_value(self, w_value, timestamp=None, propagate=1):
        assert len(w_value) == self.get_element_nb()
        if isinstance(w_value, collections.abc.Sequence):
            w_value_map = {}
            for v, elem in zip(w_value, self.get_elements()):
                w_value_map[elem] = v
        else:
            w_value_map = w_value
            w_value = []
            for elem in self.get_elements():
                w_value.append(w_value_map[elem])
        self._w_value_map = w_value
        super(Position, self).set_write_value(w_value, timestamp=timestamp,
                                              propagate=propagate)

    def on_change(self, evt_src, evt_type, evt_value):
        self.fire_read_event(propagate=evt_type.priority)

    def update(self, cache=True, propagate=1):
        if cache:
            for phy_elem_pos in self.obj.get_low_level_physical_position_attribute_iterator():
                if not phy_elem_pos.has_value():
                    cache = False
                    break
        if not cache:
            dial_position_values = self.obj.motion.read_dial_position(
                serial=True)
            for motion_obj, position_value in \
                    list(dial_position_values.items()):
                motion_obj.put_dial_position(
                    position_value, propagate=propagate)


[docs] class PoolMotorGroup(PoolGroupElement): def __init__(self, **kwargs): self._physical_elements = [] self._in_start_move = False kwargs['elem_type'] = ElementType.MotorGroup PoolGroupElement.__init__(self, **kwargs) on_change = self.on_change self._position = Position(self, listeners=on_change) def _create_action_cache(self): motion_name = "%s.Motion" % self._name return PoolMotion(self, motion_name) # -------------------------------------------------------------------------- # Event forwarding # --------------------------------------------------------------------------
[docs] def on_change(self, evt_src, evt_type, evt_value): # forward all events coming from attributes to the listeners self.fire_event(evt_type, evt_value)
[docs] def on_element_changed(self, evt_src, evt_type, evt_value): name = evt_type.name.lower() if name in ('state', 'position'): state, status = self._calculate_states() if name == 'state': propagate_state = evt_type.priority else: propagate_state = 0 self.set_state(state, propagate=propagate_state) self.set_status(status, propagate=propagate_state)
[docs] def add_user_element(self, element, index=None): elem_type = element.get_type() if elem_type == ElementType.Motor: pass elif elem_type == ElementType.PseudoMotor: # TODO: make this happen pass else: raise Exception("element %s is not a motor" % element.name) PoolGroupElement.add_user_element(self, element, index=index)
# -------------------------------------------------------------------------- # position # --------------------------------------------------------------------------
[docs] def get_position_attribute(self): return self._position
[docs] def get_low_level_physical_position_attribute_iterator(self): return self.get_physical_elements_attribute_iterator()
[docs] def get_physical_position_attribute_iterator(self): return self.get_user_elements_attribute_iterator()
[docs] def get_physical_positions_attribute_sequence(self): return self.get_user_elements_attribute_sequence()
[docs] def get_physical_positions_attribute_map(self): return self.get_user_elements_attribute_map()
[docs] def get_position(self, cache: bool = True, propagate: int = 1) -> sardana.sardanaattribute.SardanaAttribute: """Returns the user position. :param cache: if ``True`` (default) return value in cache, otherwise read value from hardware :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority :return: the user position """ position = self._position position.update(cache=cache, propagate=propagate) return position
[docs] def set_position(self, positions: Sequence[float]) -> None: """Moves the motor group to the specified user positions :param positions: the user positions to move to """ self.start_move(positions)
[docs] def set_write_position(self, w_position: Sequence[float], timestamp: Any = None, propagate: int = 1) -> None: """Sets a new write value for the user position. :param w_position: the new write value for user position :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ self._position.set_write_value(w_position, timestamp=timestamp, propagate=propagate)
position = property(get_position, set_position, doc="motor group positions") # -------------------------------------------------------------------------- # default acquisition channel # --------------------------------------------------------------------------
[docs] def get_default_attribute(self): return self.get_position_attribute()
# -------------------------------------------------------------------------- # motion # --------------------------------------------------------------------------
[docs] def get_motion(self): return self.get_action_cache()
motion = property(get_motion, doc="motion object") # -------------------------------------------------------------------------- # motion calculation # --------------------------------------------------------------------------
[docs] def calculate_motion(self, new_positions, items=None): user_elements = self.get_user_elements() if items is None: items = {} calculated = {} for new_position, element in zip(new_positions, user_elements): calculated[element] = new_position # TODO: use Sardana attribute configuration and # get rid of accessing tango - see sardana-org/sardana#663 from sardana.tango.core.util import _check_attr_range try: _check_attr_range(element.name, "position", new_position) except ValueError as e: # TODO: don't concatenate exception message whenever # tango-controls/pytango#340 is fixed msg = "requested move of {} is outside of limits ({})".format( element.name, str(e) ) raise ValueError(msg) from e for new_position, element in zip(new_positions, user_elements): element.calculate_motion(new_position, items=items, calculated=calculated) return items
[docs] def start_move(self, new_position): self._in_start_move = True try: return self._start_move(new_position) finally: self._in_start_move = False
def _start_move(self, new_positions): self._stopped = False self._aborted = False self._released = False items = self.calculate_motion(new_positions) timestamp = time.time() for item, position_info in list(items.items()): item.set_write_position(position_info[0], timestamp=timestamp, propagate=0) if not self._simulation_mode: self.motion.run(items=items)