#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
""" """
__all__ = ["MotorGroup", "MotorGroupClass"]
__docformat__ = 'restructuredtext'
import sys
import time
from PyTango import Except, DevState, DevDouble, AttrQuality, READ_WRITE, \
SPECTRUM
from taurus.core.util.log import DebugIt
from sardana import State, SardanaServer
from sardana.sardanaattribute import SardanaAttribute
from sardana.pool.poolexception import PoolException
from sardana.tango.core.util import exception_str, throw_sardana_exception
from sardana.tango.pool.PoolDevice import PoolGroupDevice, PoolGroupDeviceClass
[docs]
class MotorGroup(PoolGroupDevice):
def __init__(self, dclass, name):
self.in_write_position = False
PoolGroupDevice.__init__(self, dclass, name)
[docs]
def init(self, name):
PoolGroupDevice.init(self, name)
def _is_allowed(self, req_type):
return PoolGroupDevice._is_allowed(self, req_type)
[docs]
def get_motor_group(self):
return self.element
[docs]
def set_motor_group(self, motor_group):
self.element = motor_group
motor_group = property(get_motor_group, set_motor_group)
[docs]
@DebugIt()
def delete_device(self):
PoolGroupDevice.delete_device(self)
motor_group = self.motor_group
if motor_group is not None:
motor_group.remove_listener(self.on_motor_group_changed)
self.motor_group = None
[docs]
@DebugIt()
def init_device(self):
PoolGroupDevice.init_device(self)
detect_evts = "position",
non_detect_evts = "elementlist",
self.set_change_events(detect_evts, non_detect_evts)
if self.pool.use_numeric_element_ids:
self.Elements = list(map(int, self.Elements))
motor_group = self.motor_group
if motor_group is None:
full_name = self.get_full_name()
name = self.alias or full_name
self.motor_group = motor_group = \
self.pool.create_motor_group(name=name, id=self.Id,
full_name=full_name, user_elements=self.Elements)
motor_group.add_listener(self.on_motor_group_changed)
self.set_state(DevState.ON)
[docs]
def on_motor_group_changed(self, event_source, event_type, event_value):
try:
self._on_motor_group_changed(event_source, event_type, event_value)
except:
msg = 'Error occurred "on_motor_group_changed(%s.%s): %s"'
exc_info = sys.exc_info()
self.error(msg, self.motor_group.name, event_type.name,
exception_str(*exc_info[:2]))
self.debug("Details", exc_info=exc_info)
def _on_motor_group_changed(self, event_source, event_type, event_value):
# during server startup and shutdown avoid processing element
# creation events
if SardanaServer.server_state != State.Running:
return
timestamp = time.time()
name = event_type.name.lower()
attr = self.get_attribute_by_name(name)
quality = AttrQuality.ATTR_VALID
priority = event_type.priority
value, w_value, error = None, None, None
if name == "state":
value = self.calculate_tango_state(event_value)
elif name == "status":
value = self.calculate_tango_status(event_value)
else:
if isinstance(event_value, SardanaAttribute):
if event_value.error:
error = Except.to_dev_failed(*event_value.exc_info)
else:
value = event_value.value
timestamp = event_value.timestamp
else:
value = event_value
state = self.motor_group.get_state(propagate=0)
if name == "position":
w_value = event_value.w_value
if state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
self.set_attribute(attr, value=value, w_value=w_value,
timestamp=timestamp, quality=quality,
priority=priority, error=error, synch=False)
[docs]
def always_executed_hook(self):
pass
#state = to_tango_state(self.motor_group.get_state(cache=False))
def read_attr_hardware(self, data):
pass
def _to_motor_positions(self, pos):
positions = []
for elem in self.motor_group.get_user_elements():
position = pos[elem]
if position.in_error():
Except.throw_python_exception(*position.exc_info)
positions.append(position.value)
return positions
def _to_motor_write_positions(self, pos):
w_positions = []
for elem in self.motor_group.get_user_elements():
position = pos[elem]
if position.in_error():
Except.throw_python_exception(*position.exc_info)
w_positions.append(position.w_value)
return w_positions
[docs]
def read_Position(self, attr):
# if motors are moving their position is already being updated with a
# high frequency so don't bother overloading and just get the cached
# values
motor_group = self.motor_group
use_cache = motor_group.is_in_operation() and not self.Force_HW_Read
position = motor_group.get_position(cache=use_cache, propagate=0)
if position.error:
Except.throw_python_exception(*position.exc_info)
state = motor_group.get_state(cache=use_cache, propagate=0)
quality = None
if state == State.Moving:
quality = AttrQuality.ATTR_CHANGING
self.set_attribute(attr, value=position.value, w_value=position.w_value,
quality=quality, priority=0,
timestamp=position.timestamp)
[docs]
def write_Position(self, attr):
self.in_write_position = True
try:
position = attr.get_write_value()
self.debug("write_Position(%s)", position)
try:
self.wait_for_operation()
except:
raise Exception("Cannot move: already in motion")
try:
self.motor_group.position = position
except PoolException as pe:
throw_sardana_exception(pe)
finally:
self.in_write_position = False
is_Position_allowed = _is_allowed
[docs]
class MotorGroupClass(PoolGroupDeviceClass):
# Class Properties
class_property_list = {
}
# Device Properties
device_property_list = {
}
device_property_list.update(PoolGroupDeviceClass.device_property_list)
# Command definitions
cmd_list = {
}
cmd_list.update(PoolGroupDeviceClass.cmd_list)
# Attribute definitions
attr_list = {
'Position': [[DevDouble, SPECTRUM, READ_WRITE, 4096], ],
}
attr_list.update(PoolGroupDeviceClass.attr_list)
def _get_class_properties(self):
ret = PoolGroupDeviceClass._get_class_properties(self)
ret['Description'] = "Motor group device class"
ret['InheritedFrom'].insert(0, 'PoolGroupDevice')
return ret