Source code for spinnman.connections.connection_listener
# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
from spinn_utilities.abstract_context_manager import AbstractContextManager
from spinn_utilities.log import FormatAdapter
from spinnman.exceptions import SpinnmanEOFException
logger = FormatAdapter(logging.getLogger(__name__))
_POOL_SIZE = 4
_TIMEOUT = 1
[docs]class ConnectionListener(Thread, AbstractContextManager):
"""
Thread that listens to a connection and calls callbacks with new
messages when they arrive.
"""
__slots__ = [
"__callback_pool",
"__callbacks",
"__connection",
"__done",
"__timeout"]
def __init__(self, connection, n_processes=_POOL_SIZE, timeout=_TIMEOUT):
"""
:param Listenable connection: A connection to listen to
:param int n_processes:
The number of threads to use when calling callbacks
:param float timeout:
How long to wait for messages before checking to see if the
connection is to be terminated.
"""
super().__init__(
name=f"Connection listener for connection {connection}")
self.daemon = True
self.__connection = connection
self.__timeout = timeout
self.__callback_pool = ThreadPoolExecutor(max_workers=n_processes)
self.__done = False
self.__callbacks = set()
def __run_step(self, handler):
"""
:param ~collections.abc.Callable handler:
"""
if self.__connection.is_ready_to_receive(timeout=self.__timeout):
message = handler()
for callback in self.__callbacks:
future = self.__callback_pool.submit(callback, message)
future.add_done_callback(self.__done_callback)
def __done_callback(self, future):
"""
:param ~concurrent.futures.Future future:
"""
try:
future.result()
except Exception: # pylint: disable=broad-except
logger.exception("problem in listener call")
[docs] def run(self):
"""
Implements the listening thread.
"""
with self.__callback_pool:
handler = self.__connection.get_receive_method()
while not self.__done:
try:
self.__run_step(handler)
except SpinnmanEOFException:
self.__done = True
except Exception: # pylint: disable=broad-except
if not self.__done:
logger.warning("problem when dispatching message",
exc_info=True)
[docs] def add_callback(self, callback):
"""
Add a callback to be called when a message is received.
:param ~collections.abc.Callable callback:
A callable which takes a single parameter, which is the message
received; the result of the callback will be ignored.
"""
self.__callbacks.add(callback)
[docs] def close(self):
"""
Closes the listener.
.. note::
This does not close the provider of the messages; this instead
marks the listener as closed. The listener will not truly stop
until the get message call returns.
"""
self.__done = True
self.join()