#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool library. It defines the base classes
for"""
__all__ = ["PoolBaseGroup"]
__docformat__ = 'restructuredtext'
from typing import Sequence, Iterator, Dict
from taurus.core.tango.tangovalidator import TangoAttributeNameValidator
import sardana
from sardana import State, ElementType, TYPE_PHYSICAL_ELEMENTS
from sardana.pool.poolexternal import PoolExternalObject
from sardana.pool.poolcontainer import PoolContainer
from sardana.pool.poolelement import PoolElement
from sardana.pool.poolcontroller import PoolController
[docs]
class PoolBaseGroup(PoolContainer):
def __init__(self, **kwargs):
self._pending = True
self._user_element_ids = None
self._user_elements = None
self._physical_elements = None
self._physical_elements_set = None
self._state_statistics = {}
self._pool = kwargs.pop('pool')
self.set_user_element_ids(kwargs.pop('user_elements'))
PoolContainer.__init__(self)
try:
self._build_elements()
except KeyError:
# Failed to build element information. No problem.
# Probably one or more underlying elements have not
# been constructed yet.
pass
def _get_pool(self):
return self._pool
def _create_action_cache(self):
raise NotImplementedError
def _get_action_cache(self):
if self._action_cache is None:
self._action_cache = self._fill_action_cache()
return self._action_cache
def _set_action_cache(self, action_cache):
physical_elements = self.get_physical_elements()
if self._action_cache is not None:
for ctrl_physical_elements in list(physical_elements.values()):
for physical_element in ctrl_physical_elements:
action_cache.remove_element(physical_element)
self._action_cache = self._fill_action_cache(action_cache)
def _fill_action_cache(self, action_cache=None, physical_elements=None):
if action_cache is None:
action_cache = self._create_action_cache()
if physical_elements is None:
physical_elements = self.get_physical_elements()
for _, ctrl_physical_elements in list(physical_elements.items()):
for physical_element in ctrl_physical_elements:
action_cache.add_element(physical_element)
return action_cache
def _calculate_element_state(self, elem, elem_state_info):
u_state, u_status = elem_state_info
if u_status is None:
state_str = "None" if u_state is None else State.whatis(u_state)
u_status = '{} is {}'.format(elem.name, state_str)
else:
u_status = u_status.split("\n", 1)[0]
return u_state, u_status
def _calculate_states(self, state_info=None):
user_elements = self.get_user_elements()
none, unknown = set(), set()
fault, alarm, on, moving = set(), set(), set(), set()
status = []
if not state_info:
state_info = {}
for elem in user_elements:
if elem.get_type() == ElementType.External:
continue
# cannot call get_state(us) here since it may lead to dead
# lock!
si = elem.inspect_state(), elem.inspect_status()
state_info[elem] = si
for elem, elem_state_info in list(state_info.items()):
elem_type = elem.get_type()
if elem_type == ElementType.External:
continue
u_state, u_status = self._calculate_element_state(
elem, elem_state_info)
if u_state == State.Moving:
moving.add(elem)
elif u_state == State.On:
on.add(elem)
elif u_state == State.Fault:
fault.add(elem)
elif u_state == State.Alarm:
alarm.add(elem)
elif u_state is State.Unknown:
unknown.add(elem)
elif u_state is None:
none.add(elem)
status.append(u_status)
state = State.On
if none or unknown:
state = State.Unknown
if fault:
state = State.Fault
elif alarm:
state = State.Alarm
elif moving:
state = State.Moving
self._state_statistics = {State.On: on, State.Fault: fault,
State.Alarm: alarm, State.Moving: moving,
State.Unknown: unknown, None: none}
status = "\n".join(status)
return state, status
def _is_managed_element(self, element):
return True
def _is_internal_element(self, element_id):
if self.pool.use_numeric_element_ids:
return isinstance(element_id, int)
return True
def _build_elements(self):
self._user_elements = []
self._physical_elements = {}
self._physical_elements_set = set()
pool = self._get_pool()
for user_element_id in self._user_element_ids:
if self._is_internal_element(user_element_id):
try:
user_element = pool.get_element(id=user_element_id)
except KeyError:
self._pending = True
self._user_elements = None
self._physical_elements = None
self._physical_elements_set = None
raise
if not self._is_managed_element(user_element):
user_element_id = user_element.get_source()
# a tango channel or non internal element (ex: ioregister or motor
# in measurement group)
else:
validator = TangoAttributeNameValidator()
params = validator.getUriGroups(user_element_id)
params['pool'] = self._get_pool()
user_element = PoolExternalObject(**params)
self.add_user_element(user_element)
self._pending = False
[docs]
def on_element_changed(self, evt_src, evt_type, evt_value):
pass
[docs]
def set_user_element_ids(self, new_element_ids):
self.clear_user_elements()
self._user_element_ids = new_element_ids
[docs]
def get_user_element_ids(self) -> Sequence[int]:
"""Returns the sequence of user element IDs
:return: the sequence of user element IDs
"""
return self._user_element_ids
user_element_ids = property(get_user_element_ids)
[docs]
def get_user_elements(self) -> Sequence[PoolElement]:
"""Returns the sequence of user elements
:return: the sequence of user elements
"""
if self._pending:
self._build_elements()
return self._user_elements
[docs]
def get_user_elements_attribute_iterator(self) -> Iterator[sardana.sardanaattribute.SardanaAttribute]:
"""Returns an iterator over the main attribute of each user element.
:return: an iterator over the main attribute of each user element.
"""
for element in self.get_user_elements():
yield element.get_default_attribute()
get_user_elements_attribute = get_user_elements_attribute_iterator
get_user_elements_attribute.__doc__ = get_user_elements_attribute_iterator.__doc__
[docs]
def get_user_elements_attribute_sequence(self) -> Sequence[sardana.sardanaattribute.SardanaAttribute]:
"""Returns a sequence of main attribute of each user element.
In loops use preferably :meth:`get_user_elements_attribute_iterator` for
performance and memory reasons.
:return: a sequence of main attribute of each user element.
"""
return list(self.get_user_elements_attribute_iterator())
[docs]
def get_user_elements_attribute_map(self) -> Dict[PoolElement, sardana.sardanaattribute.SardanaAttribute]:
"""Returns a dictionary of main attribute of each user element.
:return: a dictionary of main attribute of each user element.
"""
ret = {}
for element in self.get_user_elements():
ret[element] = element.get_default_attribute()
return ret
[docs]
def get_physical_elements(self) -> Dict[PoolController, Sequence[PoolElement]]:
"""Returns a dictionary or physical elements where key is a controller
object and value is a sequence of pool elements
:return: a dictionary of physical elements
"""
if self._pending:
self._build_elements()
return self._physical_elements
[docs]
def get_physical_elements_iterator(self) -> Iterator[PoolElement]:
"""Returns an iterator over the physical elements.
.. warning:: The order is non deterministic.
:return: an iterator over the physical elements.
"""
for _, elements in list(self.get_physical_elements().items()):
for element in elements:
yield element
[docs]
def get_physical_elements_attribute_iterator(self) -> Iterator[sardana.sardanaattribute.SardanaAttribute]:
"""Returns an iterator over the main attribute of each physical element.
.. warning:: The order is non deterministic.
:return: an iterator over the main attribute of each physical element.
"""
for element in self.get_physical_elements_iterator():
yield element.get_default_attribute()
[docs]
def get_physical_elements_set(self):
if self._pending:
self._build_elements()
return self._physical_elements_set
[docs]
def add_user_element(self, element, index=None):
user_elements = self._user_elements
physical_elements = self._physical_elements
physical_elements_set = self._physical_elements_set
if element in user_elements:
raise Exception("Group already contains %s" % element.name)
if index is None:
index = len(user_elements)
user_elements.insert(index, element)
if not self._is_managed_element(element):
return index
self.add_element(element)
self._find_physical_elements(element,
physical_elements=physical_elements,
physical_elements_set=physical_elements_set)
action_cache = self._action_cache
if action_cache is not None:
self._fill_action_cache(action_cache=action_cache,
physical_elements=physical_elements)
element.add_listener(self.on_element_changed)
return index
def _find_physical_elements(self, element, physical_elements=None,
physical_elements_set=None):
elem_type = element.get_type()
if physical_elements is None:
physical_elements = {}
if physical_elements_set is None:
physical_elements_set = set()
if elem_type in TYPE_PHYSICAL_ELEMENTS:
ctrl = element.controller
own_elements = physical_elements.get(ctrl)
if own_elements is None:
physical_elements[ctrl] = own_elements = set()
own_elements.add(element)
physical_elements_set.add(element)
else:
elem_physical_elements = element.get_physical_elements()
for ctrl, elements in list(elem_physical_elements.items()):
own_elements = physical_elements.get(ctrl)
if own_elements is None:
physical_elements[ctrl] = own_elements = set()
own_elements.update(elements)
physical_elements_set.update(elements)
return physical_elements
# TODO: too complicated to implement for now
# def remove_user_element(self, element):
# try:
# idx = self._user_elements.index(element)
# except ValueError:
# raise Exception("Group doesn't contain %s" % element.name)
# action_cache = self.get_action_cache()
# element.remove_listener(self.on_element_changed)
# action_cache.remove_element(element)
# del self._user_elements[idx]
# del self._user_element_ids[self._user_element_ids.index(element.id)]
# self.remove_element(element)
[docs]
def clear_user_elements(self):
user_elements = self._user_elements
if user_elements is not None:
for element in user_elements:
if element.get_type() != ElementType.External:
element.remove_listener(self.on_element_changed)
self.remove_element(element)
self._action_cache = None
self._pending = True
self._user_elements = None
self._user_element_ids = None
self._physical_elements = None
# --------------------------------------------------------------------------
# stop
# --------------------------------------------------------------------------
[docs]
def stop(self):
msg = ""
for ctrl, elements in list(self.get_physical_elements().items()):
self.debug("Stopping %s %s", ctrl.name,
[e.name for e in elements])
for el in elements:
el._stopped = True
try:
error_elements = ctrl.stop_elements(elements=elements)
if len(error_elements) > 0:
element_names = [elem.name for elem in error_elements]
msg += ("Controller %s -> %s\n" %
(ctrl.name, element_names))
self.error("Unable to stop %s controller: "
"Stop of elements %s failed" %
(ctrl.name, element_names))
except Exception:
self.error("Unable to stop controller %s", ctrl.name)
self.debug("Details:", exc_info=1)
if msg:
msg_init = "Elements which could not be stopped:\n"
# TODO: think about a more specific type of exception
raise Exception(msg_init + msg)
# --------------------------------------------------------------------------
# abort
# --------------------------------------------------------------------------
[docs]
def abort(self):
msg = ""
for ctrl, elements in list(self.get_physical_elements().items()):
self.debug("Aborting %s %s", ctrl.name,
[e.name for e in elements])
try:
error_elements = ctrl.abort_elements(elements=elements)
# TODO: report elements that could not be aborted by their
# names
if len(error_elements) > 0:
element_names = [elem.name for elem in error_elements]
msg += ("Controller %s -> %s\n" %
(ctrl.name, element_names))
self.error("Unable to abort %s controller: "
"Abort of axes %s failed" %
(ctrl.name, element_names))
except Exception:
self.error("Unable to abort controller %s", ctrl.name)
self.debug("Details:", exc_info=1)
if msg:
msg_init = "Elements which could not be aborted:\n"
# TODO: think about a more specific type of exception
raise Exception(msg_init + msg)
# --------------------------------------------------------------------------
# involved in an operation
# --------------------------------------------------------------------------
[docs]
def get_operation(self):
for _, elements in list(self.get_physical_elements().items()):
for element in elements:
op = element.get_operation()
if op is not None:
return op
return None