# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import copy
import os
import select
import socket
import threading
import time
import uuid
import warnings
from ..extern.six.moves import queue, range
from ..extern.six.moves import xmlrpc_client as xmlrpc
from ..extern.six.moves.urllib.parse import urlunparse
from .. import log
from .constants import SAMP_STATUS_OK
from .constants import __profile_version__
from .errors import SAMPWarning, SAMPHubError, SAMPProxyError
from .utils import internet_on, ServerProxyPool, _HubAsClient
from .lockfile_helpers import read_lockfile, create_lock_file
from .standard_profile import ThreadingXMLRPCServer
from .web_profile import WebProfileXMLRPCServer, web_profile_text_dialog
__all__ = ['SAMPHubServer', 'WebProfileDialog']
__doctest_skip__ = ['.', 'SAMPHubServer.*']
[docs]class SAMPHubServer(object):
"""
SAMP Hub Server.
Parameters
----------
secret : str, optional
The secret code to use for the SAMP lockfile. If none is is specified,
the :func:`uuid.uuid1` function is used to generate one.
addr : str, optional
Listening address (or IP). This defaults to 127.0.0.1 if the internet
is not reachable, otherwise it defaults to the host name.
port : int, optional
Listening XML-RPC server socket port. If left set to 0 (the default),
the operating system will select a free port.
lockfile : str, optional
Custom lockfile name.
timeout : int, optional
Hub inactivity timeout. If ``timeout > 0`` then the Hub automatically
stops after an inactivity period longer than ``timeout`` seconds. By
default ``timeout`` is set to 0 (Hub never expires).
client_timeout : int, optional
Client inactivity timeout. If ``client_timeout > 0`` then the Hub
automatically unregisters the clients which result inactive for a
period longer than ``client_timeout`` seconds. By default
``client_timeout`` is set to 0 (clients never expire).
mode : str, optional
Defines the Hub running mode. If ``mode`` is ``'single'`` then the Hub
runs using the standard ``.samp`` lock-file, having a single instance
for user desktop session. Otherwise, if ``mode`` is ``'multiple'``,
then the Hub runs using a non-standard lock-file, placed in
``.samp-1`` directory, of the form ``samp-hub-<UUID>``, where
``<UUID>`` is a unique UUID assigned to the hub.
label : str, optional
A string used to label the Hub with a human readable name. This string
is written in the lock-file assigned to the ``hub.label`` token.
web_profile : bool, optional
Enables or disables the Web Profile support.
web_profile_dialog : class, optional
Allows a class instance to be specified using ``web_profile_dialog``
to replace the terminal-based message with e.g. a GUI pop-up. Two
`queue.Queue` instances will be added to the instance as attributes
``queue_request`` and ``queue_result``. When a request is received via
the ``queue_request`` queue, the pop-up should be displayed, and a
value of `True` or `False` should be added to ``queue_result``
depending on whether the user accepted or refused the connection.
web_port : int, optional
The port to use for web SAMP. This should not be changed except for
testing purposes, since web SAMP should always use port 21012.
pool_size : int, optional
The number of socket connections opened to communicate with the
clients.
"""
def __init__(self, secret=None, addr=None, port=0, lockfile=None,
timeout=0, client_timeout=0, mode='single', label="",
web_profile=True, web_profile_dialog=None, web_port=21012,
pool_size=20):
# Generate random ID for the hub
self._id = str(uuid.uuid1())
# General settings
self._is_running = False
self._customlockfilename = lockfile
self._lockfile = None
self._addr = addr
self._port = port
self._mode = mode
self._label = label
self._timeout = timeout
self._client_timeout = client_timeout
self._pool_size = pool_size
# Web profile specific attributes
self._web_profile = web_profile
self._web_profile_dialog = web_profile_dialog
self._web_port = web_port
self._web_profile_server = None
self._web_profile_callbacks = {}
self._web_profile_requests_queue = None
self._web_profile_requests_result = None
self._web_profile_requests_semaphore = None
self._host_name = "127.0.0.1"
if internet_on():
try:
self._host_name = socket.getfqdn()
socket.getaddrinfo(self._addr or self._host_name,
self._port or 0)
except socket.error:
self._host_name = "127.0.0.1"
# Threading stuff
self._thread_lock = threading.Lock()
self._thread_run = None
self._thread_hub_timeout = None
self._thread_client_timeout = None
self._launched_threads = []
# Variables for timeout testing:
self._last_activity_time = None
self._client_activity_time = {}
# Hub message id counter, used to create hub msg ids
self._hub_msg_id_counter = 0
# Hub secret code
self._hub_secret_code_customized = secret
self._hub_secret = self._create_secret_code()
# Hub public id (as SAMP client)
self._hub_public_id = ""
# Client ids
# {private_key: (public_id, timestamp)}
self._private_keys = {}
# Metadata per client
# {private_key: metadata}
self._metadata = {}
# List of subscribed clients per MType
# {mtype: private_key list}
self._mtype2ids = {}
# List of subscribed MTypes per client
# {private_key: mtype list}
self._id2mtypes = {}
# List of XML-RPC addresses per client
# {public_id: (XML-RPC address, ServerProxyPool instance)}
self._xmlrpc_endpoints = {}
# Synchronous message id heap
self._sync_msg_ids_heap = {}
# Public ids counter
self._client_id_counter = -1
@property
def id(self):
"""
The unique hub ID.
"""
return self._id
def _register_standard_api(self, server):
# Standard Profile only operations
server.register_function(self._ping, 'samp.hub.ping')
server.register_function(self._set_xmlrpc_callback, 'samp.hub.setXmlrpcCallback')
# Standard API operations
server.register_function(self._register, 'samp.hub.register')
server.register_function(self._unregister, 'samp.hub.unregister')
server.register_function(self._declare_metadata, 'samp.hub.declareMetadata')
server.register_function(self._get_metadata, 'samp.hub.getMetadata')
server.register_function(self._declare_subscriptions, 'samp.hub.declareSubscriptions')
server.register_function(self._get_subscriptions, 'samp.hub.getSubscriptions')
server.register_function(self._get_registered_clients, 'samp.hub.getRegisteredClients')
server.register_function(self._get_subscribed_clients, 'samp.hub.getSubscribedClients')
server.register_function(self._notify, 'samp.hub.notify')
server.register_function(self._notify_all, 'samp.hub.notifyAll')
server.register_function(self._call, 'samp.hub.call')
server.register_function(self._call_all, 'samp.hub.callAll')
server.register_function(self._call_and_wait, 'samp.hub.callAndWait')
server.register_function(self._reply, 'samp.hub.reply')
def _register_web_profile_api(self, server):
# Web Profile methods like Standard Profile
server.register_function(self._ping, 'samp.webhub.ping')
server.register_function(self._unregister, 'samp.webhub.unregister')
server.register_function(self._declare_metadata, 'samp.webhub.declareMetadata')
server.register_function(self._get_metadata, 'samp.webhub.getMetadata')
server.register_function(self._declare_subscriptions, 'samp.webhub.declareSubscriptions')
server.register_function(self._get_subscriptions, 'samp.webhub.getSubscriptions')
server.register_function(self._get_registered_clients, 'samp.webhub.getRegisteredClients')
server.register_function(self._get_subscribed_clients, 'samp.webhub.getSubscribedClients')
server.register_function(self._notify, 'samp.webhub.notify')
server.register_function(self._notify_all, 'samp.webhub.notifyAll')
server.register_function(self._call, 'samp.webhub.call')
server.register_function(self._call_all, 'samp.webhub.callAll')
server.register_function(self._call_and_wait, 'samp.webhub.callAndWait')
server.register_function(self._reply, 'samp.webhub.reply')
# Methods particularly for Web Profile
server.register_function(self._web_profile_register, 'samp.webhub.register')
server.register_function(self._web_profile_allowReverseCallbacks, 'samp.webhub.allowReverseCallbacks')
server.register_function(self._web_profile_pullCallbacks, 'samp.webhub.pullCallbacks')
def _start_standard_server(self):
self._server = ThreadingXMLRPCServer(
(self._addr or self._host_name, self._port or 0),
log, logRequests=False, allow_none=True)
prot = 'http'
self._port = self._server.socket.getsockname()[1]
addr = "{0}:{1}".format(self._addr or self._host_name, self._port)
self._url = urlunparse((prot, addr, '', '', '', ''))
self._server.register_introspection_functions()
self._register_standard_api(self._server)
def _start_web_profile_server(self):
self._web_profile_requests_queue = queue.Queue(1)
self._web_profile_requests_result = queue.Queue(1)
self._web_profile_requests_semaphore = queue.Queue(1)
if self._web_profile_dialog is not None:
# TODO: Some sort of duck-typing on the web_profile_dialog object
self._web_profile_dialog.queue_request = \
self._web_profile_requests_queue
self._web_profile_dialog.queue_result = \
self._web_profile_requests_result
try:
self._web_profile_server = WebProfileXMLRPCServer(
('localhost', self._web_port), log, logRequests=False,
allow_none=True)
self._web_port = self._web_profile_server.socket.getsockname()[1]
self._web_profile_server.register_introspection_functions()
self._register_web_profile_api(self._web_profile_server)
log.info("Hub set to run with Web Profile support enabled.")
except socket.error:
log.warning("Port {0} already in use. Impossible to run the "
"Hub with Web Profile support.".format(self._web_port),
SAMPWarning)
self._web_profile = False
# Cleanup
self._web_profile_requests_queue = None
self._web_profile_requests_result = None
self._web_profile_requests_semaphore = None
def _launch_thread(self, group=None, target=None, name=None, args=None):
# Remove inactive threads
remove = []
for t in self._launched_threads:
if not t.is_alive():
remove.append(t)
for t in remove:
self._launched_threads.remove(t)
# Start new thread
t = threading.Thread(group=group, target=target, name=name, args=args)
t.start()
# Add to list of launched threads
self._launched_threads.append(t)
def _join_launched_threads(self, timeout=None):
for t in self._launched_threads:
t.join(timeout=timeout)
def _timeout_test_hub(self):
if self._timeout == 0:
return
last = time.time()
while self._is_running:
time.sleep(0.05) # keep this small to check _is_running often
now = time.time()
if now - last > 1.:
with self._thread_lock:
if self._last_activity_time is not None:
if now - self._last_activity_time >= self._timeout:
warnings.warn("Timeout expired, Hub is shutting down!",
SAMPWarning)
self.stop()
return
last = now
def _timeout_test_client(self):
if self._client_timeout == 0:
return
last = time.time()
while self._is_running:
time.sleep(0.05) # keep this small to check _is_running often
now = time.time()
if now - last > 1.:
for private_key in self._client_activity_time.keys():
if (now - self._client_activity_time[private_key] > self._client_timeout
and private_key != self._hub_private_key):
warnings.warn(
"Client {} timeout expired!".format(private_key),
SAMPWarning)
self._notify_disconnection(private_key)
self._unregister(private_key)
last = now
def _hub_as_client_request_handler(self, method, args):
if method == 'samp.client.receiveCall':
return self._receive_call(*args)
elif method == 'samp.client.receiveNotification':
return self._receive_notification(*args)
elif method == 'samp.client.receiveResponse':
return self._receive_response(*args)
elif method == 'samp.app.ping':
return self._ping(*args)
def _setup_hub_as_client(self):
hub_metadata = {"samp.name": "Astropy SAMP Hub",
"samp.description.text": self._label,
"author.name": "The Astropy Collaboration",
"samp.documentation.url": "http://docs.astropy.org/en/stable/vo/samp",
"samp.icon.url": self._url + "/samp/icon"}
result = self._register(self._hub_secret)
self._hub_public_id = result["samp.self-id"]
self._hub_private_key = result["samp.private-key"]
self._set_xmlrpc_callback(self._hub_private_key, self._url)
self._declare_metadata(self._hub_private_key, hub_metadata)
self._declare_subscriptions(self._hub_private_key,
{"samp.app.ping": {},
"x-samp.query.by-meta": {}})
[docs] def start(self, wait=False):
"""
Start the current SAMP Hub instance and create the lock file. Hub
start-up can be blocking or non blocking depending on the ``wait``
parameter.
Parameters
----------
wait : bool
If `True` then the Hub process is joined with the caller, blocking
the code flow. Usually `True` option is used to run a stand-alone
Hub in an executable script. If `False` (default), then the Hub
process runs in a separated thread. `False` is usually used in a
Python shell.
"""
if self._is_running:
raise SAMPHubError("Hub is already running")
if self._lockfile is not None:
raise SAMPHubError("Hub is not running but lockfile is set")
if self._web_profile:
self._start_web_profile_server()
self._start_standard_server()
self._lockfile = create_lock_file(lockfilename=self._customlockfilename,
mode=self._mode, hub_id=self.id,
hub_params=self.params)
self._update_last_activity_time()
self._setup_hub_as_client()
self._start_threads()
log.info("Hub started")
if wait and self._is_running:
self._thread_run.join()
self._thread_run = None
@property
def params(self):
"""
The hub parameters (which are written to the logfile)
"""
params = {}
# Keys required by standard profile
params['samp.secret'] = self._hub_secret
params['samp.hub.xmlrpc.url'] = self._url
params['samp.profile.version'] = __profile_version__
# Custom keys
params['hub.id'] = self.id
params['hub.label'] = self._label or "Hub {0}".format(self.id)
return params
def _start_threads(self):
self._thread_run = threading.Thread(target=self._serve_forever)
self._thread_run.daemon = True
if self._timeout > 0:
self._thread_hub_timeout = threading.Thread(
target=self._timeout_test_hub,
name="Hub timeout test")
self._thread_hub_timeout.daemon = True
else:
self._thread_hub_timeout = None
if self._client_timeout > 0:
self._thread_client_timeout = threading.Thread(
target=self._timeout_test_client,
name="Client timeout test")
self._thread_client_timeout.daemon = True
else:
self._thread_client_timeout = None
self._is_running = True
self._thread_run.start()
if self._thread_hub_timeout is not None:
self._thread_hub_timeout.start()
if self._thread_client_timeout is not None:
self._thread_client_timeout.start()
def _create_secret_code(self):
if self._hub_secret_code_customized is not None:
return self._hub_secret_code_customized
else:
return str(uuid.uuid1())
[docs] def stop(self):
"""
Stop the current SAMP Hub instance and delete the lock file.
"""
if not self._is_running:
return
log.info("Hub is stopping...")
self._notify_shutdown()
self._is_running = False
if self._lockfile and os.path.isfile(self._lockfile):
lockfiledict = read_lockfile(self._lockfile)
if lockfiledict['samp.secret'] == self._hub_secret:
os.remove(self._lockfile)
self._lockfile = None
# Reset variables
# TODO: What happens if not all threads are stopped after timeout?
self._join_all_threads(timeout=10.)
self._hub_msg_id_counter = 0
self._hub_secret = self._create_secret_code()
self._hub_public_id = ""
self._metadata = {}
self._private_keys = {}
self._mtype2ids = {}
self._id2mtypes = {}
self._xmlrpc_endpoints = {}
self._last_activity_time = None
log.info("Hub stopped.")
def _join_all_threads(self, timeout=None):
# In some cases, ``stop`` may be called from some of the sub-threads,
# so we just need to make sure that we don't try and shut down the
# calling thread.
current_thread = threading.current_thread()
if self._thread_run is not current_thread:
self._thread_run.join(timeout=timeout)
if not self._thread_run.is_alive():
self._thread_run = None
if self._thread_hub_timeout is not None and self._thread_hub_timeout is not current_thread:
self._thread_hub_timeout.join(timeout=timeout)
if not self._thread_hub_timeout.is_alive():
self._thread_hub_timeout = None
if self._thread_client_timeout is not None and self._thread_client_timeout is not current_thread:
self._thread_client_timeout.join(timeout=timeout)
if not self._thread_client_timeout.is_alive():
self._thread_client_timeout = None
self._join_launched_threads(timeout=timeout)
@property
def is_running(self):
"""Return an information concerning the Hub running status.
Returns
-------
running : bool
Is the hub running?
"""
return self._is_running
def _serve_forever(self):
while self._is_running:
try:
read_ready = select.select([self._server.socket], [], [], 0.01)[0]
except select.error as exc:
warnings.warn("Call to select() in SAMPHubServer failed: {0}".format(exc),
SAMPWarning)
else:
if read_ready:
self._server.handle_request()
if self._web_profile:
# We now check if there are any connection requests from the
# web profile, and if so, we initialize the pop-up.
if self._web_profile_dialog is None:
try:
request = self._web_profile_requests_queue.get_nowait()
except queue.Empty:
pass
else:
web_profile_text_dialog(request, self._web_profile_requests_result)
# We now check for requests over the web profile socket, and we
# also update the pop-up in case there are any changes.
try:
read_ready = select.select([self._web_profile_server.socket], [], [], 0.01)[0]
except select.error as exc:
warnings.warn("Call to select() in SAMPHubServer failed: {0}".format(exc),
SAMPWarning)
else:
if read_ready:
self._web_profile_server.handle_request()
self._server.server_close()
if self._web_profile_server is not None:
self._web_profile_server.server_close()
def _notify_shutdown(self):
msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.event.shutdown")
for mtype in msubs:
if mtype in self._mtype2ids:
for key in self._mtype2ids[mtype]:
self._notify_(self._hub_private_key,
self._private_keys[key][0],
{"samp.mtype": "samp.hub.event.shutdown",
"samp.params": {}})
def _notify_register(self, private_key):
msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.event.register")
for mtype in msubs:
if mtype in self._mtype2ids:
public_id = self._private_keys[private_key][0]
for key in self._mtype2ids[mtype]:
# if key != private_key:
self._notify(self._hub_private_key,
self._private_keys[key][0],
{"samp.mtype": "samp.hub.event.register",
"samp.params": {"id": public_id}})
def _notify_unregister(self, private_key):
msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.event.unregister")
for mtype in msubs:
if mtype in self._mtype2ids:
public_id = self._private_keys[private_key][0]
for key in self._mtype2ids[mtype]:
if key != private_key:
self._notify(self._hub_private_key,
self._private_keys[key][0],
{"samp.mtype": "samp.hub.event.unregister",
"samp.params": {"id": public_id}})
def _notify_metadata(self, private_key):
msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.event.metadata")
for mtype in msubs:
if mtype in self._mtype2ids:
public_id = self._private_keys[private_key][0]
for key in self._mtype2ids[mtype]:
# if key != private_key:
self._notify(self._hub_private_key,
self._private_keys[key][0],
{"samp.mtype": "samp.hub.event.metadata",
"samp.params": {"id": public_id,
"metadata": self._metadata[private_key]}
})
def _notify_subscriptions(self, private_key):
msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.event.subscriptions")
for mtype in msubs:
if mtype in self._mtype2ids:
public_id = self._private_keys[private_key][0]
for key in self._mtype2ids[mtype]:
self._notify(self._hub_private_key,
self._private_keys[key][0],
{"samp.mtype": "samp.hub.event.subscriptions",
"samp.params": {"id": public_id,
"subscriptions": self._id2mtypes[private_key]}
})
def _notify_disconnection(self, private_key):
def _xmlrpc_call_disconnect(endpoint, private_key, hub_public_id, message):
endpoint.samp.client.receiveNotification(private_key, hub_public_id, message)
msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.disconnect")
public_id = self._private_keys[private_key][0]
endpoint = self._xmlrpc_endpoints[public_id][1]
for mtype in msubs:
if mtype in self._mtype2ids and private_key in self._mtype2ids[mtype]:
log.debug("notify disconnection to {}".format(public_id))
self._launch_thread(target=_xmlrpc_call_disconnect,
args=(endpoint, private_key,
self._hub_public_id,
{"samp.mtype": "samp.hub.disconnect",
"samp.params": {"reason": "Timeout expired!"}}))
def _ping(self):
self._update_last_activity_time()
log.debug("ping")
return "1"
def _query_by_metadata(self, key, value):
public_id_list = []
for private_id in self._metadata:
if key in self._metadata[private_id]:
if self._metadata[private_id][key] == value:
public_id_list.append(self._private_keys[private_id][0])
return public_id_list
def _set_xmlrpc_callback(self, private_key, xmlrpc_addr):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
if private_key == self._hub_private_key:
public_id = self._private_keys[private_key][0]
self._xmlrpc_endpoints[public_id] = \
(xmlrpc_addr, _HubAsClient(self._hub_as_client_request_handler))
return ""
# Dictionary stored with the public id
log.debug("set_xmlrpc_callback: {} {}".format(private_key,
xmlrpc_addr))
server_proxy_pool = None
server_proxy_pool = ServerProxyPool(self._pool_size,
xmlrpc.ServerProxy,
xmlrpc_addr, allow_none=1)
public_id = self._private_keys[private_key][0]
self._xmlrpc_endpoints[public_id] = (xmlrpc_addr,
server_proxy_pool)
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
return ""
def _perform_standard_register(self):
with self._thread_lock:
private_key, public_id = self._get_new_ids()
self._private_keys[private_key] = (public_id, time.time())
self._update_last_activity_time(private_key)
self._notify_register(private_key)
log.debug("register: private-key = {} and self-id = {}"
.format(private_key, public_id))
return {"samp.self-id": public_id,
"samp.private-key": private_key,
"samp.hub-id": self._hub_public_id}
def _register(self, secret):
self._update_last_activity_time()
if secret == self._hub_secret:
return self._perform_standard_register()
else:
# return {"samp.self-id": "", "samp.private-key": "", "samp.hub-id": ""}
raise SAMPProxyError(7, "Bad secret code")
def _get_new_ids(self):
private_key = str(uuid.uuid1())
self._client_id_counter += 1
public_id = 'cli#hub'
if self._client_id_counter > 0:
public_id = "cli#{}".format(self._client_id_counter)
return private_key, public_id
def _unregister(self, private_key):
self._update_last_activity_time()
public_key = ""
self._notify_unregister(private_key)
with self._thread_lock:
if private_key in self._private_keys:
public_key = self._private_keys[private_key][0]
del self._private_keys[private_key]
else:
return ""
if private_key in self._metadata:
del self._metadata[private_key]
if private_key in self._id2mtypes:
del self._id2mtypes[private_key]
for mtype in self._mtype2ids.keys():
if private_key in self._mtype2ids[mtype]:
self._mtype2ids[mtype].remove(private_key)
if public_key in self._xmlrpc_endpoints:
del self._xmlrpc_endpoints[public_key]
if private_key in self._client_activity_time:
del self._client_activity_time[private_key]
if self._web_profile:
if private_key in self._web_profile_callbacks:
del self._web_profile_callbacks[private_key]
self._web_profile_server.remove_client(private_key)
log.debug("unregister {} ({})".format(public_key, private_key))
return ""
def _declare_metadata(self, private_key, metadata):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
log.debug("declare_metadata: private-key = {} metadata = {}"
.format(private_key, str(metadata)))
self._metadata[private_key] = metadata
self._notify_metadata(private_key)
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
return ""
def _get_metadata(self, private_key, client_id):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
client_private_key = self._public_id_to_private_key(client_id)
log.debug("get_metadata: private-key = {} client-id = {}"
.format(private_key, client_id))
if client_private_key is not None:
if client_private_key in self._metadata:
log.debug("--> metadata = {}"
.format(self._metadata[client_private_key]))
return self._metadata[client_private_key]
else:
return {}
else:
raise SAMPProxyError(6, "Invalid client ID")
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
def _declare_subscriptions(self, private_key, mtypes):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
log.debug("declare_subscriptions: private-key = {} mtypes = {}"
.format(private_key, str(mtypes)))
# remove subscription to previous mtypes
if private_key in self._id2mtypes:
prev_mtypes = self._id2mtypes[private_key]
for mtype in prev_mtypes:
try:
self._mtype2ids[mtype].remove(private_key)
except ValueError: # private_key is not in list
pass
self._id2mtypes[private_key] = copy.deepcopy(mtypes)
# remove duplicated MType for wildcard overwriting
original_mtypes = copy.deepcopy(mtypes)
for mtype in original_mtypes:
if mtype.endswith("*"):
for mtype2 in original_mtypes:
if mtype2.startswith(mtype[:-1]) and \
mtype2 != mtype:
if mtype2 in mtypes:
del(mtypes[mtype2])
log.debug("declare_subscriptions: subscriptions accepted from "
"{} => {}".format(private_key, str(mtypes)))
for mtype in mtypes:
if mtype in self._mtype2ids:
if private_key not in self._mtype2ids[mtype]:
self._mtype2ids[mtype].append(private_key)
else:
self._mtype2ids[mtype] = [private_key]
self._notify_subscriptions(private_key)
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
return ""
def _get_subscriptions(self, private_key, client_id):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
client_private_key = self._public_id_to_private_key(client_id)
if client_private_key is not None:
if client_private_key in self._id2mtypes:
log.debug("get_subscriptions: client-id = {} mtypes = {}"
.format(client_id,
str(self._id2mtypes[client_private_key])))
return self._id2mtypes[client_private_key]
else:
log.debug("get_subscriptions: client-id = {} mtypes = "
"missing".format(client_id))
return {}
else:
raise SAMPProxyError(6, "Invalid client ID")
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
def _get_registered_clients(self, private_key):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
reg_clients = []
for pkey in self._private_keys.keys():
if pkey != private_key:
reg_clients.append(self._private_keys[pkey][0])
log.debug("get_registered_clients: private_key = {} clients = {}"
.format(private_key, reg_clients))
return reg_clients
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
def _get_subscribed_clients(self, private_key, mtype):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
sub_clients = {}
for pkey in self._private_keys.keys():
if pkey != private_key and self._is_subscribed(pkey, mtype):
sub_clients[self._private_keys[pkey][0]] = {}
log.debug("get_subscribed_clients: private_key = {} mtype = {} "
"clients = {}".format(private_key, mtype, sub_clients))
return sub_clients
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
@staticmethod
[docs] def get_mtype_subtypes(mtype):
"""
Return a list containing all the possible wildcarded subtypes of MType.
Parameters
----------
mtype : str
MType to be parsed.
Returns
-------
types : list
List of subtypes
Examples
--------
>>> from astropy.samp import SAMPHubServer
>>> SAMPHubServer.get_mtype_subtypes("samp.app.ping")
['samp.app.ping', 'samp.app.*', 'samp.*', '*']
"""
subtypes = []
msubs = mtype.split(".")
indexes = list(range(len(msubs)))
indexes.reverse()
indexes.append(-1)
for i in indexes:
tmp_mtype = ".".join(msubs[:i + 1])
if tmp_mtype != mtype:
if tmp_mtype != "":
tmp_mtype = tmp_mtype + ".*"
else:
tmp_mtype = "*"
subtypes.append(tmp_mtype)
return subtypes
def _is_subscribed(self, private_key, mtype):
subscribed = False
msubs = SAMPHubServer.get_mtype_subtypes(mtype)
for msub in msubs:
if msub in self._mtype2ids:
if private_key in self._mtype2ids[msub]:
subscribed = True
return subscribed
def _notify(self, private_key, recipient_id, message):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
if self._is_subscribed(self._public_id_to_private_key(recipient_id),
message["samp.mtype"]) is False:
raise SAMPProxyError(2, "Client {} not subscribed to MType {}"
.format(recipient_id, message["samp.mtype"]))
self._launch_thread(target=self._notify_, args=(private_key,
recipient_id,
message))
return {}
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
def _notify_(self, sender_private_key, recipient_public_id, message):
if sender_private_key not in self._private_keys:
return
sender_public_id = self._private_keys[sender_private_key][0]
try:
log.debug("notify {} from {} to {}".format(
message["samp.mtype"], sender_public_id,
recipient_public_id))
recipient_private_key = self._public_id_to_private_key(recipient_public_id)
arg_params = (sender_public_id, message)
samp_method_name = "receiveNotification"
self._retry_method(recipient_private_key, recipient_public_id, samp_method_name, arg_params)
except Exception as exc:
warnings.warn("{} notification from client {} to client {} "
"failed [{}]".format(message["samp.mtype"],
sender_public_id,
recipient_public_id, exc),
SAMPWarning)
def _notify_all(self, private_key, message):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
if "samp.mtype" not in message:
raise SAMPProxyError(3, "samp.mtype keyword is missing")
recipient_ids = self._notify_all_(private_key, message)
return recipient_ids
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
def _notify_all_(self, sender_private_key, message):
recipient_ids = []
msubs = SAMPHubServer.get_mtype_subtypes(message["samp.mtype"])
for mtype in msubs:
if mtype in self._mtype2ids:
for key in self._mtype2ids[mtype]:
if key != sender_private_key:
_recipient_id = self._private_keys[key][0]
recipient_ids.append(_recipient_id)
self._launch_thread(target=self._notify,
args=(sender_private_key,
_recipient_id, message)
)
return recipient_ids
def _call(self, private_key, recipient_id, msg_tag, message):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
if self._is_subscribed(self._public_id_to_private_key(recipient_id),
message["samp.mtype"]) is False:
raise SAMPProxyError(2, "Client {} not subscribed to MType {}"
.format(recipient_id, message["samp.mtype"]))
public_id = self._private_keys[private_key][0]
msg_id = self._get_new_hub_msg_id(public_id, msg_tag)
self._launch_thread(target=self._call_, args=(private_key, public_id,
recipient_id, msg_id,
message))
return msg_id
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
def _call_(self, sender_private_key, sender_public_id,
recipient_public_id, msg_id, message):
if sender_private_key not in self._private_keys:
return
try:
log.debug("call {} from {} to {} ({})".format(
msg_id.split(";;")[0], sender_public_id,
recipient_public_id, message["samp.mtype"]))
recipient_private_key = self._public_id_to_private_key(recipient_public_id)
arg_params = (sender_public_id, msg_id, message)
samp_methodName = "receiveCall"
self._retry_method(recipient_private_key, recipient_public_id, samp_methodName, arg_params)
except Exception as exc:
warnings.warn("{} call {} from client {} to client {} failed "
"[{},{}]".format(message["samp.mtype"],
msg_id.split(";;")[0],
sender_public_id,
recipient_public_id, type(exc), exc),
SAMPWarning)
def _call_all(self, private_key, msg_tag, message):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
if "samp.mtype" not in message:
raise SAMPProxyError(3, "samp.mtype keyword is missing in "
"message tagged as {}".format(msg_tag))
public_id = self._private_keys[private_key][0]
msg_id = self._call_all_(private_key, public_id, msg_tag, message)
return msg_id
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
def _call_all_(self, sender_private_key, sender_public_id, msg_tag,
message):
msg_id = {}
msubs = SAMPHubServer.get_mtype_subtypes(message["samp.mtype"])
for mtype in msubs:
if mtype in self._mtype2ids:
for key in self._mtype2ids[mtype]:
if key != sender_private_key:
_msg_id = self._get_new_hub_msg_id(sender_public_id,
msg_tag)
receiver_public_id = self._private_keys[key][0]
msg_id[receiver_public_id] = _msg_id
self._launch_thread(target=self._call_,
args=(sender_private_key,
sender_public_id,
receiver_public_id, _msg_id,
message))
return msg_id
def _call_and_wait(self, private_key, recipient_id, message, timeout):
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
timeout = int(timeout)
now = time.time()
response = {}
msg_id = self._call(private_key, recipient_id, "samp::sync::call",
message)
self._sync_msg_ids_heap[msg_id] = None
while self._is_running:
if timeout > 0 and time.time() - now >= timeout:
del(self._sync_msg_ids_heap[msg_id])
raise SAMPProxyError(1, "Timeout expired!")
if self._sync_msg_ids_heap[msg_id] is not None:
response = copy.deepcopy(self._sync_msg_ids_heap[msg_id])
del(self._sync_msg_ids_heap[msg_id])
break
time.sleep(0.01)
return response
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
def _reply(self, private_key, msg_id, response):
"""
The main method that gets called for replying. This starts up an
asynchronous reply thread and returns.
"""
self._update_last_activity_time(private_key)
if private_key in self._private_keys:
self._launch_thread(target=self._reply_, args=(private_key, msg_id,
response))
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
return {}
def _reply_(self, responder_private_key, msg_id, response):
if responder_private_key not in self._private_keys or not msg_id:
return
responder_public_id = self._private_keys[responder_private_key][0]
counter, hub_public_id, recipient_public_id, recipient_msg_tag = msg_id.split(";;", 3)
try:
log.debug("reply {} from {} to {}".format(
counter, responder_public_id, recipient_public_id))
if recipient_msg_tag == "samp::sync::call":
if msg_id in self._sync_msg_ids_heap.keys():
self._sync_msg_ids_heap[msg_id] = response
else:
recipient_private_key = self._public_id_to_private_key(recipient_public_id)
arg_params = (responder_public_id, recipient_msg_tag, response)
samp_method_name = "receiveResponse"
self._retry_method(recipient_private_key, recipient_public_id, samp_method_name, arg_params)
except Exception as exc:
warnings.warn("{} reply from client {} to client {} failed [{}]"
.format(recipient_msg_tag, responder_public_id,
recipient_public_id, exc),
SAMPWarning)
def _retry_method(self, recipient_private_key, recipient_public_id, samp_method_name, arg_params):
"""
This method is used to retry a SAMP call several times.
Parameters
----------
recipient_private_key
The private key of the receiver of the call
recipient_public_key
The public key of the receiver of the call
samp_method_name : str
The name of the SAMP method to call
arg_params : tuple
Any additional arguments to be passed to the SAMP method
"""
if recipient_private_key is None:
raise SAMPHubError("Invalid client ID")
from . import conf
for attempt in range(conf.n_retries):
if not self._is_running:
time.sleep(0.01)
continue
try:
if (self._web_profile and
recipient_private_key in self._web_profile_callbacks):
# Web Profile
callback = {"samp.methodName": samp_method_name,
"samp.params": arg_params}
self._web_profile_callbacks[recipient_private_key].put(callback)
else:
# Standard Profile
hub = self._xmlrpc_endpoints[recipient_public_id][1]
getattr(hub.samp.client, samp_method_name)(recipient_private_key, *arg_params)
except xmlrpc.Fault as exc:
log.debug("{} XML-RPC endpoint error (attempt {}): {}"
.format(recipient_public_id, attempt + 1,
exc.faultString))
time.sleep(0.01)
else:
return
# If we are here, then the above attempts failed
error_message = samp_method_name + " failed after " + conf.n_retries + " attempts"
raise SAMPHubError(error_message)
def _public_id_to_private_key(self, public_id):
for private_key in self._private_keys.keys():
if self._private_keys[private_key][0] == public_id:
return private_key
return None
def _get_new_hub_msg_id(self, sender_public_id, sender_msg_id):
with self._thread_lock:
self._hub_msg_id_counter += 1
return "msg#{};;{};;{};;{}".format(self._hub_msg_id_counter,
self._hub_public_id,
sender_public_id, sender_msg_id)
def _update_last_activity_time(self, private_key=None):
with self._thread_lock:
self._last_activity_time = time.time()
if private_key is not None:
self._client_activity_time[private_key] = time.time()
def _receive_notification(self, private_key, sender_id, message):
return ""
def _receive_call(self, private_key, sender_id, msg_id, message):
if private_key == self._hub_private_key:
if "samp.mtype" in message and message["samp.mtype"] == "samp.app.ping":
self._reply(self._hub_private_key, msg_id,
{"samp.status": SAMP_STATUS_OK, "samp.result": {}})
elif ("samp.mtype" in message and
(message["samp.mtype"] == "x-samp.query.by-meta" or
message["samp.mtype"] == "samp.query.by-meta")):
ids_list = self._query_by_metadata(message["samp.params"]["key"],
message["samp.params"]["value"])
self._reply(self._hub_private_key, msg_id,
{"samp.status": SAMP_STATUS_OK,
"samp.result": {"ids": ids_list}})
return ""
else:
return ""
def _receive_response(self, private_key, responder_id, msg_tag, response):
return ""
def _web_profile_register(self, identity_info,
client_address=("unknown", 0),
origin="unknown"):
self._update_last_activity_time()
if not client_address[0] in ["localhost", "127.0.0.1"]:
raise SAMPProxyError(403, "Request of registration rejected "
"by the Hub.")
if not origin:
origin = "unknown"
if isinstance(identity_info, dict):
# an old version of the protocol provided just a string with the app name
if "samp.name" not in identity_info:
raise SAMPProxyError(403, "Request of registration rejected "
"by the Hub (application name not "
"provided).")
# Red semaphore for the other threads
self._web_profile_requests_semaphore.put("wait")
# Set the request to be displayed for the current thread
self._web_profile_requests_queue.put((identity_info, client_address,
origin))
# Get the popup dialogue response
response = self._web_profile_requests_result.get()
# OK, semaphore green
self._web_profile_requests_semaphore.get()
if response:
register_map = self._perform_standard_register()
translator_url = ("http://localhost:{}/translator/{}?ref="
.format(self._web_port, register_map["samp.private-key"]))
register_map["samp.url-translator"] = translator_url
self._web_profile_server.add_client(register_map["samp.private-key"])
return register_map
else:
raise SAMPProxyError(403, "Request of registration rejected by "
"the user.")
def _web_profile_allowReverseCallbacks(self, private_key, allow):
self._update_last_activity_time()
if private_key in self._private_keys:
if allow == "0":
if private_key in self._web_profile_callbacks:
del self._web_profile_callbacks[private_key]
else:
self._web_profile_callbacks[private_key] = queue.Queue()
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
return ""
def _web_profile_pullCallbacks(self, private_key, timeout_secs):
self._update_last_activity_time()
if private_key in self._private_keys:
callback = []
callback_queue = self._web_profile_callbacks[private_key]
try:
while self._is_running:
item_queued = callback_queue.get_nowait()
callback.append(item_queued)
except queue.Empty:
pass
return callback
else:
raise SAMPProxyError(5, "Private-key {} expired or invalid."
.format(private_key))
[docs]class WebProfileDialog(object):
"""
A base class to make writing Web Profile GUI consent dialogs
easier.
The concrete class must:
1) Poll ``handle_queue`` periodically, using the timer services
of the GUI's event loop. This function will call
``self.show_dialog`` when a request requires authorization.
``self.show_dialog`` will be given the arguments:
- ``samp_name``: The name of the application making the request.
- ``details``: A dictionary of details about the client
making the request.
- ``client``: A hostname, port pair containing the client
address.
- ``origin``: A string containing the origin of the
request.
2) Call ``consent`` or ``reject`` based on the user's response to
the dialog.
"""
[docs] def handle_queue(self):
try:
request = self.queue_request.get_nowait()
except queue.Empty: # queue is set but empty
pass
except AttributeError: # queue has not been set yet
pass
else:
if isinstance(request[0], str): # To support the old protocol version
samp_name = request[0]
else:
samp_name = request[0]["samp.name"]
self.show_dialog(samp_name, request[0], request[1], request[2])
[docs] def consent(self):
self.queue_result.put(True)
[docs] def reject(self):
self.queue_result.put(False)