#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool library. It defines the
PoolPseudoCounter class"""
__all__ = ["PoolPseudoCounter"]
__docformat__ = 'restructuredtext'
import sys
import time
from typing import Iterator, Dict, Optional
import sardana
from sardana import State, ElementType, TYPE_PHYSICAL_ELEMENTS
from sardana.sardanaattribute import SardanaAttribute
from sardana.sardanabuffer import EarlyValueException, LateValueException
from sardana.sardanaexception import SardanaException
from sardana.sardanavalue import SardanaValue
from sardana.pool.poolexception import PoolException
from sardana.pool.poolbasechannel import PoolBaseChannel
from sardana.pool.poolbasechannel import ValueBuffer as ValueBuffer_
from sardana.pool.poolbasegroup import PoolBaseGroup
from sardana.pool.poolacquisition import PoolCTAcquisition
from sardana.pool.poolelement import PoolElement
class ValueBuffer(ValueBuffer_):
def __init__(self, *args, **kwargs):
super(ValueBuffer, self).__init__(*args, **kwargs)
# The Pool will not be able to start with a hierarchy of pseudo counters
# and when the upper level pseudo counter is initialized before the lower
# level pseudo counter. It seems like the order of initialization is alphabetical.
# In this case postpone the listener configuration until
# PoolPseudoCounter.calculate_state_info().
self._listeners_configured = False
try:
for value_buffer_attr in self.obj.get_physical_value_buffer_iterator():
value_buffer_attr.add_listener(self.on_change)
self._listeners_configured = True
except KeyError:
pass
def on_change(self, evt_src, evt_type, evt_value):
for idx in evt_value.keys():
physical_values = []
for value_buf in self.obj.get_physical_value_buffer_iterator():
try:
value = value_buf.get_value(idx)
except EarlyValueException:
return
except LateValueException:
self.remove_physical_values(idx)
break
physical_values.append(value)
else:
# loop collected all values so we can proceed to calculate
value = self.obj.calc(physical_values)
self.append(value, idx)
self.remove_physical_values(idx)
def remove_physical_values(self, idx, force=False):
for value_buf in self.obj.get_physical_value_buffer_iterator():
if force or not value_buf.is_value_required(idx):
try:
value_buf.remove(idx)
except KeyError:
# probably another pseudo counter on the same level
# already removed it
pass
class Value(SardanaAttribute):
def __init__(self, *args, **kwargs):
self._exc_info = None
super(Value, self).__init__(*args, **kwargs)
# The Pool will not be able to start with a hierarchy of pseudo counters
# and when the upper level pseudo counter is initialized before the lower
# level pseudo counter. It seems like the order of initialization is alphabetical.
# In this case postpone the listener configuration until
# PoolPseudoCounter.calculate_state_info().
self._listeners_configured = False
try:
for value_attr in self.obj.get_physical_value_attribute_iterator():
value_attr.add_listener(self.on_change)
self._listeners_configured = True
except KeyError:
pass
def _in_error(self):
for value_attr in self.obj.get_physical_value_attribute_iterator():
if value_attr.error:
return True
return self._exc_info is not None
def _has_value(self):
for value_attr in self.obj.get_physical_value_attribute_iterator():
if not value_attr.has_value():
return False
return True
def _get_value(self):
return self.calc().value
def _set_value(self, value, exc_info=None, timestamp=None, propagate=1):
raise Exception("Cannot set value for %s" % self.obj.name)
def _get_write_value(self):
w_values = self.get_physical_write_values()
return self.calc_pseudo(physical_values=w_values).value
def _set_write_value(self, w_value, timestamp=None, propagate=1):
raise Exception("Cannot set write value for %s" % self.obj.name)
def _get_exc_info(self):
exc_info = self._exc_info
if exc_info is None:
for value_attr in self.obj.get_physical_value_attribute_iterator():
if value_attr.error:
return value_attr.get_exc_info()
return exc_info
def _get_timestamp(self):
timestamps = [
value_attr.timestamp for value_attr in self.obj.get_physical_value_attribute_iterator()]
if not len(timestamps):
timestamps = self._local_timestamp,
return max(timestamps)
def get_physical_write_values(self):
ret = []
for value_attr in self.obj.get_physical_value_attribute_iterator():
if value_attr.has_write_value():
value = value_attr.w_value
else:
if not value_attr.has_value():
# if underlying counter doesn't have value yet, it is
# because of a cold start
value_attr.update(propagate=0)
if value_attr.in_error():
raise PoolException("Cannot get '%s' value" % value_attr.obj.name,
exc_info=value_attr.exc_info)
value = value_attr.value
ret.append(value)
return ret
def get_physical_values(self):
ret = []
for value_attr in self.obj.get_physical_value_attribute_iterator():
# if underlying channel doesn't have value yet, it is because
# of a cold start
if not value_attr.has_value():
value_attr.update(propagate=0)
if value_attr.in_error():
raise PoolException("Cannot get '%s' value" % value_attr.obj.name,
exc_info=value_attr.exc_info)
ret.append(value_attr.value)
return ret
def calc(self, physical_values=None):
self._exc_info = None
try:
obj = self.obj
if physical_values is None:
physical_values = self.get_physical_values()
else:
l_v, l_u = len(physical_values), len(obj.get_user_elements())
if l_v != l_u:
raise IndexError("CalcPseudo(%s): must give %d physical "
"values (you gave %d)" % (obj.name, l_u, l_v))
ctrl, axis = obj.controller, obj.axis
result = ctrl.calc(axis, physical_values)
if result.error:
self._exc_info = result.exc_info
except SardanaException as se:
self._exc_info = se.exc_info
result = SardanaValue(exc_info=se.exc_info)
except:
exc_info = sys.exc_info()
result = SardanaValue(exc_info=exc_info)
self._exc_info = exc_info
return result
def calc_all(self, physical_values=None):
self._exc_info = None
try:
obj = self.obj
if physical_values is None:
physical_values = self.get_physical_values()
else:
l_v, l_u = len(physical_values), len(obj.get_user_elements())
if l_v != l_u:
raise IndexError("CalcAllPseudo(%s): must give %d physical "
"values (you gave %d)" % (obj.name, l_u, l_v))
ctrl, axis = obj.controller, obj.axis
result = ctrl.calc_all(axis, physical_values)
except SardanaException as se:
self._exc_info = se.exc_info
result = SardanaValue(exc_info=se.exc_info)
except:
self._exc_info = sys.exc_info()
result = SardanaValue(exc_info=sys.exc_info())
return result
def on_change(self, evt_src, evt_type, evt_value):
self.fire_read_event(propagate=evt_type.priority)
def update(self, cache=True, propagate=1):
if cache:
for phy_elem_val in self.obj.get_low_level_physical_value_attribute_iterator():
if not phy_elem_val.has_value():
cache = False
break
if not cache:
values = self.obj.acquisition.read_value(serial=True)
if not len(values):
self._local_timestamp = time.time()
for acq_obj, value in list(values.items()):
acq_obj.put_value(value, propagate=propagate)
[docs]
class PoolPseudoCounter(PoolBaseGroup, PoolBaseChannel):
"""A class representing a Pseudo Counter in the Sardana Device Pool"""
ValueAttributeClass = Value
ValueBufferClass = ValueBuffer
AcquisitionClass = None
def __init__(self, **kwargs):
self._siblings = None
kwargs['elem_type'] = ElementType.PseudoCounter
kwargs['_create_attrs_and_buffers_on_init'] = False
PoolBaseChannel.__init__(self, **kwargs)
user_elements = kwargs.pop(
'user_elements',
self.controller.counter_ids
)
PoolBaseGroup.__init__(self, user_elements=user_elements,
pool=kwargs['pool'])
PoolBaseChannel._create_attrs_and_buffers(self)
[docs]
def serialize(self, *args, **kwargs):
kwargs = PoolBaseChannel.serialize(self, *args, **kwargs)
elements = [elem.name for elem in self.get_user_elements()]
physical_elements = []
for elem_list in list(self.get_physical_elements().values()):
for elem in elem_list:
physical_elements.append(elem.name)
cl_name = self.__class__.__name__
cl_name = cl_name[4:]
kwargs['elements'] = elements
kwargs['physical_elements'] = physical_elements
return kwargs
[docs]
def add_user_element(self, element, index=None):
index = PoolBaseGroup.add_user_element(self, element, index)
element.add_pseudo_element(self)
return index
[docs]
def on_element_changed(self, evt_src, evt_type, evt_value):
name = evt_type.name.lower()
# always calculate state.
status_info = self._calculate_states()
state, status = self.calculate_state_info(status_info=status_info)
state_propagate = 0
status_propagate = 0
if name == 'state':
state_propagate = evt_type.priority
elif name == 'status':
status_propagate = evt_type.priority
self.set_state(state, propagate=state_propagate)
self.set_status(status, propagate=status_propagate)
def _create_action_cache(self):
acq_name = "%s.Acquisition" % self._name
return PoolCTAcquisition(self, acq_name)
[docs]
def get_action_cache(self):
return self._get_action_cache()
[docs]
def set_action_cache(self, action_cache):
self._set_action_cache(action_cache)
[docs]
def get_siblings(self):
if self._siblings is None:
self._siblings = siblings = set()
for axis, sibling in \
list(self.controller.get_element_axis().items()):
if axis == self.axis:
continue
siblings.add(sibling)
return self._siblings
siblings = property(fget=get_siblings,
doc="the siblings for this pseudo counter")
# ------------------------------------------------------------------------
# value
# ------------------------------------------------------------------------
[docs]
def calc(self, physical_values=None):
return self.get_value_attribute().calc(physical_values=physical_values)
[docs]
def calc_all(self, physical_values=None):
return self.get_value_attribute().calc_all(physical_values=physical_values)
[docs]
def get_low_level_physical_value_attribute_iterator(self):
return self.get_physical_elements_attribute_iterator()
[docs]
def get_physical_value_attribute_iterator(self):
return self.get_user_elements_attribute_iterator()
[docs]
def get_physical_values_attribute_sequence(self):
return self.get_user_elements_attribute_sequence()
[docs]
def get_physical_values_attribute_map(self):
return self.get_user_elements_attribute_map()
[docs]
def get_physical_value_buffer_iterator(self) -> Iterator[sardana.sardanabuffer.SardanaBuffer]:
"""Returns an iterator over the value buffer of each user element.
:return: an iterator over the value buffer of each user element.
"""
for element in self.get_user_elements():
yield element.get_value_buffer()
[docs]
def get_physical_values(self, cache: bool = True, propagate: int = 1) -> Dict[PoolElement, sardana.sardanaattribute.SardanaAttribute]:
"""Get value for underlying elements.
:param cache:
if ``True`` (default) return value in cache, otherwise read value
from hardware
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:return:
the physical value
"""
self._value.update(cache=cache, propagate=propagate)
return self.get_physical_values_attribute_map()
[docs]
def get_siblings_values(self, use: Optional[Dict[PoolElement, sardana.sardanavalue.SardanaValue]] = None) -> Dict[PoolElement, float]:
"""Get the last values for all siblings.
:param use: the already calculated values. If a sibling is in this
dictionary, the value stored here is used instead
:return: a dictionary with siblings values
"""
values = {}
for sibling in self.siblings:
value_attr = sibling.get_value_attribute()
if use and sibling in use:
pos = use[sibling]
else:
pos = value_attr.value
values[sibling] = pos
return values
[docs]
def get_value(self, cache: bool = True, propagate: int = 1) -> sardana.sardanaattribute.SardanaAttribute:
"""Returns the pseudo counter value.
:param cache:
if ``True`` (default) return value in cache, otherwise read value
from hardware
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:return:
the pseudo counter value
"""
value_attr = self._value
value_attr.update(cache=cache, propagate=propagate)
return value_attr
[docs]
def set_value(self, value, propagate=1):
raise Exception("Not possible to set_value of a pseudo counter")
value = property(get_value, doc="pseudo counter value")
# --------------------------------------------------------------------------
# state information
# --------------------------------------------------------------------------
_STD_STATUS = "{name} is {state}\n{ctrl_status}"
[docs]
def calculate_state_info(self, status_info=None):
# Refer to Value.__init__() for an explanation on this 'hack'
if not self._value._listeners_configured:
for value_attr in self.get_physical_value_attribute_iterator():
value_attr.add_listener(self._value.on_change)
self._value._listeners_configured = True
# Refer to ValueBuffer.__init__() for an explanation on this 'hack'
if not self._value_buffer._listeners_configured:
for value_buffer_attr in self.get_physical_value_buffer_iterator():
value_buffer_attr.add_listener(self._value_buffer.on_change)
self._value_buffer._listeners_configured = True
if status_info is None:
status_info = self._state, self._status
state, status = status_info
if state == State.On:
state_str = "Stopped"
else:
state_str = "in " + State[state]
new_status = self._STD_STATUS.format(name=self.name, state=state_str,
ctrl_status=status)
state, new_status = self._calculate_init_attr_state_info(
state, new_status
)
return state, new_status
[docs]
def read_state_info(self, state_info=None):
if state_info is None:
state_info = {}
action_cache = self.get_action_cache()
ctrl_state_infos = action_cache.read_state_info(serial=True)
for obj, ctrl_state_info in list(ctrl_state_infos.items()):
state_info = obj._from_ctrl_state_info(ctrl_state_info)
obj.put_state_info(state_info)
for user_element in self.get_user_elements():
if user_element.get_type() not in TYPE_PHYSICAL_ELEMENTS:
state_info = user_element._calculate_states()
user_element.put_state_info(state_info)
ret = self._calculate_states()
return ret