Source code for spinn_front_end_common.interface.interface_functions.application_runner
# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from spinn_utilities.log import FormatAdapter
from spinnman.messages.scp.enums import Signal
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spinn_front_end_common.utilities.utility_objs import ExecutableType
from spinn_front_end_common.utilities.constants import (
MICRO_TO_MILLISECOND_CONVERSION)
SAFETY_FINISH_TIME = 0.1
logger = FormatAdapter(logging.getLogger(__name__))
[docs]def application_runner(runtime, time_threshold, run_until_complete):
"""
Ensures all cores are initialised correctly, ran, and completed
successfully.
:param int runtime:
:param int time_threshold:
:param bool run_until_complete:
:raises ConfigurationException:
"""
runner = _ApplicationRunner()
# pylint: disable=protected-access
runner._run(runtime, time_threshold, run_until_complete)
class _ApplicationRunner(object):
"""
Ensures all cores are initialised correctly, ran, and completed
successfully.
"""
__slots__ = ["__txrx", "__app_id"]
def __init__(self):
self.__txrx = FecDataView.get_transceiver()
self.__app_id = FecDataView.get_app_id()
# Wraps up as a PACMAN algorithm
def _run(
self, runtime, time_threshold, run_until_complete=False):
"""
:param int runtime:
:param int time_threshold:
:param bool run_until_complete:
:return: Number of synchronisation changes
:rtype: int
:raises ConfigurationException:
"""
# pylint: disable=too-many-arguments
logger.info("*** Running simulation... *** ")
# wait for all cores to be ready
self._wait_for_start()
buffer_manager = FecDataView.get_buffer_manager()
notification_interface = FecDataView.get_notification_protocol()
# set the buffer manager into a resume state, so that if it had ran
# before it'll work again
buffer_manager.resume()
# every thing is in sync0 so load the initial buffers
buffer_manager.load_initial_buffers()
# clear away any router diagnostics that have been set due to all
# loading applications
for chip in FecDataView.get_machine().chips:
self.__txrx.clear_router_diagnostic_counters(chip.x, chip.y)
# wait till external app is ready for us to start if required
notification_interface.wait_for_confirmation()
# set off the executables that are in sync state
# (sending to all is just as safe)
self._send_sync_signal()
# Send start notification to external applications
notification_interface.send_start_resume_notification()
if runtime is None and not run_until_complete:
# Do NOT stop the buffer manager at end; app is using it still
logger.info("Application is set to run forever; exiting")
else:
# Wait for the application to finish
self._run_wait(
run_until_complete, runtime, time_threshold)
# Send stop notification to external applications
notification_interface.send_stop_pause_notification()
def _run_wait(self, run_until_complete, runtime, time_threshold):
"""
:param bool run_until_complete:
:param int runtime:
:param float time_threshold:
"""
if not run_until_complete:
factor = (FecDataView.get_time_scale_factor() /
MICRO_TO_MILLISECOND_CONVERSION)
scaled_runtime = runtime * factor
time_to_wait = scaled_runtime + SAFETY_FINISH_TIME
logger.info(
"Application started; waiting {}s for it to stop",
time_to_wait)
time.sleep(time_to_wait)
self._wait_for_end(timeout=time_threshold)
else:
logger.info("Application started; waiting until finished")
self._wait_for_end()
def _wait_for_start(self, timeout=None):
"""
:param timeout:
:type timeout: float or None
"""
for ex_type, cores in FecDataView.get_executable_types().items():
self.__txrx.wait_for_cores_to_be_in_state(
cores, self.__app_id, ex_type.start_state, timeout=timeout)
def _send_sync_signal(self):
"""
Let apps that use the simulation interface or sync signals commence
running their main processing loops. This is done with a very fast
synchronisation barrier and a signal.
"""
executable_types = FecDataView.get_executable_types()
if (ExecutableType.USES_SIMULATION_INTERFACE in executable_types
or ExecutableType.SYNC in executable_types):
# locate all signals needed to set off executables
sync_signal = self._determine_simulation_sync_signals()
# fire all signals as required
self.__txrx.send_signal(self.__app_id, sync_signal)
def _wait_for_end(self, timeout=None):
"""
:param timeout:
:type timeout: float or None
"""
for ex_type, cores in FecDataView.get_executable_types().items():
self.__txrx.wait_for_cores_to_be_in_state(
cores, self.__app_id, ex_type.end_state, timeout=timeout)
def _determine_simulation_sync_signals(self):
"""
Determines the start states, and creates core subsets of the
states for further checks.
:return: the sync signal
:rtype: ~.Signal
:raises ConfigurationException:
"""
sync_signal = None
executable_types = FecDataView.get_executable_types()
if ExecutableType.USES_SIMULATION_INTERFACE in executable_types:
sync_signal = FecDataView.get_next_sync_signal()
# handle the sync states, but only send once if they work with
# the simulation interface requirement
if ExecutableType.SYNC in executable_types:
if sync_signal == Signal.SYNC1:
raise ConfigurationException(
"There can only be one SYNC signal per run. This is "
"because we cannot ensure the cores have not reached the "
"next SYNC state before we send the next SYNC. Resulting "
"in uncontrolled behaviour")
sync_signal = Signal.SYNC0
return sync_signal