Source code for sardana.sardanadefs

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module contains the most generic sardana constants and enumerations"""


import collections

__all__ = ["EpsilonError", "SardanaServer", "ServerRunMode", "State",
           "DataType", "DataFormat", "DataAccess", "AttrQuality",
           "DTYPE_MAP", "R_DTYPE_MAP", "DACCESS_MAP",
           "from_dtype_str", "from_access_str", "to_dtype_dformat",
           "to_daccess", "InvalidId", "InvalidAxis", "ElementType",
           "Interface", "Interfaces", "InterfacesExpanded",
           "TYPE_ELEMENTS", "TYPE_GROUP_ELEMENTS", "TYPE_MOVEABLE_ELEMENTS",
           "TYPE_PHYSICAL_ELEMENTS", "TYPE_ACQUIRABLE_ELEMENTS",
           "TYPE_EXP_CHANNEL_ELEMENTS", "TYPE_TIMERABLE_ELEMENTS",
           "TYPE_PSEUDO_ELEMENTS", "INTERFACES", "INTERFACES_EXPANDED",
           "ScalarNumberFilter"]

__docformat__ = 'restructuredtext'

import math
from enum import IntEnum

from taurus.core.util.enumeration import Enumeration

#: maximum difference between two floats so that they are considered equal
EpsilonError = 1E-16

#: sardana element state enumeration
State = Enumeration("State", (
    "On",
    "Off",
    "Close",
    "Open",
    "Insert",
    "Extract",
    "Moving",
    "Standby",
    "Fault",
    "Init",
    "Running",
    "Alarm",
    "Disable",
    "Unknown",
    "Invalid"))


class _SardanaServer(object):
    """Class representing the current sardana server state"""

    def __init__(self):
        self.server_state = State.Invalid

    def __repr__(self):
        return "SardanaServer()"


#: the global object containing the SardanaServer information
SardanaServer = _SardanaServer()

#:
#: The sardana server run mode:
#:
#: - **SynchPure** : Pure synchronous: Start the server and run the server loop
#:   until it stops
#: - **SynchThread** : separate thread synchronous: start a thread running the
#:   server loop. Block until the server loop ends
#: - **SynchProcess** : separate process synchronous: start a sub-process
#:   running the server loop. Block until the server loop ends
#: - **AsynchThread** : separate thread asynchronous: start a thread running the
#:   server loop. Return immediately
#: - **ASynchProcess** : separate process asynchronous: start a sub-process
#:   running the server loop. Return immediately
ServerRunMode = Enumeration("ServerRunMode",
                            ("SynchPure", "SynchThread", "SynchProcess",
                             "AsynchThread", "AsynchProcess"))

#: sardana data types (used by device pool controllers)
DataType = Enumeration("DataType", (
    "Integer",
    "Double",
    "String",
    "Boolean",
    "Encoded",
    "Invalid"))

#: sardana data format enumeration (used by device pool controllers)
DataFormat = Enumeration("DataFormat", (
    "Scalar",
    "OneD",
    "TwoD",
    "Invalid"))

#: sardana data access (used by device pool controllers)
DataAccess = Enumeration("DataAccess", (
    "ReadOnly",
    "ReadWrite",
    "Invalid"))


class AttrQuality(IntEnum):
    """Attribute quality factor"""

    #: Attribute is valid
    Valid = 0
    #: Attribute is invalid
    Invalid = 1
    #: Attribute is in alarm
    Alarm = 2
    #: Attribute is changing e.g. element is in operation
    Changing = 3
    #: Attribute is in warning
    Warning = 4

#: dictionary dict<data type, :class:`sardana.DataType`>
DTYPE_MAP = {
    'int': DataType.Integer,
    'integer': DataType.Integer,
    int: DataType.Integer,
    'long': DataType.Integer,
    DataType.Integer: DataType.Integer,
    'float': DataType.Double,
    'double': DataType.Double,
    float: DataType.Double,
    DataType.Double: DataType.Double,
    'str': DataType.String,
    'string': DataType.String,
    str: DataType.String,
    DataType.String: DataType.String,
    'bool': DataType.Boolean,
    'boolean': DataType.Boolean,
    bool: DataType.Boolean,
    DataType.Boolean: DataType.Boolean,
}

#: dictionary dict<data type, :class:`sardana.DataType`>
R_DTYPE_MAP = {
    'int': int,
    'integer': int,
    int: int,
    'long': int,
    DataType.Integer: int,
    'float': float,
    'double': float,
    float: float,
    DataType.Double: float,
    'str': str,
    'string': str,
    str: str,
    DataType.String: str,
    'bool': bool,
    'boolean': bool,
    bool: bool,
    DataType.Boolean: bool,
}

# DTYPE_MAP.setdefault(DataType.Invalid)

#: dictionary dict<access type, :class:`sardana.DataAccess`>
DACCESS_MAP = {
    'read': DataAccess.ReadOnly,
    DataAccess.ReadOnly: DataAccess.ReadOnly,
    'readwrite': DataAccess.ReadWrite,
    'read_write': DataAccess.ReadWrite,
    DataAccess.ReadWrite: DataAccess.ReadWrite,
}
# DACCESS_MAP.setdefault(DataAccess.Invalid)


[docs]def from_dtype_str(dtype): """Transforms the given dtype parameter (string/:obj:`DataType` or None) into a tuple of two elements (str, :obj:`DataFormat`) where the first element is a string with a simplified data type. - If None is given, it returns ('float', :obj:`DataFormat.Scalar`) - If :obj:`DataType` is given, it returns (:obj:`DataType`, :obj:`DataFormat.Scalar`) :param dtype: the data type to be transformed :type dtype: :obj:`str` or None or :obj:`DataType` :return: a tuple <str, :obj:`DataFormat`> for the given dtype :rtype: tuple<str, :obj:`DataFormat`>""" dformat = DataFormat.Scalar if dtype is None: dtype = 'float' elif isinstance(dtype, str): dtype = dtype.lower() if dtype.startswith("pytango."): dtype = dtype[len("pytango."):] if dtype.startswith("dev"): dtype = dtype[len("dev"):] if dtype.startswith("var"): dtype = dtype[len("var"):] if dtype.endswith("array"): dtype = dtype[:dtype.index("array")] dformat = DataFormat.OneD return dtype, dformat
[docs]def from_access_str(access): """Transforms the given access parameter (string or :obj:`DataAccess`) into a simplified data access string. :param dtype: the access to be transformed :type dtype: :obj:`str` :return: a simple string for the given access :rtype: :obj:`str`""" if isinstance(access, str): access = access.lower() if access.startswith("pytango."): access = access[len("pytango."):] return access
[docs]def to_dtype_dformat(data): """Transforms the given data parameter (string/ or sequence of string or sequence of sequence of string/:obj:`DataType`) into a tuple of two elements (:obj:`DataType`, :obj:`DataFormat`). :param data: the data information to be transformed :type data: :obj:`str` or seq<str> or seq<seq<str>> :return: a tuple <:obj:`DataType`, :obj:`DataFormat`> for the given data :rtype: tuple<:obj:`DataType`, :obj:`DataFormat`> """ import operator dtype, dformat = data, DataFormat.Scalar if isinstance(data, str): dtype, dformat = from_dtype_str(data) elif isinstance(data, collections.abc.Sequence): dformat = DataFormat.OneD dtype = data[0] if isinstance(dtype, str): dtype, dformat2 = from_dtype_str(dtype) if dformat2 == DataFormat.OneD: dformat = DataFormat.TwoD elif isinstance(dtype, collections.abc.Sequence): dformat = DataFormat.TwoD dtype = dtype[0] if isinstance(dtype, str): dtype, _ = from_dtype_str(dtype) dtype = DTYPE_MAP.get(dtype, DataType.Invalid) return dtype, dformat
[docs]def to_daccess(daccess): """Transforms the given access parameter (string or None) into a :obj:`DataAccess`. If None is given returns :obj:`DataAccess.ReadWrite` :param dtype: the access to be transformed :type dtype: :obj:`str` :return: a :obj:`DataAccess` for the given access :rtype: :obj:`DataAccess`""" if daccess is None: daccess = DataAccess.ReadWrite elif isinstance(daccess, str): daccess = DACCESS_MAP.get( from_access_str(daccess), DataAccess.ReadWrite) return daccess
#: A constant representing an invalid ID InvalidId = 0 #: A constant representing an invalid axis InvalidAxis = 0 #: An enumeration describing the all possible element types in sardana ElementType = Enumeration("ElementType", ( "Pool", "Controller", "Motor", "CTExpChannel", "ZeroDExpChannel", "OneDExpChannel", "TwoDExpChannel", "ComChannel", "IORegister", "TriggerGate", "PseudoMotor", "PseudoCounter", "Constraint", "MotorGroup", "MeasurementGroup", "Instrument", "ControllerClass", "ControllerLibrary", "RecorderClass", "RecorderLibrary", "MacroServer", "Door", "MacroClass", "MacroLibrary", "MacroFunction", "External", "Meta", "ParameterType", "Unknown")) ET = ElementType #: a set containning all "controllable" element types. #: Constant values belong to :class:`~sardana.sardanadefs.ElementType` TYPE_ELEMENTS = {ET.Motor, ET.CTExpChannel, ET.ZeroDExpChannel, ET.OneDExpChannel, ET.TwoDExpChannel, ET.TriggerGate, ET.ComChannel, ET.IORegister, ET.PseudoMotor, ET.PseudoCounter, ET.Constraint} #: a set containing all group element types. #: Constant values belong to :class:`~sardana.sardanadefs.ElementType` TYPE_GROUP_ELEMENTS = {ET.MotorGroup, ET.MeasurementGroup} #: a set containing the type of elements which are moveable. #: Constant values belong to :class:`~sardana.sardanadefs.ElementType` TYPE_MOVEABLE_ELEMENTS = {ET.Motor, ET.PseudoMotor, ET.MotorGroup} #: a set containing the possible types of physical elements. #: Constant values belong to :class:`~sardana.sardanadefs.ElementType` TYPE_PHYSICAL_ELEMENTS = {ET.Motor, ET.CTExpChannel, ET.ZeroDExpChannel, ET.OneDExpChannel, ET.TwoDExpChannel, ET.TriggerGate, ET.ComChannel, ET.IORegister} #: a set containing the possible types of acquirable elements. #: Constant values belong to :class:`~sardana.sardanadefs.ElementType` TYPE_ACQUIRABLE_ELEMENTS = {ET.Motor, ET.CTExpChannel, ET.ZeroDExpChannel, ET.OneDExpChannel, ET.TwoDExpChannel, ET.ComChannel, ET.IORegister, ET.PseudoMotor, ET.PseudoCounter} #: a set containing the possible measure-able elements. #: Constant values belong to :class:`~sardana.sardanadefs.ElementType` TYPE_COUNTABLE_ELEMENTS = {ET.CTExpChannel, ET.OneDExpChannel, ET.TwoDExpChannel, ET.MeasurementGroup} #: a set containing the possible types of experimental channel elements. #: Constant values belong to :class:`~sardana.sardanadefs.ElementType` TYPE_EXP_CHANNEL_ELEMENTS = {ET.CTExpChannel, ET.ZeroDExpChannel, ET.OneDExpChannel, ET.TwoDExpChannel, ET.PseudoCounter} #: a set containing the possible timer-able elements. #: Constant values belong to :class:`~sardana.sardanadefs.ElementType` TYPE_TIMERABLE_ELEMENTS = {ET.CTExpChannel, ET.OneDExpChannel, ET.TwoDExpChannel} #: a set containing the possible types of pseudo elements. #: Constant values belong to :class:`~sardana.sardanadefs.ElementType` TYPE_PSEUDO_ELEMENTS = {ET.PseudoMotor, ET.PseudoCounter} # : An enumeration describing the all possible sardana interfaces # SardanaInterface = Enumeration("SardanaInterface", ( \ # ("Object", 0b0000000000000001), # ("Element", 0b0000000000000011), # ("Class", 0b0000000000000101), # ("Library", 0b0000000000001001), # ("PoolObject", 0b0000000000010001), # ("PoolElement", 0b0000000000010011), # ("Pool", 0b0000000000110011), # ("Controller", 0b0000000001000001), # ("Moveable", 0b0000000010000001), # ("Acquirable", 0b0000000100000001), # ("Instrument", 0b0000001000000001), # ("Motor", 0b0000010000000001), # ("PseudoMotor", 0b0000100000000001), # ("IORegister", 0b0001000000000001), # ("ExpChannel", 0b0010000000000001), # ("CTExpChannel", 0b0100000000000001), # ("ZeroDExpChannel", 0b1000000000000001), # ("OneDExpChannel", 0b0000000000000001), # ("TwoDExpChannel", 0b0000000000000001), # ("PseudoCounter", 0b0000000000000001), # ("ComChannel", 0b0000000000000001), # ("MotorGroup", 0b0000000000000001), # ("MeasurementGroup", 0b0000000000000001), # ("ControllerLibrary", 0b0000000000000001), # ("ControllerClass", 0b0000000000000001), # ("Constraint", 0b0000000000000001), # ("External", 0b0000000000000001), # ("MacroServerObject", 0b0000000000000001), # ("MacroServerElement",0b0000000000000001), # ("MacroServer", 0b0000000000000001), # ("MacroLibrary", 0b0000000000000001), # ("MacroClass", 0b0000000000000001), # ("Macro", 0b0000000000000001), ) ) #: a dictionary containing the direct interfaces supported by each type #: (:obj:`dict`\<:obj:`str`\, :obj:`tuple`\<:obj:`set`\<:obj:`str`\, :obj:`str`\>>>) INTERFACES = { "Meta": (set(), "A generic sardana meta object"), "Object": (set(), "A generic sardana object"), "Element": ({"Object"}, "A generic sardana element"), "Class": ({"Object"}, "A generic sardana class"), "Function": ({"Object"}, "A generic sardana function"), "Library": ({"Object"}, "A generic sardana library"), "PoolObject": ({"Object"}, "A Pool object"), "PoolElement": ({"Element", "PoolObject"}, "A Pool element"), "Pool": ({"PoolElement"}, "A Pool"), "Controller": ({"PoolElement"}, "A controller"), "Moveable": ({"PoolElement"}, "A moveable element"), "Acquirable": ({"PoolElement"}, "An acquirable element"), "Countable": ({"PoolElement"}, "A countable element"), "Instrument": ({"PoolElement"}, "An instrument"), "Motor": ({"Moveable", "Acquirable"}, "a motor"), "PseudoMotor": ({"Moveable", "Acquirable"}, "A pseudo motor"), "IORegister": ({"Acquirable"}, "An IO register"), "ExpChannel": ({"Acquirable"}, "A generic experimental channel"), "CTExpChannel": ({"ExpChannel", "Countable"}, "A counter/timer experimental channel"), "ZeroDExpChannel": ({"ExpChannel"}, "A 0D experimental channel"), "OneDExpChannel": ({"ExpChannel", "Countable"}, "A 1D experimental channel"), "TwoDExpChannel": ({"ExpChannel", "Countable"}, "A 2D experimental channel"), "TriggerGate": ({"PoolElement"}, "A trigger/gate"), "PseudoCounter": ({"ExpChannel"}, "A pseudo counter"), "ComChannel": ({"PoolElement"}, "A communication channel"), "MotorGroup": (set(("PoolElement",),), "A motor group"), "MeasurementGroup": ({"PoolElement", "Countable"}, "A measurement group"), "ControllerLibrary": ({"Library", "PoolObject"}, "A controller library"), "ControllerClass": ({"Class", "PoolObject"}, "A controller class"), "Constraint": ({"PoolObject"}, "A constraint"), "External": ({"Object"}, "An external object"), "MacroServerObject": ({"Object"}, "A generic macro server object"), "MacroServerElement": ({"Element", "MacroServerObject"}, "A generic macro server element"), "MacroServer": ({"MacroServerElement"}, "A MacroServer"), "Door": ({"MacroServerElement"}, "A macro server door"), "MacroLibrary": ({"Library", "MacroServerObject"}, "A macro server library"), "MacroCode": ({"MacroServerObject"}, "A macro server macro code"), "MacroClass": ({"Class", "MacroCode"}, "A macro server macro class"), "MacroFunction": ({"Function", "MacroCode"}, "A macro server macro function"), "Macro": ({"MacroClass", "MacroFunction"}, "A macro server macro"), "ParameterType": ({"Meta"}, "A generic macro server parameter type"), } #: a dictionary containing the *all* interfaces supported by each type #: (:obj:`dict` <:obj:`str`, :obj:`set` < :obj:`str`> >) INTERFACES_EXPANDED = {} def __expand(name): direct_expansion, _ = INTERFACES[name] if isinstance(direct_expansion, str): direct_expansion = direct_expansion, exp = set(direct_expansion) for e in direct_expansion: e_value = INTERFACES_EXPANDED.get(e) if e_value is None: exp.update(__expand(e)) else: exp.update(e_value[0]) exp.add(name) return exp def __build_interfaces_expanded(): global INTERFACES_EXPANDED for i in INTERFACES: INTERFACES_EXPANDED[i] = __expand(i), INTERFACES[i][1] __build_interfaces_expanded() def __expand_sardana_interface_data(si_map, name, curr_id): if name in si_map: return curr_id d = 0 i_expanded = set(INTERFACES_EXPANDED[name][0]) i_expanded.remove(name) for interface in i_expanded: if interface not in si_map: curr_id = __expand_sardana_interface_data( si_map, interface, curr_id) d |= si_map[interface] si_map[name] = int(d | curr_id) return 2 * curr_id def __root_expand_sardana_interface_data(): curr_id = 1 si_map = {} for interface in INTERFACES_EXPANDED: curr_id = __expand_sardana_interface_data(si_map, interface, curr_id) return si_map #: An enumeration describing the all possible sardana interfaces Interface = Enumeration("Interface", list(__root_expand_sardana_interface_data().items())) def __create_sardana_interfaces(): interfaces, interfaces_expanded = {}, {} for i in INTERFACES: i_enum = Interface[i] i_items, i_items_expanded = INTERFACES[i][0], INTERFACES_EXPANDED[i][0] i_enum_items = set(map(Interface.get, i_items)) i_enum_items_expanded = set(map(Interface.get, i_items_expanded)) interfaces[i_enum] = i_enum_items interfaces_expanded[i_enum] = i_enum_items_expanded return interfaces, interfaces_expanded _Interfaces, _InterfacesExpanded = __create_sardana_interfaces() #: a dictionary containing the direct interfaces supported by each type #: (:obj:`dict` <:obj:`sardana.sardanadefs.Interface`, :obj:`set` < :obj:`sardana.sardanadefs.Interface`> >) Interfaces = _Interfaces #: a dictionary containing the *all* interfaces supported by each type. #: (:obj:`dict` <:obj:`sardana.sardanadefs.Interface`, :obj:`set` < :obj:`sardana.sardanadefs.Interface`> >) InterfacesExpanded = _InterfacesExpanded class ScalarNumberFilter(object): """A simple scalar number filter that returns ``False`` if two numbers are indentical (i.e. |a-b| < error)""" def __call__(self, a, b): try: return math.fabs(a - b) > EpsilonError except: return a != b