# Copyright (c) 2016 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import ExitStack
import logging
import math
from typing import Dict, Tuple
from spinn_utilities.config_holder import get_config_str_list, get_config_bool
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spalloc import Job
from spalloc.states import JobState
from spinn_utilities.abstract_context_manager import AbstractContextManager
from spinn_utilities.config_holder import get_config_int, get_config_str
from spinn_machine import Machine
from spinnman.constants import SCP_SCAMP_PORT
from spinnman.spalloc import (
is_server_address, SpallocClient, SpallocJob, SpallocState)
from spinn_front_end_common.abstract_models import (
AbstractMachineAllocationController)
from spinn_front_end_common.abstract_models.impl import (
MachineAllocationController)
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.interface.provenance import ProvenanceWriter
from spinn_front_end_common.utilities.utility_calls import parse_old_spalloc
logger = FormatAdapter(logging.getLogger(__name__))
_MACHINE_VERSION = 5 # Spalloc only ever works with v5 boards
#: The number of chips per board to use in calculations to ensure that
#: the number of boards allocated is enough. This is 2 less than the maximum
#: as there are a few boards with 2 down chips in the big machine.
CALC_CHIPS_PER_BOARD = Machine.MAX_CHIPS_PER_48_BOARD - 2
class SpallocJobController(MachineAllocationController):
__slots__ = (
# the spalloc job object
"_job",
# the current job's old state
"_state",
"__client",
"__closer",
"__use_proxy"
)
def __init__(
self, client: SpallocClient, job: SpallocJob,
task: AbstractContextManager, use_proxy: bool):
"""
:param ~spinnman.spalloc.SpallocClient client:
:param ~spinnman.spalloc.SpallocJob job:
:param task:
:type task:
~spinn_utilities.abstract_context_manager.AbstractContextManager
:param bool use_proxy:
"""
if job is None:
raise TypeError("must have a real job")
self.__client = client
self.__closer = task
self._job = job
self._state = job.get_state()
self.__use_proxy = use_proxy
super().__init__("SpallocJobController")
@overrides(AbstractMachineAllocationController.extend_allocation)
def extend_allocation(self, new_total_run_time):
# Does Nothing in this allocator - machines are held until exit
pass
@overrides(AbstractMachineAllocationController.close)
def close(self):
super().close()
self.__closer.close()
self._job.destroy()
self.__client.close()
@overrides(AbstractMachineAllocationController.where_is_machine)
def where_is_machine(self, chip_x, chip_y):
"""
:param int chip_x:
:param int chip_y:
:rtype: tuple(int,int,int)
"""
return self._job.where_is_machine(x=chip_x, y=chip_y)
@overrides(MachineAllocationController._wait)
def _wait(self):
try:
if self._state != SpallocState.DESTROYED:
self._state = self._job.wait_for_state_change(self._state)
except TypeError:
pass
except Exception as e: # pylint: disable=broad-except
if not self._exited:
raise e
return self._state != SpallocState.DESTROYED
@overrides(MachineAllocationController._teardown)
def _teardown(self):
if not self._exited:
self.__closer.close()
self._job.destroy()
self.__client.close()
super()._teardown()
@overrides(AbstractMachineAllocationController.create_transceiver)
def create_transceiver(self):
"""
.. note::
This allocation controller proxies the transceiver's connections
via Spalloc. This allows it to work even outside the UNIMAN
firewall.
"""
if not self.__use_proxy:
return super(SpallocJobController, self).create_transceiver()
txrx = self._job.create_transceiver()
txrx.ensure_board_is_ready()
return txrx
@overrides(AbstractMachineAllocationController.open_sdp_connection)
def open_sdp_connection(self, chip_x, chip_y, udp_port=SCP_SCAMP_PORT):
"""
.. note::
This allocation controller proxies connections via Spalloc. This
allows it to work even outside the UNIMAN firewall.
"""
return self._job.connect_to_board(chip_x, chip_y, udp_port)
@overrides(AbstractMachineAllocationController.open_eieio_connection)
def open_eieio_connection(self, chip_x, chip_y):
return self._job.open_eieio_connection(chip_x, chip_y, SCP_SCAMP_PORT)
@overrides(AbstractMachineAllocationController.open_eieio_listener)
def open_eieio_listener(self):
return self._job.open_listener_connection()
@property
@overrides(AbstractMachineAllocationController.proxying)
def proxying(self):
return self.__use_proxy
@overrides(MachineAllocationController.make_report)
def make_report(self, filename):
with open(filename, "w", encoding="utf-8") as report:
report.write(f"Job: {self._job}")
class _OldSpallocJobController(MachineAllocationController):
__slots__ = (
# the spalloc job object
"_job",
# the current job's old state
"_state"
)
def __init__(self, job: Job, host: str):
"""
:param ~spalloc.job.Job job:
"""
if job is None:
raise TypeError("must have a real job")
self._job = job
self._state = job.state
super().__init__("SpallocJobController", host)
@overrides(AbstractMachineAllocationController.extend_allocation)
def extend_allocation(self, new_total_run_time):
# Does Nothing in this allocator - machines are held until exit
pass
@overrides(AbstractMachineAllocationController.close)
def close(self):
super().close()
self._job.destroy()
@property
def power(self) -> bool:
"""
:rtype: bool
"""
return self._job.power
def set_power(self, power: bool):
"""
:param bool power:
"""
self._job.set_power(power)
if power:
self._job.wait_until_ready()
@overrides(AbstractMachineAllocationController.where_is_machine)
def where_is_machine(self, chip_x, chip_y):
return self._job.where_is_machine(chip_y=chip_y, chip_x=chip_x)
@overrides(MachineAllocationController._wait)
def _wait(self):
try:
if self._state != JobState.destroyed:
self._state = self._job.wait_for_state_change(self._state)
except TypeError:
pass
except Exception as e: # pylint: disable=broad-except
if not self._exited:
raise e
return self._state != JobState.destroyed
@overrides(MachineAllocationController._teardown)
def _teardown(self):
if not self._exited:
self._job.close()
super()._teardown()
_MACHINE_VERSION = 5
[docs]def spalloc_allocator(
bearer_token: str = None) -> Tuple[
str, int, None, bool, bool, Dict[Tuple[int, int], str], None,
MachineAllocationController]:
"""
Request a machine from a SPALLOC server that will fit the given
number of chips.
:param bearer_token: The bearer token to use
:type bearer_token: str or None
:return:
host, board version, BMP details, reset on startup flag,
auto-detect BMP flag, board address map, allocation controller
:rtype: tuple(str, int, object, bool, bool, dict(tuple(int,int),str),
MachineAllocationController)
"""
spalloc_server = get_config_str("Machine", "spalloc_server")
# Work out how many boards are needed
if FecDataView.has_n_boards_required():
n_boards = FecDataView.get_n_boards_required()
else:
n_chips = FecDataView.get_n_chips_needed()
n_boards_float = float(n_chips) / CALC_CHIPS_PER_BOARD
logger.info("{:.2f} Boards Required for {} chips",
n_boards_float, n_chips)
# If the number of boards rounded up is less than 50% of a board
# bigger than the actual number of boards,
# add another board just in case.
n_boards = int(math.ceil(n_boards_float))
if n_boards - n_boards_float < 0.5:
n_boards += 1
if is_server_address(spalloc_server):
host, connections, mac = _allocate_job_new(
spalloc_server, n_boards, bearer_token)
else:
host, connections, mac = _allocate_job_old(spalloc_server, n_boards)
return (host, _MACHINE_VERSION, None, False, False, connections, mac)
def _allocate_job_new(
spalloc_server: str, n_boards: int,
bearer_token: str = None) -> Tuple[
str, Dict[Tuple[int, int], str], MachineAllocationController]:
"""
Request a machine from an new-style spalloc server that will fit the
given number of boards.
:param str spalloc_server:
The server from which the machine should be requested
:param int n_boards: The number of boards required
:param bearer_token: The bearer token to use
:type bearer_token: str or None
:rtype: tuple(str, dict(tuple(int,int),str), MachineAllocationController)
"""
logger.info(f"Requesting job with {n_boards} boards")
with ExitStack() as stack:
spalloc_machine = get_config_str("Machine", "spalloc_machine")
use_proxy = get_config_bool("Machine", "spalloc_use_proxy")
client = SpallocClient(spalloc_server, bearer_token=bearer_token)
stack.enter_context(client)
job = client.create_job(n_boards, spalloc_machine)
stack.enter_context(job)
task = job.launch_keepalive_task()
stack.enter_context(task)
job.wait_until_ready()
connections = job.get_connections()
ProvenanceWriter().insert_board_provenance(connections)
root = connections.get((0, 0), None)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"boards: {}",
str(connections).replace("{", "[").replace("}", "]"))
allocation_controller = SpallocJobController(
client, job, task, use_proxy)
# Success! We don't want to close the client, job or task now;
# the allocation controller now owns them.
stack.pop_all()
return (root, connections, allocation_controller)
def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[
str, Dict[Tuple[int, int], str], MachineAllocationController]:
"""
Request a machine from an old-style spalloc server that will fit the
requested number of boards.
:param str spalloc_server:
The server from which the machine should be requested
:param int n_boards: The number of boards required
:rtype: tuple(str, dict(tuple(int,int),str), MachineAllocationController)
"""
host, port, user = parse_old_spalloc(
spalloc_server, get_config_int("Machine", "spalloc_port"),
get_config_str("Machine", "spalloc_user"))
spalloc_kwargs = {
'hostname': host,
'port': port,
'owner': user
}
spalloc_machine = get_config_str("Machine", "spalloc_machine")
if spalloc_machine is not None:
spalloc_kwargs['machine'] = spalloc_machine
job, hostname, scamp_connection_data = _launch_checked_job_old(
n_boards, spalloc_kwargs)
machine_allocation_controller = _OldSpallocJobController(job, hostname)
return (hostname, scamp_connection_data, machine_allocation_controller)
def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[
Job, str, Dict[Tuple[int, int], str]]:
"""
:rtype: tuple(~.Job, str, dict(tuple(int,int),str))
"""
logger.info(f"Requesting job with {n_boards} boards")
avoid_boards = get_config_str_list("Machine", "spalloc_avoid_boards")
avoid_jobs = []
try:
while True:
job = Job(n_boards, **spalloc_kwargs)
try:
job.wait_until_ready()
# get param from jobs before starting, so that hanging doesn't
# occur
hostname = job.hostname
except Exception as ex:
job.destroy(str(ex))
raise
connections = job.connections
if logger.isEnabledFor(logging.DEBUG):
logger.debug("boards: {}",
str(connections).replace("{", "[").replace(
"}", "]"))
with ProvenanceWriter() as db:
db.insert_board_provenance(connections)
if hostname not in avoid_boards:
break
avoid_jobs.append(job)
logger.warning(
f"Asking for new job as {hostname} "
f"as in the spalloc_avoid_boards list")
finally:
if avoid_boards:
for key in list(connections.keys()):
if connections[key] in avoid_boards:
logger.warning(
f"Removing connection info for {connections[key]} "
f"as in the spalloc avoid_boards list")
del connections[key]
for avoid_job in avoid_jobs:
avoid_job.destroy("Asked to avoid by cfg")
return job, hostname, connections