#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool library. It defines the base classes
for ZeroDExpChannel"""
__all__ = ["Pool0DExpChannel"]
__docformat__ = 'restructuredtext'
import numpy
import time
from typing import Any
import sardana
from sardana import ElementType
from sardana.sardanaevent import EventType
from sardana.sardanaattribute import SardanaAttribute
from sardana.sardanavalue import SardanaValue
from sardana.pool.poolbasechannel import PoolBaseChannel
from sardana.pool.poolacquisition import Pool0DAcquisition
class BaseAccumulation(object):
def __init__(self):
self.buffer = numpy.zeros(shape=(2, 16384), dtype=numpy.float64)
self.clear()
def clear(self):
self.nb_points = 0
self.value = None
self.timestamp = None
def get_value_buffer(self):
return self.buffer[0][:self.nb_points]
def get_time_buffer(self):
return self.buffer[1][:self.nb_points]
def append(self, value, timestamp=None):
if timestamp is None:
timestamp = time.time()
idx = self.nb_points
self.nb_points += 1
self.buffer[0][idx] = value
self.buffer[1][idx] = timestamp
self.update_value(value, timestamp)
def update_value(self, value, timestamp):
self.value = value
self.timestamp = timestamp
LastAccumulation = BaseAccumulation
class SumAccumulation(BaseAccumulation):
def clear(self):
BaseAccumulation.clear(self)
self.sum = 0.0
def update_value(self, value, timestamp):
BaseAccumulation.update_value(self, value, timestamp)
if not value is None:
self.sum += value
self.value = self.sum
class AverageAccumulation(SumAccumulation):
def clear(self):
SumAccumulation.clear(self)
self.nb_valid_points = 0
def update_value(self, value, timestamp):
SumAccumulation.update_value(self, value, timestamp)
if not value is None:
self.nb_valid_points += 1
self.value = self.sum / self.nb_valid_points
class IntegralAccumulation(BaseAccumulation):
def clear(self):
BaseAccumulation.clear(self)
self.sum = 0.0
self.last_value = None
self.start_time = None
def update_value(self, value, timestamp):
if self.nb_points == 1:
self.last_value = value, timestamp
self.start_time = timestamp
self.value = value
else:
last_value, last_timestamp = self.last_value
dt = timestamp - last_timestamp
self.sum += dt * (last_value + value) / 2
total_dt = timestamp - self.start_time
self.value = self.sum / total_dt
self.last_value = value, timestamp
def get_accumulation_class(ctype):
return globals()[ctype + "Accumulation"]
class CurrentValue(SardanaAttribute):
def update(self, cache=True, propagate=1):
if not cache or not self.has_value():
value = self.obj.read_current_value()
self.set_value(value, propagate=propagate)
class Value(SardanaAttribute):
DefaultAccumulationType = "Average"
def __init__(self, *args, **kwargs):
accumulation_type = kwargs.pop(
'accumulation_type', self.DefaultAccumulationType)
super(Value, self).__init__(*args, **kwargs)
self.set_accumulation_type(accumulation_type)
def get_val(self):
return self.obj.get_value_attribute()
def set_accumulation_type(self, ctype):
klass = get_accumulation_class(ctype)
self._accumulation = klass()
def get_accumulation_type(self):
klass_name = self._accumulation.__class__.__name__
return klass_name[:klass_name.index("Accumulation")]
def get_accumulation(self):
return self._accumulation
accumulation = property(get_accumulation)
def _get_value(self):
value = self._accumulation.value
if value is None:
raise Exception("Value not available: no successful acquisition"
" done so far!")
return value
def _get_value_obj(self):
'''Override superclass method and compose a SardanaValue object from
the present values.
'''
value = self._accumulation.value
# use timestamp of the last acquired sample as timestamp of
# accumulation
timestamp = self._accumulation.get_time_buffer()[-1]
value_obj = SardanaValue(value=value, timestamp=timestamp)
return value_obj
def _in_error(self):
# for the moment let's assume that 0D is never in error
# this could be improved by searching the accumulation buffer
# in presence of readout errors
return False
def _has_value(self):
return self.accumulation.value is not None
def _get_timestamp(self):
return self.accumulation.timestamp
def get_accumulation_buffer(self):
return self.accumulation.get_value_buffer()
def get_time_buffer(self):
return self.accumulation.get_time_buffer()
def clear_buffer(self):
self.accumulation.clear()
def append_buffer(self, value, propagate=1):
self.accumulation.append(value.value, value.timestamp)
if propagate > 0:
evt_type = EventType(self.name, priority=propagate)
self.fire_event(evt_type, self)
def update(self, cache=True, propagate=1):
# it is the Pool0DAcquisition action which is allowed to update
raise Exception("0D Value can not be updated from outside"
" of acquisition")
[docs]
class Pool0DExpChannel(PoolBaseChannel):
ValueAttributeClass = Value
AcquisitionClass = Pool0DAcquisition
def __init__(self, **kwargs):
kwargs['elem_type'] = ElementType.ZeroDExpChannel
PoolBaseChannel.__init__(self, **kwargs)
self._current_value = CurrentValue(self, listeners=self.on_change)
# -------------------------------------------------------------------------
# Accumulation
# -------------------------------------------------------------------------
[docs]
def get_accumulation_type(self):
return self.get_value_attribute().get_accumulation_type()
[docs]
def get_accumulation(self):
return self.get_value_attribute().get_accumulation()
[docs]
def set_accumulation_type(self, ctype):
return self.get_value_attribute().set_accumulation_type(ctype)
accumulation = property(get_accumulation)
accumulation_type = property(get_accumulation_type, set_accumulation_type)
# -------------------------------------------------------------------------
# value
# -------------------------------------------------------------------------
[docs]
def get_accumulated_value_attribute(self) -> sardana.sardanaattribute.SardanaAttribute:
"""Returns the accumulated value attribute object for this 0D.
:return: the accumulated value attribute
"""
return self.get_value_attribute()
[docs]
def get_current_value_attribute(self) -> sardana.sardanaattribute.SardanaAttribute:
"""Returns the current value attribute object for this 0D.
:return: the current value attribute
"""
return self._current_value
[docs]
def get_accumulated_value(self) -> sardana.sardanaattribute.SardanaAttribute:
"""Gets the accumulated value for this 0D.
:return:
a :class:`~sardana.sardanavalue.SardanaValue` containing the 0D
value
:raises: Exception if no acquisition has been done yet on this 0D"""
return self.get_accumulated_value_attribute()
[docs]
def read_current_value(self) -> sardana.sardanavalue.SardanaValue:
"""Reads the 0D value from hardware.
:return:
a :class:`~sardana.sardanavalue.SardanaValue` containing the counter
value
"""
return self.acquisition.read_value()[self]
[docs]
def put_current_value(self, value: sardana.sardanavalue.SardanaValue, propagate: int = 1) -> None:
"""Put a current value.
:param value:
the new value
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
"""
curr_val_attr = self.get_current_value_attribute()
curr_val_attr.set_value(value, propagate=propagate)
if self.is_in_operation():
acc_val_attr = self.get_accumulated_value_attribute()
acc_val_attr.append_buffer(value, propagate=propagate)
[docs]
def get_current_value(self, cache: Any = True, propagate: Any = 1) -> sardana.sardanaattribute.SardanaAttribute:
"""Returns the counter value.
:return:
the 0D accumulated value
"""
curr_val_attr = self.get_current_value_attribute()
curr_val_attr.update(cache=cache, propagate=propagate)
return curr_val_attr
current_value = property(get_current_value, doc="0D value")
accumulated_value = property(get_accumulated_value, doc="0D value")
[docs]
def clear_buffer(self):
self.get_accumulated_value_attribute().clear_buffer()
# -------------------------------------------------------------------------
# accumulation buffer
# -------------------------------------------------------------------------
[docs]
def get_accumulation_buffer(self):
return self.get_accumulated_value_attribute().get_accumulation_buffer()
accumulation_buffer = property(get_accumulation_buffer)
# -------------------------------------------------------------------------
# time buffer
# -------------------------------------------------------------------------
[docs]
def get_time_buffer(self):
return self.get_accumulated_value_attribute().get_time_buffer()
time_buffer = property(get_time_buffer)
[docs]
def start_acquisition(self, value=None):
self._aborted = False
self.clear_buffer()
if value is None:
raise Exception(
"Invalid integration_time '%s'. Hint set a new value for 'value' first" % value)
if not self._simulation_mode:
acq = self.acquisition.run()