Source code for sardana.pool.poolbasechannel

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Pool library. It defines the base classes
for experiment channels"""

__all__ = ["Value", "PoolBaseChannel"]

__docformat__ = 'restructuredtext'


from typing import Sequence, Any, Optional
import weakref

import sardana
from sardana.sardanadefs import AttrQuality, ElementType
from sardana.sardanaattribute import SardanaAttribute
from sardana.sardanabuffer import SardanaBuffer
from sardana.pool.poolelement import PoolElement
from sardana.pool.poolacquisition import PoolAcquisitionSoftware,\
    get_timerable_items
from sardana.pool.poolmeasurementgroup import ChannelConfiguration,\
    ControllerConfiguration
from sardana.sardanaevent import EventType
from sardana.pool import AcqSynch, AcqMode


class ValueBuffer(SardanaBuffer):

    def is_value_required(self, idx: int) -> bool:
        """Check whether any of pseudo elements still still requires
        this value.

        :param idx: value's index
        :return: whether value is required or can be freely removed
        """
        for element in self.obj.get_pseudo_elements():
            if element().get_value_buffer().next_idx <= idx:
                return True
        return False


class Value(SardanaAttribute):

    def __init__(self, *args, **kwargs):
        super(Value, self).__init__(*args, **kwargs)

    def update(self, cache=True, propagate=1):
        if not cache or not self.has_value():
            value = self.obj.read_value()
            self.set_value(value, propagate=propagate)


class ValueRefBuffer(SardanaBuffer):
    """Buffer for value refs.

    .. note::
        The ValueRefBuffer class has been included in Sardana on a provisional
        basis. Backwards incompatible changes (up to and including removal
        of the class) may occur if deemed necessary by the core developers.
    """
    pass


class ValueRef(SardanaAttribute):
    """Value ref attribute.

    .. note::
        The ValueRef class has been included in Sardana on a provisional
        basis. Backwards incompatible changes (up to and including removal
        of the class) may occur if deemed necessary by the core developers.
    """
    pass


[docs] class PoolBaseChannel(PoolElement): """Base class for Pool experimental channel .. todo:: Value ref API should be exposed only on the channels which belongs to *referable* controllers. """ ValueAttributeClass = Value ValueBufferClass = ValueBuffer ValueRefAttributeClass = ValueRef ValueRefBufferClass = ValueRefBuffer AcquisitionClass = PoolAcquisitionSoftware def __init__(self, **kwargs): PoolElement.__init__(self, **kwargs) if kwargs.get("_create_attrs_and_buffers_on_init", True): self._create_attrs_and_buffers() self._value_ref_pattern = None self._value_ref_enabled = None self._pseudo_elements = [] if self.AcquisitionClass is not None: acq_name = "%s.Acquisition" % self._name self.set_action_cache(self.AcquisitionClass(self, name=acq_name)) self._integration_time = 0 self._shape = None def _create_attrs_and_buffers(self): self._value = self.ValueAttributeClass(self, listeners=self.on_change) self._value_buffer = self.ValueBufferClass(self, listeners=self.on_change) self._value_ref = self.ValueRefAttributeClass( self, listeners=self.on_change) self._value_ref_buffer = self.ValueRefBufferClass( self, listeners=self.on_change)
[docs] def has_pseudo_elements(self) -> bool: """Informs whether this channel forms part of any pseudo element e.g. pseudo counter. :return: has pseudo elements """ return len(self._pseudo_elements) > 0
[docs] def get_pseudo_elements(self) -> Sequence[weakref.ref]: """Returns list of pseudo elements e.g. pseudo counters that this channel belongs to. :return: weak references to pseudo elements """ return self._pseudo_elements
[docs] def add_pseudo_element(self, element: "sardana.pool.poolpseudocounter.PoolPseudoCounter") -> None: """Adds pseudo element e.g. pseudo counter that this channel belongs to. :param element: pseudo element """ if not self.has_pseudo_elements(): self.get_value_buffer().persistent = True self._pseudo_elements.append(weakref.ref(element))
[docs] def remove_pseudo_element(self, element: "sardana.pool.poolpseudocounter.PoolPseudoCounter") -> None: """Removes pseudo element e.g. pseudo counters that this channel belongs to. :param element: pseudo element """ for pseudo_element in self._pseudo_elements: if pseudo_element() == element: self._pseudo_elements.remove(pseudo_element) if not self.has_pseudo_elements(): self.get_value_buffer().persistent = False break else: raise ValueError( "{} is not a pseudo element of {}".format( element.name, self.name) )
[docs] def get_value_attribute(self) -> SardanaAttribute: """Returns the value attribute object for this experiment channel :return: the value attribute """ return self._value
[docs] def get_value_buffer(self) -> SardanaBuffer: """Returns the value attribute object for this experiment channel :return: the value attribute """ return self._value_buffer
[docs] def get_value_ref_attribute(self) -> SardanaAttribute: """Returns the value attribute object for this experiment channel .. note:: The get_value_ref_attribute method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :return: the value attribute """ return self._value_ref
[docs] def get_value_ref_buffer(self) -> SardanaBuffer: """Returns the value attribute object for this experiment channel .. note:: The get_value_ref_buffer method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :return: the value attribute """ return self._value_ref_buffer
# -------------------------------------------------------------------------- # Event forwarding # --------------------------------------------------------------------------
[docs] def on_change(self, evt_src, evt_type, evt_value): # forward all events coming from attributes to the listeners self.fire_event(evt_type, evt_value)
# -------------------------------------------------------------------------- # default acquisition channel # --------------------------------------------------------------------------
[docs] def get_default_attribute(self): return self.get_value_attribute()
# -------------------------------------------------------------------------- # acquisition # --------------------------------------------------------------------------
[docs] def get_acquisition(self): return self.get_action_cache()
acquisition = property(get_acquisition, doc="acquisition object") # -------------------------------------------------------------------------- # value # --------------------------------------------------------------------------
[docs] def read_value(self) -> sardana.sardanavalue.SardanaValue: """Reads the channel value from hardware. :return: a :class:`~sardana.sardanavalue.SardanaValue` containing the channel value :rtype: :class:`~sardana.sardanavalue.SardanaValue`""" return self.acquisition.read_value(serial=True)[self]
[docs] def put_value(self, value: sardana.sardanavalue.SardanaValue, quality: AttrQuality = AttrQuality.Valid, propagate: int = 1) -> Any: """Sets a value. :param value: the new value :param quality: the new quality :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ val_attr = self._value val_attr.set_quality(quality) val_attr.set_value(value, propagate=propagate) return val_attr
[docs] def get_value(self, cache: bool = True, propagate: int = 1) -> sardana.sardanaattribute.SardanaAttribute: """Returns the channel value. :param cache: if ``True`` (default) return value in cache, otherwise read value from hardware :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority :return: the channel value """ return self._get_value(cache=cache, propagate=propagate)
def _get_value(self, cache=True, propagate=1): value = self.get_value_attribute() value.update(cache=cache, propagate=propagate) return value
[docs] def set_value(self, value: float) -> None: """Starts an acquisition on this channel :param value: the value to count """ return self._set_value(value)
def _set_value(self, value): self.start_acquisition(value) value = property(get_value, set_value, doc="channel value") # ------------------------------------------------------------------------ # value buffer # ------------------------------------------------------------------------
[docs] def extend_value_buffer(self, values: sardana.sardanavalue.SardanaValue, idx: Optional[int] = None, propagate: int = 1) -> ValueBuffer: """Extend value buffer with new values assigning them consecutive indexes starting with idx. If idx is omitted, then the new values will be added right after the last value in the buffer. Also update the read value of the attribute with the last element of values. :param values: values to be added to the buffer :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ if len(values) == 0: return # fill value buffer val_buffer = self._value_buffer val_buffer.extend(values, idx) # update value attribute val_attr = self._value val_attr.set_value(values[-1], propagate=propagate) return val_buffer
[docs] def append_value_buffer(self, value: sardana.sardanavalue.SardanaValue, idx: Optional[int] = None, propagate: int = 1) -> ValueBuffer: """Extend value buffer with new values assigning them consecutive indexes starting with idx. If idx is omitted, then the new value will be added with right after the last value in the buffer. Also update the read value. :param value: value to be added to the buffer :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ # fill value buffer val_buffer = self._value_buffer val_buffer.append(value, idx) # update value attribute val_attr = self._value val_attr.set_value(value, propagate=propagate) return val_buffer
[docs] def clear_value_buffer(self): val_attr = self._value_buffer val_attr.clear()
# ------------------------------------------------------------------------ # value ref # ------------------------------------------------------------------------
[docs] def read_value_ref(self) -> sardana.sardanavalue.SardanaValue: """Reads the channel value ref from hardware. .. note:: The read_value_ref method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :return: a *sardana value* containing the channel value :rtype: :class:`~sardana.sardanavalue.SardanaValue`""" return self.acquisition.read_value_ref()[self]
[docs] def put_value_ref(self, value_ref: sardana.sardanavalue.SardanaValue, propagate: int = 1) -> ValueRef: """Sets a value ref. .. note:: The put_value_ref method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :param value_ref: the new value :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ val_ref_attr = self._value_ref val_ref_attr.set_value(value_ref, propagate=propagate) return val_ref_attr
[docs] def get_value_ref(self) -> sardana.SardanaValue: """Returns the channel value ref. .. note:: The get_value_ref method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :return: the channel value """ value_ref_attr = self.get_value_ref_attribute() if not value_ref_attr.has_value(): raise Exception("ValueRef not available: no successful" " acquisition done so far!") return value_ref_attr.value_obj
value_ref = property(get_value_ref, doc="channel value ref") # ------------------------------------------------------------------------ # value ref buffer # ------------------------------------------------------------------------
[docs] def extend_value_ref_buffer(self, value_refs: sardana.sardanavalue.SardanaValue, idx: Optional[int] = None, propagate: int = 1) -> ValueRefBuffer: """Extend value ref buffer with new values assigning them consecutive indexes starting with idx. If idx is omitted, then the new values will be added right after the last value ref in the buffer. Also update the read value ref of the attribute with the last element of values. .. note:: The extend_value_ref_buffer method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :param value_refs: values to be added to the buffer :param idx: index at which to append the value_refs :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ if len(value_refs) == 0: return # fill value ref buffer val_ref_buffer = self._value_ref_buffer val_ref_buffer.extend(value_refs, idx) # update value ref attribute val_ref_attr = self._value_ref val_ref_attr.set_value(value_refs[-1], propagate=propagate) return val_ref_buffer
[docs] def append_value_ref_buffer(self, value_ref: sardana.sardanavalue.SardanaValue, idx: Optional[int] = None, propagate: int = 1) -> ValueRefBuffer: """Append new value ref to the value ref buffer at the idx position. If idx is omitted, then the new value ref will be added right after the last value ref in the buffer. Also update the read value ref. .. note:: The append_value_ref_buffer method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :param value_ref: value ref to be added to the buffer :param idx: index at which to append the value_ref :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ # fill value ref buffer val_ref_buffer = self._value_ref_buffer val_ref_buffer.append(value_ref, idx) # update value ref attribute val_ref_attr = self._value_ref val_ref_attr.set_value(value_ref, propagate=propagate) return val_ref_buffer
[docs] def clear_value_ref_buffer(self): """Clear value ref buffer. .. note:: The clear_value_ref_buffer method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ val_ref_attr = self._value_ref_buffer val_ref_attr.clear()
# ------------------------------------------------------------------------ # value ref pattern # ------------------------------------------------------------------------
[docs] def get_value_ref_pattern(self) -> str: """Returns the channel value reference pattern. .. note:: The get_value_ref_pattern method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :return: the channel value """ return self._value_ref_pattern
[docs] def set_value_ref_pattern(self, value_ref_pattern: str, propagate: int = 1) -> None: """Set a value reference pattern in the kernel (not in the hardware). .. note:: The set_value_ref_pattern method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :param value_ref_pattern: the new value reference pattern :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ self._set_value_ref_pattern(value_ref_pattern, propagate=propagate)
def _set_value_ref_pattern(self, value_ref_pattern, propagate=1): self._value_ref_pattern = value_ref_pattern if not propagate: return self.fire_event( EventType("value_ref_pattern", priority=propagate), value_ref_pattern) value_ref_pattern = property(get_value_ref_pattern, set_value_ref_pattern, doc="channel value reference pattern") # ------------------------------------------------------------------------ # value ref enabled # ------------------------------------------------------------------------
[docs] def is_value_ref_enabled(self) -> bool: """Check if value reference is enabled. .. note:: The is_value_ref_enabled method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :return: the channel value """ return self._value_ref_enabled
[docs] def set_value_ref_enabled(self, value_ref_enabled: bool, propagate: int = 1) -> None: """Set value reference enabled flag in the kernel (not in the hardware). .. note:: The set_value_ref_enabled method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. :param value_ref_enabled: the new value reference enabled :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ self._set_value_ref_enabled(value_ref_enabled, propagate=propagate)
def _set_value_ref_enabled(self, value_ref_enabled, propagate=1): self._value_ref_enabled = value_ref_enabled if not propagate: return self.fire_event( EventType("value_ref_enabled", priority=propagate), value_ref_enabled) value_ref_enabled = property(is_value_ref_enabled, set_value_ref_enabled, doc="channel value reference enabled")
[docs] def is_referable(self): """Check if channel has referable capability. .. note:: The is_referable method has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ return self.controller.is_referable()
# -------------------------------------------------------------------------- # integration time # --------------------------------------------------------------------------
[docs] def get_integration_time(self) -> float: """Return the integration time for this object. :return: the current integration time """ return self._integration_time
[docs] def set_integration_time(self, integration_time: float, propagate: int = 1) -> None: """Set the integration time for this object. :param integration_time: integration time in seconds to set :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ if integration_time == self._integration_time: # integration time is not changed. Do nothing return self._integration_time = integration_time if not propagate: return self.fire_event(EventType("integration_time", priority=propagate), integration_time)
integration_time = property(get_integration_time, set_integration_time, doc="channel integration time") # ------------------------------------------------------------------------- # shape # ------------------------------------------------------------------------- def _get_shape_from_max_dim_size(self): # MaxDimSize could actually become a standard fallback # in case the shape controller parameters is not implemented # reconsider it whenever backwards compatibility with reading the value # is about to be removed try: from sardana.pool.controller import MaxDimSize controller = self.controller axis_attr_info = controller.get_axis_attributes(self.axis) value_info = axis_attr_info["Value"] shape = value_info[MaxDimSize] except Exception as e: raise RuntimeError( "can not provide backwards compatibility, you must" "implement shape axis parameter") from e return shape
[docs] def get_shape(self, cache=True, propagate=1): if not cache or self._shape is None: shape = self.read_shape() self._set_shape(shape, propagate=propagate) return self._shape
def _set_shape(self, shape, propagate=1): self._shape = shape if not propagate: return self.fire_event( EventType("shape", priority=propagate), shape)
[docs] def read_shape(self): try: shape = self.controller.get_axis_par(self.axis, "shape") except Exception: shape = None if shape is None: # backwards compatibility for controllers not implementing # shape axis par if self.get_type() in (ElementType.OneDExpChannel, ElementType.TwoDExpChannel): self.warning( "not implementing shape axis parameter in 1D and 2D " "controllers is deprecated since 3.1.0") value = self.value.value if value is None: self.debug("could not get shape from value") shape = self._get_shape_from_max_dim_size() else: import numpy try: shape = numpy.shape(value) except Exception: self.debug("could not get shape from value") shape = self._get_shape_from_max_dim_size() # scalar channel else: shape = [] return shape
shape = property(get_shape, doc="channel value shape") def _prepare(self): # TODO: think of implementing the preparation in the software # acquisition action, similarly as it is done for the global # acquisition action if self.is_referable(): self.controller.set_axis_par(self.axis, "value_ref_enabled", self.value_ref_enabled) if self.value_ref_enabled: self.controller.set_axis_par(self.axis, "value_ref_pattern", self.value_ref_pattern)
[docs] def start_acquisition(self): msg = "{0} does not support independent acquisition".format( self.__class__.__name__) raise NotImplementedError(msg)
class PoolTimerableChannel(PoolBaseChannel): def __init__(self, **kwargs): PoolBaseChannel.__init__(self, **kwargs) # TODO: part of the configuration could be moved to the base class # so other experimental channels could reuse it self._conf_channel = ChannelConfiguration(self) self._conf_ctrl = ControllerConfiguration(self.controller) self._conf_ctrl.add_channel(self._conf_channel) self._timer = "__default" self._conf_timer = None # ------------------------------------------------------------------------ # timer # ------------------------------------------------------------------------ def get_timer(self) -> str: """Return the timer for this object. :return: the current timer """ return self._timer def set_timer(self, timer: str, propagate: int = 1) -> None: """Set timer for this object. :param timer: new timer to set :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ if timer == self._timer: return if self._conf_timer is not None: timer_elem = self._conf_timer.element if timer_elem is not self: self.acquisition.remove_element(timer_elem) self._conf_ctrl.remove_channel(self._conf_timer) self._timer = timer self._conf_timer = None if not propagate: return self.fire_event(EventType("timer", priority=propagate), timer) timer = property(get_timer, set_timer, doc="timer for the timerable channel") def _configure_timer(self): timer = self.timer if timer == "__self": conf_timer = self._conf_channel else: ctrl = self.get_controller() if timer == "__default": axis = ctrl.get_default_timer() if axis is None: msg = "default_timer not defined in controller" raise ValueError(msg) timer_elem = ctrl.get_element(axis=axis) else: timer_elem = self.pool.get_element_by_name(timer) if timer_elem is self: conf_timer = self._conf_channel else: self.acquisition.add_element(timer_elem) conf_timer = ChannelConfiguration(timer_elem) self._conf_ctrl.add_channel(conf_timer) self._conf_ctrl.timer = conf_timer self._conf_timer = conf_timer def start_acquisition(self): """Start software triggered acquisition""" if self._simulation_mode: return if self.timer is None: msg = "no timer configured - acquisition is not possible" raise RuntimeError(msg) self._prepare() ctrls, master = get_timerable_items( [self._conf_ctrl], self._conf_timer, AcqMode.Timer) self.acquisition.run(ctrls, self.integration_time, master, None) def _prepare(self): # TODO: think of implementing the preparation in the software # acquisition action, similarly as it is done for the global # acquisition action PoolBaseChannel._prepare(self) if self._conf_timer is None: self._configure_timer() axis = self._conf_timer.axis repetitions = 1 latency = 0 nb_starts = 1 self.controller.set_ctrl_par("synchronization", AcqSynch.SoftwareTrigger) self.controller.ctrl.PrepareOne(axis, self.integration_time, repetitions, latency, nb_starts)