Source code for sardana.pool.controller

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module contains the definition of the Controller base classes"""

import copy
from collections import OrderedDict
from typing import Any, Sequence, Union, List, Dict

from taurus import getLogLevel
from taurus.core.util.log import Logger
from sardana import DataAccess
from sardana.sardanavalue import SardanaValue
from sardana.pool.pooldefs import ControllerAPI, AcqSynch
import sardana


__all__ = ["DataAccess", "SardanaValue", "Type", "Access", "Description",
           "DefaultValue", "FGet", "FSet",
           "Memorized", "MemorizedNoInit", "NotMemorized", "MaxDimSize",
           "Controller", "Readable", "Startable", "Stopable", "Loadable",
           "Referable", "Synchronizer",
           "MotorController", "CounterTimerController", "ZeroDController",
           "OneDController", "TwoDController", "TriggerGateController",
           "PseudoMotorController", "PseudoCounterController",
           "IORegisterController"]

__docformat__ = 'restructuredtext'


#: Constant data type (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
Type = 'type'

#: Constant data access (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
Access = 'r/w type'

#: Constant description (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
Description = 'description'

#: Constant default value (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
DefaultValue = 'defaultvalue'

#: Constant for getter function (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
FGet = "fget"

#: Constant for setter function (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
FSet = "fset"

#: Constant memorize (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
#: Possible values for this key are :obj:`Memorized`, :obj:`MemorizedNoInit`
#: and :obj:`NotMemorized`
Memorize = "memorized"

#: Constant memorized (to be used as a *value* in the :obj:`Memorize` field
#: definition in :attr:`~Controller.axis_attributes` or
#: :attr:`~Controller.ctrl_attributes`)
Memorized = "true"

#: Constant memorize but not write at initialization (to be used as a *value*
#: in the :obj:`Memorize` field definition in
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
MemorizedNoInit = "true_without_hard_applied"

#: Constant not memorize (to be used as a *value*
#: in the :obj:`Memorize` field definition in
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
NotMemorized = "false"

#: Constant MaxDimSize (to be used as a *key* in the definition of
#: :attr:`~Controller.axis_attributes` or :attr:`~Controller.ctrl_attributes`)
MaxDimSize = "maxdimsize"


[docs] class Controller(object): """ Base controller class. Do **NOT** inherit from this class directly :param inst: controller instance name :param props: a dictionary containing pairs of property name, property value """ #: A sequence of :obj:`str` representing the controller features ctrl_features = [] #: A :class:`dict` containing controller properties where: #: #: - key : (:obj:`str`) controller property name #: - value : :class:`dict` with with three :obj:`str` keys ("type", #: "description" and "defaultvalue" case insensitive): #: #: - for :obj:`Type`, value is one of the values described in #: :ref:`sardana-controller-data-type` #: #: - for :obj:`Description`, value is a :obj:`str` description of the #: property. #: if is not given it defaults to empty string. #: #: - for :obj:`DefaultValue`, value is a python object or None if no #: default value exists for the property. #: #: Example:: #: #: from sardana.pool.controller import MotorController, \ #: Type, Description, DefaultValue #: #: class MyCtrl(MotorController): #: #: ctrl_properties = \ #: { #: 'host' : { Type : str, #: Description : "host name" }, #: 'port' : { Type : int, #: Description : "port number", #: DefaultValue: 5000 } #: } #: ctrl_properties = {} #: A :class:`dict` containning controller extra attributes where: #: #: - key : (:obj:`str`) controller attribute name #: - value : :class:`dict` with :obj:`str` possible keys: "type", #: "r/w type", "description", "fget", "fset" and "maxdimsize" #: (case insensitive): #: #: - for :obj:`Type`, value is one of the values described in #: :ref:`sardana-controller-data-type` #: #: - for :obj:`Access`, value is one of #: :obj:`~sardana.sardanadefs.DataAccess` ("read" or "read_write" #: (case insensitive) strings are also accepted) [default: ReadWrite] #: #: - for :obj:`Description`, value is a :obj:`str` description of the #: attribute [default: "" (empty string)] #: #: - for :obj:`FGet`, value is a :obj:`str` with the method name for #: the attribute getter [default: "get"<controller attribute name>] #: #: - for :obj:`FSet`, value is a :obj:`str` with the method name for #: the attribute setter. [default, if :obj:`Access` = "read_write": #: "set"<controller attribute name>] #: #: - for :obj:`DefaultValue`, value is a python object or None if no #: default value exists for the attribute. If given, the attribute is #: set when the controller is first created. #: #: - for :obj:`Memorize`, value is a :obj:`str` with possible values: #: :obj:`Memorized`, :obj:`MemorizedNoInit` and #: :obj:`NotMemorized` [default: :obj:`Memorized`] #: #: .. versionadded:: 1.1 #: #: - for :obj:`MaxDimSize`, value is a :obj:`tuple` with possible values: #: - for scalar **must** be an empty tuple ( () or [] ) #: [default: ()] #: - for 1D arrays a sequence with one value (example: (1024,)) #: [default: (2048,)] #: - for 1D arrays a sequence with two values (example: (1024, 1024)) #: [default: (2048, 2048)] #: #: .. versionadded:: 1.1 #: #: .. versionadded:: 1.0 #: #: Example:: #: #: from sardana.pool.controller import PseudoMotorController, \ #: Type, Description, DefaultValue, DataAccess #: #: class HKLCtrl(PseudoMotorController): #: #: ctrl_attributes = \ #: { #: 'ReflectionMatrix' : { Type : ( (float,), ), #: Description : "The reflection matrix", #: Access : DataAccess.ReadOnly, #: FGet : 'getReflectionMatrix', }, #: } #: #: def getReflectionMatrix(self): #: return ( (1.0, 0.0), (0.0, 1.0) ) ctrl_attributes = {} #: A :class:`dict` containning controller extra attributes for each axis #: where: #: #: - key : (:obj:`str`) axis attribute name #: - value : :class:`dict` with three :obj:`str` keys #: ("type", "r/w type", "description" case insensitive): #: #: - for :obj:`Type`, value is one of the values described in #: :ref:`sardana-controller-data-type` #: #: - for :obj:`Access`, value is one of #: :obj:`~sardana.sardanadefs.DataAccess` ("read" or "read_write" #: (case insensitive) strings are also accepted) #: #: - for :obj:`Description`, value is a :obj:`str` description of the #: attribute #: #: - for :obj:`DefaultValue`, value is a python object or None if no #: default value exists for the attribute. If given, the attribute is #: set when the axis is first created. #: #: - for :obj:`Memorize`, value is a :obj:`str` with possible values: #: :obj:`Memorized`, :obj:`MemorizedNoInit` and #: :obj:`NotMemorized` [default: :obj:`Memorized`] #: #: .. versionadded:: 1.1 #: #: - for :obj:`MaxDimSize`, value is a :obj:`tuple` with possible values: #: - for scalar **must** be an empty tuple ( () or [] ) #: [default: ()] #: - for 1D arrays a sequence with one value (example: (1024,)) #: [default: (2048,)] #: - for 1D arrays a sequence with two values (example: (1024, 1024)) #: [default: (2048, 2048)] #: #: .. versionadded:: 1.1 #: #: .. versionadded:: 1.0 #: #: Example:: #: #: from sardana.pool.controller import MotorController, \ #: Type, Description, DefaultValue, DataAccess #: #: class MyMCtrl(MotorController): #: #: axis_attributes = \ #: { #: 'EncoderSource' : { Type : str, #: Description : 'motor encoder source', }, #: } #: #: def getAxisExtraPar(self, axis, name): #: name = name.lower() #: if name == 'encodersource': #: return self._encodersource[axis] #: #: def setAxisExtraPar(self, axis, name, value): #: name = name.lower() #: if name == 'encodersource': #: self._encodersource[axis] = value axis_attributes = {} #: A :class:`dict` containing the standard attributes present on each axis #: device standard_axis_attributes = {} #: A :obj:`str` representing the controller gender gender = None #: A :obj:`str` representing the controller model name model = 'Generic' #: A :obj:`str` representing the controller organization organization = 'Sardana team' #: A :obj:`str` containning the path to the image file image = None #: A :obj:`str` containning the path to the image logo file logo = None def __init__(self, inst: str, props: Dict, *args, **kwargs): self._inst_name = inst self._log = Logger("Controller.%s" % inst) self._log.log_obj.setLevel(getLogLevel()) self._args = args self._kwargs = kwargs self._api_version = self._findAPIVersion() for prop_name, prop_value in list(props.items()): setattr(self, prop_name, prop_value) def _findAPIVersion(self): """*Internal*. By default return the Pool Controller API version of the pool where the controller is running""" return ControllerAPI def _getPoolController(self): """*Internal*.""" if hasattr(self, '_kwargs'): kw = self._kwargs if kw is not None: ctrl_wr = kw.get('pool_controller') if ctrl_wr is not None: return ctrl_wr()
[docs] def AddDevice(self, axis: int): """**Controller API**. Override if necessary. Default implementation does nothing. :param axis: axis number""" pass
[docs] def DeleteDevice(self, axis: int): """**Controller API**. Override if necessary. Default implementation does nothing. :param axis: axis number""" pass
[docs] def GetName(self) -> str: """**Controller API**. The controller instance name. :return: the controller instance name .. versionadded:: 1.0""" return self._inst_name
[docs] def GetAxisName(self, axis: int) -> str: """**Controller API**. The axis name. :return: the axis name .. versionadded:: 1.0""" ctrl = self._getPoolController() if ctrl is not None: return ctrl.get_element(axis=axis).name return str(axis)
[docs] def PreStateAll(self): """**Controller API**. Override if necessary. Called to prepare a read of the state of all axis. Default implementation does nothing.""" pass
[docs] def PreStateOne(self, axis: int): """**Controller API**. Override if necessary. Called to prepare a read of the state of a single axis. Default implementation does nothing. :param axis: axis number""" pass
[docs] def StateAll(self): """**Controller API**. Override if necessary. Called to read the state of all selected axis. Default implementation does nothing.""" pass
[docs] def StateOne(self, axis: int): """**Controller API**. Override is MANDATORY. Called to read the state of one axis. Default implementation raises :exc:`NotImplementedError`.""" raise NotImplementedError("StateOne must be defined in the controller")
[docs] def SetCtrlPar(self, parameter, value): """**Controller API**. Override if necessary. Called to set a parameter with a value. Default implementation sets this object member named '_'+parameter with the given value. .. versionadded:: 1.0""" setattr(self, '_' + parameter.lower(), value)
[docs] def GetCtrlPar(self, parameter): """**Controller API**. Override if necessary. Called to set a parameter with a value. Default implementation returns the value contained in this object's member named '_'+parameter. .. versionadded:: 1.0""" return getattr(self, '_' + parameter.lower())
[docs] def SetAxisPar(self, axis: int, parameter, value): """**Controller API**. Override is MANDATORY. Called to set a parameter with a value on the given axis. Default implementation raises :exc:`NotImplementedError`. .. versionadded:: 1.0""" raise NotImplementedError("SetAxisPar must be defined in the " "controller")
[docs] def GetAxisPar(self, axis: int, parameter): """**Controller API**. Override is MANDATORY. Called to get a parameter value on the given axis. Default implementation raises :exc:`NotImplementedError`. .. versionadded:: 1.0""" raise NotImplementedError("GetAxisPar must be defined in the " "controller")
[docs] def SetAxisExtraPar(self, axis: int, parameter, value): """**Controller API**. Override if necessary. Called to set a parameter with a value on the given axis. Default implementation raises :exc:`NotImplementedError`. .. versionadded:: 1.0""" raise NotImplementedError("SetAxisExtraPar must be defined in the " "controller")
[docs] def GetAxisExtraPar(self, axis: int, parameter): """**Controller API**. Override if necessary. Called to get a parameter value on the given axis. Default implementation raises :exc:`NotImplementedError`. .. versionadded:: 1.0""" raise NotImplementedError("GetAxisExtraPar must be defined in the " "controller")
[docs] def GetAxisAttributes(self, axis: int): """**Controller API**. Override if necessary. Returns a dictionary of all attributes per axis. Default implementation returns a new :class:`dict` with the standard attributes plus the :attr:`~Controller.axis_attributes` :param axis: axis number :return: a dict containing attribute information as defined in :attr:`~Controller.axis_attributes` .. versionadded:: 1.0""" ret = copy.deepcopy(self.standard_axis_attributes) if isinstance(self, Referable): ref_attrs = copy.deepcopy(self.referable_axis_attributes) ret.update(ref_attrs) axis_attrs = copy.deepcopy(self.axis_attributes) ret.update(axis_attrs) return ret
[docs] def SendToCtrl(self, stream: str) -> str: """**Controller API**. Override if necessary. Sends a string to the controller. Default implementation raises :exc:`NotImplementedError`. :param stream: stream to be sent :return: any relevant information e.g. response of the controller """ raise NotImplementedError("SendToCtrl not implemented")
[docs] class Startable(object): """A Startable interface. A controller for which it's axis are 'startable' (like a motor, for example) should implement this interface .. note: Do not inherit directly from :class:`Startable`."""
[docs] def PreStartAll(self): """**Controller API**. Override if necessary. Called to prepare a start of all axis (whatever pre-start means). Default implementation does nothing.""" pass
[docs] def PreStartOne(self, axis: int, value: float) -> bool: """**Controller API**. Override if necessary. Called to prepare a start of the given axis (whatever pre-start means). Default implementation returns True. :param axis: axis number :param value: new value :return: True means a successfull pre-start or False for a failure """ return True
[docs] def StartOne(self, axis: int, value: float): """**Controller API**. Override if necessary. Called to do a start of the given axis (whatever start means). Default implementation raises :exc:`NotImplementedError` :param axis: axis number :param value: new value""" raise NotImplementedError("StartOne must be defined in the controller")
[docs] def StartAll(self): """**Controller API**. Override is MANDATORY! Default implementation does nothing.""" pass
[docs] class Stopable(object): """A Stopable interface. A controller for which it's axis are 'stoppable' (like a motor, for example) should implement this interface .. note: Do not inherit directly from :class:`Stopable`."""
[docs] def PreAbortAll(self): """**Controller API**. Override if necessary. Called to prepare a abort of all axis (whatever pre-abort means). Default implementation does nothing.""" pass
[docs] def PreAbortOne(self, axis: int) -> bool: """**Controller API**. Override if necessary. Called to prepare a abort of the given axis (whatever pre-abort means). Default implementation returns True. :param axis: axis number :return: True means a successfull pre-abort or False for a failure """ return True
[docs] def AbortOne(self, axis: int): """**Controller API**. Override is MANDATORY! Default implementation raises :exc:`NotImplementedError`. Aborts one of the axis :param axis: axis number""" raise NotImplementedError("AbortOne must be defined in the controller")
[docs] def AbortAll(self): """**Controller API**. Override if necessary. Aborts all active axis of this controller. Default implementation does nothing.""" pass
[docs] def PreStopAll(self): """**Controller API**. Override if necessary. Called to prepare a stop of all axis (whatever pre-stop means). Default implementation does nothing.""" pass
[docs] def PreStopOne(self, axis: int) -> bool: """**Controller API**. Override if necessary. Called to prepare a stop of the given axis (whatever pre-stop means). Default implementation returns True. :param axis: axis number :return: True means a successfull pre-stop or False for a failure """ return True
[docs] def StopOne(self, axis: int): """**Controller API**. Override if necessary. Stops one of the axis. *This method is reserved for future implementation.* Default implementation calls :meth:`~Controller.AbortOne`. :param axis: axis number .. versionadded:: 1.0""" self.AbortOne(axis)
[docs] def StopAll(self): """**Controller API**. Override if necessary. Stops all active axis of this controller. Default implementation calls :meth:`~Controller.AbortAll`.""" self.AbortAll()
[docs] class Readable(object): """A Readable interface. A controller for which it's axis are 'readable' (like a motor, counter or 1D for example) should implement this interface .. note: Do not inherit directly from Readable."""
[docs] def PreReadAll(self): """**Controller API**. Override if necessary. Called to prepare a read of the value of all axis. Default implementation does nothing.""" pass
[docs] def PreReadOne(self, axis: int): """**Controller API**. Override if necessary. Called to prepare a read of the value of a single axis. Default implementation does nothing. :param axis: axis number""" pass
[docs] def ReadAll(self): """**Controller API**. Override if necessary. Called to read the value of all selected axis Default implementation does nothing.""" pass
[docs] def ReadOne(self, axis: int) -> Any: """**Controller API**. Override is MANDATORY! Default implementation raises :exc:`NotImplementedError` :param axis: axis number :return: the axis value """ raise NotImplementedError("ReadOne must be defined in the controller")
[docs] class Loadable(object): """A Loadable interface. A controller for which it's axis are 'loadable' (like a counter, 1D or 2D for example) should implement this interface .. note: Do not inherit directly from Loadable.""" #: axis of the default timer default_timer = None
[docs] def PrepareOne(self, axis: int, value: float, repetitions: int, latency: float, nb_starts: int): """**Controller API**. Override if necessary. Called to prepare the master channel axis with the measurement parameters. Default implementation does nothing. :param axis: axis number :param repetitions: number of repetitions :param value: integration time / monitor count :param latency: latency time :param nb_starts: number of starts """ pass
[docs] def PreLoadAll(self): """**Controller API**. Override if necessary. Called to prepare loading the integration time / monitor value. Default implementation does nothing.""" pass
[docs] def PreLoadOne(self, axis: int, value: float, repetitions: int, latency: float) -> bool: """**Controller API**. Override if necessary. Called to prepare loading the master channel axis with the acquisition parameters. Default implementation returns True. :param axis: axis number :param value: integration time /monitor value :param repetitions: number of repetitions :param latency: latency time :return: True means a successful PreLoadOne or False for a failure """ return True
[docs] def LoadAll(self): """**Controller API**. Override if necessary. Called to load the integration time / monitor value. Default implementation does nothing.""" pass
[docs] def LoadOne(self, axis: int, value: float, repetitions: int, latency: float): """**Controller API**. Override is MANDATORY! Called to load the integration time / monitor value. Default implementation raises :exc:`NotImplementedError`. :param axis: axis number :param float value: integration time /monitor value :param repetitions: number of repetitions :param latency: latency time :param value: integration time /monitor value""" raise NotImplementedError("LoadOne must be defined in the controller")
[docs] class Referable(object): """A Referable interface. A controller for which it's axis can report data references (like a 1D or 2D for example) should implement this interface .. note: Inherit from Referable together with either OneDController or TwoDController .. note:: The Referable class has been included in Sardana on a provisional basis. Backwards incompatible changes (up to and including removal of the class) may occur if deemed necessary by the core developers. """ #: A :class:`dict` containing the referable attributes present on each axis #: device referable_axis_attributes = OrderedDict([ ("ValueRef", {Type: str, Access: DataAccess.ReadOnly, Description: "Value reference", }), # TODO: in case of Tango ValueBuffer type is overridden # by DevEncoded ("ValueRefBuffer", {Type: str, Access: DataAccess.ReadOnly, Description: "Value reference buffer", }), ("ValueRefEnabled", {Type: bool, Access: DataAccess.ReadWrite, Description: "Value reference enabled"}), ("ValueRefPattern", {Type: str, Access: DataAccess.ReadWrite, Description: "Value reference template"}), ])
[docs] def RefOne(self, axis: int) -> str: """**Controller API**. Override is MANDATORY! Default implementation raises :exc:`NotImplementedError` :param axis: axis number :return: the axis value """ raise NotImplementedError("RefOne must be defined in the controller")
[docs] class Synchronizer(object): """A Synchronizer interface. A controller for which its axis are 'Able to Synchronize' should implement this interface .. note: Do not inherit directly from Synchronizer."""
[docs] def PreSynchAll(self): """**Controller API**. Override if necessary. Called to prepare loading the synchronization description. Default implementation does nothing.""" pass
[docs] def PreSynchOne(self, axis: int, description: List[Dict]) -> bool: """**Controller API**. Override if necessary. Called to prepare loading the axis with the synchronization description. Default implementation returns True. :param axis: axis number :param description: synchronization description :return: True means a successfull PreSynchOne or False for a failure """ return True
[docs] def SynchAll(self): """**Controller API**. Override if necessary. Called to load the synchronization description. Default implementation does nothing.""" pass
[docs] def SynchOne(self, axis: int, description: List[dict]): """**Controller API**. Override is MANDATORY! Called to load the axis with the synchronization description. Default implementation raises :exc:`NotImplementedError`. :param axis: axis number :param description: synchronization description""" raise NotImplementedError("SynchOne must be defined in the controller")
[docs] class MotorController(Controller, Startable, Stopable, Readable): """Base class for a motor controller. Inherit from this class to implement your own motor controller for the device pool. A motor controller should support these axis parameters: - acceleration - deceleration - velocity - base_rate - step_per_unit These parameters are configured through the :meth:`~Controller.GetAxisPar`/:meth:`~Controller.SetAxisPar` API. """ #: A constant representing no active switch. NoLimitSwitch = 0 #: A constant representing an active *home* switch. #: You can *OR* two or more switches together. For example, to say both #: upper and lower limit switches are active:: #: #: limit_switches = self.HomeLimitSwitch | self.LowerLimitSwitch HomeLimitSwitch = 1 #: A constant representing an active *upper limit* switch. #: You can *OR* two or more switches together. For example, to say both #: upper and lower limit switches are active:: #: #: limit_switches = self.UpperLimitSwitch | self.LowerLimitSwitch UpperLimitSwitch = 2 #: A constant representing an active *lower limit* switch. #: You can *OR* two or more switches together. For example, to say both #: upper and lower limit switches are active:: #: #: limit_switches = self.UpperLimitSwitch | self.LowerLimitSwitch LowerLimitSwitch = 4 #: A :class:`dict` containing the standard attributes present on each axis #: device standard_axis_attributes = OrderedDict([ ('Position', {'type': float, 'description': 'Position', }), ('DialPosition', {'type': float, 'description': 'Dial Position', }), ('Offset', {'type': float, 'description': 'Offset', }), ('Sign', {'type': float, 'description': 'Sign', }), ('Step_per_unit', {'type': float, 'description': 'Steps per unit', }), ('Base_rate', {'type': float, 'description': 'Base rate', }), ('Velocity', {'type': float, 'description': 'Velocity', }), ('Acceleration', {'type': float, 'description': 'Acceleration time (s)', }), ('Deceleration', {'type': float, 'description': 'Deceleration time (s)', }), ('Backlash', {'type': float, 'description': 'Backlash', }), ('Limit_switches', {'type': (bool,), 'description': "This attribute is the motor " "limit switches state. It's an array with 3 \n" "elements which are:\n" "0 - The home switch\n" "1 - The upper limit switch\n" "2 - The lower limit switch\n" "False means not active. True means active", }), ]) standard_axis_attributes.update(Controller.standard_axis_attributes) #: A :obj:`str` representing the controller gender gender = 'Motor controller'
[docs] def GetAxisAttributes(self, axis: int): """**Motor Controller API**. Override if necessary. Returns a sequence of all attributes per axis. Default implementation returns a :class:`dict` containning: - Position - DialPosition - Offset - Sign - Step_per_unit - Acceleration - Deceleration - Base_rate - Velocity - Backlash - Limit_switches plus all attributes contained in :attr:`~Controller.axis_attributes` .. note:: Normally you don't need to Override this method. You just implement the class member :attr:`~Controller.axis_attributes`. Typically, you will need to Override this method in two cases: - certain axes contain a different set of extra attributes which cannot be simply defined in :attr:`~Controller.axis_attributes` - some axes (or all) don't implement a set of standard moveable parameters (ex.: if a motor controller is created to control a power supply, it may have a position (current) and a velocity (ramp speed) but it may not have acceleration) :param axis: axis number :return: a dict containing attribute information as defined in :attr:`~Controller.axis_attributes` .. versionadded:: 1.0""" return Controller.GetAxisAttributes(self, axis)
[docs] def DefinePosition(self, axis: int, position): """**Motor Controller API**. Override is recommended! This method is called to load a new motor position. Default implementation does nothing. """ # raise NotImplementedError("DefinePosition must be defined in the " # "controller") pass
[docs] class CounterTimerController(Controller, Readable, Startable, Stopable, Loadable): """Base class for a counter/timer controller. Inherit from this class to implement your own counter/timer controller for the device pool. A counter timer controller should support these controller parameters: - timer - monitor """ #: A :class:`dict` containing the standard attributes present on each axis #: device standard_axis_attributes = { 'IntegrationTime': {'type': float, 'description': 'Integration time used in ' 'independent acquisition'}, 'Timer': {'type': str, 'description': 'Timer used in independent acquisition'}, 'Value': {'type': float, 'description': 'Value', }, # TODO: in case of Tango ValueBuffer type is overridden by DevEncoded 'ValueBuffer': {'type': str, 'description': 'Value buffer', }, 'Shape': {'type': (int,), 'description': 'Shape of the value, it is an empty array'} } standard_axis_attributes.update(Controller.standard_axis_attributes) #: A :obj:`str` representing the controller gender gender = 'Counter/Timer controller' def __init__(self, inst, props, *args, **kwargs): Controller.__init__(self, inst, props, *args, **kwargs) self._timer = None self._monitor = None self._master = None self._latency_time = 0 self._synchronization = AcqSynch.SoftwareTrigger
[docs] def StartOne(self, axis: int, value: float): """**Controller API**. Override if necessary. Called to do a start of the given axis (whatever start means). :param axis: axis number :param value: new value""" pass
[docs] class TriggerGateController(Controller, Synchronizer, Stopable, Startable): """Base class for a trigger/gate controller. Inherit from this class to implement your own trigger/gate controller for the device pool. """ #: A :obj:`str` representing the controller gender gender = 'Trigger/Gate controller' standard_axis_attributes = { 'MoveableOnInput': {'type': str, 'description': 'Moveable', }, } def __init__(self, inst, props, *args, **kwargs): Controller.__init__(self, inst, props, *args, **kwargs) # TODO: Implement a Preparable interface and move this method # and the Loadable.PrepareOne() there.
[docs] def PrepareOne(self, axis: int, nb_starts: int): """**Controller API**. Override if necessary. Called to prepare the trigger/gate axis with the measurement parameters. Default implementation does nothing. :param axis: axis :param nb_starts: number of starts """ pass
[docs] class ZeroDController(Controller, Readable, Stopable): """Base class for a 0D controller. Inherit from this class to implement your own 0D controller for the device pool.""" #: A :class:`dict` containing the standard attributes present on each axis #: device standard_axis_attributes = { 'IntegrationTime': {'type': float, 'description': 'Integration time used in ' 'independent acquisition'}, 'Value': {'type': float, 'description': 'Value', }, # TODO: in case of Tango ValueBuffer type is overridden by DevEncoded 'ValueBuffer': {'type': str, 'description': 'Value buffer', }, 'Shape': {'type': (int,), 'description': 'Shape of the value, it is an empty array'} } standard_axis_attributes.update(Controller.standard_axis_attributes) #: A :obj:`str` representing the controller gender gender = '0D controller'
[docs] def AbortOne(self, axis: int): """This method is not executed by the system. Default implementation does nothing. :param axis: axis number""" pass
[docs] class OneDController(Controller, Readable, Startable, Stopable, Loadable): """Base class for a 1D controller. Inherit from this class to implement your own 1D controller for the device pool. .. versionadded:: 1.2""" standard_axis_attributes = { 'IntegrationTime': {'type': float, 'description': 'Integration time used in ' 'independent acquisition'}, 'Timer': {'type': str, 'description': 'Timer used in independent acquisition'}, 'Value': {'type': (float,), 'description': 'Value', 'maxdimsize': (16 * 1024,)}, # TODO: in case of Tango ValueBuffer type is overridden by DevEncoded 'ValueBuffer': {'type': str, 'description': 'Value buffer', }, 'Shape': {'type': (int,), 'description': 'Shape of the value, it is an array with ' '1 element - X dimension'} } standard_axis_attributes.update(Controller.standard_axis_attributes) #: A :obj:`str` representing the controller gender gender = '1D controller' def __init__(self, inst, props, *args, **kwargs): Controller.__init__(self, inst, props, *args, **kwargs) self._latency_time = 0 self._synchronization = AcqSynch.SoftwareTrigger
[docs] def GetAxisPar(self, axis: int, parameter): """**Controller API**. Override is MANDATORY. Called to get a parameter value on the given axis. ``GetAxisPar`` with 'data_source' parameter is deprecated since 2.8.0. Inherit from :class:`~Referable` class in order to report value references. Default implementation calls deprecated :meth:`~Controller.GetPar` which, by default, raises :exc:`NotImplementedError`. .. versionadded:: 1.2""" if parameter.lower() == 'data_source': return None return self.GetPar(axis, parameter)
[docs] class TwoDController(Controller, Readable, Startable, Stopable, Loadable): """Base class for a 2D controller. Inherit from this class to implement your own 2D controller for the device pool.""" standard_axis_attributes = { 'IntegrationTime': {'type': float, 'description': 'Integration time used in ' 'independent acquisition'}, 'Timer': {'type': str, 'description': 'Timer used in independent acquisition'}, 'Value': {'type': ((float,),), 'description': 'Value', 'maxdimsize': (4 * 1024, 4 * 1024)}, # TODO: in case of Tango ValueBuffer type is overridden by DevEncoded 'ValueBuffer': {'type': str, 'description': 'Value buffer', }, 'Shape': {'type': (int,), 'description': 'Shape of the value, it is an array with ' '2 elements: X and Y dimensions'} } standard_axis_attributes.update(Controller.standard_axis_attributes) #: A :obj:`str` representing the controller gender gender = '2D controller' def __init__(self, inst, props, *args, **kwargs): Controller.__init__(self, inst, props, *args, **kwargs) self._latency_time = 0
[docs] def GetAxisPar(self, axis: int, parameter): """**Controller API**. Override is MANDATORY. Called to get a parameter value on the given axis. ``GetAxisPar`` with 'data_source' parameter is deprecated since 2.8.0. Inherit from :class:`~Referable` class in order to report value references. Default implementation calls deprecated :meth:`~Controller.GetPar` which, by default, raises :exc:`NotImplementedError`. .. versionadded:: 1.2""" if parameter.lower() == 'data_source': return None return self.GetPar(axis, parameter)
[docs] class PseudoController(Controller): """Base class for all pseudo controllers. .. note: Do not inherit directly from :class:`PseudoController`.""" def _getElem(self, index_or_role, roles, local_cache, ids): """*Iternal*.""" elem = local_cache.get(index_or_role) if elem is None: pool = self._getPoolController().pool if isinstance(index_or_role, int): index = index_or_role role = roles[index] else: role = index_or_role index = roles.index(role) motor_id = ids[index] elem = pool.get_element_by_id(motor_id) local_cache[index] = local_cache[role] = elem return elem
[docs] class PseudoMotorController(PseudoController): """Base class for a pseudo motor controller. Inherit from this class to implement your own pseudo motor controller for the device pool. Every Pseudo Motor implementation must be a subclass of this class. Current procedure for a correct implementation of a Pseudo Motor class: - mandatory: - define the class level attributes :attr:`~PseudoMotorController.pseudo_motor_roles`, :attr:`~PseudoMotorController.motor_roles` - write :meth:`~PseudoMotorController.CalcPseudo` method - write :meth:`~PseudoMotorController.CalcPhysical` method. - optional: - write :meth:`~PseudoMotorController.CalcAllPseudo` and :meth:`~PseudoMotorController.CalcAllPhysical` if great performance gain can be achived""" #: a sequence of strings describing the role of each pseudo motor axis in #: this controller pseudo_motor_roles = () #: a sequence of strings describing the role of each motor in this #: controller motor_roles = () #: A :class:`dict` containing the standard attributes present on each axis #: device standard_axis_attributes = { 'Position': {'type': float, 'description': 'Position', }, } #: A :obj:`str` representing the controller gender gender = 'Pseudo motor controller' def __init__(self, inst, props, *args, **kwargs): self.__motor_role_elements = {} self.__pseudo_motor_role_elements = {} PseudoController.__init__(self, inst, props, *args, **kwargs)
[docs] def CalcAllPseudo(self, physical_pos: Sequence[float], curr_pseudo_pos: Sequence[float]) -> Sequence[float]: """**Pseudo Motor Controller API**. Override if necessary. Calculates the positions of all pseudo motors that belong to the pseudo motor system from the positions of the physical motors. Default implementation does a loop calling :meth:`PseudoMotorController.calc_pseudo` for each pseudo motor role. :param physical_pos: a sequence containing physical motor positions :param curr_pseudo_pos: a sequence containing the current pseudo motor :param physical_pos: a sequence containing physical motor positions :param curr_pseudo_pos: a sequence containing the current pseudo motor positions :return: a sequece of pseudo motor positions (one for each pseudo motor role) .. versionadded:: 1.0""" ret = [] for i in range(len(self.pseudo_motor_roles)): ret.append(self.CalcPseudo(i + 1, physical_pos, curr_pseudo_pos)) return ret
[docs] def CalcAllPhysical(self, pseudo_pos: Sequence[float], curr_physical_pos: Sequence[float]) -> Sequence[float]: """**Pseudo Motor Controller API**. Override if necessary. Calculates the positions of all motors that belong to the pseudo motor system from the positions of the pseudo motors. Default implementation does a loop calling :meth:`PseudoMotorController.calc_physical` for each motor role. :param pseudo_pos: a sequence containing pseudo motor positions :param curr_physical_pos: a sequence containing the current physical motor positions :return: a sequece of motor positions (one for each motor role) .. versionadded:: 1.0""" ret = [] for i in range(len(self.motor_roles)): pos = self.CalcPhysical(i + 1, pseudo_pos, curr_physical_pos) ret.append(pos) return ret
[docs] def CalcPseudo(self, axis: int, physical_pos: Sequence[float], curr_pseudo_pos: Sequence[float]) -> float: """**Pseudo Motor Controller API**. Override is **MANDATORY**. Calculate pseudo motor position given the physical motor positions :param axis: the pseudo motor role axis :param physical_pos: a sequence containing motor positions :param curr_pseudo_pos: a sequence containing the current pseudo motor positions :return: a pseudo motor position corresponding to the given axis pseudo motor role .. versionadded:: 1.0""" raise NotImplementedError("CalcPseudo must be defined in the " "controller")
[docs] def CalcPhysical(self, axis: int, pseudo_pos: Sequence[float], curr_physical_pos: Sequence[float]) -> float: """**Pseudo Motor Controller API**. Override is **MANDATORY**. Calculate physical motor position given the pseudo motor positions. :param axis: the motor role axis :param pseudo_pos: a sequence containing pseudo motor positions :param curr_physical_pos: a sequence containing the current physical motor positions :return: a motor position corresponding to the given axis motor role .. versionadded:: 1.0""" raise NotImplementedError("CalcPhysical must be defined in the " "controller")
[docs] def GetMotor(self, index_or_role: Union[int, str]) -> "sardana.pool.poolmotor.PoolMotor": """Returns the motor for a given role/index. .. warning:: * Use with care: Executing motor methods can be dangerous! * Since the controller is built before any element (including motors), this method will **FAIL** when called from the controller constructor :param index_or_role: index number or role name :return: Motor object for the given role/index """ return self._getElem(index_or_role, self.motor_roles, self.__motor_role_elements, self._kwargs['motor_ids'])
[docs] def GetPseudoMotor(self, index_or_role: Union[int, str]) -> "sardana.pool.poolpseudomotor.PoolPseudoMotor": """Returns the pseudo motor for a given role/index. .. warning:: * Use with care: Executing pseudo motor methods can be dangerous! * Since the controller is built before any element (including pseudo motors), this method will **FAIL** when called from the controller constructor :param index_or_role: index number or role name :return: PseudoMotor object for the given role/index """ dict_ids = self._getPoolController().get_element_ids() dict_axis = self._getPoolController().get_element_axis() pseudo_motor_ids = [] for akey, aname in list(dict_axis.items()): pseudo_motor_ids.append( list(dict_ids.keys())[list(dict_ids.values()).index(aname)]) return self._getElem(index_or_role, self.pseudo_motor_roles, self.__pseudo_motor_role_elements, pseudo_motor_ids)
# self._kwargs['pseudo_motor_roles'])
[docs] class PseudoCounterController(Controller): """Base class for a pseudo counter controller. Inherit from this class to implement your own pseudo counter controller for the device pool. Every Pseudo Counter implementation must be a subclass of this class. Current procedure for a correct implementation of a Pseudo Counter class: - mandatory: - define the class level attributes :attr:`~PseudoCounterController.counter_roles`, - write :meth:`~PseudoCounterController.Calc` method""" #: a sequence of strings describing the role of each pseudo counter axis in #: this controller pseudo_counter_roles = () #: a sequence of strings describing the role of each counter in this #: controller counter_roles = () #: A :class:`dict` containing the standard attributes present on each axis #: device standard_axis_attributes = { 'IntegrationTime': {'type': float, 'description': 'Integration time used in ' 'independent acquisition'}, 'Value': {'type': float, 'description': 'Value', }, # TODO: in case of Tango ValueBuffer type is overridden by DevEncoded 'ValueBuffer': {'type': str, 'description': 'Data', }, 'Shape': {'type': (int,), 'description': 'Shape of the value, it is an empty array'} } #: A :obj:`str` representing the controller gender gender = 'Pseudo counter controller'
[docs] def Calc(self, axis: int, values: Sequence[float]) -> float: """**Pseudo Counter Controller API**. Override is **MANDATORY**. Calculate pseudo counter position given the counter values. :param axis: the pseudo counter role axis :param values: a sequence containing current values of underlying elements :return: a pseudo counter value corresponding to the given axis pseudo counter role .. versionadded:: 1.0""" raise NotImplementedError("Calc must be defined in the controller")
[docs] def CalcAll(self, values: Sequence[float]) -> Sequence[float]: """**Pseudo Counter Controller API**. Override if necessary. Calculates all pseudo counter values from the values of counters. Default implementation does a loop calling :meth:`PseudoCounterController.Calc` for each pseudo counter role. :param values: a sequence containing current values of underlying elements :return: a sequece of pseudo counter values (one for each pseudo counter role) .. versionadded:: 1.2""" f, n = self.Calc, len(self.pseudo_counter_roles) return [f(i + 1, values) for i in range(n)]
[docs] class IORegisterController(Controller, Readable): """Base class for a IORegister controller. Inherit from this class to implement your own IORegister controller for the device pool. """ #: A :class:`dict` containing the standard attributes present on each axis #: device standard_axis_attributes = { 'Value': {'type': float, 'description': 'Value', }, } standard_axis_attributes.update(Controller.standard_axis_attributes) #: A :obj:`str` representing the controller gender gender = 'I/O register controller' def __init__(self, inst, props, *args, **kwargs): Controller.__init__(self, inst, props, *args, **kwargs)
[docs] def WriteOne(self, axis: int, value): """**IORegister Controller API**. Override if necessary.""" pass