Source code for sardana.pool.poolutil

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""Pool utils"""

import threading
from typing import Any

from taurus.core.util.containers import CaselessDict
import PyTango


__all__ = ["PoolUtil"]

__docformat__ = 'restructuredtext'



[docs] class _PoolUtil(object): def __init__(self): self._ctrl_proxies = CaselessDict() self._lock = threading.Lock() def __call__(self, *args, **kwargs): return self
[docs] def get_device(self, *args: Any, **kwargs: Any) -> PyTango.DeviceProxy: """Factory method to create a single `tango.DeviceProxy` instance per controller instance. :param ctrl_name: Controller name to which assign the proxy object :param device_name: Tango device name :return: single device proxy object """ ctrl_name = args[0] device_name = args[1] with self._lock: ctrl_devs = self._ctrl_proxies.get(ctrl_name) if ctrl_devs is None: self._ctrl_proxies[ctrl_name] = ctrl_devs = CaselessDict() dev = ctrl_devs.get(device_name) if dev is None: ctrl_devs[device_name] = dev = PyTango.DeviceProxy(device_name) return dev
get_motor = get_phy_motor = get_pseudo_motor = get_motor_group = \ get_exp_channel = get_ct_channel = get_zerod_channel = \ get_oned_channel = get_twod_channel = get_pseudo_counter_channel = \ get_measurement_group = get_com_channel = get_ioregister = get_device
#: Singleton instance of the `~sardana.pool.poolutil._PoolUtil` class. #: #: It is a factory of `tango.DeviceProxy` objects and ensures only one #: instance of such objects is created for the whole process. #: Please refer to the `~sardana.pool.poolutil._PoolUtil` API on the available #: methods. PoolUtil = _PoolUtil()