Source code for sardana.tango.pool.Pool

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

""" """

__all__ = ["Pool", "PoolClass"]

__docformat__ = 'restructuredtext'

import json
import operator
import os.path

import PyTango

from taurus import Factory
from taurus.core.util.containers import CaselessDict
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.log import Logger, DebugIt

from sardana import sardanacustomsettings
from sardana import State, SardanaServer, ElementType, Interface, \
    TYPE_ACQUIRABLE_ELEMENTS, TYPE_PSEUDO_ELEMENTS
from sardana.pool.pool import Pool as POOL
from sardana.pool.poolmetacontroller import TYPE_MAP_OBJ
from sardana.tango.core.util import get_tango_version_number
import collections


[docs]class Pool(PyTango.LatestDeviceImpl, Logger): ElementsCache = None def __init__(self, cl, name): PyTango.LatestDeviceImpl.__init__(self, cl, name) Logger.__init__(self, name) self._use_physical_roles_property = getattr( sardanacustomsettings, "USE_PHYSICAL_ROLES_PROPERTY", False ) if not self._use_physical_roles_property: self.warning( "sardanacustomsetting.USE_PHYSICAL_ROLES_PROPERTY=False " "is deprecated since 3.4.0, use True instead " "(check docs for the implications)") self.init(name) self.init_device()
[docs] def init(self, full_name): try: db = Factory().getDatabase() alias = db.get_alias(full_name) if alias.lower() == 'nada': alias = None except: alias = None if alias is None: alias = PyTango.Util.instance().get_ds_inst_name() self._pool = POOL(self.get_full_name(), alias) self._pool.add_listener(self.on_pool_changed)
[docs] def get_full_name(self): """Compose full name from the TANGO_HOST information and device name. In case Sardana is used with Taurus 3 the full name is of format "dbhost:dbport/<domain>/<family>/<member>" where dbhost may be either FQDN or PQDN, depending on the TANGO_HOST configuration. In case Sardana is used with Taurus 4 the full name is of format "tango://dbhost:dbport/<domain>/<family>/<member>" where dbhost is always FQDN. :return: this device full name :rtype: :obj:`str` """ db = PyTango.Util.instance().get_database() db_name = db.get_db_host() + ":" + db.get_db_port() # try to use Taurus 4 to retrieve FQDN try: from taurus.core.tango.tangovalidator import \ TangoAuthorityNameValidator db_name = "//%s" % db_name db_name, _, _ = TangoAuthorityNameValidator().getNames(db_name) # if Taurus3 in use just continue except ImportError: pass full_name = db_name + "/" + self.get_name() return full_name
@property def pool(self): return self._pool
[docs] @DebugIt() def delete_device(self): # self.pool.monitor.pause() pass
[docs] @DebugIt() def init_device(self): self.set_state(PyTango.DevState.INIT) self.get_device_properties(self.get_device_class()) p = self.pool p.set_python_path(self.PythonPath) p.set_pool_path(self.PoolPath) p.set_motion_loop_sleep_time(self.MotionLoop_SleepTime / 1000) p.set_motion_loop_states_per_position( self.MotionLoop_StatesPerPosition) p.set_acq_loop_sleep_time(self.AcqLoop_SleepTime / 1000) p.set_acq_loop_states_per_value(self.AcqLoop_StatesPerValue) p.set_drift_correction(self.DriftCorrection) if self.RemoteLog is None: p.clear_remote_logging() else: try: h_p = self.RemoteLog.split(":", 1) if len(h_p) == 1: host = h_p[0] port = None else: host, port = h_p port = int(port) p.init_remote_logging(host=host, port=port) except: self.warning("Invalid property value for 'RemoteLog': %s", self.RemoteLog) p.clear_remote_logging() self._recalculate_instruments() for attr in self.get_device_class().attr_list: if attr.lower().endswith("list"): self.set_change_event(attr, True, False) self.set_change_event("State", True, False) self.set_change_event("Status", True, False) self.set_change_event("Elements", True, False) # hold the monitor thread for now! # self.pool.monitor.resume() self.set_state(PyTango.DevState.ON)
def _recalculate_instruments(self): il = self.InstrumentList = list(self.InstrumentList) p = self.pool if not p.use_numeric_element_ids: try: int(il[2]) except Exception: # no instruments - IndexError # no numeric ids - ValueError pass else: self.warning( "Forcing sardanacustomsettings.USE_NUMERIC_ELEMENT_IDS=True " "because numeric element ids are still configured in Tango DB.") p.use_numeric_element_ids = True # TODO make sure the instrument didn't change if p.use_numeric_element_ids: for i in range(0, len(il), 3): iklass, iname, iid = il[i:i + 3] iid = int(iid) p.create_instrument(full_name=iname, klass=iklass, id=iid) else: for i in range(0, len(il), 2): iklass, iname, = il[i:i + 2] p.create_instrument(full_name=iname, klass=iklass, id=iname) #@DebugIt()
[docs] def always_executed_hook(self): pass
#@DebugIt() def read_attr_hardware(self, data): pass #@DebugIt()
[docs] def read_ControllerLibList(self, attr): info = self.pool.get_elements_str_info(ElementType.ControllerLibrary) attr.set_value(info)
#@DebugIt()
[docs] def read_ControllerClassList(self, attr): info = self.pool.get_elements_str_info(ElementType.ControllerClass) attr.set_value(info)
#@PyTango.DebugIt(show_args=True,show_ret=True)
[docs] def read_ControllerList(self, attr): info = self.pool.get_elements_str_info(ElementType.Controller) attr.set_value(info)
[docs] def read_InstrumentList(self, attr): #instruments = self._pool.get_elements_by_type(ElementType.Instrument) #instrument_names = map(PoolInstrument.get_full_name, instruments) # attr.set_value(instrument_names) info = self.pool.get_elements_str_info(ElementType.Instrument) attr.set_value(info)
#@DebugIt()
[docs] def read_ExpChannelList(self, attr): info = [] info.extend(self.pool.get_elements_str_info(ElementType.CTExpChannel)) info.extend(self.pool.get_elements_str_info( ElementType.ZeroDExpChannel)) info.extend(self.pool.get_elements_str_info( ElementType.OneDExpChannel)) info.extend(self.pool.get_elements_str_info( ElementType.TwoDExpChannel)) info.extend(self.pool.get_elements_str_info(ElementType.PseudoCounter)) attr.set_value(info)
#@DebugIt()
[docs] def read_AcqChannelList(self, attr): info = self.pool.get_acquisition_elements_str_info() attr.set_value(info)
#@DebugIt()
[docs] def read_MotorGroupList(self, attr): info = self.pool.get_elements_str_info(ElementType.MotorGroup) attr.set_value(info)
#@DebugIt()
[docs] def read_MotorList(self, attr): info = self.pool.get_elements_str_info(ElementType.Motor) info.extend(self.pool.get_elements_str_info(ElementType.PseudoMotor)) attr.set_value(info)
#@DebugIt()
[docs] def read_TriggerGateList(self, attr): info = self.pool.get_elements_str_info(ElementType.TriggerGate) attr.set_value(info)
#@DebugIt()
[docs] def read_MeasurementGroupList(self, attr): info = self.pool.get_elements_str_info(ElementType.MeasurementGroup) attr.set_value(info)
#@DebugIt()
[docs] def read_IORegisterList(self, attr): info = self.pool.get_elements_str_info(ElementType.IORegister) attr.set_value(info)
#@DebugIt()
[docs] def read_ComChannelList(self, attr): info = self.pool.get_elements_str_info(ElementType.Communication) attr.set_value(info)
#@DebugIt()
[docs] def getElements(self, cache=True): value = self.ElementsCache if cache and value is not None: return value value = dict(new=self.pool.get_elements_info()) value = CodecFactory().encode('utf8_json', ('', value)) self.ElementsCache = value return value
#@DebugIt()
[docs] def read_Elements(self, attr): element_list = self.getElements() attr.set_value(*element_list)
[docs] def is_Elements_allowed(self, req_type): return True return SardanaServer.server_state == State.Running
is_ControllerLibList_allowed = \ is_ControllerClassList_allowed = \ is_ControllerList_allowed = \ is_InstrumentList_allowed = \ is_ExpChannelList_allowed = \ is_TriggerGateList_allowed = \ is_AcqChannelList_allowed = \ is_MotorGroupList_allowed = \ is_MotorList_allowed = \ is_MeasurementGroupList_allowed = \ is_IORegisterList_allowed = \ is_ComChannelList_allowed = is_Elements_allowed def _get_interface_ids(self, interface, elem_names): _pool, motor_ids = self.pool, [] for elem_name in elem_names: try: element = _pool.get_element_by_name(elem_name) except: element = _pool.get_element_by_full_name(elem_name) elem_interface = element.get_interface() if not Interface.Moveable & elem_interface: raise Exception("%s is a %s. It MUST be a moveable" % (element.name, Interface[elem_interface])) motor_ids.append(element.id) return motor_ids def _get_moveable_ids(self, elem_names): return self._get_interface_ids(Interface.Moveable, elem_names) def _get_acquirable_ids(self, elem_names): return self._get_interface_ids(Interface.Acquirable, elem_names) #@DebugIt()
[docs] def CreateController(self, argin): kwargs = self._format_CreateController_arguments(argin) # TODO: Support in future sequence of elements kwargs = kwargs[0] type_str = kwargs['type'] lib = kwargs['library'] class_name = kwargs['klass'] name = kwargs['name'] properties = kwargs['properties'] elem_type = ElementType[type_str] mod_name, _ = os.path.splitext(lib) kwargs['module'] = mod_name td = TYPE_MAP_OBJ[ElementType.Controller] auto_full_name = td.auto_full_name ctrl_class = td.ctrl_klass full_name = kwargs.get("full_name", auto_full_name.format(**kwargs)) util = PyTango.Util.instance() # check that element doesn't exist yet self._check_element(name, full_name) # check library exists ctrl_manager = self.pool.ctrl_manager mod_name, _ = os.path.splitext(lib) ctrl_lib = ctrl_manager.getControllerLib(mod_name) if ctrl_lib is None: raise Exception("Controller library '%s' not found" % lib) # check class exists ctrl_class = ctrl_lib.get_controller(class_name) if ctrl_class is None: raise Exception("Controller class '%s' not found in '%s'" % (class_name, lib)) # check that class type matches the required type if elem_type not in ctrl_class.types: raise Exception("Controller class '%s' does not implement '%s' " "interface" % (class_name, type_str)) # check that necessary property values are set for prop_name, prop_info in list(ctrl_class.ctrl_properties.items()): prop_value = properties.get(prop_name) if prop_value is None: if prop_info.default_value is None: raise Exception("Controller class '%s' needs property '%s'" % (class_name, prop_name)) else: properties[prop_name] = prop_value # for pseudo motor check that motors are given if elem_type == ElementType.PseudoMotor: klass_roles = ctrl_class.controller_class.motor_roles klass_pseudo_roles = ctrl_class.controller_class.pseudo_motor_roles if not len(klass_pseudo_roles): klass_pseudo_roles = class_name, roles = kwargs.get('roles') if roles is None: raise Exception("Pseudo motor controller class %s needs motors " "for roles: %s and pseudo roles: %s" % (class_name, ", ".join(klass_roles), ", ".join(klass_pseudo_roles))) motor_ids = [] for klass_role in klass_roles: if klass_role not in roles: raise Exception("Pseudo motor controller class '%s' needs " "motor(s) for role(s) %s" % (class_name, klass_role)) motor_name = roles[klass_role] if self._use_physical_roles_property: motor_ids.append(klass_role) motor_ids.extend(self._get_moveable_ids((motor_name,))) if self._use_physical_roles_property: property_name = 'physical_roles' else: property_name = 'motor_role_ids' properties[property_name] = motor_ids pseudo_motor_infos = {} pseudo_motor_ids = [] for i, klass_pseudo_role in enumerate(klass_pseudo_roles): if klass_pseudo_role not in roles: raise Exception("Pseudo motor controller class '%s' needs " "pseudo motor name for role '%s'" % (class_name, klass_pseudo_role)) pm_id = self.pool.get_new_id() pm_name = roles[klass_pseudo_role] info = dict(id=pm_id, name=pm_name, ctrl_name=name, axis=i + 1, type='PseudoMotor') if pm_name.count(',') > 0: n, fn = list(map(str.strip, pm_name.split(',', 1))) info['name'], info['full_name'] = n, fn pseudo_motor_infos[klass_pseudo_role] = info pseudo_motor_ids.append(pm_id) # for pseudo counter check counters are given elif elem_type == ElementType.PseudoCounter: klass_roles = ctrl_class.controller_class.counter_roles klass_pseudo_roles = ctrl_class.controller_class.pseudo_counter_roles if not len(klass_pseudo_roles): klass_pseudo_roles = class_name, roles = kwargs.get('roles') if roles is None: raise Exception("Pseudo counter controller class '%s' needs " "counter(s) for role(s): %s and pseudo " "role(s): %s" % (class_name, ", ".join(klass_roles), ", ".join(klass_pseudo_roles))) counter_ids = [] for klass_role in klass_roles: if klass_role not in roles: raise Exception("Pseudo counter controller class '%s' " "needs counter name for role '%s'" % (class_name, klass_role)) counter_name = roles[klass_role] if self._use_physical_roles_property: counter_ids.append(klass_role) counter_ids.extend(self._get_acquirable_ids((counter_name,))) if self._use_physical_roles_property: property_name = 'physical_roles' else: property_name = 'counter_role_ids' properties[property_name] = counter_ids pseudo_counter_infos = {} pseudo_counter_ids = [] for i, klass_pseudo_role in enumerate(klass_pseudo_roles): if klass_pseudo_role not in roles: raise Exception("Pseudo counter controller class %s needs " "pseudo motor name for role %s" % (class_name, klass_pseudo_role)) pc_id = self.pool.get_new_id() pc_name = roles[klass_pseudo_role] info = dict(id=pc_id, name=pc_name, ctrl_name=name, axis=i + 1, type='PseudoCounter') if pc_name.count(',') > 0: n, fn = list(map(str.strip, pc_name.split(',', 1))) info['name'], info['full_name'] = n, fn pseudo_counter_infos[klass_pseudo_role] = info pseudo_counter_ids.append(pc_id) properties['type'] = type_str properties['library'] = lib properties['klass'] = class_name def create_controller_cb(device_name): if self.pool.use_numeric_element_ids: properties['id'] = self.pool.get_new_id() try: db = util.get_database() #types = [ pool.ElementType.whatis(t) for t in ctrl.get_ctrl_types() ] db.put_device_property(device_name, properties) except: self.warning("Unexpected error in controller creation callback", exc_info=True) raise util.create_device('Controller', full_name, name, cb=create_controller_cb) # for pseudo motor/counter controller also create pseudo # motor(s)/counter(s) automatically if elem_type == ElementType.PseudoMotor: for pseudo_motor_info in list(pseudo_motor_infos.values()): self._create_single_element(pseudo_motor_info) elif elem_type == ElementType.PseudoCounter: for pseudo_counter_info in list(pseudo_counter_infos.values()): self._create_single_element(pseudo_counter_info)
#@DebugIt()
[docs] def CreateInstrument(self, argin): kwargs = self._format_CreateInstrument_arguments(argin) instrument = self.pool.create_instrument(**kwargs) instrument_list = self.InstrumentList # update database property instrument_list.append(instrument.instrument_class) instrument_list.append(instrument.full_name) if self.pool.use_numeric_element_ids: instrument_list.append(instrument.id) db = PyTango.Util.instance().get_database() props = {'InstrumentList': instrument_list} db.put_device_property(self.get_name(), props)
#@DebugIt()
[docs] def CreateElement(self, argin): kwargs_seq = self._format_CreateElement_arguments(argin) for kwargs in kwargs_seq: self._create_single_element(kwargs)
[docs] def RenameElement(self, argin): old_name = argin[0] new_name = argin[1] self.pool.rename_element(old_name, new_name) # obtain normal name (without database) and apply new alias element = self.pool.get_element_by_name(new_name) db = PyTango.Util.instance().get_database() normal_name = '/'.join(element.full_name.split('/')[-3:]) db.put_device_alias(normal_name, new_name)
def _create_single_element(self, kwargs): elem_type_str = kwargs['type'] ctrl_name = kwargs['ctrl_name'] axis = kwargs['axis'] try: elem_type = ElementType[elem_type_str] except KeyError: raise Exception("Unknown element type '%s'" % elem_type_str) name = kwargs['name'] td = TYPE_MAP_OBJ[elem_type] auto_full_name = td.auto_full_name full_name = kwargs.get("full_name", auto_full_name.format(**kwargs)) ctrl = self.pool.get_element(name=ctrl_name) if ctrl.get_type() != ElementType.Controller: type_str = ElementType.whatis(ctrl.get_type()) raise Exception("'%s' is not a controller (It is a %s)" % (ctrl_name, type_str)) ctrl_types = ctrl.get_ctrl_types() if elem_type not in ctrl_types: ctrl_type_str = ElementType.whatis(ctrl_types[0]) raise Exception("Cannot create %s in %s controller" % (type, ctrl_type_str)) elem_axis = ctrl.get_element(axis=axis) if elem_axis is not None: raise Exception("Controller already contains axis %d (%s)" % (axis, elem_axis.get_name())) self._check_element(name, full_name) util = PyTango.Util.instance() def create_element_cb(device_name): try: db = util.get_database() if self.pool.use_numeric_element_ids: data = { "id": self.pool.get_new_id(), "ctrl_id": ctrl.get_id() } else: data = { "ctrl_id": ctrl.name } data["axis"] = axis db.put_device_property(device_name, data) except: import traceback traceback.print_exc() util.create_device(elem_type_str, full_name, name, cb=create_element_cb) #@DebugIt()
[docs] def CreateMotorGroup(self, argin): kwargs = self._format_CreateMotorGroup_arguments(argin) # TODO: Support in future sequence of elements kwargs = kwargs[0] util = PyTango.Util.instance() name = kwargs['name'] kwargs['pool_name'] = self.pool.name td = TYPE_MAP_OBJ[ElementType.MotorGroup] auto_full_name = td.auto_full_name full_name = kwargs.get("full_name", auto_full_name.format(**kwargs)) self._check_element(name, full_name) elem_ids = self._get_moveable_ids(kwargs["elements"]) def create_motgrp_cb(device_name): db = util.get_database() data = {"id": self.pool.get_new_id(), "elements": elem_ids} db.put_device_property(device_name, data) data = {} data["position"] = {"abs_change": "1.0"} db.put_device_attribute_property(device_name, data) util.create_device("MotorGroup", full_name, name, cb=create_motgrp_cb)
#@DebugIt()
[docs] def CreateMeasurementGroup(self, argin): kwargs = self._format_CreateMeasurementGroup_arguments(argin) # TODO: Support in future sequence of elements kwargs = kwargs[0] util = PyTango.Util.instance() name = kwargs['name'] kwargs['pool_name'] = self.pool.name td = TYPE_MAP_OBJ[ElementType.MeasurementGroup] auto_full_name = td.auto_full_name full_name = kwargs.get("full_name", auto_full_name.format(**kwargs)) self._check_element(name, full_name) elem_ids = [] for elem_name in kwargs["elements"]: # if internal pool element (channel, motor, ioregister, etc) store # it's id try: elem = self.pool.get_element(name=elem_name) elem_ids.append(elem.id) except: # otherwise assume a tango attribute/command elem_ids.append(elem_name) def create_mntgrp_cb(device_name): data = {"elements": elem_ids} if self.pool.use_numeric_element_ids: data["id"] = self.pool.get_new_id() db = util.get_database() db.put_device_property(device_name, data) data = {} db.put_device_attribute_property(device_name, data) util.create_device("MeasurementGroup", full_name, name, cb=create_mntgrp_cb)
def _check_element(self, name, full_name): self.pool.check_element(name, full_name) db = PyTango.Util.instance().get_database() e = None try: db.import_device(name) e = Exception("The tango alias '%s' already exists" % name) except: pass if e: raise e try: db.import_device(full_name) e = Exception("The tango device '%s' already exists" % full_name) except: pass if e: raise e
[docs] def on_pool_changed(self, evt_src, evt_type, evt_value): # during server startup and shutdown avoid processing element # creation events if SardanaServer.server_state != State.Running: return evt_name = evt_type.name.lower() if evt_name in ("elementcreated", "elementdeleted", "elementchanged"): elem = evt_value elem_type = elem.get_type() td = TYPE_MAP_OBJ[elem_type] attribute_list_name = td.family + "List" info = self.pool.get_elements_str_info(elem_type) self.push_change_event(attribute_list_name, info) if elem_type in TYPE_ACQUIRABLE_ELEMENTS: info = self.pool.get_acquisition_elements_str_info() self.push_change_event('AcqChannelList', info) # force the element list cache to be rebuild next time someone reads # the element list self.ElementsCache = None value = {} if evt_name == "elementcreated": key = 'new' elif evt_name == "elementdeleted": key = 'del' else: key = 'change' json_elem = elem.serialize(pool=self.pool.full_name) value[key] = json_elem, value = CodecFactory().getCodec('utf8_json').encode(('', value)) self.push_change_event('Elements', *value) elif evt_name == "elementschanged": # force the element list cache to be rebuild next time someone reads # the element list self.ElementsCache = None pool_name = self.pool.full_name new_values, changed_values, deleted_values = [], [], [] for elem in evt_value['new']: json_elem = elem.serialize(pool=pool_name) new_values.append(json_elem) for elem in evt_value['change']: json_elem = elem.serialize(pool=pool_name) changed_values.append(json_elem) for elem in evt_value['del']: json_elem = elem.serialize(pool=pool_name) deleted_values.append(json_elem) value = {"new": new_values, "change": changed_values, "del": deleted_values} value = CodecFactory().getCodec('utf8_json').encode(('', value)) self.push_change_event('Elements', *value)
def _format_create_json_arguments(self, argin): elems, ret = json.loads(argin[0]), [] if isinstance(elems, collections.abc.Mapping): elems = [elems] for elem in elems: d = {} for k, v in list(elem.items()): d[str(k)] = str(v) ret.append(d) return ret def _format_CreateElement_arguments(self, argin): if len(argin) == 0: msg = PoolClass.cmd_list["CreateElement"][0][1] raise Exception(msg) if len(argin) == 1: return self._format_create_json_arguments(argin) ret = {'type': argin[0], 'ctrl_name': argin[1], 'axis': int(argin[2]), 'name': argin[3]} if len(argin) > 4: ret['full_name'] = argin[4] return [ret] def _format_CreateController_arguments(self, argin): if len(argin) == 0: msg = PoolClass.cmd_list["CreateController"][0][1] raise Exception(msg) if len(argin) == 1: ret = self._format_create_json_arguments(argin) if 'type' not in ret: raise KeyError("Missing key 'type'") if 'library' not in ret: raise KeyError("Missing key 'library'") if 'klass' not in ret: raise KeyError("Missing key 'klass'") if 'name' not in ret: raise KeyError("Missing key 'name'") if 'properties' not in ret: ret['properties'] = CaselessDict() return ret ret = {'type': argin[0], 'library': argin[1], 'klass': argin[2], 'name': argin[3]} i = 4 roles = {} for arg in argin[4:]: role_data = arg.split('=', 1) if len(role_data) < 2: break role_name, role_element = role_data roles[role_name] = role_element i += 1 if len(roles) > 0: ret['roles'] = roles p = argin[i:] if len(p) % 2: raise Exception("must give pair of property name, property value") props = CaselessDict() for name, value in zip(p[0::2], p[1::2]): props[name] = value ret['properties'] = props return [ret] def _format_CreateMotorGroup_arguments(self, argin): if len(argin) == 0: msg = PoolClass.cmd_list["CreateMotorGroup"][0][1] raise Exception(msg) if len(argin) == 1: ret = [] try: elems = json.loads(argin[0]) except: elems = argin if isinstance(elems, collections.abc.Mapping): elems = [elems] for elem in elems: d = {} for k, v in list(elem.items()): d[str(k)] = str(v) ret.append(d) return ret ret = {'name': argin[0]} if argin[-1].count('/') == 2: ret['full_name'] = argin[-1] del argin[-1] ret['elements'] = argin[1:] return [ret] def _format_CreateMeasurementGroup_arguments(self, argin): if len(argin) == 0: msg = PoolClass.cmd_list["CreateMeasurementGroup"][0][1] raise Exception(msg) if len(argin) == 1: ret = [] try: elems = json.loads(argin[0]) except: elems = argin if isinstance(elems, collections.abc.Mapping): elems = [elems] for elem in elems: d = {} for k, v in list(elem.items()): d[str(k)] = str(v) ret.append(d) return ret ret = {'name': argin[0]} # if argin[-1].count('/') == 2: # ret['full_name'] = argin[-1] # del argin[-1] channels = [] for arg in argin[1:]: try: channel = self.pool.get_element_by_full_name(arg) channels.append(channel.name) except: try: channel = self.pool.get_element_by_name(arg) channels.append(channel.name) except: channels.append(arg) ret['elements'] = channels return [ret] def _format_CreateInstrument_arguments(self, argin): ret = {'full_name': argin[0], 'klass': argin[1]} if len(argin) > 2: ret['id'] = argin[2] return ret #@DebugIt()
[docs] def DeleteElement(self, name): try: elem = self.pool.get_element(full_name=name) except: elem = self.pool.get_element(name=name) elem_type = elem.get_type() td = TYPE_MAP_OBJ[elem_type] type_name = td.name full_name = elem.get_full_name() self.pool.delete_element(name) if elem_type == ElementType.Instrument: # update database property il = self.InstrumentList idx = il.index(full_name) if self.pool.use_numeric_element_ids: self.InstrumentList = il[:idx - 1] + il[idx + 2:] else: # no id in property self.InstrumentList = il[:idx - 1] + il[idx + 1:] db = PyTango.Util.instance().get_database() props = {'InstrumentList': self.InstrumentList} db.put_device_property(self.get_name(), props) else: util = PyTango.Util.instance() util.delete_device(type_name, full_name)
#@DebugIt()
[docs] def GetControllerClassInfo(self, names): if names.startswith('['): names = json.loads(names) else: names = (names,) ctrl_classes = self.pool.get_controller_classes_info(names) ret = [] for name in names: ctrl_class = ctrl_classes[name] data = None if ctrl_class is not None: data = ctrl_class.serialize() ret.append(data) return json.dumps(ret)
#@DebugIt()
[docs] def ReloadControllerLib(self, lib_name): self.pool.reload_controller_lib(lib_name)
#@DebugIt()
[docs] def ReloadControllerClass(self, class_name): self.pool.reload_controller_class(class_name)
[docs] def Stop(self): self.pool.stop()
[docs] def Abort(self): self.pool.abort()
[docs] def SendToController(self, stream): ctrl_name, stream = stream[:2] try: ctrl = self.pool.get_element_by_name(ctrl_name) except KeyError: ctrl = self.pool.get_element_by_full_name(ctrl_name) return ctrl.send_to_controller(stream)
[docs] def GetFile(self, name): p = self.pool manager = p.ctrl_manager lib = manager.getControllerLib(name) if lib is None: raise Exception("Unknown controller file '%s'", name) return lib.path, "".join(lib.get_code())
[docs] def PutFile(self, file_data): p = self.pool manager = p.ctrl_manager manager.setControllerLib(*file_data)
[docs] def GetControllerCode(self, argin): pass
[docs] def SetControllerCode(self, argin): pass
CREATE_CONTROLLER_PAR_IN_DOC = """\ Must give either: * A JSON encoded dict as first string with: * mandatory keys: 'type', 'library', 'klass' and 'name' (values are strings). * optional keys: * 'properties': a dict with keys being property names and values the property values * 'roles': a dict with keys being controller roles and values being element names. (example: { 'gap' : 'motor21', 'offset' : 'motor55' }). Only applicable of pseudo controllers * a sequence of strings: <type>, <library>, <class>, <name> [, <role_name>'='<element name>] [, <property name>, <property value>] Examples:: data = dict(type='Motor', library='DummyMotorController', klass='DummyMotorController', name='my_motor_ctrl_1') pool.CreateController([json.dumps(data)]) pool.CreateController(['Motor', 'DummyMotorController', 'DummyMotorController', 'my_motor_ctrl_2']) """ CREATE_CONTROLLER_PAR_OUT_DOC = "None" CREATE_CONTROLLER_DOC = """\ Tango command to create controller. :param argin: {0} :type argin: list<str> :return: {1} """.format(CREATE_CONTROLLER_PAR_IN_DOC, CREATE_CONTROLLER_PAR_OUT_DOC) CREATE_ELEMENT_PAR_IN_DOC = """\ Must give either: * A JSON encoded dict as first string with: * mandatory keys: 'type', 'ctrl_name', 'axis', 'name' (values are strings). * optional keys: * 'full_name' : a string representing the full tango device name * a sequence of strings: <type>, <ctrl_name>, <axis>, <name> [, <full_name>] Examples:: data = dict(type='Motor', ctrl_name='my_motor_ctrl_1', axis='4', name='theta', full_name='BL99/EH/THETA') pool.CreateElement([json.dumps(data)]) pool.CreateElement(['Motor', 'my_motor_ctrl_1', '1', 'phi', 'BL99/EH/PHI']) """ CREATE_ELEMENT_PAR_OUT_DOC = "None" CREATE_ELEMENT_DOC = """\ Tango command to create element (motor, counter/timer, 0D, 1D, 2D, IORegister). :param argin: {0} :type argin: list<str> :return: {1} """.format(CREATE_ELEMENT_PAR_IN_DOC, CREATE_ELEMENT_PAR_OUT_DOC) CREATE_INSTRUMENT_PAR_IN_DOC = """\ Must give either: * A JSON encoded dict as first string with: * mandatory keys: 'full_name', 'klass' (values are strings). * a sequence of strings: <full_name>, <class> Examples:: pool.CreateInstrument(['/OH', 'NXhutch']) pool.CreateInstrument(['/OH/Mono', 'NXmonochromator']) pool.CreateInstrument(['/EH', 'NXhutch']) pool.CreateInstrument(['/EH/Pilatus', 'NXdetector']) """ CREATE_INSTRUMENT_PAR_OUT_DOC = "None" CREATE_INSTRUMENT_DOC = """\ Tango command to create instrument. :param argin: {0} :type argin: list<str> :return: {1} """.format(CREATE_INSTRUMENT_PAR_IN_DOC, CREATE_INSTRUMENT_PAR_OUT_DOC) CREATE_MOTOR_GROUP_PAR_IN_DOC = """\ Must give either: * A JSON encoded dict as first string with: * mandatory keys: 'name', 'elements' (with value being a list of moveables) * optional keys: * 'full_name': with value being a full tango device name * a sequence of strings: <motor group name> [, <element> ]" Examples:: data = dict(name='diffrac_motor_group', elements=['theta', 'theta2', 'phi']) pool.CreateMotorGroup([json.dumps(data)]) pool.CreateMotorGroup(['diffrac_mg', 'theta', 'theta2' ]) """ CREATE_MOTOR_GROUP_PAR_OUT_DOC = "None" CREATE_MOTOR_GROUP_DOC = """\ Tango command to create motor group. :param argin: {0} :type argin: list<str> :return: {1} """.format(CREATE_MOTOR_GROUP_PAR_IN_DOC, CREATE_MOTOR_GROUP_PAR_OUT_DOC) Pool.CreateMotorGroup.__doc__ = CREATE_MOTOR_GROUP_DOC CREATE_MEASUREMENT_GROUP_PAR_IN_DOC = """\ Must give either: * A JSON encoded dict as first string with: * mandatory keys: 'name', 'elements' (with value being a list of acquirables)" * optional keys: * 'full_name': with value being a full tango device name * a sequence of strings: <motor group name> [, <element> ]" An acquirable is either a sardana element (counter/timer, 0D, 1D, 2D, motor) or a tango attribute (ex: sys/tg_test/1/short_spectrum_ro) Examples:: data = dict(name='my_exp_01', elements=['timer', 'C1', 'sys/tg_test/1/double_scalar']) pool.CreateMeasurementGroup([json.dumps(data)]) pool.CreateMeasurementGroup(['my_exp_02', 'timer', 'CCD1', 'sys/tg_test/1/short_spectrum_ro']) """ CREATE_MEASUREMENT_GROUP_PAR_OUT_DOC = "None" CREATE_MEASUREMENT_GROUP_DOC = """\ Tango command to create measurement group. :param argin: {0} :type argin: list<str> :return: {1} """.format(CREATE_MEASUREMENT_GROUP_PAR_IN_DOC, CREATE_MEASUREMENT_GROUP_PAR_OUT_DOC) DELETE_ELEMENT_PAR_IN_DOC = """\ name of element to be deleted """ DELETE_ELEMENT_PAR_OUT_DOC = "None" DELETE_ELEMENT_DOC = """\ Tango command to delete element. :param argin: {0} :type argin: :obj:`str` :return: {1} """.format(DELETE_ELEMENT_PAR_IN_DOC, DELETE_ELEMENT_PAR_OUT_DOC) GET_CONTROLLER_CLASS_INFO_PAR_IN_DOC = """\ Must give either: * A JSON encoded list of controller class names * a controller class name Examples:: data = "DummyMotorController", "DummyCounterTimerController" pool.GetControllerClassInfo(json.dumps(data)) pool.GetControllerClassInfo("DummyMotorController") """ GET_CONTROLLER_CLASS_INFO_PAR_OUT_DOC = """ a JSON encoded string describing the controller class """ GET_CONTROLLER_CLASS_INFO_DOC = """\ Tango command to get detailed information about a controller class. :param argin: {0} :type argin: :obj:`str` :return: {1} :rtype: :obj:`str` """.format(GET_CONTROLLER_CLASS_INFO_PAR_IN_DOC, GET_CONTROLLER_CLASS_INFO_PAR_OUT_DOC) RELOAD_CONTROLLER_LIB_PAR_IN_DOC = """\ the controller library name (without extension) """ RELOAD_CONTROLLER_LIB_PAR_OUT_DOC = "None" RELOAD_CONTROLLER_LIB_INFO_DOC = """\ Tango command to reload the controller library code. :param argin: {0} :type argin: :obj:`str` :return: {1} """.format(RELOAD_CONTROLLER_LIB_PAR_IN_DOC, RELOAD_CONTROLLER_LIB_PAR_OUT_DOC) RELOAD_CONTROLLER_CLASS_PAR_IN_DOC = """\ the controller class name """ RELOAD_CONTROLLER_CLASS_PAR_OUT_DOC = "None" RELOAD_CONTROLLER_CLASS_INFO_DOC = """\ Tango command to reload the controller class code (reloads the entire library where the class is described). :param argin: {0} :type argin: :obj:`str` :return: {1} """.format(RELOAD_CONTROLLER_CLASS_PAR_IN_DOC, RELOAD_CONTROLLER_CLASS_PAR_OUT_DOC) RENAME_ELEMENT_PAR_IN_DOC = """ Two elements sequence of strings: <old element name>, <new element name> """ RENAME_ELEMENT_PAR_OUT_DOC = "None" RENAME_ELEMENT_CLASS_INFO_DOC = """\ Tango command to rename the element (rename Pool element and put new alias in the Tango Database). :param argin: {0} :type argin: list<str> :return: {1} """.format(RENAME_ELEMENT_PAR_IN_DOC, RENAME_ELEMENT_PAR_OUT_DOC) STOP_PAR_IN_DOC = "None" STOP_PAR_OUT_DOC = "None" STOP_DOC = """\ Stops all elements managed by this Pool :param argin: {0} :return: {1} """.format(STOP_PAR_IN_DOC, STOP_PAR_OUT_DOC) ABORT_PAR_IN_DOC = "None" ABORT_PAR_OUT_DOC = "None" ABORT_DOC = """\ Aborts all elements managed by this Pool :param argin: {0} :return: {1} """.format(ABORT_PAR_IN_DOC, ABORT_PAR_OUT_DOC) SEND_TO_CONTROLLER_PAR_IN_DOC = """\ a sequence of two strings: <controller name>, <data> """ SEND_TO_CONTROLLER_PAR_OUT_DOC = """\ the controller response """ SEND_TO_CONTROLLER_DOC = """\ Sends a string to a controller. :param argin: {0} :return: {1} """.format(SEND_TO_CONTROLLER_PAR_IN_DOC, SEND_TO_CONTROLLER_PAR_OUT_DOC) Pool.CreateController.__doc__ = CREATE_CONTROLLER_DOC Pool.CreateElement.__doc__ = CREATE_ELEMENT_DOC Pool.CreateInstrument.__doc__ = CREATE_INSTRUMENT_DOC Pool.CreateMotorGroup.__doc__ = CREATE_MOTOR_GROUP_DOC Pool.CreateMeasurementGroup.__doc__ = CREATE_MEASUREMENT_GROUP_DOC Pool.DeleteElement.__doc__ = DELETE_ELEMENT_DOC Pool.GetControllerClassInfo.__doc__ = GET_CONTROLLER_CLASS_INFO_DOC Pool.ReloadControllerLib.__doc__ = RELOAD_CONTROLLER_LIB_INFO_DOC Pool.ReloadControllerClass.__doc__ = RELOAD_CONTROLLER_CLASS_INFO_DOC Pool.RenameElement.__doc__ = RENAME_ELEMENT_CLASS_INFO_DOC Pool.Stop.__doc__ = STOP_DOC Pool.Abort.__doc__ = ABORT_DOC
[docs]class PoolClass(PyTango.DeviceClass): # Class Properties class_property_list = { } # Device Properties device_property_list = { 'PoolPath': [PyTango.DevVarStringArray, "list of directories to search for controllers (path separators " "can be '\n' or character conventionally used by the OS to" "separate search path components, such as ':' for POSIX" "or ';' for Windows)", []], 'PythonPath': [PyTango.DevVarStringArray, "list of directories to be appended to sys.path at startup (path " "separators can be '\n' or ':')", []], 'MotionLoop_SleepTime': [PyTango.DevLong, "Sleep time in the motion loop in mS [default: %dms]" % int(POOL.Default_MotionLoop_SleepTime * 1000), int(POOL.Default_MotionLoop_SleepTime * 1000)], 'MotionLoop_StatesPerPosition': [PyTango.DevLong, "Number of State reads done before doing a position read in the " "motion loop [default: %d]" % POOL.Default_MotionLoop_StatesPerPosition, POOL.Default_MotionLoop_StatesPerPosition], 'AcqLoop_SleepTime': [PyTango.DevLong, "Sleep time in the acquisition loop in mS [default: %dms]" % int(POOL.Default_AcqLoop_SleepTime * 1000), int(POOL.Default_AcqLoop_SleepTime * 1000)], 'AcqLoop_StatesPerValue': [PyTango.DevLong, "Number of State reads done before doing a value read in the " "acquisition loop [default: %d]" % POOL.Default_AcqLoop_StatesPerValue, POOL.Default_AcqLoop_StatesPerValue], 'RemoteLog': [PyTango.DevString, "Logging (python logging) host:port [default: None]", None], 'DriftCorrection': [PyTango.DevBoolean, "Globally apply drift correction on pseudo motors (can be " "overwritten at PseudoMotor level [default: %d]." % POOL.Default_DriftCorrection, POOL.Default_DriftCorrection], 'InstrumentList': [PyTango.DevVarStringArray, "List of instruments (internal property)", []], 'LogstashHost': [PyTango.DevString, "Hostname where Logstash runs. " "This property has been included in Sardana on a provisional " "basis. Backwards incompatible changes (up to and including " "its removal) may occur if deemed necessary by the " "core developers.", None], 'LogstashPort': [PyTango.DevLong, "Port on which Logstash will listen on events [default: 12345]. " "This property has been included in Sardana on a provisional " "basis. Backwards incompatible changes (up to and including " "its removal) may occur if deemed necessary by the " "core developers.", 12345], 'LogstashCacheDbPath': [PyTango.DevString, "Path to the Logstash cache database [default: None]. " "It is advised not to use the database cache, as it may " "have negative effects on logging performance. See #895. " "This property has been included in Sardana on a provisional " "basis. Backwards incompatible changes (up to and including " "its removal) may occur if deemed necessary by the " "core developers.", None] } # Command definitions cmd_list = { 'CreateController': [[PyTango.DevVarStringArray, CREATE_CONTROLLER_PAR_IN_DOC], [PyTango.DevVoid, CREATE_CONTROLLER_PAR_OUT_DOC]], 'CreateElement': [[PyTango.DevVarStringArray, CREATE_ELEMENT_PAR_IN_DOC], [PyTango.DevVoid, CREATE_ELEMENT_PAR_OUT_DOC]], 'CreateInstrument': [[PyTango.DevVarStringArray, CREATE_INSTRUMENT_PAR_IN_DOC], [PyTango.DevVoid, CREATE_INSTRUMENT_PAR_OUT_DOC]], 'CreateMotorGroup': [[PyTango.DevVarStringArray, CREATE_MOTOR_GROUP_PAR_IN_DOC], [PyTango.DevVoid, CREATE_MOTOR_GROUP_PAR_OUT_DOC]], 'CreateMeasurementGroup': [[PyTango.DevVarStringArray, CREATE_MEASUREMENT_GROUP_PAR_IN_DOC], [PyTango.DevVoid, CREATE_MEASUREMENT_GROUP_PAR_OUT_DOC]], 'DeleteElement': [[PyTango.DevString, DELETE_ELEMENT_PAR_IN_DOC], [PyTango.DevVoid, DELETE_ELEMENT_PAR_OUT_DOC]], 'GetControllerClassInfo': [[PyTango.DevString, GET_CONTROLLER_CLASS_INFO_PAR_IN_DOC], [PyTango.DevString, GET_CONTROLLER_CLASS_INFO_PAR_OUT_DOC]], 'ReloadControllerLib': [[PyTango.DevString, RELOAD_CONTROLLER_LIB_PAR_IN_DOC], [PyTango.DevVoid, RELOAD_CONTROLLER_LIB_PAR_OUT_DOC]], 'ReloadControllerClass': [[PyTango.DevString, RELOAD_CONTROLLER_CLASS_PAR_IN_DOC], [PyTango.DevVoid, RELOAD_CONTROLLER_CLASS_PAR_OUT_DOC]], 'RenameElement': [[PyTango.DevVarStringArray, RENAME_ELEMENT_PAR_IN_DOC], [PyTango.DevVoid, RENAME_ELEMENT_PAR_OUT_DOC]], 'GetControllerCode': [[PyTango.DevVarStringArray, "<Controller library name> [, <Controller class name>]"], [PyTango.DevVarStringArray, "result is a sequence of 3 strings:\n" "<full path and file name>, <code>, <line number>"]], 'SetControllerCode': [[PyTango.DevVarStringArray, "<Controller library name>, <code> [, <Auto reload>=True]\n" "- if controller library is a simple module name:\n" " - if it exists, it is overwritten, otherwise a new python " "file is created in the directory of the first element in " "the PoolPath property" "- if controller library is the full path name:\n" " - if path is not in the PoolPath, an exception is thrown" " - if file exists it is overwritten otherwise a new file " "is created"], [PyTango.DevVoid, ""]], 'Stop': [[PyTango.DevVoid, STOP_PAR_IN_DOC], [PyTango.DevVoid, STOP_PAR_OUT_DOC]], 'Abort': [[PyTango.DevVoid, ABORT_PAR_IN_DOC], [PyTango.DevVoid, ABORT_PAR_OUT_DOC]], 'SendToController': [[PyTango.DevVarStringArray, SEND_TO_CONTROLLER_PAR_IN_DOC], [PyTango.DevString, SEND_TO_CONTROLLER_PAR_OUT_DOC]], 'GetFile': [[PyTango.DevString, "name (may be module name, file name or full (with absolute path) file name"], [PyTango.DevVarStringArray, "[complete(with absolute path) file name, file contents]"]], 'PutFile': [[PyTango.DevVarStringArray, "[name (may be module name, file name or full (with absolute path) file name, file contents]"], [PyTango.DevVoid, ""]], } # Attribute definitions attr_list = { 'InstrumentList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "Instrument list", 'description': "the list of instruments (a JSON encoded dict)", }], 'ControllerList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "Controller list", 'description': "the list of controllers (a JSON encoded dict)", }], 'ExpChannelList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "Experiment channel list", 'description': "The list of experiment channels (a JSON encoded dict)", }], 'AcqChannelList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "Acquisition channel list", 'description': "The list of all acquisition channels (a JSON encoded dict)", }], 'MotorGroupList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "Motor group list", 'description': "the list of motor groups (a JSON encoded dict)", }], 'ControllerLibList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "Controller library list", 'description': "the list of controller libraries (a JSON encoded dict)", }], 'ControllerClassList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "Controller class list", 'description': "the list of controller classes (a JSON encoded dict)", }], 'MotorList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "Motor list", 'description': "the list of motors (a JSON encoded dict)", }], 'TriggerGateList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "TriggerGate list", 'description': "the list of trigger/gates (a JSON encoded dict)", }], 'MeasurementGroupList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "Measurement group list", 'description': "the list of measurement groups (a JSON encoded dict)", }], 'IORegisterList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "IORegister list", 'description': "the list of IORegisters (a JSON encoded dict)", }], 'ComChannelList': [[PyTango.DevString, PyTango.SPECTRUM, PyTango.READ, 4096], { 'label': "Communication channel list", 'description': "the list of communication channels (a JSON encoded dict)", }], 'Elements': [[PyTango.DevEncoded, PyTango.SCALAR, PyTango.READ], { 'label': "Elements", 'description': "the list of all elements (a JSON encoded dict)", }], } def __init__(self, name): PyTango.DeviceClass.__init__(self, name) self.set_type(name) def _get_class_properties(self): return dict(ProjectTitle="Sardana", Description="Device Pool management class", doc_url="http://sardana-controls.org/", InheritedFrom="Device_5Impl") def write_class_property(self): util = PyTango.Util.instance() db = util.get_database() if db is None: return db.put_class_property(self.get_name(), self._get_class_properties())