#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module contains the most generic sardana constants and enumerations"""
import collections
__all__ = ["EpsilonError", "SardanaServer", "ServerRunMode", "State",
"DataType", "DataFormat", "DataAccess", "AttrQuality",
"DTYPE_MAP", "R_DTYPE_MAP", "DACCESS_MAP",
"from_dtype_str", "from_access_str", "to_dtype_dformat",
"to_daccess", "InvalidId", "InvalidAxis", "ElementType",
"Interface", "Interfaces", "InterfacesExpanded",
"TYPE_ELEMENTS", "TYPE_GROUP_ELEMENTS", "TYPE_MOVEABLE_ELEMENTS",
"TYPE_PHYSICAL_ELEMENTS", "TYPE_ACQUIRABLE_ELEMENTS",
"TYPE_EXP_CHANNEL_ELEMENTS", "TYPE_TIMERABLE_ELEMENTS",
"TYPE_PSEUDO_ELEMENTS", "INTERFACES", "INTERFACES_EXPANDED",
"ScalarNumberFilter"]
__docformat__ = 'restructuredtext'
import math
from enum import IntEnum
from taurus.core.util.enumeration import Enumeration
#: maximum difference between two floats so that they are considered equal
EpsilonError = 1E-16
#: sardana element state enumeration
State = Enumeration("State", (
"On",
"Off",
"Close",
"Open",
"Insert",
"Extract",
"Moving",
"Standby",
"Fault",
"Init",
"Running",
"Alarm",
"Disable",
"Unknown",
"Invalid"))
class _SardanaServer(object):
"""Class representing the current sardana server state"""
def __init__(self):
self.server_state = State.Invalid
def __repr__(self):
return "SardanaServer()"
#: the global object containing the SardanaServer information
SardanaServer = _SardanaServer()
#:
#: The sardana server run mode:
#:
#: - **SynchPure** : Pure synchronous: Start the server and run the server loop
#: until it stops
#: - **SynchThread** : separate thread synchronous: start a thread running the
#: server loop. Block until the server loop ends
#: - **SynchProcess** : separate process synchronous: start a sub-process
#: running the server loop. Block until the server loop ends
#: - **AsynchThread** : separate thread asynchronous: start a thread running the
#: server loop. Return immediately
#: - **ASynchProcess** : separate process asynchronous: start a sub-process
#: running the server loop. Return immediately
ServerRunMode = Enumeration("ServerRunMode",
("SynchPure", "SynchThread", "SynchProcess",
"AsynchThread", "AsynchProcess"))
#: sardana data types (used by device pool controllers)
DataType = Enumeration("DataType", (
"Integer",
"Double",
"String",
"Boolean",
"Encoded",
"Invalid"))
#: sardana data format enumeration (used by device pool controllers)
DataFormat = Enumeration("DataFormat", (
"Scalar",
"OneD",
"TwoD",
"Invalid"))
#: sardana data access (used by device pool controllers)
DataAccess = Enumeration("DataAccess", (
"ReadOnly",
"ReadWrite",
"Invalid"))
class AttrQuality(IntEnum):
"""Attribute quality factor"""
#: Attribute is valid
Valid = 0
#: Attribute is invalid
Invalid = 1
#: Attribute is in alarm
Alarm = 2
#: Attribute is changing e.g. element is in operation
Changing = 3
#: Attribute is in warning
Warning = 4
#: dictionary dict<data type, :class:`sardana.DataType`>
DTYPE_MAP = {
'int': DataType.Integer,
'integer': DataType.Integer,
int: DataType.Integer,
'long': DataType.Integer,
DataType.Integer: DataType.Integer,
'float': DataType.Double,
'double': DataType.Double,
float: DataType.Double,
DataType.Double: DataType.Double,
'str': DataType.String,
'string': DataType.String,
str: DataType.String,
DataType.String: DataType.String,
'bool': DataType.Boolean,
'boolean': DataType.Boolean,
bool: DataType.Boolean,
DataType.Boolean: DataType.Boolean,
}
#: dictionary dict<data type, :class:`sardana.DataType`>
R_DTYPE_MAP = {
'int': int,
'integer': int,
int: int,
'long': int,
DataType.Integer: int,
'float': float,
'double': float,
float: float,
DataType.Double: float,
'str': str,
'string': str,
str: str,
DataType.String: str,
'bool': bool,
'boolean': bool,
bool: bool,
DataType.Boolean: bool,
}
# DTYPE_MAP.setdefault(DataType.Invalid)
#: dictionary dict<access type, :class:`sardana.DataAccess`>
DACCESS_MAP = {
'read': DataAccess.ReadOnly,
DataAccess.ReadOnly: DataAccess.ReadOnly,
'readwrite': DataAccess.ReadWrite,
'read_write': DataAccess.ReadWrite,
DataAccess.ReadWrite: DataAccess.ReadWrite,
}
# DACCESS_MAP.setdefault(DataAccess.Invalid)
[docs]def from_dtype_str(dtype):
"""Transforms the given dtype parameter (string/:obj:`DataType` or None)
into a tuple of two elements (str, :obj:`DataFormat`) where the first
element is a string with a simplified data type.
- If None is given, it returns
('float', :obj:`DataFormat.Scalar`)
- If :obj:`DataType` is given, it returns
(:obj:`DataType`, :obj:`DataFormat.Scalar`)
:param dtype: the data type to be transformed
:type dtype: :obj:`str` or None or :obj:`DataType`
:return: a tuple <str, :obj:`DataFormat`> for the given dtype
:rtype: tuple<str, :obj:`DataFormat`>"""
dformat = DataFormat.Scalar
if dtype is None:
dtype = 'float'
elif isinstance(dtype, str):
dtype = dtype.lower()
if dtype.startswith("pytango."):
dtype = dtype[len("pytango."):]
if dtype.startswith("dev"):
dtype = dtype[len("dev"):]
if dtype.startswith("var"):
dtype = dtype[len("var"):]
if dtype.endswith("array"):
dtype = dtype[:dtype.index("array")]
dformat = DataFormat.OneD
return dtype, dformat
[docs]def from_access_str(access):
"""Transforms the given access parameter (string or :obj:`DataAccess`) into
a simplified data access string.
:param dtype: the access to be transformed
:type dtype: :obj:`str`
:return: a simple string for the given access
:rtype: :obj:`str`"""
if isinstance(access, str):
access = access.lower()
if access.startswith("pytango."):
access = access[len("pytango."):]
return access
[docs]def to_daccess(daccess):
"""Transforms the given access parameter (string or None) into a
:obj:`DataAccess`. If None is given returns :obj:`DataAccess.ReadWrite`
:param dtype: the access to be transformed
:type dtype: :obj:`str`
:return: a :obj:`DataAccess` for the given access
:rtype: :obj:`DataAccess`"""
if daccess is None:
daccess = DataAccess.ReadWrite
elif isinstance(daccess, str):
daccess = DACCESS_MAP.get(
from_access_str(daccess), DataAccess.ReadWrite)
return daccess
#: A constant representing an invalid ID
InvalidId = 0
#: A constant representing an invalid axis
InvalidAxis = 0
#: An enumeration describing the all possible element types in sardana
ElementType = Enumeration("ElementType", (
"Pool",
"Controller",
"Motor",
"CTExpChannel",
"ZeroDExpChannel",
"OneDExpChannel",
"TwoDExpChannel",
"ComChannel",
"IORegister",
"TriggerGate",
"PseudoMotor",
"PseudoCounter",
"Constraint",
"MotorGroup",
"MeasurementGroup",
"Instrument",
"ControllerClass",
"ControllerLibrary",
"RecorderClass",
"RecorderLibrary",
"MacroServer",
"Door",
"MacroClass",
"MacroLibrary",
"MacroFunction",
"External",
"Meta",
"ParameterType",
"Unknown"))
ET = ElementType
#: a set containning all "controllable" element types.
#: Constant values belong to :class:`~sardana.sardanadefs.ElementType`
TYPE_ELEMENTS = {ET.Motor, ET.CTExpChannel, ET.ZeroDExpChannel,
ET.OneDExpChannel, ET.TwoDExpChannel, ET.TriggerGate,
ET.ComChannel, ET.IORegister, ET.PseudoMotor,
ET.PseudoCounter, ET.Constraint}
#: a set containing all group element types.
#: Constant values belong to :class:`~sardana.sardanadefs.ElementType`
TYPE_GROUP_ELEMENTS = {ET.MotorGroup, ET.MeasurementGroup}
#: a set containing the type of elements which are moveable.
#: Constant values belong to :class:`~sardana.sardanadefs.ElementType`
TYPE_MOVEABLE_ELEMENTS = {ET.Motor, ET.PseudoMotor, ET.MotorGroup}
#: a set containing the possible types of physical elements.
#: Constant values belong to :class:`~sardana.sardanadefs.ElementType`
TYPE_PHYSICAL_ELEMENTS = {ET.Motor, ET.CTExpChannel, ET.ZeroDExpChannel,
ET.OneDExpChannel, ET.TwoDExpChannel, ET.TriggerGate,
ET.ComChannel, ET.IORegister}
#: a set containing the possible types of acquirable elements.
#: Constant values belong to :class:`~sardana.sardanadefs.ElementType`
TYPE_ACQUIRABLE_ELEMENTS = {ET.Motor, ET.CTExpChannel, ET.ZeroDExpChannel,
ET.OneDExpChannel, ET.TwoDExpChannel,
ET.ComChannel, ET.IORegister, ET.PseudoMotor,
ET.PseudoCounter}
#: a set containing the possible measure-able elements.
#: Constant values belong to :class:`~sardana.sardanadefs.ElementType`
TYPE_COUNTABLE_ELEMENTS = {ET.CTExpChannel, ET.OneDExpChannel,
ET.TwoDExpChannel, ET.MeasurementGroup}
#: a set containing the possible types of experimental channel elements.
#: Constant values belong to :class:`~sardana.sardanadefs.ElementType`
TYPE_EXP_CHANNEL_ELEMENTS = {ET.CTExpChannel, ET.ZeroDExpChannel,
ET.OneDExpChannel, ET.TwoDExpChannel,
ET.PseudoCounter}
#: a set containing the possible timer-able elements.
#: Constant values belong to :class:`~sardana.sardanadefs.ElementType`
TYPE_TIMERABLE_ELEMENTS = {ET.CTExpChannel, ET.OneDExpChannel,
ET.TwoDExpChannel}
#: a set containing the possible types of pseudo elements.
#: Constant values belong to :class:`~sardana.sardanadefs.ElementType`
TYPE_PSEUDO_ELEMENTS = {ET.PseudoMotor, ET.PseudoCounter}
# : An enumeration describing the all possible sardana interfaces
# SardanaInterface = Enumeration("SardanaInterface", ( \
# ("Object", 0b0000000000000001),
# ("Element", 0b0000000000000011),
# ("Class", 0b0000000000000101),
# ("Library", 0b0000000000001001),
# ("PoolObject", 0b0000000000010001),
# ("PoolElement", 0b0000000000010011),
# ("Pool", 0b0000000000110011),
# ("Controller", 0b0000000001000001),
# ("Moveable", 0b0000000010000001),
# ("Acquirable", 0b0000000100000001),
# ("Instrument", 0b0000001000000001),
# ("Motor", 0b0000010000000001),
# ("PseudoMotor", 0b0000100000000001),
# ("IORegister", 0b0001000000000001),
# ("ExpChannel", 0b0010000000000001),
# ("CTExpChannel", 0b0100000000000001),
# ("ZeroDExpChannel", 0b1000000000000001),
# ("OneDExpChannel", 0b0000000000000001),
# ("TwoDExpChannel", 0b0000000000000001),
# ("PseudoCounter", 0b0000000000000001),
# ("ComChannel", 0b0000000000000001),
# ("MotorGroup", 0b0000000000000001),
# ("MeasurementGroup", 0b0000000000000001),
# ("ControllerLibrary", 0b0000000000000001),
# ("ControllerClass", 0b0000000000000001),
# ("Constraint", 0b0000000000000001),
# ("External", 0b0000000000000001),
# ("MacroServerObject", 0b0000000000000001),
# ("MacroServerElement",0b0000000000000001),
# ("MacroServer", 0b0000000000000001),
# ("MacroLibrary", 0b0000000000000001),
# ("MacroClass", 0b0000000000000001),
# ("Macro", 0b0000000000000001), ) )
#: a dictionary containing the direct interfaces supported by each type
#: (:obj:`dict`\<:obj:`str`\, :obj:`tuple`\<:obj:`set`\<:obj:`str`\, :obj:`str`\>>>)
INTERFACES = {
"Meta": (set(), "A generic sardana meta object"),
"Object": (set(), "A generic sardana object"),
"Element": ({"Object"}, "A generic sardana element"),
"Class": ({"Object"}, "A generic sardana class"),
"Function": ({"Object"}, "A generic sardana function"),
"Library": ({"Object"}, "A generic sardana library"),
"PoolObject": ({"Object"}, "A Pool object"),
"PoolElement": ({"Element", "PoolObject"}, "A Pool element"),
"Pool": ({"PoolElement"}, "A Pool"),
"Controller": ({"PoolElement"}, "A controller"),
"Moveable": ({"PoolElement"}, "A moveable element"),
"Acquirable": ({"PoolElement"}, "An acquirable element"),
"Countable": ({"PoolElement"}, "A countable element"),
"Instrument": ({"PoolElement"}, "An instrument"),
"Motor": ({"Moveable", "Acquirable"}, "a motor"),
"PseudoMotor": ({"Moveable", "Acquirable"}, "A pseudo motor"),
"IORegister": ({"Acquirable"}, "An IO register"),
"ExpChannel": ({"Acquirable"}, "A generic experimental channel"),
"CTExpChannel": ({"ExpChannel", "Countable"},
"A counter/timer experimental channel"),
"ZeroDExpChannel": ({"ExpChannel"}, "A 0D experimental channel"),
"OneDExpChannel": ({"ExpChannel", "Countable"},
"A 1D experimental channel"),
"TwoDExpChannel": ({"ExpChannel", "Countable"},
"A 2D experimental channel"),
"TriggerGate": ({"PoolElement"}, "A trigger/gate"),
"PseudoCounter": ({"ExpChannel"}, "A pseudo counter"),
"ComChannel": ({"PoolElement"}, "A communication channel"),
"MotorGroup": (set(("PoolElement",),), "A motor group"),
"MeasurementGroup": ({"PoolElement", "Countable"},
"A measurement group"),
"ControllerLibrary": ({"Library", "PoolObject"}, "A controller library"),
"ControllerClass": ({"Class", "PoolObject"}, "A controller class"),
"Constraint": ({"PoolObject"}, "A constraint"),
"External": ({"Object"}, "An external object"),
"MacroServerObject": ({"Object"}, "A generic macro server object"),
"MacroServerElement": ({"Element", "MacroServerObject"},
"A generic macro server element"),
"MacroServer": ({"MacroServerElement"}, "A MacroServer"),
"Door": ({"MacroServerElement"}, "A macro server door"),
"MacroLibrary": ({"Library", "MacroServerObject"},
"A macro server library"),
"MacroCode": ({"MacroServerObject"}, "A macro server macro code"),
"MacroClass": ({"Class", "MacroCode"}, "A macro server macro class"),
"MacroFunction": ({"Function", "MacroCode"},
"A macro server macro function"),
"Macro": ({"MacroClass", "MacroFunction"}, "A macro server macro"),
"ParameterType": ({"Meta"}, "A generic macro server parameter type"),
}
#: a dictionary containing the *all* interfaces supported by each type
#: (:obj:`dict` <:obj:`str`, :obj:`set` < :obj:`str`> >)
INTERFACES_EXPANDED = {}
def __expand(name):
direct_expansion, _ = INTERFACES[name]
if isinstance(direct_expansion, str):
direct_expansion = direct_expansion,
exp = set(direct_expansion)
for e in direct_expansion:
e_value = INTERFACES_EXPANDED.get(e)
if e_value is None:
exp.update(__expand(e))
else:
exp.update(e_value[0])
exp.add(name)
return exp
def __build_interfaces_expanded():
global INTERFACES_EXPANDED
for i in INTERFACES:
INTERFACES_EXPANDED[i] = __expand(i), INTERFACES[i][1]
__build_interfaces_expanded()
def __expand_sardana_interface_data(si_map, name, curr_id):
if name in si_map:
return curr_id
d = 0
i_expanded = set(INTERFACES_EXPANDED[name][0])
i_expanded.remove(name)
for interface in i_expanded:
if interface not in si_map:
curr_id = __expand_sardana_interface_data(
si_map, interface, curr_id)
d |= si_map[interface]
si_map[name] = int(d | curr_id)
return 2 * curr_id
def __root_expand_sardana_interface_data():
curr_id = 1
si_map = {}
for interface in INTERFACES_EXPANDED:
curr_id = __expand_sardana_interface_data(si_map, interface, curr_id)
return si_map
#: An enumeration describing the all possible sardana interfaces
Interface = Enumeration("Interface",
list(__root_expand_sardana_interface_data().items()))
def __create_sardana_interfaces():
interfaces, interfaces_expanded = {}, {}
for i in INTERFACES:
i_enum = Interface[i]
i_items, i_items_expanded = INTERFACES[i][0], INTERFACES_EXPANDED[i][0]
i_enum_items = set(map(Interface.get, i_items))
i_enum_items_expanded = set(map(Interface.get, i_items_expanded))
interfaces[i_enum] = i_enum_items
interfaces_expanded[i_enum] = i_enum_items_expanded
return interfaces, interfaces_expanded
_Interfaces, _InterfacesExpanded = __create_sardana_interfaces()
#: a dictionary containing the direct interfaces supported by each type
#: (:obj:`dict` <:obj:`sardana.sardanadefs.Interface`, :obj:`set` < :obj:`sardana.sardanadefs.Interface`> >)
Interfaces = _Interfaces
#: a dictionary containing the *all* interfaces supported by each type.
#: (:obj:`dict` <:obj:`sardana.sardanadefs.Interface`, :obj:`set` < :obj:`sardana.sardanadefs.Interface`> >)
InterfacesExpanded = _InterfacesExpanded
class ScalarNumberFilter(object):
"""A simple scalar number filter that returns ``False`` if two numbers are
indentical (i.e. |a-b| < error)"""
def __call__(self, a, b):
try:
return math.fabs(a - b) > EpsilonError
except:
return a != b