#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool library. It defines the base classes
for"""
__all__ = ["PoolBaseElement"]
__docformat__ = 'restructuredtext'
import weakref
import threading
from typing import Any, Dict, Tuple, Optional
from taurus.core.util.lock import TaurusLock
import sardana
from sardana import State
from sardana.sardanaevent import EventType
from sardana.pool.poolobject import PoolObject
[docs]
class PoolBaseElement(PoolObject):
"""A Pool object that besides the name, reference to the pool, ID, full_name
and user_full_name has:
- _simulation_mode : boolean telling if in simulation mode
- _state : element state
- _status : element status"""
def __init__(self, **kwargs):
self._simulation_mode = False
self._state = None
self._state_event = None
self._status = None
self._status_event = None
self._failed_init_attrs = []
self._action_cache = None
self._aborted = False
self._stopped = False
self._released = False
lock_name = kwargs['name'] + "Lock"
# A lock for high level operations: monitoring, motion or acquisition
self._lock = TaurusLock(name=lock_name, lock=threading.RLock())
# The operation context in which the element is involved
self._operation = None
# The :class:`PoolAction` in which element is involved
self._pool_action = None
super(PoolBaseElement, self).__init__(**kwargs)
def __enter__(self):
self.lock()
def __exit__(self, exc_type, exc_value, traceback):
self.unlock()
return False
[docs]
def lock(self, blocking: bool = True) -> bool:
"""Acquires the this element lock
:param blocking:
whether or not to block if lock is already acquired [default: True]
"""
ret = self._lock.acquire(blocking)
return ret
[docs]
def unlock(self):
ret = self._lock.release()
return ret
[docs]
def get_action_cache(self):
"""Returns the internal action cache object"""
return self._action_cache
[docs]
def serialize(self, *args, **kwargs):
ret = PoolObject.serialize(self, *args, **kwargs)
return ret
[docs]
def init_attribute_values(self, attr_values: Optional[Dict[str, Any]] = None) -> None:
"""Initialize attributes with values.
:param attr_values: map of attribute names and values
"""
self._failed_init_attrs = []
# --------------------------------------------------------------------------
# dependent elements
# --------------------------------------------------------------------------
# --------------------------------------------------------------------------
# simulation mode
# --------------------------------------------------------------------------
[docs]
def get_simulation_mode(self, cache: bool = True, propagate: int = 1) -> bool:
"""Returns the simulation mode for this object.
:param cache: not used [default: True]
:param propagate: [default: 1]
:return: the current simulation mode
"""
return self._simulation_mode
[docs]
def set_simulation_mode(self, simulation_mode, propagate=1):
self._simulation_mode = simulation_mode
if not propagate:
return
if simulation_mode == self._simulation_mode:
# current state is equal to last state_event. Skip event
return
self.fire_event(EventType("simulation_mode", priority=propagate),
simulation_mode)
[docs]
def put_simulation_mode(self, simulation_mode):
self._simulation_mode = simulation_mode
simulation_mode = property(get_simulation_mode, set_simulation_mode,
doc="element simulation mode")
# --------------------------------------------------------------------------
# state
# --------------------------------------------------------------------------
[docs]
def get_state(self, cache: bool = True, propagate: int = 1) -> sardana.State:
"""Returns the state for this object. If cache is True (default) it
returns the current state stored in cache (it will force an update if
cache is empty). If propagate > 0 and if the state changed since last
read, it will propagate the state event to all listeners.
:param cache:
tells if return value from local cache or update from HW read
[default: True]
:param propagate:
if > 0 propagates the event in case it changed since last HW read.
Values bigger that mean the event if sent should be a priority event
[default: 1]
:return: the current object state
"""
if not cache or self._state is None:
state_info = self.read_state_info()
self._set_state_info(state_info, propagate=propagate)
return self._state
[docs]
def inspect_state(self) -> sardana.State:
"""Looks at the current cached value of state
:return: the current object state
"""
return self._state
[docs]
def set_state(self, state, propagate=1):
self._set_state(state, propagate=propagate)
def _set_state(self, state, propagate=1):
self._state = state
if not propagate:
return
if state == self._state_event:
# current state is equal to last state_event. Skip event
return
self._state_event = state
self.fire_event(EventType("state", priority=propagate), state)
[docs]
def put_state(self, state):
self._state = state
state = property(get_state, set_state, doc="element state")
# --------------------------------------------------------------------------
# status
# --------------------------------------------------------------------------
[docs]
def inspect_status(self) -> str:
"""Looks at the current cached value of status
:return: the current object status
"""
return self._status
[docs]
def get_status(self, cache: bool = True, propagate: int = 1) -> str:
"""Returns the status for this object. If cache is True (default) it
returns the current status stored in cache (it will force an update if
cache is empty). If propagate > 0 and if the status changed since last
read, it will propagate the status event to all listeners.
:param cache:
tells if return value from local cache or update from HW read
[default: True]
:param propagate:
if > 0 propagates the event in case it changed since last HW read.
Values bigger that mean the event if sent should be a priority event
[default: 1]
:return: the current object status
"""
if not cache or self._status is None:
state_info = self.read_state_info()
self._set_state_info(state_info, propagate=propagate)
return self._status
[docs]
def set_status(self, status, propagate=1):
self._set_status(status, propagate=propagate)
def _set_status(self, status, propagate=1):
self._status = status
if not propagate:
return
s_evt = self._status_event
if s_evt is not None and len(status) == len(s_evt) and status == s_evt:
# current status is equal to last status_event. Skip event
return
self._status_event = status
self.fire_event(EventType("status", priority=propagate), status)
[docs]
def put_status(self, status):
self._status = status
status = property(get_status, set_status, doc="element status")
# --------------------------------------------------------------------------
# state information
# --------------------------------------------------------------------------
def _calculate_init_attr_state_info(self, state, status):
# append init attributes status i.e. which attrs failed to init
# and switch state to Alarm
if self._failed_init_attrs:
if state not in (State.Alarm, State.Fault):
state = State.Alarm
status += "\n{} failed to initialize value".format(
", ".join(self._failed_init_attrs)
)
return state, status
_STD_STATUS = "{name} is {state}"
[docs]
def calculate_state_info(self, status_info: Optional[Tuple[sardana.State, str]] = None) -> Tuple[sardana.State, str]:
"""Transforms the given state information. This specific base
implementation transforms the given state,status tuple into a
state, new_status tuple where new_status is "*self.name* is *state*
plus the given status.
It is assumed that the given status comes directly from the controller
status information.
:param status_info:
given status information [default: None, meaning use current state
status.
:return: a transformed state information
"""
if status_info is None:
status_info = self._state, self._status
state, status = status_info
state_str = State[state]
new_status = self._STD_STATUS.format(name=self.name, state=state_str)
if status is not None and len(status) > 0:
new_status += "\n{}".format(status) # append ctrl status
state, new_status = self._calculate_init_attr_state_info(
state, new_status
)
return state, new_status
[docs]
def set_state_info(self, state_info, propagate=1, safe=False):
if safe:
with self:
self._set_state_info(state_info, propagate=propagate)
else:
self._set_state_info(state_info, propagate=propagate)
def _set_state_info(self, state_info, propagate=1):
state_info = self.calculate_state_info(state_info)
state, status = state_info[:2]
self._set_status(status, propagate=propagate)
self._set_state(state, propagate=propagate)
[docs]
def read_state_info(self):
action_cache = self.get_action_cache()
ctrl_state_info = action_cache.read_state_info(serial=True)[self]
return self._from_ctrl_state_info(ctrl_state_info)
[docs]
def put_state_info(self, state_info):
self.set_state_info(state_info, propagate=0)
def _from_ctrl_state_info(self, state_info):
state_info, _ = state_info # ignoring exc_info
state, status = state_info
return int(state), status
# --------------------------------------------------------------------------
# default attribute
# --------------------------------------------------------------------------
[docs]
def get_default_attribute(self):
return NotImplementedError("%s doesn't have default attribute" % self.__class__.__name__)
# --------------------------------------------------------------------------
# default acquisition channel name
# --------------------------------------------------------------------------
[docs]
def get_default_acquisition_channel(self):
return self.get_default_attribute().name
# --------------------------------------------------------------------------
# stop
# --------------------------------------------------------------------------
[docs]
def stop(self):
self._stopped = True
[docs]
def was_stopped(self):
return self._stopped
# --------------------------------------------------------------------------
# abort
# --------------------------------------------------------------------------
[docs]
def abort(self):
self._aborted = True
[docs]
def was_aborted(self):
return self._aborted
# --------------------------------------------------------------------------
# release
# --------------------------------------------------------------------------
[docs]
def release(self):
if not self.is_in_local_operation():
self.warning("Not in local operation, can not release")
return
operation = self.get_operation()
if not operation.is_running():
self.warning("Operation is not running, can not release")
return
self._released = True
self._state_event = None
self.info("Release!")
operation.release_action()
[docs]
def was_released(self):
return self._released
# --------------------------------------------------------------------------
# interrupted
# --------------------------------------------------------------------------
[docs]
def was_interrupted(self):
"""Tells if action ended by an abort or stop"""
return self.was_aborted() or self.was_stopped()
# --------------------------------------------------------------------------
# involved in an operation
# --------------------------------------------------------------------------
[docs]
def is_action_running(self):
"""Determines if the element action is running or not."""
return self.get_action_cache().is_running()
[docs]
def is_in_operation(self):
"""Returns True if this element is involved in any operation"""
return self.get_operation() is not None
[docs]
def is_in_local_operation(self):
return self.get_operation() == self.get_action_cache()
[docs]
def get_operation(self):
return self._operation
[docs]
def set_operation(self, operation):
if self.is_in_operation() and operation is not None:
raise Exception("%s is already involved in an operation"
% self.name)
if operation is not None:
self._aborted = False
self._stopped = False
self._released = False
self._operation = operation
[docs]
def clear_operation(self):
return self.set_operation(None)