#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool libray. It defines the base classes
for"""
__all__ = ["PoolMotor"]
__docformat__ = 'restructuredtext'
import time
import math
from typing import Any, Optional
import sardana
from sardana import EpsilonError, State, ElementType
from sardana.sardanaattribute import SardanaAttribute, ScalarNumberAttribute, \
SardanaSoftwareAttribute
from sardana.sardanaevent import EventType
from sardana.sardanautils import assert_type, is_number, py2_round
from sardana.pool.poolelement import PoolElement
from sardana.pool.poolmotion import PoolMotion, MotionState
class Position(SardanaAttribute):
def __init__(self, *args, **kwargs):
super(Position, self).__init__(*args, **kwargs)
self.get_offset().add_listener(self.on_change)
self.get_sign().add_listener(self.on_change)
self.get_dial().add_listener(self.on_change)
def get_dial(self):
return self.obj.get_dial_position_attribute()
def get_offset(self):
return self.obj.get_offset_attribute()
def get_sign(self):
return self.obj.get_sign_attribute()
def _in_error(self):
return self.get_dial().in_error()
def _has_value(self):
return self.get_dial().has_value()
def _has_write_value(self):
return self.get_dial().has_write_value()
def _get_value(self):
return self.calc_position()
def _get_write_value(self):
dial = self.get_dial().get_write_value()
return self.calc_position(dial=dial)
def _set_value(self, value, exc_info=None, timestamp=None, propagate=1):
raise Exception("Cannot set position value for %s" % self.obj.name)
def _set_write_value(self, w_value, timestamp=None, propagate=1):
# let the write value be stored by dial using the current offset and
# sign. This way, retrieving the write value is done in reverse applying
# the offset and sign in use at that time
w_dial = self.calc_dial_position(w_value)
self.get_dial().set_write_value(w_dial, timestamp=timestamp, propagate=0)
self.fire_write_event(propagate=propagate)
def _get_exc_info(self):
return self.get_dial().get_exc_info()
def _get_timestamp(self):
return self.get_dial().get_timestamp()
def _get_write_timestamp(self):
return self.get_dial().get_write_timestamp()
def calc_position(self, dial: Optional[float] = None) -> Any:
"""Returns the computed position from last the dial position from the
given parameter or (if None), the last dial position obtained from
hardware read.
:param dial: the new dial position [default: None, meaning use the
current dial position.
:return: the computed user position
:raises:
:exc:`Exception` if dial_position is None and no read value has
been set yet"""
obj = self.obj
if dial is None:
dial_attr = obj.dial_position
if dial_attr.in_error():
raise dial_attr.exc_info[1]
dial = dial_attr.value
if not is_number(dial):
raise Exception("Controller returns not a number %s" % dial)
sign, offset = obj.sign.value, obj.offset.value
return sign * dial + offset
def calc_dial_position(self, position: Optional[float] = None) -> Any:
"""Returns the dial position for the given position. If position is
not given (or is None) it uses this object's *write* value.
:param position:
the position to be converted to dial [default: None meaning use the
this attribute's *write* value
:return: the computed dial position
"""
obj = self.obj
if position is None:
position = self.w_value
sign, offset = obj.sign.value, obj.offset.value
return (position - offset) / sign
def calc_motion(self, new_position):
"""Calculate the motor position, dial position, backlash for the
given final position."""
obj = self.obj
ctrl = obj.controller
step_per_unit = obj._step_per_unit
backlash = obj._backlash
# compute dial position
new_dial = self.calc_dial_position(new_position)
# add backlash if necessary
do_backlash = False
if obj.has_backlash() and not ctrl.has_backlash():
dial_attr = self.get_dial()
if dial_attr.in_error():
raise dial_attr.get_exc_info()[1]
old_dial = dial_attr.value
displacement = new_dial - old_dial
if math.fabs(displacement) > EpsilonError:
positive_displacement = displacement > 0
positive_backlash = self.is_backlash_positive()
do_backlash = positive_backlash != positive_displacement
if do_backlash:
new_dial = new_dial - backlash / step_per_unit
# compute a rounding value if necessary
if ctrl.wants_rounding():
# TODO: check if round would be fine
nb_step = py2_round(new_dial * step_per_unit)
new_dial = nb_step / step_per_unit
backlash_position = new_dial
if do_backlash:
backlash_position = new_dial + backlash / step_per_unit
return new_position, new_dial, do_backlash, backlash_position
def on_change(self, evt_src, evt_type, evt_value):
self.fire_read_event(propagate=evt_type.priority)
def update(self, cache=True, propagate=1):
self.get_dial().update(cache=cache, propagate=propagate)
class DialPosition(ScalarNumberAttribute):
def update(self, cache=True, propagate=1):
if not cache or not self.has_value():
dial_position_value = self.obj.read_dial_position()
self.set_value(dial_position_value, propagate=propagate)
class LimitSwitches(ScalarNumberAttribute):
pass
class Offset(SardanaSoftwareAttribute):
pass
class Sign(SardanaSoftwareAttribute):
pass
[docs]
class PoolMotor(PoolElement):
"""An internal Motor object. **NOT** part of the official API. Accessing
this object from a controller plug-in may lead to undetermined behavior
like infinite recursion."""
def __init__(self, **kwargs):
kwargs['elem_type'] = ElementType.Motor
PoolElement.__init__(self, **kwargs)
on_change = self.on_change
self._offset = Offset(self, initial_value=0, listeners=on_change)
self._sign = Sign(self, initial_value=1, listeners=on_change)
self._dial_position = DialPosition(self, listeners=on_change)
self._position = Position(self, listeners=on_change)
self._backlash = 0
self._step_per_unit = 1.0
self._limit_switches = LimitSwitches(self, name="Limit_switches",
initial_value=3 * (False,),
listeners=on_change)
self._acceleration = None
self._deceleration = None
self._velocity = None
self._base_rate = None
self._instability_time = None
self._in_start_move = False
motion_name = "%s.Motion" % self._name
self.set_action_cache(PoolMotion(self, motion_name))
# -------------------------------------------------------------------------
# Event forwarding
# -------------------------------------------------------------------------
[docs]
def on_change(self, evt_src, evt_type, evt_value):
# forward all events coming from attributes to the listeners
self.fire_event(evt_type, evt_value)
# -------------------------------------------------------------------------
# state information
# -------------------------------------------------------------------------
def _from_ctrl_state_info(self, state_info):
state_info, _ = state_info
if len(state_info) > 2:
state, status, ls = state_info[:3]
else:
state, other = state_info[:2]
if is_number(other):
ls, status = other, None
else:
ls, status = 0, other
state, ls = int(state), tuple(map(bool, (ls & 1, ls & 2, ls & 4)))
return state, status, ls
def _set_state_info(self, state_info, propagate=1):
PoolElement._set_state_info(self, state_info, propagate=propagate)
ls = state_info[-1]
if self._sign.value < 0:
ls = ls[0], ls[2], ls[1]
self._set_limit_switches(ls, propagate=propagate)
# -------------------------------------------------------------------------
# state information
# -------------------------------------------------------------------------
_STD_STATUS = "{name} is {state}{limit_switches}"
[docs]
def calculate_state_info(self, state_info=None):
if state_info is None:
state = self._state
status = self._status
ls = self._limit_switches.value
else:
state, status, ls = state_info
if state == State.On:
state_str = "Stopped"
elif state == State.Moving:
state_str = "Moving"
motion = self.get_operation()
if motion is None:
state_str += " (external)"
else:
motion_state = motion._motion_info[self].motion_state
if motion_state == MotionState.MovingBacklash:
state_str += " (backlash)"
elif motion_state == MotionState.MovingInstability:
state_str += " (instability)"
else:
state_str = "in " + State[state]
limit_switches = ""
if ls[0]:
limit_switches += ". Hit home switch"
if ls[1]:
limit_switches += ". Hit upper switch"
if ls[2]:
limit_switches += ". Hit lower switch"
new_status = self._STD_STATUS.format(name=self.name, state=state_str,
limit_switches=limit_switches)
if status is not None and len(status) > 0:
new_status += "\n{}".format(status) # append ctrl status
state, new_status = self._calculate_init_attr_state_info(
state, new_status
)
return state, new_status, ls
# -------------------------------------------------------------------------
# limit switches
# -------------------------------------------------------------------------
[docs]
def inspect_limit_switches(self):
"""returns the current (cached value of the limit switches
:return: the current limit switches flags"""
return self._limit_switches
[docs]
def get_limit_switches(self, cache: bool = True, propagate: int = 1) -> sardana.sardanaattribute.SardanaAttribute:
"""Returns the motor limit switches state.
:param cache:
if ``True`` (default) return value in cache, otherwise read value
from hardware
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:return:
the motor limit switches state
"""
self.get_state(cache=cache, propagate=propagate)
return self._limit_switches
[docs]
def set_limit_switches(self, ls, propagate=1):
self._set_limit_switches(ls, propagate=propagate)
[docs]
def put_limit_switches(self, ls, propagate=1):
self._limit_switches = tuple(ls)
def _set_limit_switches(self, ls, propagate=1):
self._limit_switches.set_value(tuple(ls), propagate=propagate)
limit_switches = property(get_limit_switches, set_limit_switches,
doc="motor limit switches")
# -------------------------------------------------------------------------
# instability time
# -------------------------------------------------------------------------
[docs]
def has_instability_time(self, cache=True):
it = self._instability_time
return it is not None and it > 0.0
[docs]
def get_instability_time(self, cache=True):
return self._instability_time
[docs]
def set_instability_time(self, instability_time, propagate=1):
self._instability_time = instability_time
if propagate > 0:
self.fire_event(EventType("instability_time", priority=propagate),
instability_time)
instability_time = property(get_instability_time, set_instability_time,
doc="motor instability time")
# -------------------------------------------------------------------------
# backlash
# -------------------------------------------------------------------------
[docs]
def has_backlash(self, cache=True):
return self._backlash != 0
[docs]
def is_backlash_positive(self, cache=True):
return self._backlash > 0
[docs]
def is_backlash_negative(self, cache=True):
return self._backlash < 0
[docs]
def get_backlash(self, cache=True):
return self._backlash
[docs]
def set_backlash(self, backlash, propagate=1):
self._backlash = backlash
if propagate > 0:
self.fire_event(
EventType("backlash", priority=propagate), backlash)
backlash = property(get_backlash, set_backlash, doc="motor backlash")
# -------------------------------------------------------------------------
# offset
# -------------------------------------------------------------------------
[docs]
def get_offset_attribute(self):
return self._offset
[docs]
def get_offset(self, cache=True):
return self._offset
[docs]
def set_offset(self, offset, propagate=1):
self._offset.set_value(offset, propagate=propagate)
self.debug("Set attribute offset = {}".format(offset))
offset = property(get_offset, set_offset, doc="motor offset")
# -------------------------------------------------------------------------
# sign
# -------------------------------------------------------------------------
[docs]
def get_sign_attribute(self):
return self._sign
[docs]
def get_sign(self, cache=True):
return self._sign
[docs]
def set_sign(self, sign, propagate=1):
assert sign in (-1, 1), \
"sign must be either -1 or 1 (not {})".format(sign)
old_sign = self._sign.value
self._sign.set_value(sign, propagate=propagate)
# invert lower with upper limit switches and send event in case of
# change
ls = self._limit_switches
if old_sign != sign and ls.has_value():
value = ls.value
value = value[0], value[2], value[1]
self._set_limit_switches(value, propagate=propagate)
self.debug("Set attribute sign = {}".format(sign))
sign = property(get_sign, set_sign, doc="motor sign")
# -------------------------------------------------------------------------
# step per unit
# -------------------------------------------------------------------------
[docs]
def get_step_per_unit(self, cache=True, propagate=1):
if not cache or self._step_per_unit is None:
step_per_unit = self.read_step_per_unit()
self._set_step_per_unit(step_per_unit, propagate=propagate)
return self._step_per_unit
[docs]
def set_step_per_unit(self, step_per_unit, propagate=1):
if step_per_unit <= 0.0:
raise Exception("Step per unit must be > 0.0")
self.controller.set_axis_par(self.axis, "step_per_unit", step_per_unit)
self._set_step_per_unit(step_per_unit, propagate=propagate)
self.debug("Set attribute step_per_unit = {}".format(step_per_unit))
def _set_step_per_unit(self, step_per_unit, propagate=1):
self._step_per_unit = step_per_unit
if propagate:
self.fire_event(EventType("step_per_unit",
priority=propagate), step_per_unit)
# force ask controller for new position to send priority event
self.get_position(cache=False, propagate=2)
[docs]
def read_step_per_unit(self):
step_per_unit = self.controller.get_axis_par(
self.axis, "step_per_unit")
assert_type(float, step_per_unit)
return step_per_unit
step_per_unit = property(get_step_per_unit, set_step_per_unit,
doc="motor steps per unit")
# -------------------------------------------------------------------------
# acceleration
# -------------------------------------------------------------------------
[docs]
def get_acceleration(self, cache=True, propagate=1):
if not cache or self._acceleration is None:
acceleration = self.read_acceleration()
self._set_acceleration(acceleration, propagate=propagate)
return self._acceleration
[docs]
def set_acceleration(self, acceleration, propagate=1):
self.controller.set_axis_par(self.axis, "acceleration", acceleration)
self._set_acceleration(acceleration, propagate=propagate)
def _set_acceleration(self, acceleration, propagate=1):
self._acceleration = acceleration
if not propagate:
return
self.fire_event(
EventType("acceleration", priority=propagate), acceleration)
[docs]
def read_acceleration(self):
acceleration = self.controller.get_axis_par(self.axis, "acceleration")
assert_type(float, acceleration)
return acceleration
acceleration = property(get_acceleration, set_acceleration,
doc="motor acceleration")
# -------------------------------------------------------------------------
# deceleration
# -------------------------------------------------------------------------
[docs]
def get_deceleration(self, cache=True, propagate=1):
if not cache or self._deceleration is None:
deceleration = self.read_deceleration()
self._set_deceleration(deceleration, propagate=propagate)
return self._deceleration
[docs]
def set_deceleration(self, deceleration, propagate=1):
self.controller.set_axis_par(self.axis, "deceleration", deceleration)
self._set_deceleration(deceleration, propagate=propagate)
def _set_deceleration(self, deceleration, propagate=1):
self._deceleration = deceleration
if not propagate:
return
self.fire_event(
EventType("deceleration", priority=propagate), deceleration)
[docs]
def read_deceleration(self):
deceleration = self.controller.get_axis_par(self.axis, "deceleration")
assert_type(float, deceleration)
return deceleration
deceleration = property(get_deceleration, set_deceleration,
doc="motor deceleration")
# -------------------------------------------------------------------------
# base_rate
# -------------------------------------------------------------------------
[docs]
def get_base_rate(self, cache=True, propagate=1):
if not cache or self._base_rate is None:
base_rate = self.read_base_rate()
self._set_base_rate(base_rate, propagate=propagate)
return self._base_rate
[docs]
def set_base_rate(self, base_rate, propagate=1):
self.controller.set_axis_par(self.axis, "base_rate", base_rate)
self._set_base_rate(base_rate, propagate=propagate)
def _set_base_rate(self, base_rate, propagate=1):
self._base_rate = base_rate
if not propagate:
return
self.fire_event(EventType("base_rate", priority=propagate), base_rate)
[docs]
def read_base_rate(self):
base_rate = self.controller.get_axis_par(self.axis, "base_rate")
assert_type(float, base_rate)
return base_rate
base_rate = property(get_base_rate, set_base_rate,
doc="motor base rate")
# -------------------------------------------------------------------------
# velocity
# -------------------------------------------------------------------------
[docs]
def get_velocity(self, cache=True, propagate=1):
if not cache or self._velocity is None:
velocity = self.read_velocity()
self._set_velocity(velocity, propagate=propagate)
return self._velocity
[docs]
def set_velocity(self, velocity, propagate=1):
self.controller.set_axis_par(self.axis, "velocity", velocity)
self._set_velocity(velocity, propagate=propagate)
def _set_velocity(self, velocity, propagate=1):
self._velocity = velocity
if not propagate:
return
self.fire_event(EventType("velocity", priority=propagate), velocity)
[docs]
def read_velocity(self):
velocity = self.controller.get_axis_par(self.axis, "velocity")
assert_type(float, velocity)
return velocity
velocity = property(get_velocity, set_velocity,
doc="motor velocity")
# -------------------------------------------------------------------------
# position & dial position
# -------------------------------------------------------------------------
[docs]
def define_position(self, position):
dial = self.get_position_attribute().calc_dial_position(position)
self.controller.define_position(self.axis, dial)
# force an event with the new position
self.get_position(cache=False, propagate=2)
[docs]
def get_position_attribute(self) -> sardana.sardanaattribute.SardanaAttribute:
"""Returns the position attribute object for this motor
:return: the position attribute
"""
return self._position
[docs]
def get_position(self, cache: bool = True, propagate: int = 1) -> sardana.sardanaattribute.SardanaAttribute:
"""Returns the user position.
:param cache:
if ``True`` (default) return value in cache, otherwise read value
from hardware
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:return:
the user position
"""
position = self._position
position.update(cache=cache, propagate=propagate)
return position
[docs]
def set_position(self, position: float) -> None:
"""Moves the motor to the specified user position
:param position:
the user position to move to
"""
self.start_move(position)
[docs]
def set_write_position(self, w_position: float, timestamp: Any = None, propagate: int = 1) -> None:
"""Sets a new write value for the user position.
:param w_position:
the new write value for user position
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
"""
self._position.set_write_value(w_position, timestamp=timestamp,
propagate=propagate)
[docs]
def read_dial_position(self) -> sardana.sardanavalue.SardanaValue:
"""Reads the dial position from hardware.
:return:
a :class:`~sardana.sardanavalue.SardanaValue` containing the dial
position
"""
return self.motion.read_dial_position(serial=True)[self]
[docs]
def put_dial_position(self, dial_position_value: sardana.sardanavalue.SardanaValue, propagate: int = 1) -> DialPosition:
"""Sets a new dial position.
:param dial_position_value:
the new dial position value
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
"""
dp = self._dial_position
dp.set_value(dial_position_value, propagate=propagate)
return dp
[docs]
def get_dial_position_attribute(self) -> sardana.sardanaattribute.SardanaAttribute:
"""Returns the dial position attribute object for this motor
:return: the dial position attribute
"""
return self._dial_position
[docs]
def get_dial_position(self, cache: bool = True, propagate: int = 1) -> sardana.sardanaattribute.SardanaAttribute:
"""Returns the dial position.
:param cache:
if ``True`` (default) return value in cache, otherwise read value
from hardware
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:return:
the dial position
"""
dp = self._dial_position
dp.update(cache=cache, propagate=propagate)
return dp
position = property(get_position, set_position, doc="motor user position")
dial_position = property(get_dial_position, doc="motor dial position")
# -------------------------------------------------------------------------
# default acquisition channel
# -------------------------------------------------------------------------
[docs]
def get_default_attribute(self):
return self.get_position_attribute()
# -------------------------------------------------------------------------
# motion
# -------------------------------------------------------------------------
[docs]
def get_motion(self):
return self.get_action_cache()
motion = property(get_motion, doc="motion object")
# -------------------------------------------------------------------------
# motion calculation
# -------------------------------------------------------------------------
[docs]
def calculate_motion(self, new_position, items=None, calculated=None):
"""Calculate the motor position, dial position, backlash for the
given final position. Items specifies the where to put the calculated
values, calculated is not used by physical motors"""
step_per_unit = self._step_per_unit
backlash = self._backlash
ctrl = self.controller
pos_attr = self.get_position_attribute()
# compute dial position
new_dial = pos_attr.calc_dial_position(new_position)
# add backlash if necessary
do_backlash = False
if self.has_backlash() and not ctrl.has_backlash():
dial_attr = self.get_dial_position_attribute()
if dial_attr.in_error():
raise dial_attr.get_exc_info()[1]
dial_attr.update()
old_dial = dial_attr.value
displacement = new_dial - old_dial
if math.fabs(displacement) > EpsilonError:
positive_displacement = displacement > 0
positive_backlash = self.is_backlash_positive()
do_backlash = positive_backlash != positive_displacement
if do_backlash:
new_dial = new_dial - backlash / step_per_unit
# compute a rounding value if necessary
if ctrl.wants_rounding():
# TODO: check if round would be fine
nb_step = py2_round(new_dial * step_per_unit)
new_dial = nb_step / step_per_unit
backlash_position = new_dial
if do_backlash:
backlash_position = new_dial + backlash / step_per_unit
if items is None:
items = {}
items[self] = new_position, new_dial, do_backlash, backlash_position
return items
[docs]
def start_move(self, new_position):
self._in_start_move = True
try:
return self._start_move(new_position)
finally:
self._in_start_move = False
def _start_move(self, new_position):
if not self._simulation_mode:
# update the dial value from the controller in case motor has been
# moved outside sardana.
# TODO: also update step_per_unit
self.get_dial_position_attribute().update(cache=False, propagate=1)
# calculate the motion (dial position, backlash, etc)
items = self.calculate_motion(new_position)
self.debug("Start motion user=%f, dial=%f, backlash? %s, "
"dial_backlash=%f", *items[self])
timestamp = time.time()
# update the write position
self.set_write_position(
items[self][0], timestamp=timestamp, propagate=0)
# move!
self.motion.run(items=items)