#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool libray. It defines the class for a
motion"""
__all__ = ["MotionState", "MotionMap", "PoolMotion", "PoolMotionItem"]
__docformat__ = 'restructuredtext'
import time
import functools
from taurus.core.util.log import DebugIt
from taurus.core.util.enumeration import Enumeration
from sardana import State
from sardana.pool.poolaction import ActionContext, PoolActionItem, PoolAction
#: enumeration representing possible motion states
MotionState = Enumeration("MotionSate", (
"Stopped",
# "StoppedOnError",
# "StoppedOnAbort",
"Moving",
"MovingBacklash",
"MovingInstability",
"Invalid"))
MS = MotionState
MovingStates = MS.Moving, MS.MovingBacklash, MS.MovingInstability
StoppedStates = MS.Stopped, # MS.StoppedOnError, MS.StoppedOnAbort
# MotionAction = Enumeration("MotionAction", ( \
# "StartMotion",
# "Finish",
# "Abort",
# "NoAction",
# "Invalid") )
#MA = MotionAction
MotionMap = {
# MS.Stopped : State.On,
MS.Moving: State.Moving,
MS.MovingBacklash: State.Moving,
MS.MovingInstability: State.Moving,
MS.Invalid: State.Invalid,
}
[docs]
class PoolMotionItem(PoolActionItem):
"""An item involved in the motion. Maps directly to a motor object"""
def __init__(self, moveable, position, dial_position, do_backlash,
backlash, instability_time=None):
PoolActionItem.__init__(self, moveable)
self.position = position
self.interrupted = False
self.dial_position = dial_position
self.do_backlash = do_backlash
self.backlash = backlash
self.instability_time = instability_time
self.old_motion_state = MS.Invalid
self.motion_state = MS.Stopped
self.start_time = None
self.stop_time = None
self.stop_final_time = None
self.old_state_info = State.Invalid, "Uninitialized", \
(False, False, False)
self.state_info = State.On, "Uninitialized", (False, False, False)
[docs]
def has_instability_time(self):
return self.instability_time is not None
[docs]
def in_motion(self):
return self.motion_state in MovingStates
[docs]
def get_moveable(self):
return self.element
moveable = property(fget=get_moveable)
[docs]
def get_state_info(self):
si = self.state_info
return MotionMap.get(self.motion_state, si[0]), si[1], si[2]
[docs]
def start(self, new_state):
self.old_state_info = self.state_info
self.state_info = new_state, self.state_info[1], self.state_info[2]
[docs]
def stopped(self, timestamp):
self.stop_time = timestamp
if self.instability_time is None:
self.stop_final_time = timestamp
new_ms = MS.Stopped
else:
new_ms = MS.MovingInstability
return new_ms
[docs]
def handle_instability(self, timestamp):
new_ms = MS.MovingInstability
dt = timestamp - self.stop_time
if dt >= self.instability_time:
self.stop_final_time = timestamp
new_ms = MS.Stopped
return new_ms
[docs]
def on_state_switch(self, state_info, timestamp=None):
if timestamp is None:
timestamp = time.time()
self.old_state_info = self.state_info
self.state_info = state_info
old_state = self.old_state_info[0]
state = state_info[0]
new_ms = ms = self.motion_state
moveable = self.moveable
self.interrupted = moveable.was_interrupted()
if self.interrupted:
if ms == MS.MovingInstability:
new_ms = self.handle_instability(timestamp)
elif state == State.Moving:
new_ms = MS.Moving
elif old_state == State.Moving:
new_ms = self.stopped(timestamp)
elif ms == MS.Stopped:
if state == State.Moving:
self.start_time = timestamp
new_ms = MS.Moving
elif ms == MS.Moving:
if state != State.Moving:
if self.do_backlash and state == State.On:
new_ms = MS.MovingBacklash
else:
new_ms = self.stopped(timestamp)
elif ms == MS.MovingBacklash:
if state != State.Moving:
new_ms = self.stopped(timestamp)
elif ms == MS.MovingInstability:
new_ms = self.handle_instability(timestamp)
self.old_motion_state, self.motion_state = ms, new_ms
return ms, new_ms
_NON_ERROR_STATES = State.On, State.Moving, State.Running
[docs]
class PoolMotion(PoolAction):
"""This class manages motion actions"""
def __init__(self, main_element, name="GlobalMotion"):
PoolAction.__init__(self, main_element, name)
self._motion_info = None
self._motion_sleep_time = None
self._nb_states_per_position = None
def _recover_start_error(self, ctrl, meth_name, read_state=False):
self.error("%s throws exception on %s. Stopping...", ctrl, meth_name)
self.debug("Details:", exc_info=1)
self.emergency_break()
if read_state:
states = {}
self.read_state_info(ret=states)
for moveable, state_info in list(states.items()):
state_info = moveable._from_ctrl_state_info(state_info)
moveable._set_state_info(state_info)
[docs]
def pre_start_all(self, pool_ctrls):
# PreStartAll on all controllers
for pool_ctrl in pool_ctrls:
try:
pool_ctrl.ctrl.PreStartAll()
except:
self._recover_start_error(pool_ctrl, "PreStartAll")
raise
[docs]
def pre_start_one(self, moveables, items):
# PreStartOne on all elements
for moveable in moveables:
pool_ctrl = moveable.controller
ctrl, axis = pool_ctrl.ctrl, moveable.axis
dial = items[moveable][1]
ret = ctrl.PreStartOne(axis, dial)
if not ret:
try:
msg = "%s.PreStartOne(%s(%d), %f) returns False" \
% (pool_ctrl.name, moveable.name, axis, dial)
raise Exception(msg)
except:
self._recover_start_error(pool_ctrl, "PreStartOne")
raise
[docs]
def start_one(self, moveables, motion_info):
# StartOne on all elements
for moveable in moveables:
pool_ctrl = moveable.controller
ctrl = pool_ctrl.ctrl
axis = moveable.axis
dial_position = motion_info[moveable].dial_position
try:
ctrl.StartOne(axis, dial_position)
except:
self._recover_start_error(pool_ctrl, "StartOne")
raise
[docs]
def start_all(self, pool_ctrls, moveables, motion_info):
# Change the state to Moving
for moveable in moveables:
moveable_info = motion_info[moveable]
state_info = moveable._from_ctrl_state_info(((State.Moving, None), None))
moveable.set_state_info(state_info, propagate=2)
state_info = moveable.inspect_state(), \
moveable.inspect_status(), \
moveable.inspect_limit_switches()
moveable_info.on_state_switch(state_info)
# StartAll on all controllers
for pool_ctrl in pool_ctrls:
try:
pool_ctrl.ctrl.StartAll()
except:
self._recover_start_error(pool_ctrl, "StartAll",
read_state=True)
raise
[docs]
def start_action(self, *args, **kwargs):
"""kwargs['items'] is a dict<moveable, (pos, dial, do_backlash, backlash)
"""
items = kwargs.pop("items")
pool = self.pool
# prepare data structures
self._aborted = False
self._stopped = False
self._motion_sleep_time = kwargs.pop("motion_sleep_time",
pool.motion_loop_sleep_time)
self._nb_states_per_position = \
kwargs.pop("nb_states_per_position",
pool.motion_loop_states_per_position)
self._motion_info = motion_info = {}
for moveable, motion_data in list(items.items()):
it = moveable.instability_time
motion_info[moveable] = PoolMotionItem(moveable, *motion_data,
instability_time=it)
pool_ctrls = self.get_pool_controller_list()
moveables = self.get_elements()
with ActionContext(self):
self.pre_start_all(pool_ctrls)
self.pre_start_one(moveables, items)
self.start_one(moveables, motion_info)
self.start_all(pool_ctrls, moveables, motion_info)
[docs]
def backlash_item(self, motion_item):
moveable = motion_item.moveable
controller = moveable.controller
axis = moveable.axis
position = motion_item.backlash
try:
controller.move({axis: position})
except:
self.warning("could not start backlash on %s", moveable.name,
exc_info=1)
[docs]
@DebugIt()
def action_loop(self):
i = 0
states, positions = {}, {}
for k in self.get_elements():
states[k] = None
positions[k] = None
nap = self._motion_sleep_time
nb_states_per_pos = self._nb_states_per_position
motion_info = self._motion_info
emergency_stop = set()
# read positions to send a first event when starting to move
# with ActionContext(self) as context:
# positions = self.raw_read_dial_position()
# position_error_occured = self._position_error_occured(positions)
# if position_error_occured:
# self.error("Pre-loop read position error")
# self.emergency_break()
# for moveable, position_info in positions.items():
# if position_info[1] is not None:
# self.error("%s error reading position",moveable.name)
# self.debug("Details:", exc_info=position_info[1])
#
# # send position
# for moveable, position_info in positions.items():
# moveable.put_dial_position(position_info, propagate=2)
# position = moveable.get_position(propagate=0)
# if position.in_error():
# emergency_stop.add(moveable)
#
# # send state
# self.read_state_info(ret=states)
# for moveable, state_info in states.items():
# state_info = moveable._from_ctrl_state_info(state_info)
# state_info = list(state_info)
# state_info[0] = State.Fault
# moveable.set_state_info(state_info, propagate=2)
# return
# else:
# for moveable, position_info in positions.items():
# moveable.put_dial_position(position_info)
while True:
self.read_state_info(ret=states)
state_error_occured = self._state_error_occured(states)
timestamp = time.time()
in_motion = False
for moveable, state_info in list(states.items()):
motion_item = motion_info[moveable]
state_info = moveable._from_ctrl_state_info(state_info)
state, status, limit_switches = state_info
old_motion_state, motion_state = \
motion_item.on_state_switch(state_info,
timestamp=timestamp)
real_state_info = motion_item.get_state_info()
moving = motion_item.in_motion()
# Something wrong happened: Stop all motors and report
if state_error_occured:
in_motion = False
emergency_stop.add(moveable)
start_backlash = motion_state == MS.MovingBacklash and \
old_motion_state != MS.MovingBacklash
start_instability = motion_state == MS.MovingInstability and \
old_motion_state != MS.MovingInstability
stopped_now = not moving and old_motion_state in MovingStates
# make sure the state is placed in the motor
moveable.put_state_info(real_state_info)
if emergency_stop:
continue
# if motor stopped 'well' and there is a backlash to do...
if start_backlash:
moveable.debug("Starting backlash")
# make sure the last position after the first motion is
# sent before starting the backlash motion
moveable.get_position(cache=False, propagate=2)
self.backlash_item(motion_item)
moving = motion_item.in_motion()
elif start_instability:
moveable.debug("Starting to wait for instability")
# make sure the last position after the first motion is
# sent before starting the backlash motion
moveable.get_position(cache=False, propagate=2)
elif stopped_now:
moveable.debug("Stopped")
# try to read a last position to force an event
moveable.get_position(cache=False, propagate=2)
# free the motor from the OperationContext so we can send
# a state event. Otherwise we may be asked to move the
# motor again which would result in an exception saying
# that the motor is already involved in an operation
# ... but before protect the motor so that the monitor
# doesn't come in between the two instructions below and
# send a state event on it's own
set_state_info = functools.partial(moveable.set_state_info,
state_info,
propagate=2,
safe=True)
self.add_finish_hook(set_state_info, False)
# Then update the state
if not stopped_now:
moveable.set_state_info(real_state_info, propagate=1)
if moving:
in_motion = True
if not in_motion:
if state_error_occured:
self.error("Loop read state error")
self.emergency_break()
# send positions
self.read_dial_position(ret=positions)
position_error_occured = self._position_error_occured(
positions)
if position_error_occured:
self.error("Loop final read position error. "
"Retrying...")
self.read_dial_position(ret=positions)
position_error_occured = self._position_error_occured(
positions)
if position_error_occured:
self.error("Loop final read position error 2. "
"Cannot send final position event!!!")
for moveable, position_info in list(positions.items()):
moveable.put_dial_position(position_info, propagate=2)
# send state
for moveable in states:
if moveable not in emergency_stop:
moveable.get_state(cache=False, propagate=2)
else:
motion_item = motion_info[moveable]
real_state_info = motion_item.get_state_info()
moveable.set_state_info(real_state_info,
propagate=2)
break
# read position every n times
if not i % nb_states_per_pos:
self.read_dial_position(ret=positions)
# send position
for moveable, position_value in list(positions.items()):
if position_value.error:
self.error("Loop read position error for %s" %
moveable.name)
moveable.put_dial_position(position_value)
i += 1
time.sleep(nap)
def _state_error_occured(self, d):
for _, (state_info, exc_info) in list(d.items()):
state = state_info[0]
if exc_info is not None or state not in _NON_ERROR_STATES:
return True
return False
def _position_error_occured(self, positions):
for _, value in list(positions.items()):
if value.error:
return True
def _recover_moving_error(self, location, emergency_stop):
emergency_names = [moveable.name for moveable in emergency_stop]
self.error("%s: error on %s", location, emergency_names)
# stop everything
self.emergency_break()
def _recover_position_moving_error(self, location, emergency_stop,
positions):
self._recover_moving_error(location, emergency_stop)
# send state
states = {}
self.read_state_info(ret=states)
motion_info = self._motion_info
for moveable in states:
if moveable not in emergency_stop:
moveable.get_state(cache=False, propagate=2)
else:
motion_item = motion_info[moveable]
real_state_info = motion_item.get_state_info()
moveable.set_state_info(real_state_info, propagate=2)
def _recover_state_moving_error(self, location, emergency_stop, states):
self._recover_moving_error(location, emergency_stop)
# send positions
positions = {}
self.read_dial_position(ret=positions)
for moveable, position_info in list(positions.items()):
moveable.put_dial_position(position_info, propagate=2)
motion_info = self._motion_info
# send state
for moveable in states:
if moveable not in emergency_stop:
moveable.get_state(cache=False, propagate=2)
else:
motion_item = motion_info[moveable]
real_state_info = motion_item.get_state_info()
moveable.set_state_info(real_state_info, propagate=2)
[docs]
def read_dial_position(self, ret=None, serial=False):
return self.read_value(ret=ret, serial=serial)
[docs]
def raw_read_dial_position(self, ret=None, serial=False):
return self.raw_read_value(ret=ret, serial=serial)