#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module contains the definition of the Controller base classes"""
__all__ = ["DataAccess", "SardanaValue", "Type", "Access", "Description",
"DefaultValue", "FGet", "FSet",
"Memorized", "MemorizedNoInit", "NotMemorized", "MaxDimSize",
"Controller", "Readable", "Startable", "Stopable", "Loadable",
"Referable", "Synchronizer",
"MotorController", "CounterTimerController", "ZeroDController",
"OneDController", "TwoDController", "TriggerGateController",
"PseudoMotorController", "PseudoCounterController",
"IORegisterController"]
__docformat__ = 'restructuredtext'
import copy
from collections import OrderedDict
from taurus import getLogLevel
from taurus.core.util.log import Logger
from sardana import DataAccess
from sardana.sardanavalue import SardanaValue
from sardana.pool.pooldefs import ControllerAPI, AcqSynch, AcqMode
#: Constant data type (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
Type = 'type'
#: Constant data access (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
Access = 'r/w type'
#: Constant description (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
Description = 'description'
#: Constant default value (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
DefaultValue = 'defaultvalue'
#: Constant for getter function (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
FGet = "fget"
#: Constant for setter function (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
FSet = "fset"
#: Constant memorize (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
#: Possible values for this key are :obj:`Memorized`, :obj:`MemorizedNoInit`
#: and :obj:`NotMemorized`
Memorize = "memorized"
#: Constant memorized (to be used as a *value* in the :obj:`Memorize` field
#: definition in :attr:`~Controller.axis_attributes` or
#: :attr:`~Controller.ctrl_attributes`)
Memorized = "true"
#: Constant memorize but not write at initialization (to be used as a *value*
#: in the :obj:`Memorize` field definition in
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
MemorizedNoInit = "true_without_hard_applied"
#: Constant not memorize (to be used as a *value*
#: in the :obj:`Memorize` field definition in
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
NotMemorized = "false"
#: Constant MaxDimSize (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
MaxDimSize = "maxdimsize"
[docs]class Controller(object):
"""
Base controller class. Do **NOT** inherit from this class directly
:param :obj:`str` inst: controller instance name
:param dict props: a dictionary containing pairs of property name,
property value
:arg args:
:keyword kwargs:
"""
#: A sequence of :obj:`str` representing the controller features
ctrl_features = []
#: A :class:`dict` containing controller properties where:
#:
#: - key : (:obj:`str`) controller property name
#: - value : :class:`dict` with with three :obj:`str` keys ("type",
#: "description" and "defaultvalue" case insensitive):
#:
#: - for :obj:`Type`, value is one of the values described in
#: :ref:`sardana-controller-data-type`
#:
#: - for :obj:`Description`, value is a :obj:`str` description of the
#: property.
#: if is not given it defaults to empty string.
#:
#: - for :obj:`DefaultValue`, value is a python object or None if no
#: default value exists for the property.
#:
#: Example::
#:
#: from sardana.pool.controller import MotorController, \
#: Type, Description, DefaultValue
#:
#: class MyCtrl(MotorController):
#:
#: ctrl_properties = \
#: {
#: 'host' : { Type : str,
#: Description : "host name" },
#: 'port' : { Type : int,
#: Description : "port number",
#: DefaultValue: 5000 }
#: }
#:
ctrl_properties = {}
#: A :class:`dict` containning controller extra attributes where:
#:
#: - key : (:obj:`str`) controller attribute name
#: - value : :class:`dict` with :obj:`str` possible keys: "type",
#: "r/w type", "description", "fget", "fset" and "maxdimsize"
#: (case insensitive):
#:
#: - for :obj:`Type`, value is one of the values described in
#: :ref:`sardana-controller-data-type`
#:
#: - for :obj:`Access`, value is one of
#: :obj:`~sardana.sardanadefs.DataAccess` ("read" or "read_write"
#: (case insensitive) strings are also accepted) [default: ReadWrite]
#:
#: - for :obj:`Description`, value is a :obj:`str` description of the
#: attribute [default: "" (empty string)]
#:
#: - for :obj:`FGet`, value is a :obj:`str` with the method name for
#: the attribute getter [default: "get"<controller attribute name>]
#:
#: - for :obj:`FSet`, value is a :obj:`str` with the method name for
#: the attribute setter. [default, if :obj:`Access` = "read_write":
#: "set"<controller attribute name>]
#:
#: - for :obj:`DefaultValue`, value is a python object or None if no
#: default value exists for the attribute. If given, the attribute is
#: set when the controller is first created.
#:
#: - for :obj:`Memorize`, value is a :obj:`str` with possible values:
#: :obj:`Memorized`, :obj:`MemorizedNoInit` and
#: :obj:`NotMemorized` [default: :obj:`Memorized`]
#:
#: .. versionadded:: 1.1
#:
#: - for :obj:`MaxDimSize`, value is a :obj:`tuple` with possible values:
#: - for scalar **must** be an empty tuple ( () or [] )
#: [default: ()]
#: - for 1D arrays a sequence with one value (example: (1024,))
#: [default: (2048,)]
#: - for 1D arrays a sequence with two values (example: (1024, 1024))
#: [default: (2048, 2048)]
#:
#: .. versionadded:: 1.1
#:
#: .. versionadded:: 1.0
#:
#: Example::
#:
#: from sardana.pool.controller import PseudoMotorController, \
#: Type, Description, DefaultValue, DataAccess
#:
#: class HKLCtrl(PseudoMotorController):
#:
#: ctrl_attributes = \
#: {
#: 'ReflectionMatrix' : { Type : ( (float,), ),
#: Description : "The reflection matrix",
#: Access : DataAccess.ReadOnly,
#: FGet : 'getReflectionMatrix', },
#: }
#:
#: def getReflectionMatrix(self):
#: return ( (1.0, 0.0), (0.0, 1.0) )
ctrl_attributes = {}
#: A :class:`dict` containning controller extra attributes for each axis
#: where:
#:
#: - key : (:obj:`str`) axis attribute name
#: - value : :class:`dict` with three :obj:`str` keys
#: ("type", "r/w type", "description" case insensitive):
#:
#: - for :obj:`Type`, value is one of the values described in
#: :ref:`sardana-controller-data-type`
#:
#: - for :obj:`Access`, value is one of
#: :obj:`~sardana.sardanadefs.DataAccess` ("read" or "read_write"
#: (case insensitive) strings are also accepted)
#:
#: - for :obj:`Description`, value is a :obj:`str` description of the
#: attribute
#:
#: - for :obj:`DefaultValue`, value is a python object or None if no
#: default value exists for the attribute. If given, the attribute is
#: set when the axis is first created.
#:
#: - for :obj:`Memorize`, value is a :obj:`str` with possible values:
#: :obj:`Memorized`, :obj:`MemorizedNoInit` and
#: :obj:`NotMemorized` [default: :obj:`Memorized`]
#:
#: .. versionadded:: 1.1
#:
#: - for :obj:`MaxDimSize`, value is a :obj:`tuple` with possible values:
#: - for scalar **must** be an empty tuple ( () or [] )
#: [default: ()]
#: - for 1D arrays a sequence with one value (example: (1024,))
#: [default: (2048,)]
#: - for 1D arrays a sequence with two values (example: (1024, 1024))
#: [default: (2048, 2048)]
#:
#: .. versionadded:: 1.1
#:
#: .. versionadded:: 1.0
#:
#: Example::
#:
#: from sardana.pool.controller import MotorController, \
#: Type, Description, DefaultValue, DataAccess
#:
#: class MyMCtrl(MotorController):
#:
#: axis_attributes = \
#: {
#: 'EncoderSource' : { Type : str,
#: Description : 'motor encoder source', },
#: }
#:
#: def getAxisExtraPar(self, axis, name):
#: name = name.lower()
#: if name == 'encodersource':
#: return self._encodersource[axis]
#:
#: def setAxisExtraPar(self, axis, name, value):
#: name = name.lower()
#: if name == 'encodersource':
#: self._encodersource[axis] = value
axis_attributes = {}
#: A :class:`dict` containing the standard attributes present on each axis
#: device
standard_axis_attributes = {}
#: A :obj:`str` representing the controller gender
gender = None
#: A :obj:`str` representing the controller model name
model = 'Generic'
#: A :obj:`str` representing the controller organization
organization = 'Sardana team'
#: A :obj:`str` containning the path to the image file
image = None
#: A :obj:`str` containning the path to the image logo file
logo = None
def __init__(self, inst, props, *args, **kwargs):
self._inst_name = inst
self._log = Logger("Controller.%s" % inst)
self._log.log_obj.setLevel(getLogLevel())
self._args = args
self._kwargs = kwargs
self._api_version = self._findAPIVersion()
for prop_name, prop_value in list(props.items()):
setattr(self, prop_name, prop_value)
def _findAPIVersion(self):
"""*Internal*. By default return the Pool Controller API
version of the pool where the controller is running"""
return ControllerAPI
def _getPoolController(self):
"""*Internal*."""
if hasattr(self, '_kwargs'):
kw = self._kwargs
if kw is not None:
ctrl_wr = kw.get('pool_controller')
if ctrl_wr is not None:
return ctrl_wr()
[docs] def AddDevice(self, axis):
"""**Controller API**. Override if necessary. Default implementation
does nothing.
:param int axis: axis number"""
pass
[docs] def DeleteDevice(self, axis):
"""**Controller API**. Override if necessary. Default implementation
does nothing.
:param int axis: axis number"""
pass
[docs] def GetName(self):
"""**Controller API**. The controller instance name.
:return: the controller instance name
:rtype: :obj:`str`
.. versionadded:: 1.0"""
return self._inst_name
[docs] def GetAxisName(self, axis):
"""**Controller API**. The axis name.
:return: the axis name
:rtype: :obj:`str`
.. versionadded:: 1.0"""
ctrl = self._getPoolController()
if ctrl is not None:
return ctrl.get_element(axis=axis).name
return str(axis)
[docs] def PreStateAll(self):
"""**Controller API**. Override if necessary.
Called to prepare a read of the state of all axis.
Default implementation does nothing."""
pass
[docs] def PreStateOne(self, axis):
"""**Controller API**. Override if necessary.
Called to prepare a read of the state of a single axis.
Default implementation does nothing.
:param int axis: axis number"""
pass
[docs] def StateAll(self):
"""**Controller API**. Override if necessary.
Called to read the state of all selected axis.
Default implementation does nothing."""
pass
[docs] def StateOne(self, axis):
"""**Controller API**. Override is MANDATORY.
Called to read the state of one axis.
Default implementation raises :exc:`NotImplementedError`."""
raise NotImplementedError("StateOne must be defined in the controller")
[docs] def SetCtrlPar(self, parameter, value):
"""**Controller API**. Override if necessary.
Called to set a parameter with a value. Default implementation sets
this object member named '_'+parameter with the given value.
.. versionadded:: 1.0"""
setattr(self, '_' + parameter, value)
[docs] def GetCtrlPar(self, parameter):
"""**Controller API**. Override if necessary.
Called to set a parameter with a value. Default implementation returns
the value contained in this object's member named '_'+parameter.
.. versionadded:: 1.0"""
return getattr(self, '_' + parameter)
[docs] def SetAxisPar(self, axis, parameter, value):
"""**Controller API**. Override is MANDATORY.
Called to set a parameter with a value on the given axis. Default
implementation raises :exc:`NotImplementedError`.
.. versionadded:: 1.0"""
raise NotImplementedError("SetAxisPar must be defined in the "
"controller")
[docs] def GetAxisPar(self, axis, parameter):
"""**Controller API**. Override is MANDATORY.
Called to get a parameter value on the given axis. Default
implementation raises :exc:`NotImplementedError`.
.. versionadded:: 1.0"""
raise NotImplementedError("GetAxisPar must be defined in the "
"controller")
[docs] def GetAxisAttributes(self, axis):
"""**Controller API**. Override if necessary.
Returns a dictionary of all attributes per axis.
Default implementation returns a new :class:`dict` with the standard
attributes plus the :attr:`~Controller.axis_attributes`
:param int axis: axis number
:return: a dict containing attribute information as defined in
:attr:`~Controller.axis_attributes`
.. versionadded:: 1.0"""
ret = copy.deepcopy(self.standard_axis_attributes)
if isinstance(self, Referable):
ref_attrs = copy.deepcopy(self.referable_axis_attributes)
ret.update(ref_attrs)
axis_attrs = copy.deepcopy(self.axis_attributes)
ret.update(axis_attrs)
return ret
[docs] def SendToCtrl(self, stream):
"""**Controller API**. Override if necessary.
Sends a string to the controller.
Default implementation raises :exc:`NotImplementedError`.
:param :obj:`str` stream: stream to be sent
:return: any relevant information e.g. response of the controller
:rtype: :obj:`str`"""
raise NotImplementedError("SendToCtrl not implemented")
[docs]class Startable(object):
"""A Startable interface. A controller for which it's axis are 'startable'
(like a motor, for example) should implement this interface
.. note: Do not inherit directly from :class:`Startable`."""
[docs] def PreStartAll(self):
"""**Controller API**. Override if necessary.
Called to prepare a start of all axis (whatever pre-start means).
Default implementation does nothing."""
pass
[docs] def PreStartOne(self, axis, value):
"""**Controller API**. Override if necessary.
Called to prepare a start of the given axis (whatever pre-start means).
Default implementation returns True.
:param int axis: axis number
:param float value: new value
:return: True means a successfull pre-start or False for a failure
:rtype: bool"""
return True
[docs] def StartOne(self, axis, value):
"""**Controller API**. Override if necessary.
Called to do a start of the given axis (whatever start means).
Default implementation raises :exc:`NotImplementedError`
:param int axis: axis number
:param float value: new value"""
raise NotImplementedError("StartOne must be defined in the controller")
[docs] def StartAll(self):
"""**Controller API**. Override is MANDATORY!
Default implementation does nothing."""
pass
[docs]class Stopable(object):
"""A Stopable interface. A controller for which it's axis are 'stoppable'
(like a motor, for example) should implement this interface
.. note: Do not inherit directly from :class:`Stopable`."""
[docs] def PreAbortAll(self):
"""**Controller API**. Override if necessary.
Called to prepare a abort of all axis (whatever pre-abort means).
Default implementation does nothing."""
pass
[docs] def PreAbortOne(self, axis):
"""**Controller API**. Override if necessary.
Called to prepare a abort of the given axis (whatever pre-abort means).
Default implementation returns True.
:param int axis: axis number
:return: True means a successfull pre-abort or False for a failure
:rtype: bool"""
return True
[docs] def AbortOne(self, axis):
"""**Controller API**. Override is MANDATORY!
Default implementation raises :exc:`NotImplementedError`.
Aborts one of the axis
:param int axis: axis number"""
raise NotImplementedError("AbortOne must be defined in the controller")
[docs] def AbortAll(self):
"""**Controller API**. Override if necessary.
Aborts all active axis of this controller.
Default implementation does nothing."""
pass
[docs] def PreStopAll(self):
"""**Controller API**. Override if necessary.
Called to prepare a stop of all axis (whatever pre-stop means).
Default implementation does nothing."""
pass
[docs] def PreStopOne(self, axis):
"""**Controller API**. Override if necessary.
Called to prepare a stop of the given axis (whatever pre-stop means).
Default implementation returns True.
:param int axis: axis number
:return: True means a successfull pre-stop or False for a failure
:rtype: bool"""
return True
[docs] def StopOne(self, axis):
"""**Controller API**. Override if necessary.
Stops one of the axis.
*This method is reserved for future implementation.*
Default implementation calls :meth:`~Controller.AbortOne`.
:param int axis: axis number
.. versionadded:: 1.0"""
self.AbortOne(axis)
[docs] def StopAll(self):
"""**Controller API**. Override if necessary.
Stops all active axis of this controller.
Default implementation calls :meth:`~Controller.AbortAll`."""
self.AbortAll()
[docs]class Readable(object):
"""A Readable interface. A controller for which it's axis are 'readable'
(like a motor, counter or 1D for example) should implement this interface
.. note: Do not inherit directly from Readable."""
[docs] def PreReadAll(self):
"""**Controller API**. Override if necessary.
Called to prepare a read of the value of all axis.
Default implementation does nothing."""
pass
[docs] def PreReadOne(self, axis):
"""**Controller API**. Override if necessary.
Called to prepare a read of the value of a single axis.
Default implementation does nothing.
:param int axis: axis number"""
pass
[docs] def ReadAll(self):
"""**Controller API**. Override if necessary.
Called to read the value of all selected axis
Default implementation does nothing."""
pass
[docs] def ReadOne(self, axis):
"""**Controller API**. Override is MANDATORY!
Default implementation raises :exc:`NotImplementedError`
:param int axis: axis number
:return: the axis value
:rtype: object
"""
raise NotImplementedError("ReadOne must be defined in the controller")
[docs]class Loadable(object):
"""A Loadable interface. A controller for which it's axis are 'loadable'
(like a counter, 1D or 2D for example) should implement this interface
.. note: Do not inherit directly from Loadable."""
#: axis of the default timer
default_timer = None
[docs] def PrepareOne(self, axis, value, repetitions, latency, nb_starts):
"""**Controller API**. Override if necessary.
Called to prepare the master channel axis with the measurement
parameters.
Default implementation does nothing.
:param int axis: axis number
:param int repetitions: number of repetitions
:param float value: integration time / monitor count
:param float latency: latency time
:param int nb_starts: number of starts
"""
pass
[docs] def PreLoadAll(self):
"""**Controller API**. Override if necessary.
Called to prepare loading the integration time / monitor value.
Default implementation does nothing."""
pass
[docs] def PreLoadOne(self, axis, value, repetitions, latency):
"""**Controller API**. Override if necessary.
Called to prepare loading the master channel axis with the
acquisition parameters.
Default implementation returns True.
:param int axis: axis number
:param float value: integration time /monitor value
:param int repetitions: number of repetitions
:param float latency: latency time
:return: True means a successful PreLoadOne or False for a failure
:rtype: bool"""
return True
[docs] def LoadAll(self):
"""**Controller API**. Override if necessary.
Called to load the integration time / monitor value.
Default implementation does nothing."""
pass
[docs] def LoadOne(self, axis, value, repetitions, latency):
"""**Controller API**. Override is MANDATORY!
Called to load the integration time / monitor value.
Default implementation raises :exc:`NotImplementedError`.
:param int axis: axis number
:param float value: integration time /monitor value
:param int repetitions: number of repetitions
:param float latency: latency time
:param float value: integration time /monitor value"""
raise NotImplementedError("LoadOne must be defined in the controller")
[docs]class Referable(object):
"""A Referable interface. A controller for which it's axis can
report data references (like a 1D or 2D for example) should implement
this interface
.. note: Inherit from Referable together with either OneDController or
TwoDController
.. note::
The Referable class has been included in Sardana on a provisional
basis. Backwards incompatible changes (up to and including removal
of the class) may occur if deemed necessary by the core developers.
"""
#: A :class:`dict` containing the referable attributes present on each axis
#: device
referable_axis_attributes = OrderedDict([
("ValueRef", {Type: str,
Access: DataAccess.ReadOnly,
Description: "Value reference", }),
# TODO: in case of Tango ValueBuffer type is overridden
# by DevEncoded
("ValueRefBuffer", {Type: str,
Access: DataAccess.ReadOnly,
Description: "Value reference buffer", }),
("ValueRefEnabled", {Type: bool,
Access: DataAccess.ReadWrite,
Description: "Value reference enabled"}),
("ValueRefPattern", {Type: str,
Access: DataAccess.ReadWrite,
Description: "Value reference template"}),
])
[docs] def RefOne(self, axis):
"""**Controller API**. Override is MANDATORY!
Default implementation raises :exc:`NotImplementedError`
:param int axis: axis number
:return: the axis value
:rtype: object
"""
raise NotImplementedError("RefOne must be defined in the controller")
[docs]class Synchronizer(object):
"""A Synchronizer interface. A controller for which its axis are 'Able to
Synchronize' should implement this interface
.. note: Do not inherit directly from Synchronizer."""
[docs] def PreSynchAll(self):
"""**Controller API**. Override if necessary.
Called to prepare loading the synchronization description.
Default implementation does nothing."""
pass
[docs] def PreSynchOne(self, axis, description):
"""**Controller API**. Override if necessary.
Called to prepare loading the axis with the synchronization description.
Default implementation returns True.
:param int axis: axis number
:param list<dict>: synchronization description
:return: True means a successfull PreSynchOne or False for a failure
:rtype: bool"""
return True
[docs] def SynchAll(self):
"""**Controller API**. Override if necessary.
Called to load the synchronization description.
Default implementation does nothing."""
pass
[docs] def SynchOne(self, axis, description):
"""**Controller API**. Override is MANDATORY!
Called to load the axis with the synchronization description.
Default implementation raises :exc:`NotImplementedError`.
:param int axis: axis number
:param list<dict> description: synchronization description"""
raise NotImplementedError("SynchOne must be defined in the controller")
[docs]class MotorController(Controller, Startable, Stopable, Readable):
"""Base class for a motor controller. Inherit from this class to implement
your own motor controller for the device pool.
A motor controller should support these axis parameters:
- acceleration
- deceleration
- velocity
- base_rate
- step_per_unit
These parameters are configured through the
:meth:`~Controller.GetAxisPar`/:meth:`~Controller.SetAxisPar` API.
"""
#: A constant representing no active switch.
NoLimitSwitch = 0
#: A constant representing an active *home* switch.
#: You can *OR* two or more switches together. For example, to say both
#: upper and lower limit switches are active::
#:
#: limit_switches = self.HomeLimitSwitch | self.LowerLimitSwitch
HomeLimitSwitch = 1
#: A constant representing an active *upper limit* switch.
#: You can *OR* two or more switches together. For example, to say both
#: upper and lower limit switches are active::
#:
#: limit_switches = self.UpperLimitSwitch | self.LowerLimitSwitch
UpperLimitSwitch = 2
#: A constant representing an active *lower limit* switch.
#: You can *OR* two or more switches together. For example, to say both
#: upper and lower limit switches are active::
#:
#: limit_switches = self.UpperLimitSwitch | self.LowerLimitSwitch
LowerLimitSwitch = 4
#: A :class:`dict` containing the standard attributes present on each axis
#: device
standard_axis_attributes = OrderedDict([
('Position', {'type': float,
'description': 'Position', }),
('DialPosition', {'type': float,
'description': 'Dial Position', }),
('Offset', {'type': float,
'description': 'Offset', }),
('Sign', {'type': float,
'description': 'Sign', }),
('Step_per_unit', {'type': float,
'description': 'Steps per unit', }),
('Base_rate', {'type': float,
'description': 'Base rate', }),
('Velocity', {'type': float,
'description': 'Velocity', }),
('Acceleration', {'type': float,
'description': 'Acceleration time (s)', }),
('Deceleration', {'type': float,
'description': 'Deceleration time (s)', }),
('Backlash', {'type': float,
'description': 'Backlash', }),
('Limit_switches', {'type': (bool,),
'description': "This attribute is the motor "
"limit switches state. It's an array with 3 \n"
"elements which are:\n"
"0 - The home switch\n"
"1 - The upper limit switch\n"
"2 - The lower limit switch\n"
"False means not active. True means active", }),
])
standard_axis_attributes.update(Controller.standard_axis_attributes)
#: A :obj:`str` representing the controller gender
gender = 'Motor controller'
[docs] def GetAxisAttributes(self, axis):
"""**Motor Controller API**. Override if necessary.
Returns a sequence of all attributes per axis.
Default implementation returns a :class:`dict` containning:
- Position
- DialPosition
- Offset
- Sign
- Step_per_unit
- Acceleration
- Deceleration
- Base_rate
- Velocity
- Backlash
- Limit_switches
plus all attributes contained in :attr:`~Controller.axis_attributes`
.. note::
Normally you don't need to Override this method. You just implement
the class member :attr:`~Controller.axis_attributes`. Typically,
you will need to Override this method in two cases:
- certain axes contain a different set of extra attributes
which cannot be simply defined in
:attr:`~Controller.axis_attributes`
- some axes (or all) don't implement a set of standard
moveable parameters (ex.: if a motor controller is created to
control a power supply, it may have a position (current) and
a velocity (ramp speed) but it may not have acceleration)
:param int axis: axis number
:return: a dict containing attribute information as defined in
:attr:`~Controller.axis_attributes`
.. versionadded:: 1.0"""
return Controller.GetAxisAttributes(self, axis)
[docs] def DefinePosition(self, axis, position):
"""**Motor Controller API**. Override is recommended!
This method is called to load a new motor position.
Default implementation does nothing.
"""
# raise NotImplementedError("DefinePosition must be defined in the "
# "controller")
pass
[docs]class CounterTimerController(Controller, Readable, Startable, Stopable,
Loadable):
"""Base class for a counter/timer controller. Inherit from this class to
implement your own counter/timer controller for the device pool.
A counter timer controller should support these controller parameters:
- timer
- monitor
"""
#: A :class:`dict` containing the standard attributes present on each axis
#: device
standard_axis_attributes = {
'IntegrationTime': {'type': float,
'description': 'Integration time used in '
'independent acquisition'},
'Timer': {'type': str,
'description': 'Timer used in independent acquisition'},
'Value': {'type': float,
'description': 'Value', },
# TODO: in case of Tango ValueBuffer type is overridden by DevEncoded
'ValueBuffer': {'type': str,
'description': 'Value buffer', },
'Shape': {'type': (int,),
'description': 'Shape of the value, it is an empty array'}
}
standard_axis_attributes.update(Controller.standard_axis_attributes)
#: A :obj:`str` representing the controller gender
gender = 'Counter/Timer controller'
def __init__(self, inst, props, *args, **kwargs):
Controller.__init__(self, inst, props, *args, **kwargs)
self._timer = None
self._monitor = None
self._master = None
self._latency_time = 0
self._synchronization = AcqSynch.SoftwareTrigger
[docs] def StartOne(self, axis, value):
"""**Controller API**. Override if necessary.
Called to do a start of the given axis (whatever start means).
:param int axis: axis number
:param float value: new value"""
pass
[docs]class TriggerGateController(Controller, Synchronizer, Stopable, Startable):
"""Base class for a trigger/gate controller. Inherit from this class to
implement your own trigger/gate controller for the device pool.
"""
#: A :obj:`str` representing the controller gender
gender = 'Trigger/Gate controller'
standard_axis_attributes = {
'MoveableOnInput': {'type': str,
'description': 'Moveable', },
}
def __init__(self, inst, props, *args, **kwargs):
Controller.__init__(self, inst, props, *args, **kwargs)
# TODO: Implement a Preparable interface and move this method
# and the Loadable.PrepareOne() there.
[docs] def PrepareOne(self, axis, nb_starts):
"""**Controller API**. Override if necessary.
Called to prepare the trigger/gate axis with the measurement
parameters.
Default implementation does nothing.
:param int axis: axis
:param int nb_starts: number of starts
"""
pass
[docs]class ZeroDController(Controller, Readable, Stopable):
"""Base class for a 0D controller. Inherit from this class to
implement your own 0D controller for the device pool."""
#: A :class:`dict` containing the standard attributes present on each axis
#: device
standard_axis_attributes = {
'IntegrationTime': {'type': float,
'description': 'Integration time used in '
'independent acquisition'},
'Value': {'type': float,
'description': 'Value', },
# TODO: in case of Tango ValueBuffer type is overridden by DevEncoded
'ValueBuffer': {'type': str,
'description': 'Value buffer', },
'Shape': {'type': (int,),
'description': 'Shape of the value, it is an empty array'}
}
standard_axis_attributes.update(Controller.standard_axis_attributes)
#: A :obj:`str` representing the controller gender
gender = '0D controller'
[docs] def AbortOne(self, axis):
"""This method is not executed by the system.
Default implementation does nothing.
:param int axis: axis number"""
pass
[docs]class OneDController(Controller, Readable, Startable, Stopable, Loadable):
"""Base class for a 1D controller. Inherit from this class to
implement your own 1D controller for the device pool.
.. versionadded:: 1.2"""
standard_axis_attributes = {
'IntegrationTime': {'type': float,
'description': 'Integration time used in '
'independent acquisition'},
'Timer': {'type': str,
'description': 'Timer used in independent acquisition'},
'Value': {'type': (float,),
'description': 'Value',
'maxdimsize': (16 * 1024,)},
# TODO: in case of Tango ValueBuffer type is overridden by DevEncoded
'ValueBuffer': {'type': str,
'description': 'Value buffer', },
'Shape': {'type': (int,),
'description': 'Shape of the value, it is an array with '
'1 element - X dimension'}
}
standard_axis_attributes.update(Controller.standard_axis_attributes)
#: A :obj:`str` representing the controller gender
gender = '1D controller'
def __init__(self, inst, props, *args, **kwargs):
Controller.__init__(self, inst, props, *args, **kwargs)
self._latency_time = 0
self._synchronization = AcqSynch.SoftwareTrigger
[docs] def GetAxisPar(self, axis, parameter):
"""**Controller API**. Override is MANDATORY.
Called to get a parameter value on the given axis.
``GetAxisPar`` with 'data_source' parameter is deprecated since 2.8.0.
Inherit from :class:`~Referable` class in order to report value
references.
Default implementation calls deprecated
:meth:`~Controller.GetPar` which, by default, raises
:exc:`NotImplementedError`.
.. versionadded:: 1.2"""
if parameter.lower() == 'data_source':
return None
return self.GetPar(axis, parameter)
[docs]class TwoDController(Controller, Readable, Startable, Stopable, Loadable):
"""Base class for a 2D controller. Inherit from this class to
implement your own 2D controller for the device pool."""
standard_axis_attributes = {
'IntegrationTime': {'type': float,
'description': 'Integration time used in '
'independent acquisition'},
'Timer': {'type': str,
'description': 'Timer used in independent acquisition'},
'Value': {'type': ((float,),),
'description': 'Value',
'maxdimsize': (4 * 1024, 4 * 1024)},
# TODO: in case of Tango ValueBuffer type is overridden by DevEncoded
'ValueBuffer': {'type': str,
'description': 'Value buffer', },
'Shape': {'type': (int,),
'description': 'Shape of the value, it is an array with '
'2 elements: X and Y dimensions'}
}
standard_axis_attributes.update(Controller.standard_axis_attributes)
#: A :obj:`str` representing the controller gender
gender = '2D controller'
def __init__(self, inst, props, *args, **kwargs):
Controller.__init__(self, inst, props, *args, **kwargs)
self._latency_time = 0
[docs] def GetAxisPar(self, axis, parameter):
"""**Controller API**. Override is MANDATORY.
Called to get a parameter value on the given axis.
``GetAxisPar`` with 'data_source' parameter is deprecated since 2.8.0.
Inherit from :class:`~Referable` class in order to report value
references.
Default implementation calls deprecated
:meth:`~Controller.GetPar` which, by default, raises
:exc:`NotImplementedError`.
.. versionadded:: 1.2"""
if parameter.lower() == 'data_source':
return None
return self.GetPar(axis, parameter)
[docs]class PseudoController(Controller):
"""Base class for all pseudo controllers.
.. note: Do not inherit directly from :class:`PseudoController`."""
def _getElem(self, index_or_role, roles, local_cache, ids):
"""*Iternal*."""
elem = local_cache.get(index_or_role)
if elem is None:
pool = self._getPoolController().pool
if isinstance(index_or_role, int):
index = index_or_role
role = roles[index]
else:
role = index_or_role
index = roles.index(role)
motor_id = ids[index]
elem = pool.get_element_by_id(motor_id)
local_cache[index] = local_cache[role] = elem
return elem
[docs]class PseudoMotorController(PseudoController):
"""Base class for a pseudo motor controller. Inherit from this class to
implement your own pseudo motor controller for the device pool.
Every Pseudo Motor implementation must be a subclass of this class.
Current procedure for a correct implementation of a Pseudo Motor class:
- mandatory:
- define the class level attributes
:attr:`~PseudoMotorController.pseudo_motor_roles`,
:attr:`~PseudoMotorController.motor_roles`
- write :meth:`~PseudoMotorController.CalcPseudo` method
- write :meth:`~PseudoMotorController.CalcPhysical` method.
- optional:
- write :meth:`~PseudoMotorController.CalcAllPseudo` and
:meth:`~PseudoMotorController.CalcAllPhysical` if great performance
gain can be achived"""
#: a sequence of strings describing the role of each pseudo motor axis in
#: this controller
pseudo_motor_roles = ()
#: a sequence of strings describing the role of each motor in this
#: controller
motor_roles = ()
#: A :class:`dict` containing the standard attributes present on each axis
#: device
standard_axis_attributes = {
'Position': {'type': float,
'description': 'Position', },
}
#: A :obj:`str` representing the controller gender
gender = 'Pseudo motor controller'
def __init__(self, inst, props, *args, **kwargs):
self.__motor_role_elements = {}
self.__pseudo_motor_role_elements = {}
PseudoController.__init__(self, inst, props, *args, **kwargs)
[docs] def CalcAllPseudo(self, physical_pos, curr_pseudo_pos):
"""**Pseudo Motor Controller API**. Override if necessary.
Calculates the positions of all pseudo motors that belong to the
pseudo motor system from the positions of the physical motors.
Default implementation does a loop calling
:meth:`PseudoMotorController.calc_pseudo` for each pseudo motor role.
:param sequence<float> physical_pos: a sequence containing physical
motor positions
:param sequence<float> curr_pseudo_pos: a sequence containing the
current pseudo motor
positions
:return: a sequece of pseudo motor positions (one for each pseudo
motor role)
:rtype: sequence<float>
.. versionadded:: 1.0"""
ret = []
for i in range(len(self.pseudo_motor_roles)):
ret.append(self.CalcPseudo(i + 1, physical_pos, curr_pseudo_pos))
return ret
[docs] def CalcAllPhysical(self, pseudo_pos, curr_physical_pos):
"""**Pseudo Motor Controller API**. Override if necessary.
Calculates the positions of all motors that belong to the pseudo
motor system from the positions of the pseudo motors.
Default implementation does a loop calling
:meth:`PseudoMotorController.calc_physical` for each motor role.
:param sequence<float> pseudo_pos: a sequence containing pseudo motor
positions
:param sequence<float> curr_physical_pos: a sequence containing the
current physical motor
positions
:return: a sequece of motor positions (one for each motor role)
:rtype: sequence<float>
.. versionadded:: 1.0"""
ret = []
for i in range(len(self.motor_roles)):
pos = self.CalcPhysical(i + 1, pseudo_pos, curr_physical_pos)
ret.append(pos)
return ret
[docs] def CalcPseudo(self, axis, physical_pos, curr_pseudo_pos):
"""**Pseudo Motor Controller API**. Override is **MANDATORY**.
Calculate pseudo motor position given the physical motor positions
:param int axis: the pseudo motor role axis
:param sequence<float> physical_pos: a sequence containing motor
positions
:param sequence<float> curr_pseudo_pos: a sequence containing the
current pseudo motor
positions
:return: a pseudo motor position corresponding to the given axis
pseudo motor role
:rtype: float
.. versionadded:: 1.0"""
raise NotImplementedError("CalcPseudo must be defined in the "
"controller")
[docs] def CalcPhysical(self, axis, pseudo_pos, curr_physical_pos):
"""**Pseudo Motor Controller API**. Override is **MANDATORY**.
Calculate physical motor position given the pseudo motor positions.
:param axis: the motor role axis
:type axis: int
:param pseudo_pos: a sequence containing pseudo motor positions
:type pseudo_pos: sequence<float>
:param curr_physical_pos: a sequence containing the current physical
motor positions
:type curr_physical_pos: sequence<float>
:return: a motor position corresponding to the given axis motor role
:rtype: float
.. versionadded:: 1.0"""
raise NotImplementedError("CalcPhysical must be defined in the "
"controller")
[docs] def GetMotor(self, index_or_role):
"""Returns the motor for a given role/index.
.. warning::
* Use with care: Executing motor methods can be dangerous!
* Since the controller is built before any element (including
motors), this method will **FAIL** when called from the controller
constructor
:param index_or_role: index number or role name
:type index_or_role: int or str
:return: Motor object for the given role/index
:rtype: :class:`~sardana.pool.poolmotor.PoolMotor`"""
return self._getElem(index_or_role, self.motor_roles,
self.__motor_role_elements,
self._kwargs['motor_ids'])
[docs] def GetPseudoMotor(self, index_or_role):
"""Returns the pseudo motor for a given role/index.
.. warning::
* Use with care: Executing pseudo motor methods can be dangerous!
* Since the controller is built before any element (including pseudo
motors), this method will **FAIL** when called from the controller
constructor
:param index_or_role: index number or role name
:type index_or_role: int or str
:return: PseudoMotor object for the given role/index
:rtype: :class:`~sardana.pool.poolpseudomotor.PoolPseudoMotor`"""
dict_ids = self._getPoolController().get_element_ids()
dict_axis = self._getPoolController().get_element_axis()
pseudo_motor_ids = []
for akey, aname in list(dict_axis.items()):
pseudo_motor_ids.append(
list(dict_ids.keys())[list(dict_ids.values()).index(aname)])
return self._getElem(index_or_role, self.pseudo_motor_roles,
self.__pseudo_motor_role_elements,
pseudo_motor_ids)
# self._kwargs['pseudo_motor_roles'])
[docs]class PseudoCounterController(Controller):
"""Base class for a pseudo counter controller. Inherit from this class to
implement your own pseudo counter controller for the device pool.
Every Pseudo Counter implementation must be a subclass of this class.
Current procedure for a correct implementation of a Pseudo Counter class:
- mandatory:
- define the class level attributes
:attr:`~PseudoCounterController.counter_roles`,
- write :meth:`~PseudoCounterController.Calc` method"""
#: a sequence of strings describing the role of each pseudo counter axis in
#: this controller
pseudo_counter_roles = ()
#: a sequence of strings describing the role of each counter in this
#: controller
counter_roles = ()
#: A :class:`dict` containing the standard attributes present on each axis
#: device
standard_axis_attributes = {
'IntegrationTime': {'type': float,
'description': 'Integration time used in '
'independent acquisition'},
'Value': {'type': float,
'description': 'Value', },
# TODO: in case of Tango ValueBuffer type is overridden by DevEncoded
'ValueBuffer': {'type': str,
'description': 'Data', },
'Shape': {'type': (int,),
'description': 'Shape of the value, it is an empty array'}
}
#: A :obj:`str` representing the controller gender
gender = 'Pseudo counter controller'
[docs] def Calc(self, axis, values):
"""**Pseudo Counter Controller API**. Override is **MANDATORY**.
Calculate pseudo counter position given the counter values.
:param int axis: the pseudo counter role axis
:param sequence<float> values: a sequence containing current values
of underlying elements
:return: a pseudo counter value corresponding to the given axis
pseudo counter role
:rtype: float
.. versionadded:: 1.0"""
raise NotImplementedError("Calc must be defined in the controller")
[docs] def CalcAll(self, values):
"""**Pseudo Counter Controller API**. Override if necessary.
Calculates all pseudo counter values from the values of counters.
Default implementation does a loop calling
:meth:`PseudoCounterController.Calc` for each pseudo counter role.
:param sequence<float> values: a sequence containing current values
of underlying elements
:return: a sequece of pseudo counter values (one for each pseudo
counter role)
:rtype: sequence<float>
.. versionadded:: 1.2"""
f, n = self.Calc, len(self.pseudo_counter_roles)
return [f(i + 1, values) for i in range(n)]
[docs]class IORegisterController(Controller, Readable):
"""Base class for a IORegister controller. Inherit from this class to
implement your own IORegister controller for the device pool.
"""
#: A :class:`dict` containing the standard attributes present on each axis
#: device
standard_axis_attributes = {
'Value': {'type': float,
'description': 'Value', },
}
standard_axis_attributes.update(Controller.standard_axis_attributes)
#: A :obj:`str` representing the controller gender
gender = 'I/O register controller'
def __init__(self, inst, props, *args, **kwargs):
Controller.__init__(self, inst, props, *args, **kwargs)
[docs] def WriteOne(self, axis, value):
"""**IORegister Controller API**. Override if necessary."""
pass