#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""The device pool submodule.
It contains specific part of sardana device pool"""
__all__ = ["InterruptException", "StopException", "AbortException",
"ReleaseException",
"BaseElement", "ControllerClass", "ControllerLibrary",
"PoolElement", "Controller", "ComChannel", "ExpChannel",
"CTExpChannel", "ZeroDExpChannel", "OneDExpChannel",
"TwoDExpChannel", "PseudoCounter", "Motor", "PseudoMotor",
"MotorGroup", "TriggerGate",
"MeasurementGroup", "IORegister", "Instrument", "Pool",
"registerExtensions", "getChannelConfigs"]
__docformat__ = 'restructuredtext'
import copy
import operator
import os
import sys
import time
import traceback
import weakref
import json
from datetime import datetime
import threading
import collections
from typing import Optional, Sequence, List, Any, Dict, Tuple, Union, Callable
import numpy
import PyTango
from PyTango import DevState, AttrDataFormat, AttrQuality, DevFailed, \
DeviceProxy, AttributeProxy
import taurus
from taurus import Factory, Device
from taurus.core.taurusbasetypes import TaurusEventType
from taurus.core.tango.tangovalidator import TangoAttributeNameValidator, \
TangoDeviceNameValidator
from taurus.core.util.log import Logger
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.containers import CaselessDict
from taurus.core.util.event import EventGenerator, AttributeEventWait, \
AttributeEventIterator
from taurus.core.tango import TangoDevice, FROM_TANGO_TO_STR_TYPE
import sardana
from sardana import sardanacustomsettings
from .sardana import BaseSardanaElementContainer, BaseSardanaElement
from .motion import Moveable, MoveableSource
from sardana.pool import AcqSynchType
from sardana.pool.poolsynchronization import SynchDescription
from sardana.taurus.core.tango.sardana import PlotType
Ready = Standby = DevState.ON
Counting = Acquiring = Moving = DevState.MOVING
Alarm = DevState.ALARM
Fault = DevState.FAULT
CHANGE_EVT_TYPES = TaurusEventType.Change, TaurusEventType.Periodic
MOVEABLE_TYPES = 'Motor', 'PseudoMotor', 'MotorGroup'
QUALITY = {
AttrQuality.ATTR_VALID: 'VALID',
AttrQuality.ATTR_INVALID: 'INVALID',
AttrQuality.ATTR_CHANGING: 'CHANGING',
AttrQuality.ATTR_WARNING: 'WARNING',
AttrQuality.ATTR_ALARM: 'ALARM',
None: 'UNKNOWN'
}
def _is_referable(channel):
# Equivalent to ExpChannel.isReferable.
# Use DeviceProxy instead of taurus to avoid crashes in Py3
# See: tango-controls/pytango#292
if isinstance(channel, str):
channel = DeviceProxy(channel)
try:
attr_list = channel.get_attribute_list()
except DevFailed:
return False
return "valueref" in list(map(str.lower, attr_list))
[docs]
class InterruptException(Exception):
pass
[docs]
class StopException(InterruptException):
pass
[docs]
class AbortException(InterruptException):
pass
class ReleaseException(InterruptException):
pass
[docs]
class BaseElement(object):
""" The base class for elements in the Pool (Pool itself, Motor,
ControllerClass, ExpChannel all should inherit from this class directly or
indirectly)
"""
def __repr__(self):
pd = self.getPoolData()
return "{0}({1})".format(pd['type'], pd['full_name'])
def __str__(self):
return self.getName()
[docs]
def serialize(self):
return self.getPoolData()
[docs]
def str(self, n=0):
"""Returns a sequence of strings representing the object in
'consistent' way.
Default is to return <name>, <controller name>, <axis>
:param n: the number of elements in the tuple."""
if n == 0:
return CodecFactory.encode(('json'), self.serialize())
return self._str_tuple[:n]
def __lt__(self, o):
return self.getPoolData()['full_name'] < o.getPoolData()['full_name']
[docs]
def getName(self):
return self.getPoolData()['name']
[docs]
def getPoolObj(self):
"""Get reference to this object's Pool."""
return self._pool_obj
[docs]
def getPoolData(self):
"""Get reference to this object's Pool data."""
return self._pool_data
[docs]
class ControllerClass(BaseElement):
def __init__(self, **kw):
self.__dict__.update(kw)
self.path, self.f_name = os.path.split(self.file_name)
self.lib_name, self.ext = os.path.splitext(self.f_name)
def __repr__(self):
pd = self.getPoolData()
return "ControllerClass({0})".format(pd['full_name'])
[docs]
def getSimpleFileName(self):
return self.f_name
[docs]
def getFileName(self):
return self.file_name
[docs]
def getClassName(self):
return self.getName()
[docs]
def getType(self):
return self.getTypes()[0]
[docs]
def getTypes(self):
return self.types
[docs]
def getLib(self):
return self.f_name
[docs]
def getGender(self):
return self.gender
[docs]
def getModel(self):
return self.model
[docs]
def getOrganization(self):
return self.organization
def __lt__(self, o):
if self.getType() != o.getType():
return self.getType() < o.getType()
if self.getGender() != o.getGender():
return self.getGender() < o.getGender()
return self.getClassName() < o.getClassName()
class ControllerLibrary(BaseElement):
def __init__(self, **kw):
self.__dict__.update(kw)
def getType(self):
return self.getTypes()[0]
def getTypes(self):
return self.type
class TangoAttributeEG(Logger, EventGenerator):
"""An event generator for a 'State' attribute"""
def __init__(self, attr):
self._attr = attr
self.call__init__(Logger, 'EG', attr)
event_name = '%s EG' % (attr.getParentObj().getNormalName())
self.call__init__(EventGenerator, event_name)
self._attr.addListener(self)
def getAttribute(self):
return self._attr
def eventReceived(self, evt_src, evt_type, evt_value):
"""Event handler from Taurus"""
if evt_type not in CHANGE_EVT_TYPES:
return
if evt_value is None:
v = None
else:
v = evt_value.rvalue
if hasattr(v, "magnitude"):
v = v.magnitude
EventGenerator.fireEvent(self, v)
def read(self, force=False):
try:
last_val = self._attr.read(cache=not force).rvalue
if hasattr(last_val, "magnitude"):
last_val = last_val.magnitude
self.last_val = last_val
except:
self.error("Read error")
self.debug("Details:", exc_info=1)
self.last_val = None
return EventGenerator.read(self)
def readValue(self, force=False):
r = self.read(force=force)
if r is None:
# do a retry
r = self.read(force=force)
return r
def write(self, value):
self._attr.write(value, with_read=False)
def __getattr__(self, name):
return getattr(self._attr, name)
def reservedOperation(fn):
def new_fn(*args, **kwargs):
self = args[0]
wr = self.getReservedWR()
if wr is not None:
if wr().isStopped():
raise StopException("stopped before calling %s" % fn.__name__)
elif wr().isAborted():
raise AbortException("aborted before calling %s" % fn.__name__)
try:
return fn(*args, **kwargs)
except AbortException:
self._clearEventWait()
raise
except:
self.debug("Exception occurred in reserved operation:"
" clearing events...")
self._clearEventWait()
raise
return new_fn
def get_pool_for_device(db, device):
server_devs = db.get_device_class_list(device.info().server_id)
for dev_name, klass_name in zip(server_devs[0::2], server_devs[1::2]):
if klass_name == "Pool":
return Device(dev_name)
[docs]
class PoolElement(BaseElement, TangoDevice):
"""Base class for a Pool element device."""
def __init__(self, name, **kwargs):
"""PoolElement initialization."""
self._reserved = None
self._evt_wait = None
self.__go_start_time = 0
self.__go_end_time = 0
self.__go_time = 0
self._total_go_time = 0
self.call__init__(TangoDevice, name, **kwargs)
# dict<string, TangoAttributeEG>
# key : the attribute name
# value : the corresponding TangoAttributeEG
self._attrEG = CaselessDict()
# subscribing to events while the device is being
# deleted may lead to a segmentation fault - see issue#1896
device_proxy = self.getDeviceProxy()
if device_proxy is None:
raise RuntimeError("Could not create PoolElement "
"(corresponding DeviceProxy could not be created, "
"maybe the device does not exist?)")
try:
device_proxy.ping()
except DevFailed as df:
raise RuntimeError("Could not create PoolElement "
"(device does not respond to ping)") from df
# force the creation of a state and status attributes
self.getStateEG()
self.getStatusEG()
[docs]
def reconfig(self):
"""Reconfigure (initialize) element in the server
This includes:
- recreation of the device and its attributes (Tango)
- writing of the memorized attributes (last set values)
Usefull when there was a problem with the hardware.
.. note::
Calling this method on the `~sardana.taurus.core.tango.sardana.pool.Controller`
will usually also require to call it on the controller's elements.
"""
self.getPoolObj().reconfigObj(self)
def _find_pool_obj(self):
pool = get_pool_for_device(self.getParentObj(), self.getDeviceProxy())
return pool
def _find_pool_data(self):
pool = self._find_pool_obj()
return pool.getElementInfo(self.getFullName())._data
# Override BaseElement.getPoolObj because the reference to pool object may
# not be filled. This reference is filled when the element is obtained
# using Pool.getObject. If one obtain the element directly using Taurus
# e.g. mot = taurus.Device(<mot_name>) it won't be filled. In this case
# look for the pool object using the database information.
[docs]
def getPoolObj(self):
try:
return self._pool_obj
except AttributeError:
self._pool_obj = self._find_pool_obj()
return self._pool_obj
# Override BaseElement.getPoolData because the reference to pool data may
# not be filled. This reference is filled when the element is obtained
# using Pool.getPoolData. If one obtain the element directly using Taurus
# e.g. mot = taurus.Device(<mot_name>) it won't be filled. In this case
# look for the pool object and its data using the database information.
[docs]
def getPoolData(self):
try:
return self._pool_data
except AttributeError:
self._pool_data = self._find_pool_data()
return self._pool_data
[docs]
def cleanUp(self):
TangoDevice.cleanUp(self)
self._reserved = None
f = self.factory()
attr_map = self._attrEG
for attr_name in list(attr_map.keys()):
attrEG = attr_map.pop(attr_name)
attr = attrEG.getAttribute()
attrEG = None
f.removeExistingAttribute(attr)
[docs]
def reserve(self, obj):
if obj is None:
self._reserved = None
return
self._reserved = weakref.ref(obj, self._unreserveCB)
def _unreserveCB(self, obj):
self.unreserve()
[docs]
def unreserve(self):
self._reserved = None
[docs]
def isReserved(self, obj=None):
if obj is None:
return self._reserved is not None
else:
o = self._reserved()
return o == obj
[docs]
def getReservedWR(self):
return self._reserved
[docs]
def getReserved(self):
if self._reserved is None:
return None
return self._reserved()
[docs]
def dump_attributes(self):
attr_names = self.get_attribute_list()
req_id = self.read_attributes_asynch(attr_names)
return self.read_attributes_reply(req_id, 2000)
def _getAttrValue(self, name, force=False):
attrEG = self._getAttrEG(name)
if attrEG is None:
return None
return attrEG.readValue(force=force)
def _getAttrEG(self, name):
attrEG = self.getAttrEG(name)
if attrEG is None:
attrEG = self._createAttribute(name)
return attrEG
def _createAttribute(self, name):
attrObj = self.getAttribute(name)
if attrObj is None:
self.warning("Unable to create attribute %s" % name)
return None, None
attrEG = TangoAttributeEG(attrObj)
self._attrEG[name] = attrEG
return attrEG
def _getEventWait(self):
if self._evt_wait is None:
# create an object that waits for attribute events.
# each time we use it we have to connect and disconnect to an
# attribute
self._evt_wait = AttributeEventWait()
return self._evt_wait
def _clearEventWait(self):
self._evt_wait = None
[docs]
def getStateEG(self):
return self._getAttrEG('state')
[docs]
def getStatusEG(self) -> EventGenerator:
"""Get Status attribute event generator.
First call to this method will create the attribute event generator
object and store it for future references. Further calls will return
the same object.
"""
return self._getAttrEG('status')
[docs]
def getControllerName(self):
return self.getControllerObj().name
[docs]
def getControllerObj(self):
full_ctrl_name = self.getPoolData()['controller']
return self.getPoolObj().getObj(full_ctrl_name, "Controller")
[docs]
def getAxis(self):
return self.getPoolData()['axis']
[docs]
def getType(self):
return self.getPoolData()['type']
[docs]
def waitReady(self, timeout=None):
return self.getStateEG().waitEvent(Moving, equal=False,
timeout=timeout)
[docs]
def getAttrEG(self, name):
"""Returns the TangoAttributeEG object"""
return self._attrEG.get(name)
[docs]
def removeAttrEG(self, name):
"""Remove the TangoAttributeEG object"""
self._attrEG.pop(name)
[docs]
def getAttrObj(self, name):
"""Returns the taurus.core.tangoattribute.TangoAttribute object"""
attrEG = self._attrEG.get(name)
if attrEG is None:
return None
return attrEG.getAttribute()
[docs]
def getInstrumentObj(self):
return self._getAttrEG('instrument')
[docs]
def getInstrumentName(self, force=False):
instr_name = self._getAttrValue('instrument', force=force)
if not instr_name:
return ''
# instr_name = instr_name[:instr_name.index('(')]
return instr_name
[docs]
def setInstrumentName(self, instr_name):
self.getInstrumentObj().write(instr_name)
[docs]
def getInstrument(self):
instr_name = self.getInstrumentName()
if not instr_name:
return None
return self.getPoolObj().getObj("Instrument", instr_name)
[docs]
@reservedOperation
def start(self, *args, **kwargs):
evt_wait = self._getEventWait()
state_attr = self.getAttribute("state")
evt_wait.connect(state_attr)
try:
if not evt_wait.waitForEvent((Ready, Alarm),
timeout=3, reactivity=.1):
state = state_attr.read(cache=False).rvalue
# state is DevState enumeration (IntEnum)
state_str = state.name.capitalize()
if state not in (Ready, Alarm):
state_event = evt_wait.getLastRecordedEvent()
state_event_str = state_event.name.capitalize()
raise RuntimeError(
"{} state is {} (last event: {}).".format(
self.name,
state_str,
state_event_str
)
+ " Can not proceed to start."
)
# Clear event set to not confuse the value coming from the
# connection with the event of of end of the operation
# in the next wait event. This was observed on Windows where
# the time stamp resolution is very poor.
evt_wait.clearEventSet()
self.__go_time = 0
self.__go_start_time = ts1 = time.time()
self._start(*args, **kwargs)
ts2 = time.time()
evt_wait.waitForEvent((DevState.MOVING, ), after=ts1)
except Exception as e:
evt_wait.disconnect()
raise e
ts2 = evt_wait.getRecordedEvents().get(DevState.MOVING, ts2)
return (ts2,)
[docs]
def waitFinish(self, timeout: Optional[float] = None, id: Optional[Sequence[float]] = None):
"""Wait for the operation to finish
:param timeout: optional timeout (seconds)
:param id: id of the opertation returned by start
"""
if id is not None:
id = id[0]
evt_wait = self._getEventWait()
reactivity = 0.1
if timeout is not None and timeout <= reactivity:
reactivity = None # no reactivity, just wait timeout
try:
evt_wait.waitForEvent((DevState.MOVING, ), after=id, equal=False,
timeout=timeout, reactivity=reactivity)
finally:
self.__go_end_time = time.time()
self.__go_time = self.__go_end_time - self.__go_start_time
evt_wait.disconnect()
[docs]
@reservedOperation
def go(self, *args, **kwargs):
self._total_go_time = 0
start_time = time.time()
eid = self.start(*args, **kwargs)
timeout = kwargs.get('timeout')
self.waitFinish(id=eid, timeout=timeout)
self._total_go_time = time.time() - start_time
[docs]
def getLastGoTime(self):
"""Returns the time it took for last go operation"""
return self.__go_time
[docs]
def getTotalLastGoTime(self):
"""Returns the time it took for last go operation, including dead time
to prepare, wait for events, etc"""
return self._total_go_time
[docs]
def abort(self, synch: bool = True, timeout: Optional[float] = None, **kwargs) -> None:
"""Executes the "Abort" command on the element
:param synch: If True, waits the Abort process to finish.
:param timeout: Wait timeout (in seconds). Only used if synch=True.
"""
if 'wait_ready' in kwargs:
self.warning('Deprecation warning: since version 3.5.0 you should use '
'"synch" instead of "wait_ready"')
synch = kwargs['wait_ready']
if not synch:
self.command_inout("Abort")
return
state = self.getStateEG()
state.lock()
try:
self.command_inout("Abort")
self.waitReady(timeout=timeout)
finally:
state.unlock()
[docs]
def stop(self, synch=True, timeout=None, **kwargs):
"""Executes the "Stop" command on the element
:param synch: If True, waits the Stop process to finish.
:param timeout: Wait timeout (in seconds). Only used if synch=True.
"""
if 'wait_ready' in kwargs:
self.warning('Deprecation warning: since version 3.5.0 you should use '
'"synch" instead of "wait_ready"')
synch = kwargs['wait_ready']
if not synch:
self.command_inout("Stop")
return
state = self.getStateEG()
state.lock()
try:
self.command_inout("Stop")
self.waitReady(timeout=timeout)
finally:
state.unlock()
[docs]
def release(self):
"""Release hung element from its operation and make cleanup.
Before releasing you should try stopping/aborting the element.
"""
# Remove state event generator to force subscription
# and synchronous state readout on the next start.
# This allows to reuse the element if the cause of the hung
# was fixed.
self.removeAttrEG("State")
self.command_inout("Release")
def _information(self, tab=' '):
indent = "\n" + tab + 10 * ' '
msg = [self.getName() + ":"]
try:
t = time.time()
state_time = datetime.fromtimestamp(t).strftime("%H:%M:%S.%f")
# TODO: use expiration_period=float("inf") to always use event
# value (taurus-org/taurus#1105)
state = self.stateObj.read()
state_time = state.time.strftime("%H:%M:%S.%f")
# state_value is DevState enumeration (IntEnum)
state = state.rvalue.name.capitalize()
except DevFailed as df:
if len(df.args):
state = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
state = traceback.format_exception_only(*e_info)[0].rstrip()
except Exception:
e_info = sys.exc_info()[:2]
state = traceback.format_exception_only(*e_info)[0].rstrip()
msg.append(tab + " State: " + state + " ({})".format(state_time))
try:
t = time.time()
status_time = datetime.fromtimestamp(t).strftime("%H:%M:%S.%f")
status = self.getStatusEG().getAttribute().read()
status_time = status.time.strftime("%H:%M:%S.%f")
status = status.value.replace('\n', indent)
except DevFailed as df:
if len(df.args):
status = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
status = traceback.format_exception_only(*e_info)[0].rstrip()
except Exception:
e_info = sys.exc_info()[:2]
status = traceback.format_exception_only(*e_info)[0].rstrip()
msg.append(tab + " Status: " + status + " ({})".format(status_time))
return msg
[docs]
class Controller(PoolElement):
""" Class encapsulating Controller functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
[docs]
def getModuleName(self):
return self.getPoolData()['module']
[docs]
def getClassName(self):
return self.getPoolData()['klass']
[docs]
def getTypes(self):
return self.getPoolData()['types']
[docs]
def getMainType(self):
return self.getPoolData()['main_type']
[docs]
def addElement(self, elem):
axis = elem.getAxis()
self._elems[axis] = elem
self._last_axis = max(self._last_axis, axis)
[docs]
def removeElement(self, elem):
axis = elem.getAxis()
del self._elems[elem.getAxis()]
if axis == self._last_axis:
self._last_axis = max(self._elems)
[docs]
def getElementByAxis(self, axis):
pool = self.getPoolObj()
for _, elem in \
list(pool.getElementsOfType(self.getMainType()).items()):
if (elem.controller != self.getFullName() or
elem.getAxis() != axis):
continue
return elem
[docs]
def getElementByName(self, name):
pool = self.getPoolObj()
for _, elem in \
list(pool.getElementsOfType(self.getMainType()).items()):
if (elem.controller != self.getFullName() or
elem.getName() != name):
continue
return elem
[docs]
def getUsedAxes(self) -> List[int]:
"""Return axes in use by this controller
:return: list of axes
"""
pool = self.getPoolObj()
axes = []
for _, elem in \
list(pool.getElementsOfType(self.getMainType()).items()):
if elem.controller != self.getFullName():
continue
axes.append(elem.getAxis())
return sorted(axes)
[docs]
def getLastUsedAxis(self) -> Optional[int]:
"""Return the last used axis (the highest axis) in this controller
:return: last used axis
"""
used_axes = self.getUsedAxes()
if len(used_axes) == 0:
return None
return max(used_axes)
def __lt__(self, o):
return self.getName() < o.getName()
class ComChannel(PoolElement):
""" Class encapsulating CommunicationChannel functionality."""
pass
[docs]
class ExpChannel(PoolElement):
""" Class encapsulating ExpChannel functionality."""
def __init__(self, name, **kw):
"""ExpChannel initialization."""
self.call__init__(PoolElement, name, **kw)
self._last_integ_time = None
self._last_value_ref_pattern = None
self._last_value_ref_enabled = None
self._value_buffer = {}
self._value_buffer_cb = None
codec_name = getattr(sardanacustomsettings, "VALUE_BUFFER_CODEC")
self._value_buffer_codec = CodecFactory().getCodec(codec_name)
self._value_ref_buffer = {}
self._value_ref_buffer_cb = None
codec_name = getattr(sardanacustomsettings, "VALUE_REF_BUFFER_CODEC")
self._value_ref_buffer_codec = CodecFactory().getCodec(codec_name)
[docs]
def isReferable(self):
if "valueref" in list(map(str.lower, self.get_attribute_list())):
return True
return False
[docs]
def getIntegrationTime(self):
return self._getAttrValue('IntegrationTime')
[docs]
def getIntegrationTimeObj(self):
return self._getAttrEG('IntegrationTime')
[docs]
def setIntegrationTime(self, integ_time):
self.getIntegrationTimeObj().write(integ_time)
[docs]
def putIntegrationTime(self, integ_time):
if self._last_integ_time == integ_time:
return
self._last_integ_time = integ_time
self.getIntegrationTimeObj().write(integ_time)
[docs]
def getValueObj_(self) -> TangoAttributeEG:
"""Retrurns Value attribute event generator object.
:return: Value attribute event generator
..todo:: When support to Taurus 3 will be dropped provide getValueObj.
Taurus 3 TaurusDevice class already uses this name.
"""
return self._getAttrEG('value')
[docs]
def getValue(self, force=False):
return self._getAttrValue('value', force=force)
[docs]
def getValueBufferObj(self):
return self._getAttrEG('valuebuffer')
[docs]
def getValueBuffer(self):
return self._value_buffer
[docs]
def valueBufferChanged(self, value_buffer):
if value_buffer is None:
return
_, value_buffer = self._value_buffer_codec.decode(value_buffer)
indexes = value_buffer["index"]
values = value_buffer["value"]
for index, value in zip(indexes, values):
self._value_buffer[index] = value
[docs]
def getValueRefObj(self) -> TangoAttributeEG:
"""Return ValueRef attribute event generator object.
:return: ValueRef attribute event generator
"""
return self._getAttrEG('value')
[docs]
def getValueRef(self, force=False):
return self._getAttrValue('valueref', force=force)
[docs]
def getValueRefBufferObj(self):
return self._getAttrEG('valuerefbuffer')
[docs]
def getValueRefBuffer(self):
return self._value_ref_buffer
[docs]
def valueBufferRefChanged(self, value_ref_buffer):
if value_ref_buffer is None:
return
_, value_ref_buffer = self._value_ref_buffercodec.decode(
value_ref_buffer)
indexes = value_ref_buffer["index"]
value_refs = value_ref_buffer["value_ref"]
for index, value_ref in zip(indexes, value_refs):
self._value_ref_buffer[index] = value_ref
[docs]
def getValueRefPattern(self):
return self._getAttrValue('ValueRefPattern')
[docs]
def getValueRefPatternObj(self):
return self._getAttrEG('ValueRefPattern')
[docs]
def setValueRefPattern(self, value_ref_pattern):
self.getValueRefPatternObj().write(value_ref_pattern)
[docs]
def putValueRefPattern(self, value_ref_pattern):
if self._last_value_ref_pattern == value_ref_pattern:
return
self._last_value_ref_pattern = value_ref_pattern
self.getValueRefPatternObj().write(value_ref_pattern)
[docs]
def isValueRefEnabled(self):
return self._getAttrValue('ValueRefEnabled')
[docs]
def getValueRefEnabledObj(self):
return self._getAttrEG('ValueRefEnabled')
[docs]
def setValueRefEnabled(self, value_ref_enabled):
self.getValueRefEnabledObj().write(value_ref_enabled)
[docs]
def putValueRefEnabled(self, value_ref_enabled):
if self._last_value_ref_enabled == value_ref_enabled:
return
self._last_value_ref_enabled = value_ref_enabled
self.getValueRefEnabledObj().write(value_ref_enabled)
def _start(self, *args, **kwargs):
self.Start()
[docs]
def go(self, *args: Any, **kwargs: Any) -> tuple:
"""Count and report count result.
Configuration measurement, then start and wait until finish.
.. note::
The count (go) method API is partially experimental (value
references may be changed to values whenever possible in the
future). Backwards incompatible changes may occur if deemed
necessary by the core developers.
:return: state and value (or value reference - experimental)
"""
start_time = time.time()
integration_time = args[0]
self.putIntegrationTime(integration_time)
PoolElement.go(self)
state = self.getStateEG().readValue()
if self.isReferable() and self.isValueRefEnabled():
result = self.getValueRef()
else:
result = self.getValue()
ret = state, result
self._total_go_time = time.time() - start_time
return ret
startCount = PoolElement.start
waitCount = PoolElement.waitFinish
count = go
stopCount = PoolElement.abort
stop = PoolElement.stop
class TimerableExpChannel(ExpChannel):
def getTimer(self):
return self._getAttrValue('Timer')
def getTimerObj(self):
return self._getAttrEG('Timer')
def setTimer(self, timer):
self.getTimerObj().write(timer)
[docs]
class CTExpChannel(TimerableExpChannel):
""" Class encapsulating CTExpChannel functionality."""
pass
[docs]
class ZeroDExpChannel(ExpChannel):
""" Class encapsulating ZeroDExpChannel functionality."""
pass
[docs]
class OneDExpChannel(TimerableExpChannel):
""" Class encapsulating OneDExpChannel functionality."""
pass
[docs]
class TwoDExpChannel(TimerableExpChannel):
""" Class encapsulating TwoDExpChannel functionality."""
pass
[docs]
class PseudoCounter(ExpChannel):
""" Class encapsulating PseudoCounter functionality."""
pass
[docs]
class TriggerGate(PoolElement):
""" Class encapsulating TriggerGate functionality."""
pass
[docs]
class Motor(PoolElement, Moveable):
""" Class encapsulating Motor functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
self.call__init__(Moveable)
[docs]
def getPosition(self, force=False):
return self._getAttrValue('position', force=force)
[docs]
def getDialPosition(self, force=False):
return self._getAttrValue('dialposition', force=force)
[docs]
def getVelocity(self, force=False):
return self._getAttrValue('velocity', force=force)
[docs]
def getAcceleration(self, force=False):
return self._getAttrValue('acceleration', force=force)
[docs]
def getDeceleration(self, force=False):
return self._getAttrValue('deceleration', force=force)
[docs]
def getBaseRate(self, force=False):
return self._getAttrValue('base_rate', force=force)
[docs]
def getBacklash(self, force=False):
return self._getAttrValue('backlash', force=force)
[docs]
def getLimitSwitches(self, force=False):
return self._getAttrValue('limit_switches', force=force)
[docs]
def getOffset(self, force=False):
return self._getAttrValue('offset', force=force)
[docs]
def getStepPerUnit(self, force=False):
return self._getAttrValue('step_per_unit', force=force)
[docs]
def getSign(self, force=False):
return self._getAttrValue('Sign', force=force)
[docs]
def getSimulationMode(self, force=False):
return self._getAttrValue('SimulationMode', force=force)
[docs]
def getPositionObj(self):
return self._getAttrEG('position')
[docs]
def getDialPositionObj(self):
return self._getAttrEG('dialposition')
[docs]
def getVelocityObj(self):
return self._getAttrEG('velocity')
[docs]
def getAccelerationObj(self):
return self._getAttrEG('acceleration')
[docs]
def getDecelerationObj(self):
return self._getAttrEG('deceleration')
[docs]
def getBaseRateObj(self):
return self._getAttrEG('base_rate')
[docs]
def getBacklashObj(self):
return self._getAttrEG('backlash')
[docs]
def getLimitSwitchesObj(self):
return self._getAttrEG('limit_switches')
[docs]
def getOffsetObj(self):
return self._getAttrEG('offset')
[docs]
def getStepPerUnitObj(self):
return self._getAttrEG('step_per_unit')
[docs]
def getSimulationModeObj(self):
return self._getAttrEG('step_per_unit')
[docs]
def setVelocity(self, value):
return self.getVelocityObj().write(value)
[docs]
def setAcceleration(self, value):
return self.getAccelerationObj().write(value)
[docs]
def setDeceleration(self, value):
return self.getDecelerationObj().write(value)
[docs]
def setBaseRate(self, value):
return self.getBaseRateObj().write(value)
[docs]
def setBacklash(self, value):
return self.getBacklashObj().write(value)
[docs]
def setOffset(self, value):
return self.getOffsetObj().write(value)
[docs]
def setStepPerUnit(self, value):
return self.getStepPerUnitObj().write(value)
[docs]
def setSign(self, value):
return self.getSignObj().write(value)
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# Moveable interface
#
def _start(self, *args, **kwargs):
new_pos = args[0]
if isinstance(new_pos, collections.abc.Sequence):
new_pos = new_pos[0]
try:
self.write_attribute('position', new_pos)
except DevFailed as df:
for err in df.args:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
[docs]
def go(self, *args, **kwargs):
start_time = time.time()
PoolElement.go(self, *args, **kwargs)
ret = self.getStateEG().readValue(), self.readPosition()
self._total_go_time = time.time() - start_time
return ret
startMove = PoolElement.start
waitMove = PoolElement.waitFinish
move = go
getLastMotionTime = PoolElement.getLastGoTime
getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs]
@reservedOperation
def iterMove(self, new_pos, timeout=None):
if isinstance(new_pos, collections.abc.Sequence):
new_pos = new_pos[0]
state, pos = self.getAttribute("state"), self.getAttribute("position")
evt_wait = self._getEventWait()
evt_wait.connect(state)
evt_wait.lock()
try:
# evt_wait.waitForEvent((DevState.MOVING, ), equal=False)
time_stamp = time.time()
try:
self.getPositionObj().write(new_pos)
except DevFailed as err_traceback:
for err in err_traceback.args:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
# putting timeout=0.1 and retries=1 is a patch for the case when
# the initial moving event doesn't arrive do to an unknown
# tango/pytango error at the time
evt_wait.waitForEvent((DevState.MOVING, ), time_stamp,
timeout=0.2, reactivity=0.1)
finally:
evt_wait.unlock()
evt_wait.disconnect()
evt_iter_wait = AttributeEventIterator(state, pos)
evt_iter_wait.lock()
try:
for evt_data in evt_iter_wait.events():
src, value = evt_data
if src == state and value != DevState.MOVING:
raise StopIteration
yield value
finally:
evt_iter_wait.unlock()
evt_iter_wait.disconnect()
[docs]
def readPosition(self, force=False):
return [self.getPosition(force=force)]
[docs]
def getMoveableSource(self):
return self.getPoolObj()
[docs]
def getSize(self):
return 1
[docs]
def getIndex(self, name):
if name.lower() == self.getName().lower():
return 0
return -1
#
# End of Moveable interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _information(self, tab=' '):
msg = PoolElement._information(self, tab=tab)
try:
position = self.read_attribute("position")
pos = str(position.value)
if position.quality != AttrQuality.ATTR_VALID:
pos += " [" + QUALITY[position.quality] + "]"
except DevFailed as df:
if len(df.args):
pos = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
msg.append(tab + "Position: " + str(pos))
return msg
[docs]
class PseudoMotor(PoolElement, Moveable):
""" Class encapsulating PseudoMotor functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
self.call__init__(Moveable)
[docs]
def getPosition(self, force=False):
return self._getAttrValue('position', force=force)
[docs]
def getDialPosition(self, force=False):
return self.getPosition(force=force)
[docs]
def getPositionObj(self):
return self._getAttrEG('position')
[docs]
def getDialPositionObj(self):
return self.getPositionObj()
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# Moveable interface
#
def _start(self, *args, **kwargs):
new_pos = args[0]
if isinstance(new_pos, collections.abc.Sequence):
new_pos = new_pos[0]
try:
self.write_attribute('position', new_pos)
except DevFailed as df:
for err in df.args:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
[docs]
def go(self, *args, **kwargs):
start_time = time.time()
PoolElement.go(self, *args, **kwargs)
ret = self.getStateEG().readValue(), self.readPosition()
self._total_go_time = time.time() - start_time
return ret
startMove = PoolElement.start
waitMove = PoolElement.waitFinish
move = go
getLastMotionTime = PoolElement.getLastGoTime
getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs]
def readPosition(self, force=False):
return [self.getPosition(force=force)]
[docs]
def getMoveableSource(self):
return self.getPoolObj()
[docs]
def getSize(self):
return 1
[docs]
def getIndex(self, name):
if name.lower() == self.getName().lower():
return 0
return -1
#
# End of Moveable interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _information(self, tab=' '):
msg = PoolElement._information(self, tab=tab)
try:
position = self.read_attribute("position")
pos = str(position.value)
if position.quality != AttrQuality.ATTR_VALID:
pos += " [" + QUALITY[position.quality] + "]"
except DevFailed as df:
if len(df.args):
pos = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
msg.append(tab + "Position: " + str(pos))
return msg
[docs]
class MotorGroup(PoolElement, Moveable):
""" Class encapsulating MotorGroup functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
self.call__init__(Moveable)
def _create_str_tuple(self):
return 3 * ["TODO"]
[docs]
def getMotorNames(self):
return self.getPoolData()['elements']
[docs]
def hasMotor(self, name):
motor_names = list(map(str.lower, self.getMotorNames()))
return name.lower() in motor_names
[docs]
def getPosition(self, force=False):
return self._getAttrValue('position', force=force)
[docs]
def getPositionObj(self):
return self._getAttrEG('position')
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# Moveable interface
#
def _start(self, *args, **kwargs):
new_pos = args[0]
try:
self.write_attribute('position', new_pos)
except DevFailed as df:
for err in df.args:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
[docs]
def go(self, *args, **kwargs):
start_time = time.time()
PoolElement.go(self, *args, **kwargs)
ret = self.getStateEG().readValue(), self.readPosition()
self._total_go_time = time.time() - start_time
return ret
startMove = PoolElement.start
waitMove = PoolElement.waitFinish
move = go
getLastMotionTime = PoolElement.getLastGoTime
getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs]
def readPosition(self, force=False):
return self.getPosition(force=force)
[docs]
def getMoveableSource(self):
return self.getPoolObj()
[docs]
def getSize(self):
return len(self.getMotorNames())
[docs]
def getIndex(self, name):
try:
motor_names = list(map(str.lower, self.getMotorNames()))
return motor_names.index(name.lower())
except:
return -1
#
# End of Moveable interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _information(self, tab=' '):
msg = PoolElement._information(self, tab=tab)
try:
position = self.read_attribute("position")
pos = str(position.value)
if position.quality != AttrQuality.ATTR_VALID:
pos += " [" + QUALITY[position.quality] + "]"
except DevFailed as df:
if len(df.args):
pos = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
msg.append(tab + "Position: " + str(pos))
return msg
class BaseChannelInfo(object):
def __init__(self, data):
# dict<str, obj>
# channel data
self.raw_data = data
self.__dict__.update(data)
class TangoChannelInfo(BaseChannelInfo):
def __init__(self, data, info):
BaseChannelInfo.__init__(self, data)
# PyTango.AttributeInfoEx
self.set_info(info)
def has_info(self):
return self.raw_info is not None
def set_info(self, info):
self.raw_info = info
if info is None:
return
data = self.raw_data
if 'data_type' not in data:
data_type = info.data_type
try:
self.data_type = FROM_TANGO_TO_STR_TYPE[data_type]
except KeyError as e:
# For backwards compatibility:
# starting from Taurus 4.3.0 DevVoid was added to the dict
if data_type == PyTango.DevVoid:
self.data_type = None
else:
raise e
if 'shape' not in data:
shape = ()
if info.data_format == AttrDataFormat.SPECTRUM:
shape = (info.max_dim_x,)
elif info.data_format == AttrDataFormat.IMAGE:
shape = (info.max_dim_x, info.max_dim_y)
self.shape = shape
else:
shape = self.shape
self.shape = list(shape)
def __getattr__(self, name):
if self.has_info():
return getattr(self.raw_info, name)
cls_name = self.__class__.__name__
raise AttributeError("'%s' has no attribute '%s'" % (cls_name, name))
def getChannelConfigs(mgconfig, ctrls=None, sort=True):
'''
gets a list of channel configurations of the controllers of the given
measurement group configuration. It optionally filters to those channels
matching given lists of controller.
:param ctrls: (seq<str> or None) a sequence of strings to filter the
controllers. If None given, all controllers will be used
:param sort: (bool) If True (default) the returned list will be sorted
according to channel index (if given in channeldata) and
then by channelname.
:return: (list<tuple>) A list of channelname,channeldata pairs.
'''
chconfigs = []
if not mgconfig:
return []
for ctrl_name, ctrl_data in list(mgconfig['controllers'].items()):
if ctrls is None or ctrl_name in ctrls:
for ch_name, ch_data in list(ctrl_data['channels'].items()):
ch_data.update({'_controller_name': ctrl_name})
chconfigs.append((ch_name, ch_data))
if sort:
# sort the channel configs by index (primary sort) and then by channel
# name.
# sort by channel_name
chconfigs = sorted(chconfigs, key=lambda c: c[0])
# sort by index (give a very large index for those which don't have it)
chconfigs = sorted(chconfigs, key=lambda c: c[1].get('index', 1e16))
return chconfigs
class MGConfiguration(object):
def __init__(self, mg, data):
self._mg = weakref.ref(mg)
self._raw_data = None
self._pending_event_data = None
self._local_changes = False
self.set_data(data)
def set_data(self, data, force=False):
# dict<str, list[DeviceProxy, CaselessDict<str, dict>]>
# where key is a device name and value is a list with two elements:
# - A device proxy or None if there was an error building it
# - A dict where keys are attribute names and value is a reference to
# a dict representing channel data as received in raw data
self.tango_dev_channels = None
# Number of elements in tango_dev_channels in error (could not build
# DeviceProxy, probably)
self.tango_dev_channels_in_error = 0
# dict<str, tuple<str, str, TangoChannelInfo>>
# where key is a channel name and value is a tuple of three elements:
# - device name
# - attribute name
# - attribute information or None if there was an error trying to get
# the information
self.tango_channels_info = None
# Number of elements in tango_channels_info_in_error in error
# (could not build attribute info, probably)
self.tango_channels_info_in_error = 0
# dict<str, dict>
# where key is a channel name and data is a reference to a dict
# representing channel data as received in raw data
self.non_tango_channels = None
# object each time
if isinstance(data, str):
data = CodecFactory().decode(('json', data))
if not force:
if self._raw_data == data:
# The new data received on the on_change_event was generated by
# this object.
return
elif self._local_changes:
self._pending_event_data = data
return
self._pending_event_data = None
self._local_changes = False
self._raw_data = data
self.__dict__.update(data)
# dict<str, dict>
# where key is the channel name and value is the channel data in form
# of a dict as received by the MG configuration attribute
self.channels = channels = CaselessDict()
self.channels_names = channels_names = CaselessDict()
self.channels_labels = channels_labels = CaselessDict()
self.controllers_names = controllers_names = CaselessDict()
self.controllers_channels = controllers_channels = CaselessDict()
self.controllers_alias = CaselessDict()
# TODO private controllers attr
for ctrl_name, ctrl_data in list(self.controllers.items()):
try:
if ctrl_name != '__tango__':
proxy = DeviceProxy(ctrl_name)
ctrl_full_name = ctrl_name
ctrl_name = proxy.alias()
self.controllers_alias[ctrl_full_name] = ctrl_name
controllers_names[ctrl_name] = ctrl_data
controllers_channels[ctrl_name] = []
except Exception:
pass
for channel_name, channel_data in \
list(ctrl_data['channels'].items()):
channels[channel_name] = channel_data
name = channel_data['name']
channels_names[name] = channel_data
label = channel_data['label']
channels_labels[name] = channel_data
index = channel_data['index']
ch_data = {'fullname': channel_name,
'label': label,
'name': name,
'index': index}
controllers_channels[ctrl_name].append(ch_data)
#####################
# @todo: the for-loops above could be replaced by something like:
# self.channels = channels = \
# CaselessDict(getChannelConfigs(data, sort=False))
#####################
# Create ordered list by channel index in the MG as cache
# channel_list: seq<dict> each element is the channel data in form
# of a dict as received by the MG configuration attribute.
# channel_list_name: seq<str>
# controller_list_names: seg<str>
self.channel_list = len(channels) * [None]
self.channel_list_name = len(channels) * [None]
self.controller_list_name = []
for channel, channel_data in channels.items():
idx = channel_data['index']
self.channel_list[idx] = channel_data
self.channel_list_name[idx] = channel
for channel_name in self.channel_list_name:
ctrl = self._get_ctrl_for_element(channel_name)
if ctrl not in self.controller_list_name:
self.controller_list_name.append(ctrl)
def _build(self):
# internal channel structure that groups channels by tango device so
# they can be read as a group minimizing this way the network requests
self.tango_dev_channels = tg_dev_chs = CaselessDict()
self.tango_dev_channels_in_error = 0
self.tango_channels_info = tg_chs_info = CaselessDict()
self.tango_channels_info_in_error = 0
self.non_tango_channels = n_tg_chs = CaselessDict()
self.cache = cache = {}
tg_attr_validator = TangoAttributeNameValidator()
for channel_name, channel_data in list(self.channels.items()):
if not channel_data.get("enabled", True):
continue
cache[channel_name] = None
data_source = channel_data['source']
params = tg_attr_validator.getUriGroups(data_source)
if params is None:
# Handle NON tango channel
n_tg_chs[channel_name] = channel_data
else:
# Handle tango channel
dev_name = params['devname'].lower()
attr_name = params['_shortattrname'].lower()
host, port = params.get('host'), params.get('port')
if host is not None and port is not None:
dev_name = "tango://{0}:{1}/{2}".format(host, port,
dev_name)
dev_data = tg_dev_chs.get(dev_name)
# technical debt: read Value or ValueRef attribute
# ideally the source configuration should include this info
# Use DeviceProxy instead of taurus to avoid crashes in Py3
# See: tango-controls/pytango#292
# channel = Device(dev_name)
# if (isinstance(channel, ExpChannel)
# and channel.isReferable()
# and channel_data.get("value_ref_enabled", False)):
if (_is_referable(dev_name)
and channel_data.get("value_ref_enabled", False)):
attr_name += "Ref"
if dev_data is None:
# Build tango device
dev = None
try:
dev = DeviceProxy(dev_name)
except:
self.tango_dev_channels_in_error += 1
tg_dev_chs[dev_name] = dev_data = [dev, CaselessDict()]
dev, attr_data = dev_data
attr_data[attr_name] = channel_data
# get attribute configuration
attr_info = None
if dev is None:
self.tango_channels_info_in_error += 1
else:
try:
tg_attr_info = dev.get_attribute_config_ex(attr_name)[
0]
except:
tg_attr_info = \
self._build_empty_tango_attr_info(channel_data)
self.tango_channels_info_in_error += 1
attr_info = TangoChannelInfo(channel_data, tg_attr_info)
tg_chs_info[channel_name] = dev_name, attr_name, attr_info
def _build_empty_tango_attr_info(self, channel_data):
ret = PyTango.AttributeInfoEx()
ret.name = channel_data['name']
ret.label = channel_data['label']
return ret
def prepare(self):
# first time? build everything
if self.tango_dev_channels is None:
return self._build()
# prepare missing tango devices
if self.tango_dev_channels_in_error > 0:
for dev_name, dev_data in list(self.tango_dev_channels.items()):
if dev_data[0] is None:
try:
dev_data[0] = DeviceProxy(dev_name)
self.tango_dev_channels_in_error -= 1
except:
pass
# prepare missing tango attribute configuration
if self.tango_channels_info_in_error > 0:
for _, attr_data in list(self.tango_channels_info.items()):
dev_name, attr_name, attr_info = attr_data
if attr_info.has_info():
continue
dev = self.tango_dev_channels[dev_name]
if dev is None:
continue
try:
tg_attr_info = dev.get_attribute_config_ex(attr_name)[0]
attr_info.set_info(tg_attr_info)
self.tango_channels_info_in_error -= 1
except:
pass
def getChannels(self):
return self.channel_list
def getChannelInfo(self, channel_name):
try:
return self.tango_channels_info[channel_name]
except Exception:
channel_name = channel_name.lower()
for d_name, a_name, ch_info in \
list(self.tango_channels_info.values()):
if ch_info.name.lower() == channel_name:
return d_name, a_name, ch_info
def getChannelsInfo(self, only_enabled: bool = False) -> Dict[str, Tuple[str, str, TangoChannelInfo]]:
"""Returns information about the channels present in the measurement
group in a form of dictionary, where key is a channel name and value is
a tuple of three elements:
- device name
- attribute name
- attribute information or None if there was an error trying to get
the information
:param only_enabled: flag to filter out disabled channels
:return: dictionary with channels info
"""
self.prepare()
ret = CaselessDict(self.tango_channels_info)
ret.update(self.non_tango_channels)
for ch_name, (_, _, ch_info) in list(ret.items()):
if only_enabled and not ch_info.enabled:
ret.pop(ch_name)
return ret
def getChannelsInfoList(self, only_enabled: bool = False) -> List[TangoChannelInfo]:
"""Returns information about the channels present in the measurement
group in a form of ordered, based on the channel index, list.
:param only_enabled: flag to filter out disabled channels
:return: list with channels info
"""
channels_info = self.getChannelsInfo(only_enabled=only_enabled)
ret = []
for _, (_, _, ch_info) in list(channels_info.items()):
ret.append(ch_info)
ret = sorted(ret, key=lambda x: x.index)
return ret
def getCountersInfoList(self):
msg = ("getCountersInfoList() is deprecated since release 3.3.3 "
"Use getChannelsInfoList() instead.")
self._mg.warning(msg)
return self.getChannelsInfoList()
def getTangoDevChannels(self, only_enabled: bool = False) -> Dict[str, Tuple[PyTango.DeviceProxy, Dict[str, Dict]]]:
"""Returns Tango channels (attributes) that could be used to read
measurement group results in a form of dict where key is a device name
and value is a list with two elements:
- A device proxy or None if there was an error building it
- A dict where keys are attribute names and value is a reference to
a dict representing channel data as received in raw data
:param only_enabled: flag to filter out disabled channels
:return: dict with Tango channels
"""
if not only_enabled:
return self.tango_dev_channels
tango_dev_channels = {}
for dev_name, dev_data in list(self.tango_dev_channels.items()):
dev_proxy, attrs = dev_data[0], copy.deepcopy(dev_data[1])
for attr_name, channel_data in list(attrs.items()):
if not channel_data["enabled"]:
attrs.pop(attr_name)
tango_dev_channels[dev_name] = (dev_proxy, attrs)
return tango_dev_channels
def read(self, parallel=True):
if parallel:
return self._read_parallel()
return self._read()
def _read_parallel(self):
self.prepare()
ret = CaselessDict(self.cache)
dev_replies = {}
# deposit read requests
tango_dev_channels = self.getTangoDevChannels(only_enabled=True)
for _, dev_data in list(tango_dev_channels.items()):
dev, attrs = dev_data
if dev is None:
continue
try:
dev_replies[dev] = dev.read_attributes_asynch(
list(attrs.keys())), attrs
except Exception:
dev_replies[dev] = None, attrs
# gather all replies
for dev, reply_data in list(dev_replies.items()):
reply, attrs = reply_data
try:
data = dev.read_attributes_reply(reply, 0)
for data_item in data:
channel_data = attrs[data_item.name]
if data_item.has_failed:
value = None
else:
value = data_item.value
ret[channel_data['full_name']] = value
except Exception:
for _, channel_data in list(attrs.items()):
ret[channel_data['full_name']] = None
return ret
def _read(self):
self.prepare()
ret = CaselessDict(self.cache)
tango_dev_channels = self.getTangoDevChannels(only_enabled=True)
for _, dev_data in list(tango_dev_channels.items()):
dev, attrs = dev_data
try:
data = dev.read_attributes(list(attrs.keys()))
for data_item in data:
channel_data = attrs[data_item.name]
if data_item.has_failed:
value = None
else:
value = data_item.value
ret[channel_data['full_name']] = value
except Exception:
for _, channel_data in list(attrs.items()):
ret[channel_data['full_name']] = None
return ret
def _get_channel_data(self, channel_name):
if channel_name in self.channels_names:
return self.channels_names[channel_name]
elif channel_name in self.channels_labels:
return self.channels_labels[channel_name]
elif channel_name in self.channels:
return self.channels[channel_name]
v = TangoDeviceNameValidator()
names = v.getNames(channel_name)
msg = 'element "{}" is not in {}'.format(channel_name, self.label)
if names is None:
v = TangoAttributeNameValidator()
names = v.getNames(channel_name)
if names is None:
raise KeyError(msg)
full_name = names[0]
data = self.channels.get(full_name)
if data is None:
raise KeyError(msg)
return data
def _get_ctrl_data(self, ctrl_name):
if ctrl_name in self.controllers_names:
return self.controllers_names[ctrl_name]
elif ctrl_name in self.controllers:
return self.controllers[ctrl_name]
v = TangoDeviceNameValidator()
names = v.getNames(ctrl_name)
msg = 'element "{}" is not in {}'.format(ctrl_name, self.label)
if names is None:
raise KeyError(msg)
full_name = names[0]
data = self.controllers.get(full_name)
if data is None:
raise KeyError(msg)
return data
def _set_channels_key(self, key, value, channels_names=None,
apply_cfg=True):
self._local_changes = True
if channels_names is None:
channels_names = self.channels.keys()
# Protections:
if key in ['enabled', 'output']:
if type(value) != bool:
raise ValueError('The value must be a boolean')
for channel_name in channels_names:
channel = self._get_channel_data(channel_name)
channel[key] = value
if apply_cfg:
self.applyConfiguration()
def _get_channels_key(self, key, channels_names=None, use_fullname=False):
"""
Helper method to return the value for one channel configuration key,
if the key does not exist the value will be None.
"""
result = collections.OrderedDict({})
if channels_names is None:
channels_names = self.channel_list_name
for channel_name in channels_names:
channel = self._get_channel_data(channel_name)
if use_fullname:
label = channel['full_name']
else:
label = channel['label']
try:
value = channel[key]
except KeyError:
result[label] = None
continue
if key == 'plot_axes':
res = []
for v in value:
if v not in ['<mov>', '<idx>']:
v = self.channels[v]['label']
res.append(v)
value = res
result[label] = value
return result
def _set_ctrls_key(self, key, value, ctrls_names=None, apply_cfg=True):
self._local_changes = True
if ctrls_names is None:
ctrls_names = self.controllers.keys()
for ctrl_name in ctrls_names:
# if ctrl_name == '__tango__':
# continue
ctrl = self._get_ctrl_data(ctrl_name)
ctrl[key] = value
if apply_cfg:
self.applyConfiguration()
def _get_ctrls_key(self, key, ctrls_names=None, use_fullname=False):
"""
Helper method to return the value for one controller configuration key,
if the key does not exist the value will be None.
"""
result = collections.OrderedDict({})
if ctrls_names is None:
ctrls_names = self.controller_list_name
for ctrl_name in ctrls_names:
if ctrl_name == '__tango__':
result[ctrl_name] = None
continue
ctrl = self._get_ctrl_data(ctrl_name)
if use_fullname:
label = ctrl_name
else:
label = DeviceProxy(ctrl_name).alias()
try:
value = ctrl[key]
except KeyError:
result[label] = None
continue
if key in ['timer', 'monitor']:
value = self.channels[value]['label']
elif key == 'synchronizer' and value != 'software':
value = DeviceProxy(value).alias()
result[label] = value
return result
def _get_ctrl_for_channel(self, channels_names, unique=False):
result = collections.OrderedDict({})
if channels_names is None:
channels_names = self.channel_list_name
for channel_name in channels_names:
channel = self._get_channel_data(channel_name)
try:
ctrl = channel['_controller_name']
except KeyError:
ctrl = '__tango__'
if unique and ctrl in result.values():
raise KeyError('There are more than one channel of the same '
'controller')
result[channel['full_name']] = ctrl
return result
def _get_ctrl_channels(self, ctrl, use_fullname=False):
idx_channel = {}
if ctrl not in self.controllers_channels:
ctrl = self.controllers_alias[ctrl]
channels_datas = self.controllers_channels[ctrl]
for channel_data in channels_datas:
if use_fullname:
name = channel_data['fullname']
else:
name = channel_data['label']
idx = channel_data['index']
idx_channel[idx] = name
channels = []
for idx in sorted(idx_channel):
channels.append(idx_channel[idx])
return channels
def _get_channels_for_element(self, element, use_fullname=False):
channels = []
if element in self.controllers_channels:
channels += self._get_ctrl_channels(element, use_fullname)
else:
channels += [element]
return channels
def _get_ctrl_for_element(self, element):
if element in self.controllers_channels:
ctrl = element
else:
# TODO: find more elegant way
channel_ctrl = self._get_ctrl_for_channel([element])
ctrl = list(channel_ctrl.values())[0]
return ctrl
def applyConfiguration(self, timeout=3):
if not self._local_changes:
return
if self._pending_event_data is not None:
self.set_data(self._pending_event_data, force=True)
raise RuntimeError('The configuration changed on the server '
'during your changes.')
mg = self._mg()
try:
mg.setConfiguration(self._raw_data)
except Exception as e:
self._local_changes = False
self._pending_event_data = None
data = mg.getConfigurationAttrEG().readValue(force=True)
self.set_data(data, force=True)
raise e
self._local_changes = False
self._pending_event_data = None
if not mg._flg_event.wait(timeout):
raise RuntimeError('timeout on applying configuration')
def _getValueRefEnabledChannels(self, channels=None, use_fullname=False):
"""get acquisition Enabled channels.
:param channels: (seq<str>) a list of channels names to get the
Enabled info
:param use_fullname: (bool) returns a full name instead sardana
element name
:return a OrderedDict where the key are the channels and value the
Enabled state
"""
return self._get_channels_key('value_ref_enabled', channels,
use_fullname)
def _setValueRefEnabledChannels(self, state, channels=None,
apply_cfg=True):
"""Enable acquisition of the indicated channels.
:param state: <bool> The state of the channels to be set.
:param channels: (seq<str>) a sequence of strings indicating
channel names
"""
self._set_channels_key('value_ref_enabled', state, channels, apply_cfg)
def _getValueRefPatternChannels(self, channels=None, use_fullname=False):
"""get acquisition Enabled channels.
:param channels: (seq<str>) a list of channels names to get the
Enabled info
:param use_fullname: (bool) returns a full name instead sardana
element name
:return a OrderedDict where the key are the channels and value the
Enabled state
"""
return self._get_channels_key('value_ref_pattern', channels,
use_fullname)
def _setValueRefPatternChannels(self, pattern, channels=None,
apply_cfg=True):
"""Enable acquisition of the indicated channels.
:param pattern: <str> The state of the channels to be set.
:param channels: (seq<str>) a sequence of strings indicating
channel names
"""
self._set_channels_key('value_ref_pattern', pattern, channels,
apply_cfg)
def _getEnabledChannels(self, channels=None, use_fullname=False):
"""get acquisition Enabled channels.
:param channels: (seq<str>) a list of channels names to get the
Enabled info
:param use_fullname: (bool) returns a full name instead sardana
element name
:return a OrderedDict where the key are the channels and value the
Enabled state
"""
return self._get_channels_key('enabled', channels, use_fullname)
def _setEnabledChannels(self, state, channels=None, apply_cfg=True):
"""Enable acquisition of the indicated channels.
:param state: <bool> The state of the channels to be set.
:param channels: (seq<str>) a sequence of strings indicating
channel names
"""
self._set_channels_key('enabled', state, channels, apply_cfg)
def _getOutputChannels(self, channels=None, use_fullname=False):
"""get the output State of the channels.
:param channels: (list<str>) a string indicating the channel name,
in case of None, it will return all the Outputs Info
:param use_fullname: (bool) returns a full name instead sardana
element name
:return a OrderedDict where keys are channel names and
value the Outputs configuration
"""
return self._get_channels_key('output', channels, use_fullname)
def _setOutputChannels(self, state, channels=None, apply_cfg=True):
"""Set the Output state of the indicated channels.
:param state: (bool) Indicate the state of the output.
:param channels: (seq<str>) a sequence of strings indicating
channel names
"""
self._set_channels_key('output', state, channels, apply_cfg)
def _getPlotTypeChannels(self, channels=None, use_fullname=False):
"""get the Plot Type for the channel indicated. In case of empty
channel value it will return all the Plot Type Info
:param channels: (list<str>) Indicate the channel to return the
Plot Type Info
:param use_fullname: (bool) returns a full name instead sardana
element name
:return a OrderedDict where keys are channel names and
value the plot axes info
"""
# TODO: Change to return enum value SEP12
return self._get_channels_key('plot_type', channels, use_fullname)
def _setPlotTypeChannels(self, ptype, channels=None, apply_cfg=True):
"""Set the Plot Type for the indicated channels.
:param ptype: <str> string indicating the type name
:param channels: (seq<str>) a list of strings indicating the channels
to apply the PlotType
"""
msg_error = 'Wrong value! PlotType allowed: ' \
'{0}'.format(PlotType.keys())
if type(ptype) == str:
if ptype.lower() not in map(str.lower, PlotType.keys()):
raise ValueError(msg_error)
for value in PlotType.keys():
if value.lower() == ptype.lower():
ptype = PlotType[value]
break
elif type(ptype) == int:
try:
PlotType[ptype]
except Exception:
raise ValueError(msg_error)
else:
raise ValueError()
self._set_channels_key('plot_type', ptype, channels, apply_cfg)
def _getPlotAxesChannels(self, channels=None, use_fullname=False):
"""get the PlotAxes for the channel indicated. In case of empty channel
value it will return all the PlotAxes Info
:param channels: (list<str>) Indicate the channel to return the
PlotAxes Info
:param use_fullname: (bool) returns a full name instead sardana
element name
:return a OrderedDict where keys are channel names and
value the plot axes info
"""
return self._get_channels_key('plot_axes', channels, use_fullname)
def _setPlotAxesChannels(self, axes, channels_names=None, apply_cfg=True):
"""Set the PlotAxes for the indicated channels.
:param axes: <seq(str)> string indicating the axis name
:param channels_names: (seq<str>) a list of strings indicating the
channels to apply the PlotAxes
"""
# Validate axes values
for i, value in enumerate(axes):
if value in ['<idx>', '<mov>']:
continue
else:
axes[i] = self._get_channel_data(value)["full_name"]
if channels_names is None:
channels_names = self.channels.keys()
for channel_name in channels_names:
channel_data = self._get_channel_data(channel_name)
# Check the current channel plot type
plot_type = PlotType[PlotType[channel_data['plot_type']]]
if plot_type == PlotType.No:
raise RuntimeError('You must set firs the PlotType')
elif plot_type == PlotType.Spectrum:
if len(axes) != 1:
raise ValueError('The Spectrum Type only allows one axis')
elif plot_type == PlotType.Image:
if len(axes) != 2:
raise ValueError('The Image Type only allows two axis')
self._set_channels_key('plot_axes', axes, [channel_name], apply_cfg=False)
if apply_cfg:
self.applyConfiguration()
def _getCtrlsTimer(self, ctrls=None, use_fullname=False):
"""get the acquisition Timer.
:param ctrls: <list(str)> list of Controllers names to get the timer
info
:param use_fullname: <bool> returns a full name instead sardana
element name
:return a OrderedDict where keys are controller names and
value the Timer Info
"""
return self._get_ctrls_key('timer', ctrls, use_fullname)
def _setCtrlsTimer(self, timers, apply_cfg=True):
"""Set the acquisition Timer to the controllers compatibles,
it finds the controller comptible with this timer and set it
.
:param timer_name: <str> strings indicating the timer name
"""
result = self._get_ctrl_for_channel(timers, unique=True)
for timer, ctrl in result.items():
self._local_changes = True
self._set_ctrls_key('timer', timer, [ctrl], apply_cfg)
def _getCtrlsMonitor(self, ctrls=None, use_fullname=False):
"""get the Monitor for the channel indicated. In case of empty channel
value it will return all the Monitor Info
:param ctrls: <str> Indicate the controllers to return the Monitor Info
:param use_fullname: <bool> returns a full name instead sardana
element name
:return a OrderedDict where keys are channel names and
value the Monitor Info
"""
return self._get_ctrls_key('monitor', ctrls, use_fullname)
def _setCtrlsMonitor(self, monitors, apply_cfg=True):
"""Set the Monitor for to the controllers compatibles,
it finds the controller comptible with this timer and set it
:param monitors: (seq<str>) a list of strings indicating the channels
to apply the monitor
:param monitor: <str> string indicating the monitor name
"""
result = self._get_ctrl_for_channel(monitors, unique=True)
for monitor, ctrl in result.items():
self._local_changes = True
self._set_ctrls_key('monitor', monitor, [ctrl], apply_cfg)
def _getCtrlsSynchronization(self, ctrls=None, use_fullname=False):
"""get the Synchronization for the channel indicated. In case of empty
ctrl value it will return all the Synchronization Info
:param ctrl: <str> Indicate the controllers to return the
Synchronization Info
:param use_fullname: <bool> returns a full name instead sardana
element name
:return a OrderedDict where keys are controllers names and
value the Synchronization Info
"""
return self._get_ctrls_key('synchronization', ctrls, use_fullname)
def _setCtrlsSynchronization(self, synchronization, ctrls=None,
apply_cfg=True):
"""Set the Synchronization to the indicated controllers.
:param synchronization: <str> string indicating the synchronization
:param ctrls: (seq<str>) a list of strings indicating the channels
to apply the Synchronization
name
"""
msg_error = 'Wrong value! Synchronization allowed: ' \
'{0}'.format(AcqSynchType.keys())
if type(synchronization) == str:
if synchronization.lower() not in map(str.lower,
AcqSynchType.keys()):
raise ValueError(msg_error)
for value in AcqSynchType.keys():
if value.lower() == synchronization.lower():
synchronization = AcqSynchType[value]
break
elif type(synchronization) == int:
try:
AcqSynchType[synchronization]
except Exception:
raise ValueError(msg_error)
else:
raise ValueError()
self._set_ctrls_key('synchronization', synchronization, ctrls,
apply_cfg)
def _getCtrlsSynchronizer(self, ctrls=None, use_fullname=False):
"""get the synchronizer for the channel indicated. In case of empty
channel value it will return all the Synchronizers Info
:param ctrls: <str> Indicate the controllers to return the
Synchronizer Info
:param use_fullname: <bool> returns a full name instead sardana
element name
:return a OrderedDict where keys are controllers names and
value the synchronizer info
"""
return self._get_ctrls_key('synchronizer', ctrls, use_fullname)
def _setCtrlsSynchronizer(self, synchronizer, ctrls=None, apply_cfg=True):
"""Set the synchronizer for the indicated controollers. In case of
empty ctrls value it will be applied to all the controllers
:param syncronizer: <str> string indicating the synchronizer name
:param ctrls: (seq<str>) a list of strings indicating the
controllers to apply the synchronizer
"""
if synchronizer == 'software':
pass
else:
# TODO: Improve how to check if the element is a trigger_gate
sync = Device(synchronizer)
if 'triggergate' not in sync.fullname:
raise ValueError('The "{0}" is not a '
'triggergate'.format(synchronizer))
synchronizer = sync.fullname
self._set_ctrls_key('synchronizer', synchronizer, ctrls, apply_cfg)
def getValues(self, parallel=True):
return self.read(parallel=parallel)
def _getCounters(self):
return self.getChannels()
def _getChannelNames(self):
return [ch['name'] for ch in self.getChannels()]
def _getCounterNames(self):
return [ch['name'] for ch in self.getCounters()]
def _getChannelLabels(self):
return [ch['label'] for ch in self.getChannels()]
def _getCounterLabels(self):
return [ch['label'] for ch in self.getCounters()]
def _getChannel(self, name):
return self.channels[name]
def getChannelsEnabledInfo(self) -> List[TangoChannelInfo]:
"""
Returns information about **only enabled** channels present in the
measurement group in a form of ordered, based on the channel index,
list.
:return: list with channels info
"""
return self.getChannelsInfoList(only_enabled=True)
def getCountersInfo(self):
return self.getCountersInfoList()
def __repr__(self):
return json.dumps(self._raw_data, indent=4, sort_keys=True)
[docs]
class MeasurementGroup(PoolElement):
"""MeasurementGroup Sardana-Taurus extension.
Setting configuration parameters using e.g.,
`~sardana.taurus.core.tango.sardana.pool.MeasurementGroup.setEnabled` or
`~sardana.taurus.core.tango.sardana.pool.MeasurementGroup.setTimer`, etc.
by default applies changes on the server. Since setting the configuration
means passing to the server all the configuration parameters of
the measurement group at once this behavior can be changed with the
``apply=False``. Then the configuration changes are kept locally.
This is useful when changing more then one parameter. In this case only
setting of the last parameter should use ``apply=True`` or use
`~sardana.taurus.core.tango.sardana.pool.MeasurementGroup.applyConfiguration`
afterwards::
# or in a macro use: meas_grp = self.getMeasurementGroup("mntgrp01")
meas_grp = taurus.Device("mntgrp01")
meas_grp.setEnabled(False, apply=False)
meas_grp.setEnabled(True, "ct01", "ct02")
"""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self._configuration = None
self._channels = None
self.call__init__(PoolElement, name, **kw)
self._flg_event = threading.Event()
self.__cfg_attr = self.getAttribute('configuration')
self.__cfg_attr.addListener(self.on_configuration_changed)
self._value_buffer_cb = None
self._value_buffer_channels = None
codec_name = getattr(sardanacustomsettings, "VALUE_BUFFER_CODEC")
self._value_buffer_codec = CodecFactory().getCodec(codec_name)
self._value_ref_buffer_cb = None
self._value_ref_buffer_channels = None
codec_name = getattr(sardanacustomsettings, "VALUE_REF_BUFFER_CODEC")
self._value_ref_buffer_codec = CodecFactory().getCodec(codec_name)
[docs]
def cleanUp(self):
PoolElement.cleanUp(self)
f = self.factory()
f.removeExistingAttribute(self.__cfg_attr)
def _create_str_tuple(self):
channel_names = ", ".join(self.getChannelNames())
return self.getName(), channel_names
[docs]
def getConfigurationAttrEG(self):
return self._getAttrEG('Configuration')
[docs]
def setConfiguration(self, configuration):
self._flg_event.clear()
codec = CodecFactory().getCodec('json')
f, data = codec.encode(('', configuration))
self.write_attribute('configuration', data)
def _setConfiguration(self, data):
if self._configuration is None:
self._configuration = MGConfiguration(self, data)
else:
self._configuration.set_data(data)
[docs]
def getConfiguration(self, force=False):
if force or self._configuration is None:
data = self.getConfigurationAttrEG().readValue(force=True)
self._setConfiguration(data)
return self._configuration
[docs]
def on_configuration_changed(self, evt_src, evt_type, evt_value):
if evt_type not in CHANGE_EVT_TYPES:
return
self.debug("Configuration changed")
self._setConfiguration(evt_value.rvalue)
self._flg_event.set()
[docs]
def getValueBuffers(self):
value_buffers = []
for channel_info in self.getChannels():
channel = Device(channel_info["full_name"])
value_buffers.append(channel.getValueBuffer())
return value_buffers
[docs]
def getIntegrationTime(self):
return self._getAttrValue('IntegrationTime')
[docs]
def getIntegrationTimeObj(self):
return self._getAttrEG('IntegrationTime')
[docs]
def setIntegrationTime(self, ctime):
self.getIntegrationTimeObj().write(ctime)
[docs]
def putIntegrationTime(self, ctime):
synch_description = self.getSynchDescription()
if (self.getIntegrationTime() == ctime
and synch_description.delay_time == 0
and synch_description.repetitions == 1):
return
self.getIntegrationTimeObj().write(ctime)
[docs]
def getAcquisitionModeObj(self):
return self._getAttrEG('AcquisitionMode')
[docs]
def getAcquisitionMode(self):
return self._getAttrValue('AcquisitionMode')
[docs]
def setAcquisitionMode(self, acqMode):
self.getAcquisitionModeObj().write(acqMode)
[docs]
def getSynchDescriptionObj(self):
return self._getAttrEG('SynchDescription')
[docs]
def getSynchDescription(self):
synch_description_json = self._getAttrValue('SynchDescription')
synch_description = SynchDescription.from_json(synch_description_json)
return synch_description
[docs]
def setSynchDescription(self, synch_description):
codec = CodecFactory().getCodec('json')
_, data = codec.encode(('', synch_description))
self.getSynchDescriptionObj().write(data)
def _get_channels_for_elements(self, elements):
if not elements:
return None
config = self.getConfiguration()
channels = []
for element in elements:
channels += config._get_channels_for_element(element)
return channels
def _get_ctrl_for_elements(self, elements):
if not elements:
return None
ctrls = []
config = self.getConfiguration()
for element in elements:
ctrl = config._get_ctrl_for_element(element)
if ctrl in ctrls:
continue
ctrls.append(ctrl)
return ctrls
[docs]
def setOutput(self, output: bool, *elements: str, apply: bool = True) -> None:
"""Set the output configuration for the given elements.
Channels and controllers are accepted as elements. Setting the output
on the controller means setting it to all channels of this controller
present in this measurement group.
:param output: `True` - output enabled, `False` - output disabled
:param elements: sequence of element names or full names, no elements
means set to all
:param apply: `True` - apply on the server, `False` - do not apply yet
on the server and keep locally (default: `True`)
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
config._setOutputChannels(output, channels, apply_cfg=apply)
[docs]
def getOutput(self, *elements: str, ret_full_name: bool = False) -> Dict[str, bool]:
"""Get the output configuration of the given elements.
Channels and controllers are accepted as elements. Getting the output
from the controller means getting it from all channels of this
controller present in this measurement group.
:param elements: sequence of element names or full names, no elements
means get from all
:param ret_full_name: whether keys in the returned dictionary are
full names or names (default: `False` means return names)
:return: ordered dictionary where keys are **channel** names (or full
names if `ret_full_name=True`) and values are their output
configurations. Note that even if the *elements* contained
controllers, the returned configuration will always contain
only channels.
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
return config._getOutputChannels(channels, use_fullname=ret_full_name)
[docs]
def setEnabled(self, enabled: bool, *elements: str, apply: bool = True) -> None:
"""Set the enabled configuration for the given elements.
Channels and controllers are accepted as elements. Setting the enabled
on the controller means setting it to all channels of this controller
present in this measurement group.
:param enabled: `True` - element enabled, `False` - element disabled
:param elements: sequence of element names or full names, no elements
means set to all
:param apply: `True` - apply on the server, `False` - do not apply yet
on the server and keep locally (default: `True`)
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
config._setEnabledChannels(enabled, channels, apply_cfg=apply)
[docs]
def getEnabled(self, *elements: str, ret_full_name: bool = False) -> Dict[str, bool]:
"""Get the output configuration of the given elements.
Channels and controllers are accepted as elements. Getting the enabled
from the controller means getting it from all channels of this
controller present in this measurement group.
:param elements: sequence of element names or full names, no elements
means get from all
:param ret_full_name: whether keys in the returned dictionary are
full names or names (default: `False` means return names)
:return: ordered dictionary where keys are **channel** names (or full
names if `ret_full_name=True`) and values are their output
configurations. Note that even if the *elements* contained
controllers, the returned configuration will always contain
only channels.
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
return config._getEnabledChannels(channels, use_fullname=ret_full_name)
[docs]
def setPlotType(self, plot_type: Union[str, int], *elements: str, apply: bool = True) -> None:
"""Set the enabled configuration for the given elements.
Channels and controllers are accepted as elements. Setting the plot
type on the controller means setting it to all channels of this
controller present in this measurement group.
:param plot_type: 'No'/0 , 'Spectrum'/1, 'Image'/2
:param elements: sequence of element names or full names, no elements
means set to all
:param apply: `True` - apply on the server, `False` - do not apply yet
on the server and keep locally (default: `True`)
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
config._setPlotTypeChannels(plot_type, channels, apply_cfg=apply)
[docs]
def getPlotType(self, *elements: str, ret_full_name: bool = False) -> Dict[str, int]:
"""Get the output configuration of the given elements.
Channels and controllers are accepted as elements. Getting the plot
type from the controller means getting it from all channels of this
controller present in this measurement group.
:param elements: sequence of element names or full names, no elements
means get from all
:param ret_full_name: whether keys in the returned dictionary are
full names or names (default: `False` means return names)
:return: ordered dictionary where keys are **channel** names (or full
names if `ret_full_name=True`) and values are their output
configurations. Note that even if the *elements* contained
controllers, the returned configuration will always contain
only channels.
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
# TODO Change the documentation when _getPlotTypeChannels return enum
# value
return config._getPlotTypeChannels(channels,
use_fullname=ret_full_name)
[docs]
def setPlotAxes(self, plot_axes: List[str], *elements: str, apply: bool = True) -> None:
"""Set the enabled configuration for the given elements.
Channels and controllers are accepted as elements. Setting the plot
axes on the controller means setting it to all channels of this
controller present in this measurement group.
:param plot_axes: ['<mov>'] / ['<mov>', '<idx>']
:param elements: sequence of element names or full names, no elements
means set to all
:param apply: `True` - apply on the server, `False` - do not apply yet
on the server and keep locally (default: `True`)
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
config._setPlotAxesChannels(plot_axes, channels, apply_cfg=apply)
[docs]
def getPlotAxes(self, *elements: str, ret_full_name: bool = False) -> Dict[str, str]:
"""Get the output configuration of the given elements.
Channels and controllers are accepted as elements. Getting the plot
axes from the controller means getting it from all channels of this
controller present in this measurement group.
:param elements: sequence of element names or full names, no elements
means get from all
:param ret_full_name: whether keys in the returned dictionary are
full names or names (default: `False` means return names)
:return: ordered dictionary where keys are **channel** names (or full
names if `ret_full_name=True`) and values are their output
configurations. Note that even if the *elements* contained
controllers, the returned configuration will always contain
only channels.
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
return config._getPlotAxesChannels(channels,
use_fullname=ret_full_name)
[docs]
def setValueRefEnabled(self, value_ref_enabled: bool, *elements: str, apply: bool = True) -> None:
"""Set the output configuration for the given elements.
Channels and controllers are accepted as elements. Setting the value
reference enabled on the controller means setting it to all channels
of this controller present in this measurement group.
:param value_ref_enabled: `True` - enabled, `False` - disabled
:param elements: sequence of element names or full names, no elements
means set to all
:param apply: `True` - apply on the server, `False` - do not apply yet
on the server and keep locally (default: `True`)
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
config._setValueRefEnabledChannels(value_ref_enabled, channels,
apply_cfg=apply)
[docs]
def getValueRefEnabled(self, *elements: str, ret_full_name: bool = False) -> Dict[str, bool]:
"""Get the value reference enabled configuration of the given elements.
Channels and controllers are accepted as elements. Getting the value
from the controller means getting it from all channels of this
controller present in this measurement group.
:param elements: sequence of element names or full names, no elements
means get from all
:param ret_full_name: whether keys in the returned dictionary are
full names or names (default: `False` means return names)
:return: ordered dictionary where keys are **channel** names (or full
names if `ret_full_name=True`) and values are their output
configurations. Note that even if the *elements* contained
controllers, the returned configuration will always contain
only channels.
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
return config._getValueRefEnabledChannels(channels,
use_fullname=ret_full_name)
[docs]
def setValueRefPattern(self, value_ref_pattern: str, *elements: str, apply: bool = True) -> None:
"""Set the output configuration for the given elements.
Channels and controllers are accepted as elements. Setting the value
reference pattern on the controller means setting it to all channels
of this controller present in this measurement group.
:param value_ref_pattern: `/path/file{index:03d}.txt`
:param elements: sequence of element names or full names, no elements
means set to all
:param apply: `True` - apply on the server, `False` - do not apply yet
on the server and keep locally (default: `True`)
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
config._setValueRefPatternChannels(value_ref_pattern, channels,
apply_cfg=apply)
[docs]
def getValueRefPattern(self, *elements: str, ret_full_name: bool = False) -> Dict[str, str]:
"""Get the value reference enabled configuration of the given elements.
Channels and controllers are accepted as elements. Getting the value
from the controller means getting it from all channels of this
controller present in this measurement group.
:param elements: sequence of element names or full names, no elements
means get from all
:param ret_full_name: whether keys in the returned dictionary are
full names or names (default: `False` means return names)
:return: ordered dictionary where keys are **channel** names (or full
names if `ret_full_name=True`) and values are their output
configurations. Note that even if the *elements* contained
controllers, the returned configuration will always contain
only channels.
"""
channels = self._get_channels_for_elements(elements)
config = self.getConfiguration()
return config._getValueRefPatternChannels(channels,
use_fullname=ret_full_name)
def _get_value_per_channel(self, config, ctrls_values, use_fullname=False):
channels_values = collections.OrderedDict({})
for ctrl, value in ctrls_values.items():
for channel in config._get_ctrl_channels(ctrl, use_fullname):
channels_values[channel] = value
return channels_values
[docs]
def setTimer(self, timer: str, *elements: str, apply: bool = True) -> None:
"""Set the timer configuration for the given channels of the same
controller.
.. note:: Currently the controller's timer must be unique. Hence this
method will set it for the whole controller regardless of the
``elements`` argument.
:param timer: channel use as timer
:param elements: sequence of channels names or full names, no elements
means set to all
:param apply: `True` - apply on the server, `False` - do not apply yet
on the server and keep locally (default: `True`)
"""
config = self.getConfiguration()
# TODO: Implement solution to set the timer per channel when it is
# allowed.
config._setCtrlsTimer([timer], apply_cfg=apply)
[docs]
def getTimer(self, *elements: str, ret_full_name: bool = False, ret_by_ctrl: bool = False) -> Dict[str, str]:
"""Get the timer configuration of the given elements.
Channels and controllers are accepted as elements. Getting the output
from the controller means getting it from all channels of this
controller present in this measurement group, unless
`ret_by_ctrl=True`.
:param elements: sequence of element names or full names, no elements
means get from all
:param ret_full_name: whether keys in the returned dictionary are
full names or names (default: `False` means return names)
:param ret_by_ctrl: whether keys in the returned dictionary are
controllers or channels (default: `False` means return channels)
:return: ordered dictionary where keys are **channel** names (or full
names if `ret_full_name=True`) and values are their timer
configurations
"""
# TODO: Implement solution to set the timer per channel when it is
# allowed.
ctrls = self._get_ctrl_for_elements(elements)
config = self.getConfiguration()
ctrls_timers = config._getCtrlsTimer(ctrls, use_fullname=ret_full_name)
if ret_by_ctrl:
return ctrls_timers
else:
return self._get_value_per_channel(config, ctrls_timers,
use_fullname=ret_full_name)
[docs]
def setMonitor(self, monitor: str, *elements: str, apply: bool = True) -> None:
"""Set the monitor configuration for the given channels of the same
controller.
.. note:: Currently the controller's monitor must be unique.
Hence this method will set it for the whole controller regardless of
the ``elements`` argument.
:param monitor: channel use as monitor
:param elements: sequence of channels names or full names, no elements
means set to all
:param apply: `True` - apply on the server, `False` - do not apply yet
on the server and keep locally (default: `True`)
"""
config = self.getConfiguration()
# TODO: Implement solution to set the moniotor per channel when it is
# allowed.
config._setCtrlsMonitor([monitor], apply_cfg=apply)
[docs]
def getMonitor(self, *elements: str, ret_full_name: bool = False, ret_by_ctrl: bool = False) -> Dict[str, str]:
"""Get the monitor configuration of the given elements.
Channels and controllers are accepted as elements. Getting the output
from the controller means getting it from all channels of this
controller present in this measurement group, unless
`ret_by_ctrl=True`.
:param elements: sequence of element names or full names, no elements
means get from all
:param ret_full_name: whether keys in the returned dictionary are
full names or names (default: `False` means return names)
:param ret_by_ctrl: whether keys in the returned dictionary are
controllers or channels (default: `False` means return channels)
:return: ordered dictionary where keys are **channel** names (or full
names if `ret_full_name=True`) and values are their monitor
configurations
"""
# TODO: Implement solution to set the timer per channel when it is
# allowed.
ctrls = self._get_ctrl_for_elements(elements)
config = self.getConfiguration()
ctrls_monitor = config._getCtrlsMonitor(ctrls,
use_fullname=ret_full_name)
if ret_by_ctrl:
return ctrls_monitor
else:
return self._get_value_per_channel(config, ctrls_monitor,
use_fullname=ret_full_name)
[docs]
def setSynchronizer(self, synchronizer: str, *elements: str, apply: bool = True) -> None:
"""Set the synchronizer configuration for the given channels or
controller.
.. note:: Currently the controller's synchronizer must be unique.
Hence this method will set it for the whole controller regardless of
the ``elements`` argument.
:param synchronizer: triger/gate element name or software
:param elements: sequence of channels names or full names, no elements
means set to all
:param apply: `True` - apply on the server, `False` - do not apply yet
on the server and keep locally (default: `True`)
"""
config = self.getConfiguration()
# TODO: Implement solution to set the timer per channel when it is
# allowed.
ctrls = self._get_ctrl_for_elements(elements)
config._setCtrlsSynchronizer(synchronizer, ctrls, apply_cfg=apply)
[docs]
def getSynchronizer(self, *elements: str, ret_full_name: bool = False,
ret_by_ctrl: bool = False) -> Dict[str, str]:
"""Get the synchronizer configuration of the given elements.
Channels and controllers are accepted as elements. Getting the output
from the controller means getting it from all channels of this
controller present in this measurement group, unless
`ret_by_ctrl=True`.
:param elements: sequence of element names or full names, no elements
means get from all
:param ret_full_name: whether keys in the returned dictionary are
full names or names (default: `False` means return names)
:param ret_by_ctrl: whether keys in the returned dictionary are
controllers or channels (default: `False` means return channels)
:return: ordered dictionary where keys are **channel** names (or full
names if `ret_full_name=True`) and values are their synchronizer
configurations
"""
# TODO: Implement solution to set the synchronizer per channel when it
# is allowed.
ctrls = self._get_ctrl_for_elements(elements)
config = self.getConfiguration()
ctrls_sync = config._getCtrlsSynchronizer(ctrls,
use_fullname=ret_full_name)
if ret_by_ctrl:
return ctrls_sync
else:
return self._get_value_per_channel(config, ctrls_sync,
use_fullname=ret_full_name)
[docs]
def setSynchronization(self, synchronization: sardana.pool.AcqSynchType, *elements: str, apply: bool = True) -> None:
"""Set the synchronization configuration for the given channels or
controller.
.. note:: Currently the controller's synchronization must be unique.
Hence this method will set it for the whole controller regardless of
the ``elements`` argument.
:param synchronization: synchronization type e.g. Trigger, Gate or
Start
:param elements: sequence of channels names or full names, no elements
means set to all
:param apply: `True` - apply on the server, `False` - do not apply yet
on the server and keep locally (default: `True`)
"""
config = self.getConfiguration()
# TODO: Implement solution to set the synchronization per channel when
# it is allowed.
ctrls = self._get_ctrl_for_elements(elements)
config._setCtrlsSynchronization(synchronization, ctrls,
apply_cfg=apply)
[docs]
def getSynchronization(self, *elements: str, ret_full_name: bool = False,
ret_by_ctrl: bool = False) -> Dict[str, sardana.pool.AcqSynchType]:
"""Get the synchronization configuration of the given elements.
Channels and controllers are accepted as elements. Getting the output
from the controller means getting it from all channels of this
controller present in this measurement group, unless
`ret_by_ctrl=True`.
:param elements: sequence of element names or full names, no elements
means get from all
:param ret_full_name: whether keys in the returned dictionary are
full names or names (default: `False` means return names)
:param ret_by_ctrl: whether keys in the returned dictionary are
controllers or channels (default: `False` means return channels)
:return: ordered dictionary where keys are **channel** names (or full
names if `ret_full_name=True`) and values are their
synchronization configurations
"""
# TODO: Implement solution to set the synchronization per channel
# when it is allowed.
ctrls = self._get_ctrl_for_elements(elements)
config = self.getConfiguration()
ctrls_sync = \
config._getCtrlsSynchronization(ctrls, use_fullname=ret_full_name)
if ret_by_ctrl:
return ctrls_sync
else:
return self._get_value_per_channel(config, ctrls_sync,
use_fullname=ret_full_name)
[docs]
def applyConfiguration(self):
"""Apply configuration changes kept locally on the server.
Setting configuration parameters using e.g.,
`~sardana.taurus.core.tango.sardana.pool.MeasurementGroup.setEnabled`
or
`~sardana.taurus.core.tango.sardana.pool.MeasurementGroup.setTimer`,
etc.
with ``apply=False`` keeps the changes locally. Use this method to
apply them on the server afterwards.
"""
self.getConfiguration().applyConfiguration()
#########################################################################
# TODO: review the following API
[docs]
def getChannelsEnabledInfo(self) -> List[TangoChannelInfo]:
"""Returns information about **only enabled** channels present in the
measurement group in a form of ordered, based on the channel index,
list.
:return: list with channels info
"""
return self.getConfiguration().getChannelsInfoList(only_enabled=True)
[docs]
def getCountersInfo(self):
return self.getConfiguration().getCountersInfoList()
[docs]
def getValues(self, parallel=True):
return self.getConfiguration().getValues(parallel)
[docs]
def getChannels(self):
return self.getConfiguration().getChannels()
[docs]
def getCounters(self):
return self.getConfiguration()._getCounters()
[docs]
def getChannelNames(self):
return self.getConfiguration()._getChannelNames()
[docs]
def getCounterNames(self):
return self.getConfiguration()._getCounterNames()
[docs]
def getChannelLabels(self):
return self.getConfiguration()._getChannelLabels()
[docs]
def getCounterLabels(self):
return self.getConfiguration()._getCounterLabels()
[docs]
def getChannel(self, name):
return self.getConfiguration()._getChannel(name)
[docs]
def getChannelInfo(self, name):
return self.getConfiguration().getChannelInfo(name)
#########################################################################
[docs]
def getChannelsInfo(self):
"""DEPRECATED"""
self.warning('Deprecation warning: you should use '
'"getChannelsInfoList" instead of "getChannelsInfo"')
return self.getConfiguration().getChannelsInfoList()
[docs]
def enableChannels(self, channels):
'''DEPRECATED: Enable acquisition of the indicated channels.
:param channels: (seq<str>) a sequence of strings indicating
channel names
'''
self.warning("enableChannels() in deprecated since 3.0.3. "
"Use setEnabled() instead.")
self.setEnabled(True, *channels)
[docs]
def disableChannels(self, channels):
'''DEPRECATED: Disable acquisition of the indicated channels.
:param channels: (seq<str>) a sequence of strings indicating
channel names
'''
self.warning("enableChannels() in deprecated since 3.0.3. "
"Use setEnabled() instead.")
self.setEnabled(False, *channels)
# NbStarts Methods
[docs]
def getNbStartsObj(self):
return self._getAttrEG('NbStarts')
[docs]
def setNbStarts(self, starts):
self.getNbStartsObj().write(starts)
[docs]
def getNbStarts(self):
return self._getAttrValue('NbStarts')
[docs]
def getMoveableObj(self):
return self._getAttrEG('Moveable')
[docs]
def getMoveable(self):
return self._getAttrValue('Moveable')
[docs]
def getLatencyTimeObj(self):
return self._getAttrEG('LatencyTime')
[docs]
def getLatencyTime(self):
return self._getAttrValue('LatencyTime')
[docs]
def setMoveable(self, moveable=None):
if moveable is None:
moveable = 'None' # Tango attribute is of type DevString
self.getMoveableObj().write(moveable)
[docs]
def valueBufferChanged(self, channel: ExpChannel, value_buffer: str) -> None:
"""Receive value buffer updates, pre-process them, and call
the subscribed callback.
:param channel: channel that reports value buffer update
:param value_buffer: json encoded value buffer update, it contains
at least values and indexes
"""
if value_buffer is None:
return
_, value_buffer = self._value_buffer_codec.decode(value_buffer)
values = value_buffer["value"]
if isinstance(values[0], list):
np_values = list(map(numpy.array, values))
value_buffer["value"] = np_values
self._value_buffer_cb(channel, value_buffer)
[docs]
def subscribeValueBuffer(self, cb: Optional[Callable] = None) -> None:
"""Subscribe to channels' value buffer update events. If no
callback is passed, the default channel's callback is subscribed which
will store the data in the channel's value_buffer attribute.
:param cb: callback to be subscribed, None means subscribe the default
channel's callback
"""
self._value_buffer_channels = []
for channel_info in self.getChannels():
enabled = channel_info.get("enabled", False)
if not enabled:
continue
full_name = channel_info["full_name"]
value_ref_enabled = channel_info.get("value_ref_enabled", False)
# Use DeviceProxy instead of taurus to avoid crashes in Py3
# See: tango-controls/pytango#292
if _is_referable(full_name) and value_ref_enabled:
continue
channel = Device(full_name)
value_buffer_obj = channel.getValueBufferObj()
if cb is not None:
self._value_buffer_cb = cb
value_buffer_obj.subscribeEvent(self.valueBufferChanged,
channel, False)
else:
value_buffer_obj.subscribeEvent(channel.valueBufferChanged,
with_first_event=False)
self._value_buffer_channels.append(channel)
[docs]
def unsubscribeValueBuffer(self, cb: Optional[Callable] = None) -> None:
"""Unsubscribe from channels' value buffer events. If no callback is
passed, unsubscribe the channel's default callback.
:param cb: callback to be unsubscribed, None means unsubscribe the
default channel's callback
"""
for channel in self._value_buffer_channels:
value_buffer_obj = channel.getValueBufferObj()
if cb is not None:
value_buffer_obj.unsubscribeEvent(self.valueBufferChanged,
channel)
self._value_buffer_cb = None
else:
value_buffer_obj.unsubscribeEvent(channel.valueBufferChanged)
self._value_buffer_channels = None
[docs]
def valueRefBufferChanged(self, channel: ExpChannel, value_ref_buffer: str) -> None:
"""Receive value ref buffer updates, pre-process them, and call
the subscribed callback.
:param channel: channel that reports value ref buffer update
:param value_ref_buffer: json encoded value ref buffer update,
it contains at least value refs and indexes
"""
if value_ref_buffer is None:
return
_, value_ref_buffer = self._value_ref_buffer_codec.decode(
value_ref_buffer)
self._value_ref_buffer_cb(channel, value_ref_buffer)
[docs]
def subscribeValueRefBuffer(self, cb: Optional[Callable] = None) -> None:
"""Subscribe to channels' value ref buffer update events. If no
callback is passed, the default channel's callback is subscribed which
will store the data in the channel's value_buffer attribute.
:param cb: callback to be subscribed, None means subscribe the default
channel's callback
"""
self._value_ref_buffer_channels = []
for channel_info in self.getChannels():
enabled = channel_info.get("enabled", False)
if not enabled:
continue
full_name = channel_info["full_name"]
value_ref_enabled = channel_info.get("value_ref_enabled", False)
# Use DeviceProxy instead of taurus to avoid crashes in Py3
# See: tango-controls/pytango#292
if not _is_referable(full_name):
continue
if not value_ref_enabled:
continue
channel = Device(full_name)
value_ref_buffer_obj = channel.getValueRefBufferObj()
if cb is not None:
self._value_ref_buffer_cb = cb
value_ref_buffer_obj.subscribeEvent(
self.valueRefBufferChanged, channel, False)
else:
value_ref_buffer_obj.subscribeEvent(
channel.valueRefBufferChanged, with_first_event=False)
self._value_ref_buffer_channels.append(channel)
[docs]
def unsubscribeValueRefBuffer(self, cb: Optional[Callable] = None) -> None:
"""Unsubscribe from channels' value ref buffer events. If no
callback is passed, unsubscribe the channel's default callback.
:param cb: callback to be unsubscribed, None means unsubscribe the
default channel's callback
"""
for channel in self._value_ref_buffer_channels:
value_ref_buffer_obj = channel.getValueRefBufferObj()
if cb is not None:
value_ref_buffer_obj.unsubscribeEvent(
self.valueRefBufferChanged, channel)
self._value_ref_buffer_cb = None
else:
value_ref_buffer_obj.unsubscribeEvent(
channel.valueRefBufferChanged)
self._value_ref_buffer_channels = None
def _start(self, *args, **kwargs):
try:
self.Start()
except DevFailed as e:
# TODO: Workaround for CORBA timeout on measurement group start
# remove it whenever sardana-org/sardana#93 gets implemented
if e.args[-1].reason == "API_DeviceTimedOut":
self.error("start timed out, trying to stop")
self.stop()
self.debug("stopped")
raise e
[docs]
def prepare(self):
self.command_inout("Prepare")
[docs]
def count_raw(self, start_time: Optional[float] = None) -> Dict[str, Any]:
"""Raw count and report count values.
Simply start and wait until finish, no configuration nor preparation.
.. note::
The count_raw method API is partially experimental (value
references may be changed to values whenever possible in the
future). Backwards incompatible changes may occur if deemed
necessary by the core developers.
:param start_time: start time of the whole count operation, if not
passed a current timestamp will be used
:return: dictionary with channel names as keys and values (or value references - experimental) as values
"""
if start_time is None:
start_time = time.time()
PoolElement.go(self)
state = self.getStateEG().readValue()
if state == Fault:
msg = "Measurement group ended acquisition with Fault state"
raise Exception(msg)
values = self.getValues()
ret = state, values
self._total_go_time = time.time() - start_time
return ret
[docs]
def go(self, *args: Any, **kwargs: Any) -> Dict[str, Any]:
"""Count and report count values.
Configuration and prepare for measurement, then start and wait until
finish.
.. note::
The count (go) method API is partially experimental (value
references may be changed to values whenever possible in the
future). Backwards incompatible changes may occur if deemed
necessary by the core developers.
:return: dictionary with channel names as keys and values (or value references - experimental) as values
"""
start_time = time.time()
cfg = self.getConfiguration()
cfg.prepare()
integration_time = args[0]
if integration_time is None or integration_time == 0:
return self.getStateEG().readValue(), self.getValues()
self.putIntegrationTime(integration_time)
self.setMoveable(None)
self.setNbStarts(1)
self.prepare()
return self.count_raw(start_time)
[docs]
def count_continuous(self, synch_description: List, value_buffer_cb: Optional[Callable] = None,
value_ref_buffer_cb: Optional[Callable] = None) -> Tuple[List[DevState], List]:
"""Execute measurement process according to the given synchronization
description.
:param synch_description: synchronization description
synchronizations
:param value_buffer_cb: callback on value buffer updates
:param value_ref_buffer_cb: callback on value reference
buffer updates
:return: state and eventually value buffers if no callback was passed
.. todo:: Think of unifying measure with count.
.. note:: The measure method has been included in MeasurementGroup
class on a provisional basis. Backwards incompatible changes
(up to and including removal of the method) may occur if
deemed necessary by the core developers.
"""
start_time = time.time()
cfg = self.getConfiguration()
cfg.prepare()
self.setSynchDescription(synch_description)
self.prepare()
self.subscribeValueBuffer(value_buffer_cb)
self.subscribeValueRefBuffer(value_ref_buffer_cb)
try:
self.count_raw(start_time)
finally:
self.unsubscribeValueBuffer(value_buffer_cb)
self.unsubscribeValueRefBuffer(value_ref_buffer_cb)
state = self.getStateEG().readValue()
if state == Fault:
msg = "Measurement group ended acquisition with Fault state"
raise Exception(msg)
if value_buffer_cb is None:
value_buffers = self.getValueBuffers()
else:
value_buffers = None
ret = state, value_buffers
self._total_go_time = time.time() - start_time
return ret
startCount = PoolElement.start
waitCount = PoolElement.waitFinish
count = go
stopCount = PoolElement.abort
stop = PoolElement.stop
[docs]
class IORegister(PoolElement):
""" Class encapsulating IORegister functionality."""
def __init__(self, name, **kw):
"""IORegister initialization."""
self.call__init__(PoolElement, name, **kw)
[docs]
def getValueObj(self):
return self._getAttrEG('value')
[docs]
def readValue(self, force=False):
return self._getAttrValue('value', force=force)
[docs]
def startWriteValue(self, new_value, timeout=None):
try:
self.getValueObj().write(new_value)
self.final_val = new_value
except DevFailed as err_traceback:
for err in err_traceback.args:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already chaging' % self)
else:
raise
[docs]
def waitWriteValue(self, timeout=None):
pass
[docs]
def writeValue(self, new_value, timeout=None):
self.startWriteValue(new_value, timeout=timeout)
self.waitWriteValue(timeout=timeout)
return self.getStateEG().readValue(), self.readValue()
writeIORegister = writeIOR = writeValue
readIORegister = readIOR = getValue = readValue
[docs]
class Instrument(BaseElement):
def __init__(self, **kw):
self.__dict__.update(kw)
[docs]
def getFullName(self):
return self.full_name
[docs]
def getParentInstrument(self):
return self.getPoolObj().getObj(self.parent_instrument)
[docs]
def getParentInstrumentName(self):
return self.parent_instrument
[docs]
def getChildrenInstruments(self):
raise NotImplementedError
return self._children
[docs]
def getElements(self):
raise NotImplementedError
return self._elements
[docs]
def getType(self):
return self.klass
[docs]
class Pool(TangoDevice, MoveableSource):
""" Class encapsulating device Pool functionality."""
def __init__(self, name, **kw):
self.call__init__(TangoDevice, name, **kw)
self.call__init__(MoveableSource)
self._elements = BaseSardanaElementContainer()
self.__elements_attr = self.getAttribute("Elements")
self.__elements_attr.addListener(self.on_elements_changed)
self._adm_device = None
[docs]
def cleanUp(self):
TangoDevice.cleanUp(self)
f = self.factory()
f.removeExistingAttribute(self.__elements_attr)
[docs]
def getObject(self, element_info):
elem_type = element_info.getType()
data = element_info._data
if elem_type in ('ControllerClass', 'ControllerLibrary', 'Instrument'):
klass = globals()[elem_type]
kwargs = dict(data)
kwargs['_pool_data'] = data
kwargs['_pool_obj'] = self
return klass(**kwargs)
obj = Factory().getDevice(element_info.full_name, _pool_obj=self,
_pool_data=data)
return obj
def _delObject(self, element):
elem_type = element.getType()
if elem_type in ('ControllerClass', 'ControllerLibrary', 'Instrument'):
return
obj = element.getObj()
obj.factory().removeExistingDevice(obj)
[docs]
def on_elements_changed(self, evt_src, evt_type, evt_value):
if evt_type == TaurusEventType.Error:
msg = evt_value
if isinstance(msg, DevFailed):
d = msg.args[0]
# skip configuration errors
if d.reason == "API_BadConfigurationProperty":
return
if d.reason in ("API_DeviceNotExported",
"API_CantConnectToDevice"):
msg = "Pool was shutdown or is inaccessible"
else:
msg = "{0}: {1}".format(d.reason, d.desc)
self.warning("Received elements error event %s", msg)
self.debug(evt_value)
return
elif evt_type not in CHANGE_EVT_TYPES:
return
try:
elems = CodecFactory().decode(evt_value.rvalue)
except:
self.error("Could not decode element info")
self.info("value: '%s'", evt_value.rvalue)
self.debug("Details:", exc_info=1)
return
elements = self.getElementsInfo()
for element_data in elems.get('new', ()):
element_data['manager'] = self
element = BaseSardanaElement(**element_data)
elements.addElement(element)
for element_data in elems.get('del', ()):
element = self.getElementInfo(element_data['full_name'])
try:
elements.removeElement(element)
# Ideally this object should disappear without the need to
# call _delObject() - there should be no references to it.
# Most probably due to the complex circular references
# between the element and its attributes, as it is in the case
# of the measurement group, we need to explicitly remove the
# object so it does not remain on the client side (Taurus
# factory)
# See more details in #145, #1528.
self._delObject(element)
except:
self.warning("Failed to remove %s", element_data)
for element_data in elems.get('change', ()):
# TODO: element is assigned but not used!! (check)
element = self._removeElement(element_data)
element = self._addElement(element_data)
return elems
def _addElement(self, element_data):
element_data['manager'] = self
element = BaseSardanaElement(**element_data)
self.getElementsInfo().addElement(element)
return element
def _removeElement(self, element_data):
name = element_data['full_name']
element = self.getElementInfo(name)
self.getElementsInfo().removeElement(element)
return element
[docs]
def getElementsInfo(self):
return self._elements
[docs]
def getElements(self):
return self.getElementsInfo().getElements()
[docs]
def getElementInfo(self, name):
return self.getElementsInfo().getElement(name)
[docs]
def getElementNamesOfType(self, elem_type):
return self.getElementsInfo().getElementNamesOfType(elem_type)
[docs]
def getElementsOfType(self, elem_type):
return self.getElementsInfo().getElementsOfType(elem_type)
[docs]
def getElementsWithInterface(self, interface):
return self.getElementsInfo().getElementsWithInterface(interface)
[docs]
def getElementWithInterface(self, elem_name, interface):
return self.getElementsInfo().getElementWithInterface(elem_name,
interface)
[docs]
def getObj(self, name, elem_type=None):
if elem_type is None:
return self.getElementInfo(name)
elif isinstance(elem_type, str):
elem_types = elem_type,
else:
elem_types = elem_type
name = name.lower()
for e_type in elem_types:
elems = self.getElementsOfType(e_type)
for elem in list(elems.values()):
if elem.name.lower() == name:
return elem
elem = elems.get(name)
if elem is not None:
return elem
[docs]
def reconfigObj(self, obj):
"""Reconfigure (initialize) `obj` in the server
This includes:
- recreation of the device and its attributes (Tango)
- writing of the memorized attributes (last set values)
Usefull when there was a problem with the hardware.
.. note::
If `obj` is a `~sardana.taurus.core.tango.sardana.pool.Controller`
this will usually also require to call it with the controller's elements.
"""
if self._adm_device is None:
self._adm_device = Device(self.adm_name())
original_timeout = self._adm_device.get_timeout_millis()
# TODO: allow parametrizing timeout
self._adm_device.set_timeout_millis(12000) # 12 s
try:
self._adm_device.DevRestart(obj.getNormalName())
except PyTango.DevFailed as e:
raise RuntimeError(
"error during reconfig of {}".format(obj.getName())
) from e
finally:
self._adm_device.set_timeout_millis(original_timeout)
def __repr__(self):
return self.getNormalName()
def __str__(self):
return repr(self)
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# MoveableSource interface
#
[docs]
def getMoveable(self, names):
"""getMoveable(seq<string> names) -> Moveable
Returns a moveable object that handles all the moveable items given in
names."""
# if simple motor just return it (if the pool has it)
if isinstance(names, str):
names = names,
if len(names) == 1:
name = names[0]
return self.getObj(name, elem_type=MOVEABLE_TYPES)
# find a motor group that contains elements
moveable = self.__findMotorGroupWithElems(names)
# if none exists create one
if moveable is None:
mgs = self.getElementsOfType('MotorGroup')
i = 1
pid = os.getpid()
while True:
name = "_mg_ms_{0}_{1}".format(pid, i)
exists = False
for mg in list(mgs.values()):
if mg.name == name:
exists = True
break
if not exists:
break
i += 1
moveable = self.createMotorGroup(name, names)
return moveable
def __findMotorGroupWithElems(self, names):
names_lower = list(map(str.lower, names))
len_names = len(names)
mgs = self.getElementsOfType('MotorGroup')
for mg in list(mgs.values()):
mg_elems = mg.elements
if len(mg_elems) != len_names:
continue
for mg_elem, name in zip(mg_elems, names_lower):
if mg_elem.lower() != name:
break
else:
return mg
#
# End of MoveableSource interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _wait_for_element_in_container(self, container, elem_name, timeout=1.5,
contains=True):
start = time.time()
cond = True
nap = 0.01
if timeout:
nap = timeout / 10
while cond:
elem = container.getElement(elem_name)
if contains:
if elem is not None:
return elem
else:
if elem is None:
return True
if timeout:
dt = time.time() - start
if dt > timeout:
self.info("Timed out waiting for '%s' in container",
elem_name)
return
time.sleep(nap)
[docs]
def createMotorGroup(self, mg_name, elements):
params = [mg_name, ] + list(map(str, elements))
self.debug('trying to create motor group for elements: %s', params)
self.command_inout('CreateMotorGroup', params)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, mg_name)
[docs]
def createMeasurementGroup(self, mg_name, elements):
params = [mg_name, ] + list(map(str, elements))
self.debug('trying to create measurement group: %s', params)
self.command_inout('CreateMeasurementGroup', params)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, mg_name)
[docs]
def deleteMeasurementGroup(self, name):
return self.deleteElement(name)
[docs]
def createElement(self, name, ctrl, axis=None):
ctrl_type = ctrl.types[0]
if axis is None:
last_axis = ctrl.getLastUsedAxis()
if last_axis is None:
axis = str(1)
else:
axis = str(last_axis + 1)
else:
axis = str(axis)
cmd = "CreateElement"
pars = ctrl_type, ctrl.name, axis, name
self.command_inout(cmd, pars)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, name)
[docs]
def renameElement(self, old_name, new_name):
self.debug('trying to rename element: %s to: %s', old_name, new_name)
self.command_inout('RenameElement', [old_name, new_name])
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, new_name,
contains=True)
[docs]
def deleteElement(self, name):
self.debug('trying to delete element: %s', name)
self.command_inout('DeleteElement', name)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, name,
contains=False)
[docs]
def createController(self, class_name, name, *props):
ctrl_class = self.getObj(class_name, elem_type='ControllerClass')
if ctrl_class is None:
raise Exception("Controller class %s not found" % class_name)
cmd = "CreateController"
pars = [ctrl_class.types[0], ctrl_class.file_name, class_name, name]
pars.extend(list(map(str, props)))
self.command_inout(cmd, pars)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, name)
[docs]
def deleteController(self, name):
return self.deleteElement(name)
[docs]
def createInstrument(self, full_name, class_name):
self.command_inout("CreateInstrument", [full_name, class_name])
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, full_name)
[docs]
def registerExtensions():
factory = Factory()
factory.registerDeviceClass("Pool", Pool)
hw_type_names = [
'Controller',
'ComChannel', 'Motor', 'PseudoMotor', 'TriggerGate',
'CTExpChannel', 'ZeroDExpChannel', 'OneDExpChannel', 'TwoDExpChannel',
'PseudoCounter', 'IORegister', 'MotorGroup', 'MeasurementGroup']
hw_type_map = [(name, globals()[name]) for name in hw_type_names]
for klass_name, klass in hw_type_map:
factory.registerDeviceClass(klass_name, klass)
[docs]
def unregisterExtensions():
factory = Factory()
factory.unregisterDeviceClass("Pool")
hw_type_names = [
'Controller',
'ComChannel', 'Motor', 'PseudoMotor', 'TriggerGate',
'CTExpChannel', 'ZeroDExpChannel', 'OneDExpChannel', 'TwoDExpChannel',
'PseudoCounter', 'IORegister', 'MotorGroup', 'MeasurementGroup']
for klass_name in hw_type_names:
factory.unregisterDeviceClass(klass_name)