# Copyright (c) 2021 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Implementation of the client for the Spalloc web service.
"""
from logging import getLogger
from multiprocessing import Process, Queue
from time import sleep
from packaging.version import Version
import queue
import requests
import sqlite3
import struct
import threading
from typing import Dict, List, Tuple
from websocket import WebSocket
from spinn_utilities.abstract_base import AbstractBase, abstractmethod
from spinn_utilities.abstract_context_manager import AbstractContextManager
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spinnman.connections.abstract_classes import Connection, Listenable
from spinnman.constants import SCP_SCAMP_PORT, UDP_BOOT_CONNECTION_DEFAULT_PORT
from spinnman.exceptions import SpinnmanTimeoutException
from .spalloc_state import SpallocState
from .proxy_protocol import ProxyProtocol
from .session import Session, SessionAware
from .utils import parse_service_url, get_hostname
from .abstract_spalloc_client import AbstractSpallocClient
from .spalloc_machine import SpallocMachine
from .spalloc_job import SpallocJob
from .spalloc_proxied_connection import SpallocProxiedConnection
from .spalloc_boot_connection import SpallocBootConnection
from .spalloc_eieio_connection import SpallocEIEIOConnection
from .spalloc_eieio_listener import SpallocEIEIOListener
from .spalloc_scp_connection import SpallocSCPConnection
from spinnman.exceptions import SpallocException
from spinnman.transceiver import Transceiver
logger = FormatAdapter(getLogger(__name__))
_open_req = struct.Struct("<IIIII")
_close_req = struct.Struct("<III")
_open_listen_req = struct.Struct("<II")
# Open and close share the response structure
_open_close_res = struct.Struct("<III")
_open_listen_res = struct.Struct("<IIIBBBBI")
_msg = struct.Struct("<II")
_msg_to = struct.Struct("<IIIII")
[docs]class SpallocClient(AbstractContextManager, AbstractSpallocClient):
"""
Basic client library for talking to new Spalloc.
"""
__slots__ = ("__session",
"__machines_url", "__jobs_url", "version")
def __init__(
self, service_url, username=None, password=None,
bearer_token=None):
"""
:param str service_url: The reference to the service.
May have username and password supplied as part of the network
location; if so, the ``username`` and ``password`` arguments
*must* be ``None``. If ``username`` and ``password`` are not given,
not even within the URL, the ``bearer_token`` must be not ``None``.
:param str username: The user name to use
:param str password: The password to use
:param str bearer_token: The bearer token to use
"""
if username is None and password is None:
service_url, username, password = parse_service_url(service_url)
self.__session = Session(service_url, username, password, bearer_token)
obj = self.__session.renew()
v = obj["version"]
self.version = Version(
f"{v['major-version']}.{v['minor-version']}.{v['revision']}")
self.__machines_url = obj["machines-ref"]
self.__jobs_url = obj["jobs-ref"]
logger.info("established session to {} for {}", service_url, username)
[docs] @staticmethod
def open_job_from_database(conn: sqlite3.Cursor) -> SpallocJob:
"""
Create a job from the description in the attached database. This is
intended to allow for access to the job's allocated resources from
visualisers and other third party code participating in the Spinnaker
Tools Notification Protocol.
.. note::
The job is not verified to exist and be running. The session
credentials may have expired; if so, the job will be unable to
regenerate them.
:param ~sqlite3.Cursor conn:
The database cursor to retrieve the job details from. Assumes
the presence of a ``proxy_configuration`` table with ``kind``,
``name`` and ``value`` columns.
:return:
The job handle, or ``None`` if the records in the database are
absent or incomplete.
:rtype: SpallocJob
"""
service_url = None
job_url = None
cookies = {}
headers = {}
for row in conn.execute("""
SELECT kind, name, value FROM proxy_configuration
"""):
kind, name, value = row
if kind == "SPALLOC":
if name == "service uri":
service_url = value
elif name == "job uri":
job_url = value
elif kind == "COOKIE":
cookies[name] = value
elif kind == "HEADER":
headers[name] = value
if not service_url or not job_url or not cookies or not headers:
# Cannot possibly work without a session or job
return None
session = Session(service_url, session_credentials=(cookies, headers))
return _SpallocJob(session, job_url)
[docs] @overrides(AbstractSpallocClient.list_machines)
def list_machines(self):
obj = self.__session.get(self.__machines_url).json()
return {m["name"]: _SpallocMachine(self, m) for m in obj["machines"]}
[docs] @overrides(AbstractSpallocClient.list_jobs)
def list_jobs(self, deleted=False):
obj = self.__session.get(
self.__jobs_url,
deleted=("true" if deleted else "false")).json()
while obj["jobs"]:
for u in obj["jobs"]:
yield _SpallocJob(self.__session, u)
if "next" not in obj:
break
obj = self.__session.get(obj["next"]).json()
def _create(self, create, machine_name):
if machine_name:
create["machine-name"] = machine_name
else:
create["tags"] = ["default"]
r = self.__session.post(self.__jobs_url, create)
url = r.headers["Location"]
return _SpallocJob(self.__session, url)
[docs] @overrides(AbstractSpallocClient.create_job)
def create_job(self, num_boards=1, machine_name=None, keepalive=45):
return self._create({
"num-boards": int(num_boards),
"keepalive-interval": f"PT{int(keepalive)}S"
}, machine_name)
[docs] @overrides(AbstractSpallocClient.create_job_rect)
def create_job_rect(self, width, height, machine_name=None, keepalive=45):
return self._create({
"dimensions": {
"width": int(width),
"height": int(height)
},
"keepalive-interval": f"PT{int(keepalive)}S"
}, machine_name)
[docs] @overrides(AbstractSpallocClient.create_job_board)
def create_job_board(
self, triad=None, physical=None, ip_address=None,
machine_name=None, keepalive=45):
if triad:
x, y, z = triad
board = {"x": x, "y": y, "z": z}
elif physical:
c, f, b = physical
board = {"cabinet": c, "frame": f, "board": b}
elif ip_address:
board = {"address": str(ip_address)}
else:
raise KeyError("at least one of triad, physical and ip_address "
"must be given")
return self._create({
"board": board,
"keepalive-interval": f"PT{int(keepalive)}S"
}, machine_name)
[docs] def close(self):
# pylint: disable=protected-access
if self.__session is not None:
self.__session._purge()
self.__session = None
def _SpallocKeepalive(url, interval, term_queue, cookies, headers):
"""
Actual keepalive task implementation. Don't use directly.
"""
headers["Content-Type"] = "text/plain; charset=UTF-8"
while True:
requests.put(url, data="alive", cookies=cookies, headers=headers,
allow_redirects=False, timeout=10)
try:
term_queue.get(True, interval)
break
except queue.Empty:
continue
class _SpallocMachine(SessionAware, SpallocMachine):
"""
Represents a spalloc-controlled machine.
Don't make this yourself. Use :py:class:`SpallocClient` instead.
"""
__slots__ = ("__name", "__tags", "__width", "__height",
"__dead_boards", "__dead_links")
def __init__(self, session, machine_data):
"""
:param _Session session:
:param dict machine_data:
"""
super().__init__(session, machine_data["uri"])
self.__name = machine_data["name"]
self.__tags = frozenset(machine_data["tags"])
self.__width = machine_data["width"]
self.__height = machine_data["height"]
self.__dead_boards = machine_data["dead-boards"]
self.__dead_links = machine_data["dead-links"]
@property
@overrides(SpallocMachine.name)
def name(self):
return self.__name
@property
@overrides(SpallocMachine.tags)
def tags(self):
return self.__tags
@property
@overrides(SpallocMachine.width)
def width(self):
return self.__width
@property
@overrides(SpallocMachine.height)
def height(self):
return self.__height
@property
@overrides(SpallocMachine.dead_boards)
def dead_boards(self):
return self.__dead_boards
@property
@overrides(SpallocMachine.dead_links)
def dead_links(self):
return self.__dead_links
@property
@overrides(SpallocMachine.area)
def area(self):
return (self.width, self.height)
def __repr__(self):
return "SpallocMachine" + str((
self.name, self.tags, self.width, self.height, self.dead_boards,
self.dead_links))
class _ProxyPing(threading.Thread):
"""
Sends ping messages to an open websocket
"""
def __init__(self, ws, sleep_time=30):
super().__init__(daemon=True)
self.__ws = ws
self.__sleep_time = sleep_time
self.__closed = False
self.start()
def run(self):
"""
The handler loop of this thread
"""
while self.__ws.connected:
try:
self.__ws.ping()
except Exception: # pylint: disable=broad-except
# If closed, ignore error and get out of here
if self.__closed:
break
# Make someone aware of the error
logger.exception("Error in websocket before close")
sleep(self.__sleep_time)
def close(self):
"""
Mark as closed to avoid error messages.
"""
self.__closed = True
class _ProxyReceiver(threading.Thread):
"""
Receives all messages off an open websocket and dispatches them to
registered listeners.
"""
def __init__(self, ws):
super().__init__(daemon=True)
self.__ws = ws
self.__returns = {}
self.__handlers = {}
self.__correlation_id = 0
self.__closed = False
self.start()
def run(self):
"""
The handler loop of this thread.
"""
while self.__ws.connected:
try:
result = self.__ws.recv_data()
frame = result[1]
if len(frame) < _msg.size:
# Message is out of protocol
continue
except Exception: # pylint: disable=broad-except
# If closed, ignore error and get out of here
if self.__closed:
break
# Make someone aware of the error
logger.exception("Error in websocket before close")
# If we are disconnected before closing, make errors happen
if not self.__ws.connected:
for rt in self.__returns.values():
rt(None)
for hd in self.__handlers.values():
hd(None)
break
code, num = _msg.unpack_from(frame, 0)
if code == ProxyProtocol.MSG:
self.dispatch_message(num, frame)
else:
self.dispatch_return(num, frame)
def expect_return(self, handler) -> int:
"""
Register a one-shot listener for a call-like message's return.
:return: The correlation ID
"""
c = self.__correlation_id
self.__correlation_id += 1
self.__returns[c] = handler
return c
def listen(self, channel_id: int, handler):
"""
Register a persistent listener for one-way messages.
"""
self.__handlers[channel_id] = handler
def dispatch_return(self, correlation_id: int, msg: bytes):
"""
Dispatch a received call-return message.
"""
handler = self.__returns.pop(correlation_id, None)
if handler:
handler(msg)
def dispatch_message(self, channel_id: int, msg: bytes):
"""
Dispatch a received one-way message.
"""
handler = self.__handlers.get(channel_id, None)
if handler:
handler(msg)
def unlisten(self, channel_id):
"""
De-register a listener for a channel
"""
self.__handlers.pop(channel_id)
def close(self):
"""
Mark receiver closed to avoid errors
"""
self.__closed = True
class _SpallocJob(SessionAware, SpallocJob):
"""
Represents a job in spalloc.
Don't make this yourself. Use :py:class:`SpallocClient` instead.
"""
__slots__ = ("__machine_url", "__chip_url",
"_keepalive_url", "__keepalive_handle", "__proxy_handle",
"__proxy_thread", "__proxy_ping")
def __init__(self, session, job_handle):
"""
:param _Session session:
:param str job_handle:
"""
super().__init__(session, job_handle)
logger.info("established job at {}", job_handle)
self.__machine_url = self._url + "machine"
self.__chip_url = self._url + "chip"
self._keepalive_url = self._url + "keepalive"
self.__keepalive_handle = None
self.__proxy_handle = None
self.__proxy_thread = None
self.__proxy_ping = None
@overrides(SpallocJob._write_session_credentials_to_db)
def _write_session_credentials_to_db(self, cur):
config = {}
config["SPALLOC", "service uri"] = self._service_url
config["SPALLOC", "job uri"] = self._url
cookies, headers = self._session_credentials
for k, v in cookies.items():
config["COOKIE", k] = v
for k, v in headers.items():
config["HEADER", k] = v
if "Authorization" in headers:
# We never write the auth headers themselves; we just extend the
# session
del headers["Authorization"]
cur.executemany("""
INSERT INTO proxy_configuration(kind, name, value)
VALUES(?, ?, ?)
""", [(k1, k2, v) for (k1, k2), v in config.items()])
@overrides(SpallocJob.get_state)
def get_state(self):
obj = self._get(self._url).json()
return SpallocState[obj["state"]]
@overrides(SpallocJob.get_root_host)
def get_root_host(self):
r = self._get(self.__machine_url)
if r.status_code == 204:
return None
obj = r.json()
for c in obj["connections"]:
[x, y], host = c
if x == 0 and y == 0:
return host
return None
@overrides(SpallocJob.get_connections)
def get_connections(self):
r = self._get(self.__machine_url)
if r.status_code == 204:
return None
return {
(int(x), int(y)): str(host)
for ((x, y), host) in r.json()["connections"]
}
@property
def __proxy_url(self):
"""
The URL for talking to the proxy connection system.
"""
r = self._get(self._url)
if r.status_code == 204:
return None
try:
url = r.json()["proxy-ref"]
logger.info("Connecting to proxy on {}", url)
return url
except KeyError:
return None
def __init_proxy(self):
if self.__proxy_handle is None or not self.__proxy_handle.connected:
self.__proxy_handle = self._websocket(
self.__proxy_url, origin=get_hostname(self._url))
self.__proxy_thread = _ProxyReceiver(self.__proxy_handle)
self.__proxy_ping = _ProxyPing(self.__proxy_handle)
@overrides(SpallocJob.connect_to_board)
def connect_to_board(self, x, y, port=SCP_SCAMP_PORT):
self.__init_proxy()
return _ProxiedSCAMPConnection(
self.__proxy_handle, self.__proxy_thread, x, y, port)
@overrides(SpallocJob.connect_for_booting)
def connect_for_booting(self):
self.__init_proxy()
return _ProxiedBootConnection(self.__proxy_handle, self.__proxy_thread)
@overrides(SpallocJob.open_eieio_connection)
def open_eieio_connection(self, x, y):
self.__init_proxy()
return _ProxiedEIEIOConnection(
self.__proxy_handle, self.__proxy_thread, x, y, SCP_SCAMP_PORT)
@overrides(SpallocJob.open_listener_connection)
def open_listener_connection(self):
self.__init_proxy()
return _ProxiedEIEIOListener(
self.__proxy_handle, self.__proxy_thread, self.get_connections())
@overrides(SpallocJob.wait_for_state_change)
def wait_for_state_change(self, old_state):
while old_state != SpallocState.DESTROYED:
obj = self._get(self._url, wait="true", timeout=None).json()
s = SpallocState[obj["state"]]
if s != old_state or s == SpallocState.DESTROYED:
return s
return old_state
@overrides(SpallocJob.wait_until_ready)
def wait_until_ready(self):
state = SpallocState.UNKNOWN
while state != SpallocState.READY:
state = self.wait_for_state_change(state)
if state == SpallocState.DESTROYED:
raise SpallocException("job was unexpectedly destroyed")
@overrides(SpallocJob.destroy)
def destroy(self, reason="finished"):
if self.__keepalive_handle:
self.__keepalive_handle.close()
self.__keepalive_handle = None
if self.__proxy_handle is not None:
self.__proxy_thread.close()
self.__proxy_ping.close()
self.__proxy_handle.close()
self._delete(self._url, reason=reason)
logger.info("deleted job at {}", self._url)
@overrides(SpallocJob.keepalive)
def keepalive(self):
self._put(self._keepalive_url, "alive")
@overrides(SpallocJob.launch_keepalive_task)
def launch_keepalive_task(self, period=30):
"""
.. note::
Tricky! *Cannot* be done with a thread, as the main thread is known
to do significant amounts of CPU-intensive work.
"""
class Closer(AbstractContextManager):
def __init__(self):
self._queue = Queue(1)
self._p = None
def close(self):
self._queue.put("quit")
# Give it a second, and if it still isn't dead, kill it
p.join(1)
if p.is_alive():
p.kill()
self._keepalive_handle = Closer()
# pylint: disable=protected-access
p = Process(target=_SpallocKeepalive, args=(
self._keepalive_url, period, self._keepalive_handle._queue,
*self._session_credentials), daemon=True)
p.start()
self._keepalive_handle._p = p
return self._keepalive_handle
@overrides(SpallocJob.where_is_machine)
def where_is_machine(self, x: int, y: int) -> Tuple[int, int, int]:
r = self._get(self.__chip_url, x=int(x), y=int(y))
if r.status_code == 204:
return None
return tuple(r.json()["physical-board-coordinates"])
@property
def _keepalive_handle(self):
return self.__keepalive_handle
@_keepalive_handle.setter
def _keepalive_handle(self, handle):
if self.__keepalive_handle is not None:
raise SpallocException("cannot keep job alive from two tasks")
self.__keepalive_handle = handle
@overrides(SpallocJob.create_transceiver)
def create_transceiver(self) -> Transceiver:
if self.get_state() != SpallocState.READY:
raise SpallocException("job not ready to execute scripts")
proxies = [
self.connect_to_board(x, y) for (x, y) in self.get_connections()]
# Also need a boot connection
proxies.append(self.connect_for_booting())
return Transceiver(version=5, connections=proxies)
def __repr__(self):
return f"SpallocJob({self._url})"
class _ProxiedConnection(metaclass=AbstractBase):
"""
Core mux/demux emulating a connection that is proxied over a websocket.
None of the methods are public because subclasses may expose a profile of
them to conform to a particular type of connection.
"""
def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
self.__ws = ws
self.__receiver = receiver
self.__msgs = queue.SimpleQueue()
self.__call_queue = queue.Queue(1)
self.__call_lock = threading.RLock()
self.__current_msg = None
self.__handle = self._open_connection()
self.__receiver.listen(self.__handle, self.__msgs.put)
@abstractmethod
def _open_connection(self) -> int:
pass
def _call(self, proto: ProxyProtocol, packer: struct.Struct,
unpacker: struct.Struct, *args) -> List[int]:
if not self._connected:
raise IOError("socket closed")
with self.__call_lock:
# All calls via websocket use correlation_id
correlation_id = self.__receiver.expect_return(
self.__call_queue.put)
self.__ws.send_binary(packer.pack(proto, correlation_id, *args))
if not self._connected:
raise IOError("socket closed after send!")
return unpacker.unpack(self.__call_queue.get())[2:]
@property
def _connected(self) -> bool:
return self.__ws and self.__ws.connected
def _throw_if_closed(self):
if not self._connected:
raise IOError("socket closed")
def _close(self):
if self._connected:
channel_id, = self._call(
ProxyProtocol.CLOSE, _close_req, _open_close_res,
self.__handle)
if channel_id != self.__handle:
raise IOError("failed to close proxy socket")
self.__receiver.unlisten(self.__handle)
self.__ws = None
self.__receiver = None
def _send(self, message: bytes):
self._throw_if_closed()
# Put the header on the front and send it
self.__ws.send_binary(_msg.pack(
ProxyProtocol.MSG, self.__handle) + message)
def _send_to(self, message: bytes, x: int, y: int, port: int):
self._throw_if_closed()
# Put the header on the front and send it
self.__ws.send_binary(_msg_to.pack(
ProxyProtocol.MSG_TO, self.__handle, x, y, port) + message)
def __get(self, timeout: float = 0.5) -> bytes:
"""
Get a value from the queue. Handles block/non-block switching and
trimming of the message protocol prefix.
"""
if not timeout:
return self.__msgs.get(block=False)[_msg.size:]
else:
return self.__msgs.get(timeout=timeout)[_msg.size:]
def _receive(self, timeout=None) -> bytes:
if self.__current_msg is not None:
try:
return self.__current_msg
finally:
self.__current_msg = None
if timeout is None:
while True:
try:
return self.__get()
except queue.Empty:
self._throw_if_closed()
else:
try:
return self.__get(timeout)
except queue.Empty as e:
self._throw_if_closed()
raise SpinnmanTimeoutException("receive", timeout) from e
def _is_ready_to_receive(self, timeout=0) -> bool:
# If we already have a message or the queue peek succeeds, return now
if self.__current_msg is not None or not self.__msgs.empty():
return True
try:
self.__current_msg = self.__get(timeout)
return True
except queue.Empty:
return False
class _ProxiedBidirectionalConnection(
_ProxiedConnection, SpallocProxiedConnection):
"""
A connection that talks to a particular board via the proxy.
"""
def __init__(
self, ws: WebSocket, receiver: _ProxyReceiver,
x: int, y: int, port: int):
self.__connect_args = (x, y, port)
super().__init__(ws, receiver)
@overrides(_ProxiedConnection._open_connection)
def _open_connection(self):
handle, = self._call(
ProxyProtocol.OPEN, _open_req, _open_close_res,
*self.__connect_args)
return handle
@overrides(Connection.is_connected)
def is_connected(self) -> bool:
return self._connected
@overrides(Connection.close)
def close(self):
self._close()
@overrides(SpallocProxiedConnection.send)
def send(self, data: bytes):
self._send(data)
@overrides(SpallocProxiedConnection.receive)
def receive(self, timeout=None) -> bytes:
return self._receive(timeout)
@overrides(Listenable.is_ready_to_receive)
def is_ready_to_receive(self, timeout=0) -> bool:
return self._is_ready_to_receive(timeout)
class _ProxiedUnboundConnection(
_ProxiedConnection, SpallocProxiedConnection):
"""
A connection that can listen to all boards via the proxy, but which can
only send if a target board is provided.
"""
def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
super().__init__(ws, receiver)
self.__addr = None
self.__port = None
@overrides(_ProxiedConnection._open_connection)
def _open_connection(self) -> int:
handle, ip1, ip2, ip3, ip4, self.__port = self._call(
ProxyProtocol.OPEN_UNBOUND, _open_listen_req, _open_listen_res)
# Assemble the address into the format expected elsewhere
self.__addr = f"{ip1}.{ip2}.{ip3}.{ip4}"
return handle
@property
def _addr(self) -> str:
return self.__addr if self._connected else None
@property
def _port(self) -> int:
return self.__port if self._connected else None
@overrides(Connection.is_connected)
def is_connected(self) -> bool:
return self._connected
@overrides(Connection.close)
def close(self):
self._close()
@overrides(SpallocProxiedConnection.send)
def send(self, data: bytes):
self._throw_if_closed()
raise IOError("socket is not open for sending")
@overrides(SpallocProxiedConnection.receive)
def receive(self, timeout=None) -> bytes:
return self._receive(timeout)
@overrides(Listenable.is_ready_to_receive)
def is_ready_to_receive(self, timeout=0) -> bool:
return self._is_ready_to_receive(timeout)
class _ProxiedSCAMPConnection(
_ProxiedBidirectionalConnection, SpallocSCPConnection):
__slots__ = ("__chip_x", "__chip_y")
def __init__(
self, ws: WebSocket, receiver: _ProxyReceiver,
x: int, y: int, port: int):
super().__init__(ws, receiver, x, y, port)
SpallocSCPConnection.__init__(self, x, y)
def __str__(self):
return f"SCAMPConnection[proxied]({self.chip_x},{self.chip_y})"
class _ProxiedBootConnection(
_ProxiedBidirectionalConnection, SpallocBootConnection):
__slots__ = ()
def __init__(self, ws: WebSocket, receiver: _ProxyReceiver):
super().__init__(ws, receiver, 0, 0, UDP_BOOT_CONNECTION_DEFAULT_PORT)
def __str__(self):
return "BootConnection[proxied]()"
class _ProxiedEIEIOConnection(
_ProxiedBidirectionalConnection,
SpallocEIEIOConnection, SpallocProxiedConnection):
# Special: This is a unidirectional receive-only connection
__slots__ = ("__addr", "__port", "__chip_x", "__chip_y")
def __init__(
self, ws: WebSocket, receiver: _ProxyReceiver,
x: int, y: int, port: int):
super().__init__(ws, receiver, x, y, port)
self.__chip_x = x
self.__chip_y = y
@property
@overrides(SpallocEIEIOConnection._coords)
def _coords(self):
return self.__chip_x, self.__chip_y
def send_to(
self,
data: bytes, address: tuple): # pylint: disable=unused-argument
"""
Direct ``send_to`` is unsupported.
"""
self._throw_if_closed()
raise IOError("socket is not open for sending")
def __str__(self):
return (f"EIEIOConnection[proxied](remote:{self.__chip_x},"
f"{self.__chip_y})")
class _ProxiedEIEIOListener(_ProxiedUnboundConnection, SpallocEIEIOListener):
__slots__ = ("__conns", )
def __init__(self, ws: WebSocket, receiver: _ProxyReceiver,
conns: Dict[Tuple[int, int], str]):
super().__init__(ws, receiver)
# Invert the map
self.__conns = {ip: xy for (xy, ip) in conns.items()}
@overrides(SpallocEIEIOListener.send_to_chip)
def send_to_chip(
self, message: bytes, x: int, y: int, port: int = SCP_SCAMP_PORT):
self._send_to(message, x, y, port)
@property
@overrides(SpallocEIEIOListener.local_ip_address)
def local_ip_address(self) -> str:
return self._addr
@property
@overrides(SpallocEIEIOListener.local_port)
def local_port(self) -> int:
return self._port
@overrides(SpallocEIEIOListener._get_chip_coords)
def _get_chip_coords(self, ip_address: str) -> Tuple[int, int]:
return self.__conns[ip_address]
def __str__(self):
return f"EIEIOConnection[proxied](local:{self._addr}:{self._port})"