#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""Generic Tango Pool Device base classes"""
__all__ = ["PoolDevice", "PoolDeviceClass",
"PoolElementDevice", "PoolElementDeviceClass",
"PoolExpChannelDevice", "PoolExpChannelDeviceClass",
"PoolGroupDevice", "PoolGroupDeviceClass"]
__docformat__ = 'restructuredtext'
import time
from copy import deepcopy
from typing import Any, Sequence, Union, Tuple
import PyTango
from PyTango import Util, DevVoid, DevLong64, DevBoolean, DevString,\
DevDouble, DevEncoded, DevVarStringArray, DispLevel, DevState, SCALAR, \
SPECTRUM, IMAGE, READ_WRITE, READ, AttrData, CmdArgType, DevFailed,\
seqStr_2_obj, Except, ErrSeverity
import taurus
from taurus.core.util.containers import CaselessDict
from taurus.core.util.codecs import CodecFactory
import sardana
from sardana import InvalidId, InvalidAxis, ElementType
from sardana import sardanacustomsettings
from sardana.pool.poolmetacontroller import DataInfo
from sardana.tango.core.SardanaDevice import SardanaDevice, SardanaDeviceClass
from sardana.tango.core.util import GenericScalarAttr, GenericSpectrumAttr, \
GenericImageAttr, to_tango_attr_info
[docs]
class PoolDevice(SardanaDevice):
"""Base Tango Pool device class"""
#: list of extreme error states
ExtremeErrorStates = DevState.FAULT, DevState.UNKNOWN
#: list of busy states
BusyStates = DevState.MOVING, DevState.RUNNING
#: Maximum number of retries in a busy state
BusyRetries = 3
def __init__(self, dclass, name):
"""Constructor"""
SardanaDevice.__init__(self, dclass, name)
[docs]
def init(self, name: str):
"""initialize the device once in the object lifetime. Override when
necessary but **always** call the method from your super class
:param name: device name"""
SardanaDevice.init(self, name)
util = Util.instance()
self._pool_device = util.get_device_list_by_class("Pool")[0]
self._element = None
[docs]
def get_id(self) -> Union[str, int]:
"""Returns the element id from database
(name or numeric id).
:return: the element id
"""
if self.pool.use_numeric_element_ids:
return self.Id
else:
return self.alias
[docs]
def get_ctrl_id(self) -> Union[str, int]:
"""Returns the controller id from database
(name or numeric id).
:return: the controller id
"""
if self.pool.use_numeric_element_ids:
return int(self.Ctrl_id)
else:
return self.Ctrl_id
[docs]
def get_instrument_id(self) -> Union[str, int]:
"""Returns the instrument id from database
(name or numeric id).
:return: the instrument id
"""
if self.pool.use_numeric_element_ids:
return int(self.Instrument_id)
else:
return self.Instrument_id
@property
def pool_device(self):
"""The tango pool device"""
return self._pool_device
@property
def pool(self):
"""The sardana pool object"""
return self.pool_device.pool
[docs]
def get_element(self) -> sardana.pool.poolelement.PoolElement:
"""Returns the underlying pool element object
:return: the underlying pool element object
"""
return self._element
[docs]
def set_element(self, element: sardana.pool.poolelement.PoolElement) -> None:
"""Associates this device with the sardana element
:param element: the sardana element
"""
self._element = element
element = property(get_element, set_element,
doc="The underlying sardana element")
[docs]
def init_device(self):
"""Initialize the device. Called during startup after :meth:`init` and
every time the tango ``Init`` command is executed.
Override when necessary but **always** call the method from your super
class"""
SardanaDevice.init_device(self)
[docs]
def delete_device(self):
"""Clean the device. Called during shutdown and every time the tango
``Init`` command is executed.
Override when necessary but **always** call the method from your super
class"""
SardanaDevice.delete_device(self)
[docs]
def Abort(self):
"""The tango abort command. Aborts the active operation"""
self.element.abort()
try:
self.element.get_state(cache=False, propagate=2)
except:
self.warning("Abort: failed to read state")
[docs]
def is_Abort_allowed(self) -> bool:
"""Returns True if it is allowed to execute the tango abort command
:return: True if it is allowed to execute the tango abort command or
False otherwise
"""
return self.get_state() != DevState.UNKNOWN
[docs]
def Stop(self):
"""The tango stop command. Stops the active operation"""
self.element.stop()
try:
self.element.get_state(cache=False, propagate=2)
except:
self.info("Stop: failed to read state")
[docs]
def is_Stop_allowed(self) -> bool:
"""Returns True if it is allowed to execute the tango stop command
:return: True if it is allowed to execute the tango stop command or
False otherwise
"""
return self.get_state() != DevState.UNKNOWN
[docs]
def Release(self):
"""The tango release command. Release the active operation"""
self.element.release()
[docs]
def is_Release_allowed(self) -> bool:
"""Returns True if it is allowed to execute the tango release command
:return: True if it is allowed to execute the tango release command or
False otherwise
"""
return self.get_state() != DevState.UNKNOWN
def _is_allowed(self, req_type):
"""Generic is_allowed"""
# state = self.get_state()
# if state in self.ExtremeErrorStates:
# return False
# if req_type == AttReqType.WRITE_REQ:
# if state in self.BusyStates:
# return False
return True
[docs]
def get_dynamic_attributes(self) -> Tuple[taurus.core.util.CaselessDict, taurus.core.util.CaselessDict]:
"""Returns the standard dynamic and fully dynamic attributes for this
device. The return is a tuple of two dictionaries:
- standard attributes: caseless dictionary with key being the attribute
name and value is a tuple of attribute name(str), tango information,
attribute information
- dynamic attributes: caseless dictionary with key being the attribute
name and value is a tuple of attribute name(str), tango information,
attribute information
**tango information**
seq< :class:`~PyTango.CmdArgType`, :class:`~PyTango.AttrDataFormat`, :class:`~PyTango.AttrWriteType` >
**attribute information**
attribute information as returned by the sardana controller
:return: the standard dynamic and fully dynamic attributes
"""
return CaselessDict(), CaselessDict()
[docs]
def initialize_dynamic_attributes(self):
"""Initializes this device dynamic attributes"""
self._attributes = attrs = CaselessDict()
attr_data = self.get_dynamic_attributes()
std_attrs, dyn_attrs = attr_data
self.remove_unwanted_dynamic_attributes(std_attrs, dyn_attrs)
if std_attrs is not None:
read = self._read_DynamicAttribute
write = self._write_DynamicAttribute
is_allowed = self._is_DynamicAttribute_allowed
for attr_name, data_info in list(std_attrs.items()):
attr_name, data_info, attr_info = data_info
attr = self.add_standard_attribute(attr_name, data_info,
attr_info, read,
write, is_allowed)
attrs[attr.get_name()] = None
if dyn_attrs is not None:
read = self._read_DynamicAttribute
write = self._write_DynamicAttribute
is_allowed = self._is_DynamicAttribute_allowed
for attr_name, data_info in list(dyn_attrs.items()):
attr_name, data_info, attr_info = data_info
attr = self.add_dynamic_attribute(attr_name, data_info,
attr_info, read,
write, is_allowed)
attrs[attr.get_name()] = None
return attrs
[docs]
def remove_unwanted_dynamic_attributes(self, new_std_attrs, new_dyn_attrs):
"""Removes unwanted dynamic attributes from previous device creation"""
dev_class = self.get_device_class()
multi_attr = self.get_device_attr()
multi_class_attr = dev_class.get_class_attr()
static_attr_names = \
list(map(str.lower, list(dev_class.attr_list.keys())))
static_attr_names.extend(('state', 'status'))
new_attrs = CaselessDict(new_std_attrs)
new_attrs.update(new_dyn_attrs)
device_attr_names = []
for i in range(multi_attr.get_attr_nb()):
device_attr_names.append(multi_attr.get_attr_by_ind(i).get_name())
# in case of calling DevRestart() on the admin device
# we don't want to remove attribute configuration from the DB
util = Util.instance()
# TODO: remove this try..except whenever pytango#541
try:
is_device_restarting = util.is_device_restarting(self.get_name())
except:
self.warning(
"Could not verify if device is being restarted. "
"This may lead to undesired removal of attribute configuration."
)
is_device_restarting = False
for attr_name in device_attr_names:
attr_name_lower = attr_name.lower()
if attr_name_lower in static_attr_names:
continue
keep_attr_conf = is_device_restarting and attr_name in new_attrs
# TODO: when pytango#540 gets implemented always remove
# the attr and to keep the attr conf use clean_db=False
if keep_attr_conf:
continue
try:
self.remove_attribute(attr_name)
except:
self.warning("Error removing dynamic attribute %s",
attr_name_lower)
self.debug("Details:", exc_info=1)
klass_attr_names = []
klass_attrs = multi_class_attr.get_attr_list()
for ind in range(len(klass_attrs)):
klass_attr_names.append(klass_attrs[ind].get_name())
for attr_name in klass_attr_names:
attr_name_lower = attr_name.lower()
if attr_name_lower in static_attr_names:
continue
# if new dynamic attribute is in class attribute then delete it
# from class attribute to be later on added again (eventually
# with diffent data type or data format)
if attr_name_lower in new_attrs:
try:
attr = multi_class_attr.get_attr(attr_name)
old_type = CmdArgType(attr.get_type())
old_format = attr.get_format()
old_access = attr.get_writable()
new_attr = new_attrs[attr_name]
new_type, new_format, new_access = new_attr[1][0][:3]
differ = new_type != old_type or \
new_format != old_format or \
new_access != old_access
if differ:
self.info("Replacing dynamic attribute %s", attr_name)
self.debug("old type: %s, new type: %s",
old_type, new_type)
self.debug("old format: %s, new format: %s",
old_format, new_format)
self.debug("old access: %s, new access: %s",
old_access, new_access)
multi_class_attr.remove_attr(attr.get_name(),
attr.get_cl_name())
except:
self.warning("Error removing dynamic attribute %s from "
" device class", attr_name)
self.debug("Details:", exc_info=1)
[docs]
def add_dynamic_attribute(self, attr_name: str, data_info: Tuple[PyTango.CmdArgType, PyTango.AttrDataFormat, PyTango.AttrWriteType], attr_info: Any, read: Any,
write: Any, is_allowed: Any) -> PyTango.Attr:
"""Adds a single dynamic attribute
:param attr_name: the attribute name
:param data_info: tango attribute information
:param attr_info: attribute information
:param read: read method for the attribute
:param write: write method for the attribute
:param is_allowed: is allowed method"""
tg_type, tg_format, tg_access = data_info[0]
if tg_access == READ:
write = None
if tg_format == SCALAR:
attr = GenericScalarAttr(attr_name, tg_type, tg_access)
if tg_format == SPECTRUM:
dim_x = attr_info.maxdimsize[0]
attr = GenericSpectrumAttr(attr_name, tg_type, tg_access,
dim_x=dim_x)
elif tg_format == IMAGE:
dim_x, dim_y = attr_info.maxdimsize
attr = GenericImageAttr(attr_name, tg_type, tg_access,
dim_x=dim_x, dim_y=dim_y)
if tg_access == READ_WRITE and tg_format == SCALAR:
memorized = attr_info.memorized.lower()
# sardana takes care of restoring memorized values
if memorized in ('true', 'true_without_hard_applied'):
attr.set_memorized()
attr.set_memorized_init(False)
attr.set_disp_level(DispLevel.EXPERT)
return self.add_attribute(attr, read, write, is_allowed)
[docs]
def add_standard_attribute(self, attr_name: str, data_info: Tuple[PyTango.CmdArgType, PyTango.AttrDataFormat, PyTango.AttrWriteType], attr_info: Any, read: Any,
write: Any, is_allowed: Any) -> PyTango.Attr:
"""Adds a single standard dynamic attribute
:param attr_name: the attribute name
:param data_info: tango attribute information
:param attr_info: attribute information
:param read: read method for the attribute
:param write: write method for the attribute
:param is_allowed: is allowed method"""
dev_class = self.get_device_class()
attr_data = AttrData(attr_name, dev_class.get_name(), data_info)
attr = self.add_attribute(attr_data, read, write, is_allowed)
return attr
[docs]
def read_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Generic read dynamic attribute.
Default implementation raises :exc:`NotImplementedError`
:param attr: attribute to be read
:raises: :exc:`NotImplementedError`"""
raise NotImplementedError
[docs]
def write_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Generic write dynamic attribute.
Default implementation raises :exc:`NotImplementedError`
:param attr: attribute to be written
:raises: :exc:`NotImplementedError`"""
raise NotImplementedError
[docs]
def is_DynamicAttribute_allowed(self, req_type: Any) -> bool:
"""Generic is dynamic attribute allowed.
Default implementation calls :meth:`_is_allowed`
:param req_type: request type
"""
return self._is_allowed(req_type)
def _read_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Generic internal read dynamic attribute.
Checks if this object has a 'read_'+<attr_name> method and calls it.
If not calls :meth:`read_DynamicAttribute`.
:param attr: attribute to be read
"""
name = attr.get_name()
read_name = "read_" + name
if hasattr(self, read_name):
read = getattr(self, read_name)
return read(attr)
return self.read_DynamicAttribute(attr)
def _write_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Generic internal write dynamic attribute.
Checks if this object has a 'write_'+<attr_name> method and calls it.
If not calls :meth:`write_DynamicAttribute`.
:param attr: attribute to be written
"""
name = attr.get_name()
write_name = "write_" + name
if hasattr(self, write_name):
write = getattr(self, write_name)
return write(attr)
return self.write_DynamicAttribute(attr)
def _is_DynamicAttribute_allowed(self, req_type: Any) -> bool:
"""Generic is dynamic attribute allowed.
Default implementation calls :meth:`is_DynamicAttribute_allowed`
:param req_type: request type
"""
return self.is_DynamicAttribute_allowed(req_type)
[docs]
def dev_state(self) -> PyTango.DevState:
"""Calculates and returns the device state. Called by Tango on a read
state request.
:return: the device state
"""
element = self.element
try:
use_cache = element.is_in_operation() and not self.Force_HW_Read
ctrl_state = element.get_state(cache=use_cache, propagate=0)
state = self.calculate_tango_state(ctrl_state)
return state
except:
self.error("Exception trying to return state")
self.debug("Details:", exc_info=1)
return DevState.FAULT
[docs]
def dev_status(self) -> str:
"""
Calculates and returns the device status. Called by Tango on a read
status request.
:return: the device status
"""
element = self.element
try:
use_cache = element.is_in_operation() and not self.Force_HW_Read
ctrl_status = self.element.get_status(cache=use_cache, propagate=0)
status = self.calculate_tango_status(ctrl_status)
return status
except Exception as e:
msg = "Exception trying to return status: %s" % str(e)
self.error(msg)
self.debug("Details:", exc_info=1)
return msg
[docs]
def wait_for_operation(self):
"""Waits for an operation to finish. It uses the maxumum number of
retries. Sleeps 0.01s between retries.
:raises: :exc:`Exception` in case of a timeout"""
element, n = self.element, self.BusyRetries
while element.is_in_operation():
if n == 0:
raise Exception("Wait for operation timedout")
time.sleep(0.01)
self.warning("waited for operation")
n = n - 1
[docs]
def Restore(self):
"""Restore tango command. Restores the attributes to their former glory.
This applies to memorized writable attributes which have a set point
stored in the database"""
restore_attributes, db_values = self.get_restore_data()
multi_attribute = self.get_device_attr()
for attr_name in restore_attributes:
props = db_values[attr_name]
if props is None or not "__value" in props:
continue
attribute = multi_attribute.get_w_attr_by_name(attr_name)
write_meth_name = "write_" + attr_name
write_meth = getattr(self, write_meth_name, None)
if write_meth is None:
self.warning("Could not recover %s: %s does not exist",
attr_name, write_meth_name)
continue
self.restore_attribute(attribute, write_meth, props['__value'])
[docs]
def get_restore_data(self):
restore_attributes = self.get_attributes_to_restore()
db = Util.instance().get_database()
db_values = db.get_device_attribute_property(self.get_name(),
restore_attributes)
return restore_attributes, db_values
[docs]
def get_attributes_to_restore(self):
std_attrs, dyn_attrs = self.get_dynamic_attributes()
multi_attribute = self.get_device_attr()
restore = []
for attr_name in std_attrs:
try:
attribute = multi_attribute.get_w_attr_by_name(attr_name)
except DevFailed:
continue
restore.append(attribute.get_name())
for attr_name in dyn_attrs:
try:
attribute = multi_attribute.get_w_attr_by_name(attr_name)
except DevFailed:
continue
restore.append(attribute.get_name())
return restore
def _get_attribute_value_from_db_value(self, attribute, db_value):
value = seqStr_2_obj(db_value, attribute.get_data_type(),
attribute.get_data_format())
return value
[docs]
def restore_attribute(self, attribute, write_meth, db_value):
value = self._get_attribute_value_from_db_value(attribute, db_value)
attr_name = attribute.get_name()
try:
attribute.set_write_value(value)
self.info("Restoring %s", attr_name)
write_meth(attribute)
except:
self.warning("Could not recover %s: Error in write", attr_name)
self.debug("Details:", exc_info=1)
[docs]
class PoolDeviceClass(SardanaDeviceClass):
"""Base Tango Pool Device Class class"""
#:
#: Sardana device class properties definition
#:
#: .. seealso:: :ref:`server`
#:
class_property_list = SardanaDeviceClass.class_property_list
#:
#: Sardana device properties definition
#:
#: .. seealso:: :ref:`server`
#:
device_property_list = {
'Id': [DevLong64, "Internal ID", InvalidId],
'Force_HW_Read': [DevBoolean, "Force a hardware read of value even "
"when in operation (motion/acquisition",
False],
}
device_property_list.update(SardanaDeviceClass.device_property_list)
#:
#: Sardana device command definition
#:
#: .. seealso:: :ref:`server`
#:
cmd_list = {
'Stop': [[DevVoid, ""], [DevVoid, ""]],
'Abort': [[DevVoid, ""], [DevVoid, ""]],
'Release': [[DevVoid, ""], [DevVoid, ""]],
'Restore': [[DevVoid, ""], [DevVoid, ""]],
}
cmd_list.update(SardanaDeviceClass.cmd_list)
#:
#: Sardana device attribute definition
#:
#: .. seealso:: :ref:`server`
#:
attr_list = {
}
attr_list.update(SardanaDeviceClass.attr_list)
standard_attr_list = {}
def _get_class_properties(self):
ret = SardanaDeviceClass._get_class_properties(self)
ret['Description'] = "Generic Pool device class"
ret['InheritedFrom'].insert(0, 'SardanaDevice')
return ret
[docs]
class PoolElementDevice(PoolDevice):
"""Base Tango Pool Element Device class"""
ignore_memorized_attrs_during_initialization = tuple()
[docs]
def init_device(self):
"""Initialize the device. Called during startup after :meth:`init` and
every time the tango ``Init`` command is executed.
Override when necessary but **always** call the method from your super
class"""
PoolDevice.init_device(self)
detect_evts = ()
non_detect_evts = "instrument",
self.set_change_events(detect_evts, non_detect_evts)
self.instrument = None
self.ctrl = None
instrument_id = self.get_instrument_id()
if instrument_id != InvalidId:
self.instrument = self.pool.get_element_by_id(instrument_id)
ctrl_id = self.get_ctrl_id()
self.ctrl = self.pool.get_element_by_id(ctrl_id)
[docs]
def initialize_attribute_values(self):
"""Initialize attribute values."""
memorized_values = self.get_memorized_values()
for attr in self.ignore_memorized_attrs_during_initialization:
memorized_values.pop(attr, None)
self.element.init_attribute_values(memorized_values)
[docs]
def delete_device(self):
element = self.element
if not element.deleted:
ctrl = element.controller
ctrl.remove_element(element, propagate=0)
PoolDevice.delete_device(self)
[docs]
def read_Instrument(self, attr: PyTango.Attribute) -> None:
"""Read the value of the ``Instrument`` tango attribute.
Returns the instrument full name or empty string if this element doesn't
belong to any instrument
:param attr: tango instrument attribute
"""
instrument = self.element.instrument
if instrument is None:
attr.set_value('')
else:
attr.set_value(instrument.full_name)
[docs]
def write_Instrument(self, attr: PyTango.Attribute) -> None:
"""Write the value of the ``Instrument`` tango attribute.
Sets a new instrument full name or empty string if this element doesn't
belong to any instrument.
The instrument **must** have been previously created.
:param attr: tango instrument attribute
"""
name = attr.get_write_value()
instrument = None
if name:
instrument = self.pool.get_element(full_name=name)
if instrument.get_type() != ElementType.Instrument:
raise Exception("%s is not an instrument" % name)
self.element.instrument = instrument
db = Util.instance().get_database()
db.put_device_property(
self.get_name(), {"Instrument_id": instrument.id})
[docs]
def get_dynamic_attributes(self) -> Tuple[taurus.core.util.CaselessDict, taurus.core.util.CaselessDict]:
"""Override of :class:`PoolDevice.get_dynamic_attributes`.
Returns the standard dynamic and fully dynamic attributes for this
device. The return is a tuple of two dictionaries:
- standard attributes: caseless dictionary with key being the attribute
name and value is a tuple of attribute name(str), tango information,
attribute information
- dynamic attributes: caseless dictionary with key being the attribute
name and value is a tuple of attribute name(str), tango information,
attribute information
**tango information**
seq< :class:`~PyTango.CmdArgType`, :class:`~PyTango.AttrDataFormat`, :class:`~PyTango.AttrWriteType` >
**attribute information**
attribute information as returned by the sardana controller
:return: the standard dynamic and fully dynamic attributes
"""
if hasattr(self, "_dynamic_attributes_cache"):
return self._standard_attributes_cache, \
self._dynamic_attributes_cache
std_attrs, dyn_attrs = self._get_dynamic_attributes()
self._standard_attributes_cache = std_attrs
self._dynamic_attributes_cache = dyn_attrs
return std_attrs, dyn_attrs
def _get_dynamic_attributes(self):
ctrl = self.ctrl
if ctrl is None:
self.warning("no controller: dynamic attributes NOT created")
return PoolDevice.get_dynamic_attributes(self)
if not ctrl.is_online():
self.warning("controller offline: dynamic attributes NOT created")
return PoolDevice.get_dynamic_attributes(self)
dyn_attrs = CaselessDict()
std_attrs = CaselessDict()
dev_class = self.get_device_class()
axis_attrs = ctrl.get_axis_attributes(self.element.axis)
std_attrs_lower = [attr.lower()
for attr in dev_class.standard_attr_list]
for attr_name, attr_info in list(axis_attrs.items()):
attr_name_lower = attr_name.lower()
if attr_name_lower in std_attrs_lower:
data_info = DataInfo.toDataInfo(attr_name, attr_info)
# copy in order to leave the class attributes untouched
# the downstream code can append MaxDimSize to the attr. info
tg_info = deepcopy(dev_class.standard_attr_list[attr_name])
std_attrs[attr_name] = attr_name, tg_info, data_info
else:
data_info = DataInfo.toDataInfo(attr_name, attr_info)
name, tg_info = to_tango_attr_info(attr_name, data_info)
dyn_attrs[attr_name] = name, tg_info, data_info
return std_attrs, dyn_attrs
[docs]
def read_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Read a generic dynamic attribute. Calls the controller of this
element to get the dynamic attribute value
:param attr: tango attribute
"""
name = attr.get_name()
ctrl = self.ctrl
if ctrl is None:
raise Exception("Cannot read %s. Controller not build!" % name)
v = ctrl.get_axis_attr(self.element.axis, name)
if v is None:
raise TypeError(
"Cannot read %s. Controller returns None" % (name,))
attr.set_value(v)
[docs]
def write_DynamicAttribute(self, attr: PyTango.Attribute) -> None:
"""Write a generic dynamic attribute. Calls the controller of this
element to get the dynamic attribute value
:param attr: tango attribute
"""
name = attr.get_name()
value = attr.get_write_value()
self.debug("writing dynamic attribute %s with value %s", name, value)
ctrl = self.ctrl
if ctrl is None:
raise Exception("Cannot write %s. Controller not build!" % name)
ctrl.set_axis_attr(self.element.axis, name, value)
[docs]
def read_SimulationMode(self, attr: PyTango.Attribute) -> None:
"""Read the current simulation mode.
:param attr: tango attribute
"""
attr.set_value(self.element.simulation_mode)
[docs]
def write_SimulationMode(self, attr: PyTango.Attribute) -> None:
"""Sets the simulation mode.
:param attr: tango attribute
"""
self.element.simulation_mode = attr.get_write_value()
[docs]
class PoolElementDeviceClass(PoolDeviceClass):
"""Base Tango Pool Element Device Class class"""
#:
#: Sardana device properties definition
#:
#: .. seealso:: :ref:`server`
#:
device_property_list = {
"Axis": [DevLong64, "Axis in the controller", [InvalidAxis]],
"Ctrl_id": [DevString, "Controller ID", [InvalidId]],
"Instrument_id": [DevString, "Instrument ID", [InvalidId]],
}
device_property_list.update(PoolDeviceClass.device_property_list)
#:
#: Sardana device attribute definition
#:
#: .. seealso:: :ref:`server`
#:
attr_list = {
'Instrument': [[DevString, SCALAR, READ_WRITE],
{'label': "Instrument",
'Display level': DispLevel.EXPERT}],
'SimulationMode': [[DevBoolean, SCALAR, READ_WRITE],
{'label': "Simulation mode"}],
}
attr_list.update(PoolDeviceClass.attr_list)
cmd_list = {
}
cmd_list.update(PoolDeviceClass.cmd_list)
[docs]
def get_standard_attr_info(self, attr: str):
"""Returns information about the standard attribute
:param attr: attribute name
:return: a sequence of tango data_type, data format"""
return self.standard_attr_list[attr]
def _get_class_properties(self):
ret = PoolDeviceClass._get_class_properties(self)
ret['Description'] = "Generic Pool element device class"
ret['InheritedFrom'].insert(0, 'PoolDevice')
return ret
[docs]
class PoolGroupDevice(PoolDevice):
"""Base Tango Pool Group Device class"""
[docs]
def read_ElementList(self, attr: PyTango.Attribute) -> None:
"""Read the element list.
:param attr: tango attribute
"""
attr.set_value(self.get_element_names())
[docs]
def get_element_names(self):
"""Returns the list of element names.
:return: a list of attribute names"""
elements = self.element.get_user_elements()
return [element.name for element in elements]
[docs]
def elements_changed(self, evt_src, evt_type, evt_value):
"""Callback for when the elements of this group changed"""
self.push_change_event("ElementList", self.get_element_names())
[docs]
class PoolGroupDeviceClass(PoolDeviceClass):
"""Base Tango Pool Group Device Class class"""
#:
#: Sardana device properties definition
#:
#: .. seealso:: :ref:`server`
#:
device_property_list = {
"Elements": [DevVarStringArray, "elements in the group", []],
}
device_property_list.update(PoolDeviceClass.device_property_list)
#:
#: Sardana device command definition
#:
#: .. seealso:: :ref:`server`
#:
cmd_list = {
}
cmd_list.update(PoolDeviceClass.cmd_list)
#:
#: Sardana device attribute definition
#:
#: .. seealso:: :ref:`server`
#:
attr_list = {
'ElementList': [[DevString, SPECTRUM, READ, 4096]],
}
attr_list.update(PoolDeviceClass.attr_list)
def _get_class_properties(self):
ret = PoolDeviceClass._get_class_properties(self)
ret['Description'] = "Generic Pool group device class"
ret['InheritedFrom'].insert(0, 'PoolDevice')
return ret
class PoolExpChannelDevice(PoolElementDevice):
def __init__(self, dclass, name):
"""Constructor"""
PoolElementDevice.__init__(self, dclass, name)
codec_name = getattr(sardanacustomsettings, "VALUE_BUFFER_CODEC")
self._value_buffer_codec = CodecFactory().getCodec(codec_name)
codec_name = getattr(sardanacustomsettings, "VALUE_REF_BUFFER_CODEC")
self._value_ref_buffer_codec = CodecFactory().getCodec(codec_name)
def _encode_value_chunk(self, value_chunk: Sequence[sardana.sardanavalue.SardanaValue]) -> str:
"""Prepare value chunk to be passed via communication channel.
:param value_chunk: value chunk
:return: json string representing value chunk
"""
index = []
value = []
for idx, sdn_value in value_chunk.items():
index.append(idx)
value.append(sdn_value.value)
data = dict(index=index, value=value)
encoded_data = self._value_buffer_codec.encode(('', data))
return encoded_data
def _encode_value_ref_chunk(self, value_ref_chunk: Any) -> str:
"""Prepare value ref chunk to be passed via communication channel.
:param value_ref_chunk: value ref chunk
:return: json string representing value chunk
"""
index = []
value_ref = []
for idx, sdn_value in value_ref_chunk.items():
index.append(idx)
value_ref.append(sdn_value.value)
data = dict(index=index, value_ref=value_ref)
encoded_data = self._value_ref_buffer_codec.encode(('', data))
return encoded_data
def initialize_dynamic_attributes(self):
attrs = PoolElementDevice.initialize_dynamic_attributes(self)
non_detect_evts = "integrationtime", "shape"
for attr_name in non_detect_evts:
if attr_name in attrs:
self.set_change_event(attr_name, True, False)
return attrs
def read_ValueBuffer(self, _):
desc = "ValueBuffer attribute is not foreseen for reading. It is " \
"used only as the communication channel for the continuous " \
"acquisitions."
Except.throw_exception("UnsupportedFeature",
desc,
"PoolExpChannelDevice.read_ValueBuffer",
ErrSeverity.WARN)
def read_ValueRefBuffer(self, _):
desc = ("ValueRefBuffer attribute is not foreseen for reading. "
"It is used only as the communication channel for the "
"continuous acquisitions.")
Except.throw_exception("UnsupportedFeature",
desc,
"PoolExpChannelDevice.read_ValueRefBuffer",
ErrSeverity.WARN)
def read_IntegrationTime(self, attr: PyTango.Attribute) -> None:
"""Reads the integration time.
:param attr: tango attribute
"""
attr.set_value(self.element.integration_time)
def write_IntegrationTime(self, attr: PyTango.Attribute) -> None:
"""Sets the integration time.
:param attr: tango attribute
"""
self.element.integration_time = attr.get_write_value()
def read_Shape(self, attr: PyTango.Attribute) -> None:
"""Reads the shape.
:param attr: tango attribute
"""
attr.set_value(self.element.get_shape(cache=False))
class PoolExpChannelDeviceClass(PoolElementDeviceClass):
#:
#: Sardana device attribute definition
#:
#: .. seealso:: :ref:`server`
#:
attr_list = {
'IntegrationTime': [[DevDouble, SCALAR, READ_WRITE]]
}
attr_list.update(PoolElementDeviceClass.attr_list)
standard_attr_list = {
'ValueBuffer': [[DevEncoded, SCALAR, READ]],
'Shape': [[DevLong64, SPECTRUM, READ, 2],
{'label': "Shape (X,Y)",
'description': "Shape of the value. It is an array with \n"
"at most 2 elements: X and Y dimensions. \n"
"0-element array - scalar\n"
"1-element array (X) - spectrum\n"
"2-element array (X, Y) - image"}],
}
standard_attr_list.update(PoolElementDeviceClass.standard_attr_list)
class PoolTimerableDevice(PoolExpChannelDevice):
def __init__(self, dclass, name):
"""Constructor"""
PoolExpChannelDevice.__init__(self, dclass, name)
def initialize_dynamic_attributes(self):
attrs = PoolExpChannelDevice.initialize_dynamic_attributes(self)
detect_evts = "timer",
for attr_name in detect_evts:
if attr_name in attrs:
self.set_change_event(attr_name, True, True)
return attrs
def read_Timer(self, attr: PyTango.Attribute) -> None:
"""Reads the timer for this channel.
:param attr: tango attribute
"""
timer = self.element.timer
if timer is None:
timer = 'None'
attr.set_value(timer)
def write_Timer(self, attr: PyTango.Attribute) -> None:
"""Sets the timer for this channel.
:param attr: tango attribute
"""
timer = attr.get_write_value()
if timer == 'None':
timer = None
self.element.timer = timer
class PoolTimerableDeviceClass(PoolExpChannelDeviceClass):
#:
#: Sardana device attribute definition
#:
#: .. seealso:: :ref:`server`
#:
# Attribute definitions
attr_list = {
'Timer': [[DevString, SCALAR, READ_WRITE],
{'Memorized': "true_without_hard_applied", }]
}
attr_list.update(PoolExpChannelDeviceClass.attr_list)
standard_attr_list = {}
standard_attr_list.update(PoolExpChannelDeviceClass.standard_attr_list)