Source code for sardana.tango.core.SardanaDevice

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""Generic Sardana Tango device module"""



__all__ = ["SardanaDevice", "SardanaDeviceClass"]

__docformat__ = 'restructuredtext'

import time
import threading
from typing import Sequence, List, Any, Union, Optional, Dict

import PyTango.constants
from PyTango import LatestDeviceImpl, DeviceClass, Util, DevState, \
    AttrQuality, TimeVal, ArgType, ApiUtil, DevFailed, WAttribute, \
    AutoTangoMonitor

import taurus
from taurus.core.util.threadpool import ThreadPool
from taurus.core.util.log import Logger

import sardana
from sardana.tango.core.util import to_tango_state, NO_DB_MAP, \
    _is_maybe_memorized_attribute, _from_memorized_value, get_full_name


__thread_pool_lock = threading.Lock()
__thread_pool = None


def get_thread_pool() -> ThreadPool:
    """Returns the global pool of threads for Sardana

    :return: the global pool of threads object
    """

    global __thread_pool

    if __thread_pool:
        return __thread_pool

    global __thread_pool_lock
    with __thread_pool_lock:
        if __thread_pool is None:
            __thread_pool = ThreadPool(name="EventTH", Psize=1, Qsize=1000)
        return __thread_pool


[docs] class SardanaDevice(LatestDeviceImpl, Logger): """SardanaDevice represents the base class for all Sardana :class:`PyTango.DeviceImpl` classes""" def __init__(self, dclass, name): """Constructor""" self.in_constructor = True try: LatestDeviceImpl.__init__(self, dclass, name) self.init(name) Logger.__init__(self, name) self._state = DevState.INIT self._status = 'Waiting to be initialized...' # access to some tango API (like MultiAttribute and Attribute) is # still not thread safe so we have this lock to protect # Wa can't always use methods which use internally the # C++ AutoTangoMonitor because it blocks the entire tango device. self.tango_lock = threading.RLock() self._event_thread_pool = get_thread_pool() self.init_device() finally: self.in_constructor = False
[docs] def init(self, name: str): """initialize the device once in the object lifetime. Override when necessary but **always** call the method from your super class :param name: device name""" db = self.get_database() if db is None: self._alias = self._get_nodb_device_info()[0] else: try: self._alias = db.get_alias(name) if self._alias.lower() == 'nada': self._alias = None except: self._alias = None
[docs] def get_alias(self) -> str: """Returns this device alias name :return: this device alias """ return self._alias
alias = property(get_alias, doc="the device alias name")
[docs] def get_full_name(self) -> str: """Compose full name from the TANGO_HOST information and device name. Full name is of format "tango://dbhost:dbport/<domain>/<family>/<member>" where dbhost is always FQDN. :return: this device full name """ return get_full_name(self.get_name())
[docs] def init_device(self): """Initialize the device. Called during startup after :meth:`init` and every time the tango ``Init`` command is executed. Override when necessary but **always** call the method from your super class""" self.set_state(self._state) db = self.get_database() if db is None: self.init_device_nodb() else: self.get_device_properties(self.get_device_class()) detect_evts = "state", "status" non_detect_evts = () self.set_change_events(detect_evts, non_detect_evts)
[docs] def sardana_init_hook(self): """Hook that is called before the server event loop. The idea behind this hook is to be equivalent to server_init_hook from Tango. Similar behaviour can be archived using post_init_callback. """ pass
def _get_nodb_device_info(self): """Internal method. Returns the device info when tango database is not being used (example: in demos)""" name = self.get_name() tango_class = self.get_device_class().get_name() devices = NO_DB_MAP.get(tango_class, ()) for dev_info in devices: if dev_info[1] == name: return dev_info
[docs] def init_device_nodb(self): """Internal method. Initialize the device when tango database is not being used (example: in demos)""" _, _, props = self._get_nodb_device_info() for prop_name, prop_value in list(props.items()): setattr(self, prop_name, prop_value)
[docs] def delete_device(self): """Clean the device. Called during shutdown and every time the tango ``Init`` command is executed. Override when necessary but **always** call the method from your super class""" pass
[docs] def set_change_events(self, evts_checked: Sequence[str], evts_not_checked: Sequence[str]): """Helper method to set change events on attributes :param evts_checked: list of attribute names to activate change events programatically with tango filter active :param evts_not_checked: list of attribute names to activate change events programatically with tango filter inactive. Use this with care! Attributes configured with no change event filter may potentially generated a lot of events! """ for evt in evts_checked: self.set_change_event(evt, True, True) for evt in evts_not_checked: self.set_change_event(evt, True, False)
[docs] def initialize_dynamic_attributes(self): """Initialize dynamic attributes. Default implementation does nothing. Override when necessary.""" pass
[docs] def initialize_attribute_values(self): """Initialize attributes values. Default implementation does nothing. Override when necessary.""" pass
[docs] def get_memorized_values(self) -> Dict[str, Any]: """Get memorized values from database :return: map with attribute names and memorized values """ maybe_memorized_attrs = [] multi_attr = self.get_device_attr() for i in range(multi_attr.get_attr_nb()): attr = multi_attr.get_attr_by_ind(i) if not _is_maybe_memorized_attribute(attr): continue maybe_memorized_attrs.append(attr.get_name()) if not maybe_memorized_attrs: return {} memorized_values = {} db = self.get_database() dev_name = self.get_name() properties = db.get_device_attribute_property( dev_name, maybe_memorized_attrs ) for attr_name, attr_properties in properties.items(): if "__value" not in attr_properties: continue attr = multi_attr.get_attr_by_name(attr_name) mem_value = attr_properties["__value"][0] value = _from_memorized_value(attr, mem_value) memorized_values[attr_name] = value return memorized_values
[docs] def get_event_thread_pool(self) -> ThreadPool: """Return the :class:`~taurus.core.util.ThreadPool` used by sardana to send tango events. :return: the sardana :class:`~taurus.core.util.ThreadPool` """ return self._event_thread_pool
[docs] def get_attribute_by_name(self, attr_name: str) -> PyTango.Attribute: """Gets the attribute for the given name. :param attr_name: attribute name :return: the attribute object """ return self.get_device_attr().get_attr_by_name(attr_name)
[docs] def get_wattribute_by_name(self, attr_name: str) -> PyTango.WAttribute: """Gets the writable attribute for the given name. :param attr_name: attribute name :return: the attribute object """ return self.get_device_attr().get_w_attr_by_name(attr_name)
[docs] def get_database(self) -> PyTango.Database: """Helper method to return a reference to the current tango database :return: the Tango database """ return Util.instance().get_database()
[docs] def set_write_attribute(self, attr, w_value): try: with AutoTangoMonitor(self): attr.set_write_value(w_value) except DevFailed as df: df0 = df.args[0] reason = df0.reason # if outside limit prefix the description with the device name if reason == PyTango.constants.API_WAttrOutsideLimit: desc = self.alias + ": " + df0.desc _df = DevFailed(*df.args[1:]) PyTango.Except.re_throw_exception( _df, df0.reason, desc, df0.origin) raise df
[docs] def set_attribute(self, attr: PyTango.Attribute, value: Any = None, w_value: Any = None, timestamp: Union[float, PyTango.TimeVal] = None, quality: Optional[PyTango.AttrQuality] = None, error: Optional[PyTango.DevFailed] = None, priority: int = 1, synch: bool = True) -> None: """Sets the given attribute value. If timestamp is not given, *now* is used as timestamp. If quality is not given VALID is assigned. If error is given an error event is sent (with no value and quality INVALID). If priority is > 1, the event filter is temporarily disabled so the event is sent for sure. If synch is set to True, wait for fire event to finish :param attr: the tango attribute :param value: the value to be set (not mandatory if setting an error) [default: None] :param w_value: the write value to be set (not mandatory) [default: None, meaning maintain current write value] :param timestamp: the timestamp associated with the operation [default: None, meaning use *now* as timestamp] :param quality: attribute quality [default: None, meaning VALID] :param error: a tango DevFailed error or None if not an error [default: None] :param priority: event priority [default: 1, meaning *normal* priority]. If priority is > 1, the event filter is temporarily disabled so the event is sent for sure. The event filter is restored to the previous value :param synch: If synch is set to True, wait for fire event to finish. If False, a job is sent to the sardana thread pool and the method returns immediately [default: True] """ set_attr = self.set_attribute_push if synch: set_attr(attr, value=value, w_value=w_value, timestamp=timestamp, quality=quality, error=error, priority=priority, synch=synch) else: th_pool = self.get_event_thread_pool() th_pool.add(set_attr, None, attr, value=value, w_value=w_value, timestamp=timestamp, quality=quality, error=error, priority=priority, synch=synch)
[docs] def set_attribute_push(self, attr, value=None, w_value=None, timestamp=None, quality=None, error=None, priority=1, synch=True): """Synchronous internal implementation of :meth:`set_attribute` (synch is passed to this method because it might need to know if it is being executed in a synchronous or asynchronous context).""" if priority > 0 and not synch: with self.tango_lock: return self._set_attribute_push(attr, value=value, w_value=w_value, timestamp=timestamp, quality=quality, error=error, priority=priority) else: return self._set_attribute_push(attr, value=value, w_value=w_value, timestamp=timestamp, quality=quality, error=error, priority=priority)
def _set_attribute_push(self, attr, value=None, w_value=None, timestamp=None, quality=None, error=None, priority=1): """Internal method.""" fire_event = priority > 0 recover = False if priority > 1 and attr.is_check_change_criteria(): attr.set_change_event(True, False) recover = True attr_name = attr.get_name().lower() if value is None and error is None: raise Exception( "Cannot set value of attribute '%s' with None" % (attr_name,)) try: if error is not None and fire_event: self.push_change_event(attr_name, error) return # some versions of Tango have a memory leak if you do # push_change_event(attr_name, value [, ...]) on state or status. # This solves the problem. if attr_name == "state": self.set_state(value) if fire_event: attempts = 0 # MeasurementGroup Start() command may take some time # and still hold TangoMonitor when we try pushing State event. # This may happen only when your client increased # the CORBA Transient timeout to a value > 3 s. # Simply re-try to avoid this problem as it was suggested on: # https://www.tango-controls.org/community/forum/c/general/development/how-to-increase-tango-serialization-monitor-timeout # Default TangoMonitor timeout is 3.2 s, # so we will wait 6.4 s MAX_ATTEMPTS = 2 while attempts < MAX_ATTEMPTS: try: self.push_change_event(attr_name) except DevFailed as df: attempts += 1 error = df.args[0] reason = error.reason if reason == "API_CommandTimedOut" \ and self.get_device_class().get_name() == "MeasurementGroup": if attempts == MAX_ATTEMPTS: raise df msg = ("Unable to push change event due to " "TangoMonitor timeout. Retrying #{} ...".format(attempts)) self.warning(msg) self.debug('Details:', exc_info=1) else: raise df else: break return elif attr_name == "status": self.set_status(value) if fire_event: self.push_change_event(attr_name) return if timestamp is None: timestamp = time.time() elif isinstance(timestamp, TimeVal): timestamp = TimeVal.totime(timestamp) if quality is None: quality = AttrQuality.ATTR_VALID data_type = attr.get_data_type() with AutoTangoMonitor(self): if w_value is not None and isinstance(attr, WAttribute): # The following try/except workarounds bug-238: "Not possible # to read motor's position when it's out of limits" # (http://sourceforge.net/p/sardana/tickets/238) # In the condition of position attribute out of range, its # w_value will not be updated during readouts or when pushing # events. # The workaround does not affect the drift correction feature # of the pseudomotors, but affects pending operation of the # Taurus write widgets of the position attribute or any other # feature (not known at the moment of applying this workaround) # or anyone trusting the w_value. # # TODO: Remove the try/except protection whenever Sardana # feature-286 has been implemented and bug-54 has been fixed. # The lack of the feature and the bug can lead to the situation # when motor's position write value is out of range. # # feature-286: "Solve inconsistencies between user position # limits and dial position limits" # (http://sourceforge.net/p/sardana/tickets/286) # # bug-54: "Software limits problems between motors and # pseudomotors" # (http://sourceforge.net/p/sardana/tickets/54) ############################################################### try: attr.set_write_value(w_value) except DevFailed as df: error = df.args[0] reason = error.reason if reason == PyTango.constants.API_WAttrOutsideLimit and\ attr_name == 'position': msg = ('Unable to update "w_value" because it is' + ' out of range (w_value=%f)' % w_value) self.warning(msg) self.debug('Details:', exc_info=1) else: raise df ############################################################### if fire_event: if data_type == ArgType.DevEncoded: fmt, data = value args = attr_name, fmt, data, timestamp, quality else: args = attr_name, value, timestamp, quality self.push_change_event(*args) else: if data_type == ArgType.DevEncoded: fmt, data = value attr.set_value_date_quality(fmt, data, timestamp, quality) else: attr.set_value_date_quality(value, timestamp, quality) finally: if recover: attr.set_change_event(True, True)
[docs] def calculate_tango_state(self, ctrl_state: sardana.sardanadefs.State, update: bool = False) -> PyTango.DevState: """Calculate tango state based on the controller state. :param ctrl_state: the state returned by the controller :param update: if True, set the state of this device with the calculated tango state [default: False: :return: the corresponding tango state """ self._state = state = to_tango_state(ctrl_state) if update: self.set_state(state) return state
[docs] def calculate_tango_status(self, ctrl_status: str, update: bool = False) -> str: """ Calculate tango status based on the controller status. :param ctrl_status: the status returned by the controller :param update: if True, set the state of this device with the calculated tango state (by default is False) :return: the corresponding tango state """ self._status = status = ctrl_status if update: self.set_status(status) return status
[docs] class SardanaDeviceClass(DeviceClass): """SardanaDeviceClass represents the base class for all Sardana :class:`PyTango.DeviceClass` classes""" #: #: Sardana device class properties definition #: #: .. seealso:: :ref:`server` #: class_property_list = { } #: #: Sardana device properties definition #: #: .. seealso:: :ref:`server` #: device_property_list = { } #: #: Sardana device command definition #: #: .. seealso:: :ref:`server` #: cmd_list = { } #: #: Sardana device attribute definition #: #: .. seealso:: :ref:`server` #: attr_list = { } def __init__(self, name): DeviceClass.__init__(self, name) self.set_type(name) def _get_class_properties(self): """Internal method""" return dict(ProjectTitle="Sardana", Description="Generic description", doc_url="http://sardana-controls.org/", __icon=self.get_name().lower() + ".png", InheritedFrom=["Device_5Impl"])
[docs] def write_class_property(self): """Write class properties ``ProjectTitle``, ``Description``, ``doc_url``, ``InheritedFrom`` and ``__icon``""" db = self.get_database() if db is None: return db.put_class_property(self.get_name(), self._get_class_properties())
[docs] def dyn_attr(self, dev_list: PyTango.DeviceImpl) -> None: """Invoked to create dynamic attributes for the given devices. Default implementation calls :meth:`SardanaDevice.initialize_dynamic_attributes` for each device :param dev_list: list of devices """ for dev in dev_list: try: dev.initialize_dynamic_attributes() except: dev.warning("Failed to initialize dynamic attributes") dev.debug("Details:", exc_info=1) try: dev.initialize_attribute_values() except: dev.warning("Failed to initialize attribute values") dev.debug("Details:", exc_info=1)
[docs] def device_name_factory(self, dev_name_list: List[str]): """Builds list of device names to use when no Database is being used :param dev_name_list: list to be filled with device names """ tango_class = self.get_name() devices = NO_DB_MAP.get(tango_class, ()) for dev_info in devices: dev_name_list.append(dev_info[1])