Source code for sardana.pool.poolioregister

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Pool libray. It defines the base classes
for """

__all__ = ["PoolIORegister"]

__docformat__ = 'restructuredtext'

import time
from typing import Any

import sardana
from sardana import ElementType
from sardana.sardanaattribute import SardanaAttribute
from sardana.pool.poolelement import PoolElement
from sardana.pool.poolacquisition import PoolIORAcquisition


class Value(SardanaAttribute):

    def __init__(self, *args, **kwargs):
        super(Value, self).__init__(*args, **kwargs)

    def update(self, cache=True, propagate=1):
        if not cache or not self.has_value():
            value = self.obj.read_value()
            self.set_value(value, propagate=propagate)


[docs] class PoolIORegister(PoolElement): def __init__(self, **kwargs): kwargs['elem_type'] = ElementType.IORegister PoolElement.__init__(self, **kwargs) self._value = Value(self, listeners=self.on_change) self._config = None acq_name = "%s.Acquisition" % self._name self.set_action_cache(PoolIORAcquisition(self.pool, name=acq_name))
[docs] def get_value_attribute(self) -> sardana.sardanaattribute.SardanaAttribute: """Returns the value attribute object for this IO register :return: the value attribute """ return self._value
# ------------------------------------------------------------------------- # Event forwarding # -------------------------------------------------------------------------
[docs] def on_change(self, evt_src, evt_type, evt_value): # forward all events coming from attributes to the listeners self.fire_event(evt_type, evt_value)
# ------------------------------------------------------------------------- # default acquisition channel # -------------------------------------------------------------------------
[docs] def get_default_attribute(self): return self.get_value_attribute()
# ------------------------------------------------------------------------- # value # -------------------------------------------------------------------------
[docs] def read_value(self) -> sardana.sardanavalue.SardanaValue: """Reads the IO register value from hardware. :return: a :class:`~sardana.sardanavalue.SardanaValue` containing the IO register value """ return self.get_action_cache().read_value()[self]
[docs] def put_value(self, value: sardana.sardanavalue.SardanaValue, propagate: int = 1) -> Value: """Sets a value. :param value: the new value :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ val_attr = self._value val_attr.set_value(value, propagate=propagate) return val_attr
[docs] def get_value(self, cache=True, propagate=1): value = self.get_value_attribute() value.update(cache=cache, propagate=propagate) return value
[docs] def set_value(self, value, timestamp=None): self.write_register(value, timestamp=timestamp)
[docs] def set_write_value(self, w_value: float, timestamp: Any = None, propagate: int = 1) -> None: """Sets a new write value for the IO registere :param w_value: the new write value for IO register :param propagate: 0 for not propagating, 1 to propagate, 2 propagate with priority """ self._value.set_write_value(w_value, timestamp=timestamp, propagate=propagate)
value = property(get_value, set_value, doc="ioregister value")
[docs] def write_register(self, value, timestamp=None): self._aborted = False self._stopped = False if not self._simulation_mode: if timestamp is None: timestamp = time.time() self.set_write_value(value, timestamp=timestamp, propagate=0) self.controller.write_one(self.axis, value)