Source code for sardana.pool.poolbaseelement

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Pool library. It defines the base classes
for"""

__all__ = ["PoolBaseElement"]

__docformat__ = 'restructuredtext'

import weakref
import threading
from typing import Any, Dict, Tuple, Optional

from taurus.core.util.lock import TaurusLock

import sardana
from sardana import State
from sardana.sardanaevent import EventType
from sardana.pool.poolobject import PoolObject


[docs] class PoolBaseElement(PoolObject): """A Pool object that besides the name, reference to the pool, ID, full_name and user_full_name has: - _simulation_mode : boolean telling if in simulation mode - _state : element state - _status : element status""" def __init__(self, **kwargs): self._simulation_mode = False self._state = None self._state_event = None self._status = None self._status_event = None self._failed_init_attrs = [] self._action_cache = None self._aborted = False self._stopped = False self._released = False lock_name = kwargs['name'] + "Lock" # A lock for high level operations: monitoring, motion or acquisition self._lock = TaurusLock(name=lock_name, lock=threading.RLock()) # The operation context in which the element is involved self._operation = None # The :class:`PoolAction` in which element is involved self._pool_action = None super(PoolBaseElement, self).__init__(**kwargs) def __enter__(self): self.lock() def __exit__(self, exc_type, exc_value, traceback): self.unlock() return False
[docs] def lock(self, blocking: bool = True) -> bool: """Acquires the this element lock :param blocking: whether or not to block if lock is already acquired [default: True] """ ret = self._lock.acquire(blocking) return ret
[docs] def unlock(self): ret = self._lock.release() return ret
[docs] def get_action_cache(self): """Returns the internal action cache object""" return self._action_cache
[docs] def serialize(self, *args, **kwargs): ret = PoolObject.serialize(self, *args, **kwargs) return ret
[docs] def init_attribute_values(self, attr_values: Optional[Dict[str, Any]] = None) -> None: """Initialize attributes with values. :param attr_values: map of attribute names and values """ self._failed_init_attrs = []
# -------------------------------------------------------------------------- # dependent elements # -------------------------------------------------------------------------- # -------------------------------------------------------------------------- # simulation mode # --------------------------------------------------------------------------
[docs] def get_simulation_mode(self, cache: bool = True, propagate: int = 1) -> bool: """Returns the simulation mode for this object. :param cache: not used [default: True] :param propagate: [default: 1] :return: the current simulation mode """ return self._simulation_mode
[docs] def set_simulation_mode(self, simulation_mode, propagate=1): self._simulation_mode = simulation_mode if not propagate: return if simulation_mode == self._simulation_mode: # current state is equal to last state_event. Skip event return self.fire_event(EventType("simulation_mode", priority=propagate), simulation_mode)
[docs] def put_simulation_mode(self, simulation_mode): self._simulation_mode = simulation_mode
simulation_mode = property(get_simulation_mode, set_simulation_mode, doc="element simulation mode") # -------------------------------------------------------------------------- # state # --------------------------------------------------------------------------
[docs] def get_state(self, cache: bool = True, propagate: int = 1) -> sardana.State: """Returns the state for this object. If cache is True (default) it returns the current state stored in cache (it will force an update if cache is empty). If propagate > 0 and if the state changed since last read, it will propagate the state event to all listeners. :param cache: tells if return value from local cache or update from HW read [default: True] :param propagate: if > 0 propagates the event in case it changed since last HW read. Values bigger that mean the event if sent should be a priority event [default: 1] :return: the current object state """ if not cache or self._state is None: state_info = self.read_state_info() self._set_state_info(state_info, propagate=propagate) return self._state
[docs] def inspect_state(self) -> sardana.State: """Looks at the current cached value of state :return: the current object state """ return self._state
[docs] def set_state(self, state, propagate=1): self._set_state(state, propagate=propagate)
def _set_state(self, state, propagate=1): self._state = state if not propagate: return if state == self._state_event: # current state is equal to last state_event. Skip event return self._state_event = state self.fire_event(EventType("state", priority=propagate), state)
[docs] def put_state(self, state): self._state = state
state = property(get_state, set_state, doc="element state") # -------------------------------------------------------------------------- # status # --------------------------------------------------------------------------
[docs] def inspect_status(self) -> str: """Looks at the current cached value of status :return: the current object status """ return self._status
[docs] def get_status(self, cache: bool = True, propagate: int = 1) -> str: """Returns the status for this object. If cache is True (default) it returns the current status stored in cache (it will force an update if cache is empty). If propagate > 0 and if the status changed since last read, it will propagate the status event to all listeners. :param cache: tells if return value from local cache or update from HW read [default: True] :param propagate: if > 0 propagates the event in case it changed since last HW read. Values bigger that mean the event if sent should be a priority event [default: 1] :return: the current object status """ if not cache or self._status is None: state_info = self.read_state_info() self._set_state_info(state_info, propagate=propagate) return self._status
[docs] def set_status(self, status, propagate=1): self._set_status(status, propagate=propagate)
def _set_status(self, status, propagate=1): self._status = status if not propagate: return s_evt = self._status_event if s_evt is not None and len(status) == len(s_evt) and status == s_evt: # current status is equal to last status_event. Skip event return self._status_event = status self.fire_event(EventType("status", priority=propagate), status)
[docs] def put_status(self, status): self._status = status
status = property(get_status, set_status, doc="element status") # -------------------------------------------------------------------------- # state information # -------------------------------------------------------------------------- def _calculate_init_attr_state_info(self, state, status): # append init attributes status i.e. which attrs failed to init # and switch state to Alarm if self._failed_init_attrs: if state not in (State.Alarm, State.Fault): state = State.Alarm status += "\n{} failed to initialize value".format( ", ".join(self._failed_init_attrs) ) return state, status _STD_STATUS = "{name} is {state}"
[docs] def calculate_state_info(self, status_info: Optional[Tuple[sardana.State, str]] = None) -> Tuple[sardana.State, str]: """Transforms the given state information. This specific base implementation transforms the given state,status tuple into a state, new_status tuple where new_status is "*self.name* is *state* plus the given status. It is assumed that the given status comes directly from the controller status information. :param status_info: given status information [default: None, meaning use current state status. :return: a transformed state information """ if status_info is None: status_info = self._state, self._status state, status = status_info state_str = State[state] new_status = self._STD_STATUS.format(name=self.name, state=state_str) if status is not None and len(status) > 0: new_status += "\n{}".format(status) # append ctrl status state, new_status = self._calculate_init_attr_state_info( state, new_status ) return state, new_status
[docs] def set_state_info(self, state_info, propagate=1, safe=False): if safe: with self: self._set_state_info(state_info, propagate=propagate) else: self._set_state_info(state_info, propagate=propagate)
def _set_state_info(self, state_info, propagate=1): state_info = self.calculate_state_info(state_info) state, status = state_info[:2] self._set_status(status, propagate=propagate) self._set_state(state, propagate=propagate)
[docs] def read_state_info(self): action_cache = self.get_action_cache() ctrl_state_info = action_cache.read_state_info(serial=True)[self] return self._from_ctrl_state_info(ctrl_state_info)
[docs] def put_state_info(self, state_info): self.set_state_info(state_info, propagate=0)
def _from_ctrl_state_info(self, state_info): state_info, _ = state_info # ignoring exc_info state, status = state_info return int(state), status # -------------------------------------------------------------------------- # default attribute # --------------------------------------------------------------------------
[docs] def get_default_attribute(self): return NotImplementedError("%s doesn't have default attribute" % self.__class__.__name__)
# -------------------------------------------------------------------------- # default acquisition channel name # --------------------------------------------------------------------------
[docs] def get_default_acquisition_channel(self): return self.get_default_attribute().name
# -------------------------------------------------------------------------- # stop # --------------------------------------------------------------------------
[docs] def stop(self): self._stopped = True
[docs] def was_stopped(self): return self._stopped
# -------------------------------------------------------------------------- # abort # --------------------------------------------------------------------------
[docs] def abort(self): self._aborted = True
[docs] def was_aborted(self): return self._aborted
# -------------------------------------------------------------------------- # release # --------------------------------------------------------------------------
[docs] def release(self): if not self.is_in_local_operation(): self.warning("Not in local operation, can not release") return operation = self.get_operation() if not operation.is_running(): self.warning("Operation is not running, can not release") return self._released = True self._state_event = None self.info("Release!") operation.release_action()
[docs] def was_released(self): return self._released
# -------------------------------------------------------------------------- # interrupted # --------------------------------------------------------------------------
[docs] def was_interrupted(self): """Tells if action ended by an abort or stop""" return self.was_aborted() or self.was_stopped()
# -------------------------------------------------------------------------- # involved in an operation # --------------------------------------------------------------------------
[docs] def is_action_running(self): """Determines if the element action is running or not.""" return self.get_action_cache().is_running()
[docs] def is_in_operation(self): """Returns True if this element is involved in any operation""" return self.get_operation() is not None
[docs] def is_in_local_operation(self): return self.get_operation() == self.get_action_cache()
[docs] def get_operation(self): return self._operation
[docs] def set_operation(self, operation): if self.is_in_operation() and operation is not None: raise Exception("%s is already involved in an operation" % self.name) if operation is not None: self._aborted = False self._stopped = False self._released = False self._operation = operation
[docs] def clear_operation(self): return self.set_operation(None)