Source code for spinn_front_end_common.interface.buffer_management.buffer_manager

# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import ctypes
from spinn_utilities.config_holder import get_config_bool
from spinn_utilities.log import FormatAdapter
from spinn_utilities.ordered_set import OrderedSet
from spinn_utilities.progress_bar import ProgressBar
from spinnman.messages.eieio.command_messages import EventStopRequest
from spinnman.messages.eieio import EIEIOType
from spinnman.messages.eieio.data_messages import EIEIODataMessage
from data_specification.constants import BYTES_PER_WORD
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.exceptions import (
    BufferableRegionTooSmall, SpinnFrontEndException)
from spinn_front_end_common.utilities.helpful_functions import (
    locate_memory_region_for_placement, locate_extra_monitor_mc_receiver)
from spinn_front_end_common.interface.buffer_management.storage_objects \
    import (BuffersSentDeque, BufferDatabase)
from spinn_front_end_common.interface.buffer_management.buffer_models import (
    AbstractReceiveBuffersToHost, AbstractSendsBuffersFromHost)
from spinn_front_end_common.utility_models.streaming_context_manager import (
    StreamingContextManager)
from .recording_utilities import get_recording_header_size


logger = FormatAdapter(logging.getLogger(__name__))

# The minimum size of any message - this is the headers plus one entry
_MIN_MESSAGE_SIZE = EIEIODataMessage.min_packet_length(
    eieio_type=EIEIOType.KEY_32_BIT, is_timestamp=True)

# The number of bytes in each key to be sent
_N_BYTES_PER_KEY = EIEIOType.KEY_32_BIT.key_bytes  # @UndefinedVariable

_SDP_MAX_PACKAGE_SIZE = 272

TRAFFIC_IDENTIFIER = "BufferTraffic"

VERIFY = False


class _RecordingRegion(ctypes.LittleEndianStructure):
    """
    Recording Region data
    """
    _fields_ = [
        # Space available for recording
        ("space", ctypes.c_uint32),
        # The size of the recording region
        ("size", ctypes.c_uint32, 31),
        # Whether any data is missing
        ("missing", ctypes.c_uint32, 1),
        # The address of the data
        ("data", ctypes.c_uint32)
    ]


[docs]class BufferManager(object): """ Manager of send buffers. """ __slots__ = [ # Set of vertices with buffers to be sent "_sender_vertices", # Dictionary of sender vertex -> buffers sent "_sent_messages", # Support class to help call Java "_java_caller", # The machine controller, in case it wants to make proxied connections # for us "_machine_controller" ] def __init__(self): # Set of vertices with buffers to be sent self._sender_vertices = set() # Dictionary of sender vertex -> buffers sent self._sent_messages = dict() if FecDataView.has_java_caller(): with BufferDatabase() as db: db.write_session_credentials_to_db() self._java_caller = FecDataView.get_java_caller() if get_config_bool("Machine", "enable_advanced_monitor_support"): self._java_caller.set_advanced_monitors() else: self._java_caller = None for placement in FecDataView.iterate_placements_by_vertex_type( AbstractSendsBuffersFromHost): vertex = placement.vertex if vertex.buffering_input(): self._sender_vertices.add(vertex) def _request_data(self, placement_x, placement_y, address, length): """ Uses the extra monitor cores for data extraction. :param int placement_x: the placement x coord where data is to be extracted from :param int placement_y: the placement y coord where data is to be extracted from :param int address: the memory address to start at :param int length: the number of bytes to extract :return: data as a byte array :rtype: bytearray """ # pylint: disable=too-many-arguments if not get_config_bool("Machine", "enable_advanced_monitor_support"): return FecDataView.read_memory( placement_x, placement_y, address, length) # Round to word boundaries initial = address % BYTES_PER_WORD address -= initial length += initial final = (BYTES_PER_WORD - (length % BYTES_PER_WORD)) % BYTES_PER_WORD length += final sender = FecDataView.get_monitor_by_xy(placement_x, placement_y) receiver = locate_extra_monitor_mc_receiver(placement_x, placement_y) extra_mon_data = receiver.get_data( sender, FecDataView.get_placement_of_vertex(sender), address, length) if VERIFY: txrx_data = FecDataView.read_memory( placement_x, placement_y, address, length) self._verify_data(extra_mon_data, txrx_data) # If we rounded to word boundaries, strip the padding junk if initial and final: return extra_mon_data[initial:-final] elif initial: return extra_mon_data[initial:] elif final: return extra_mon_data[:-final] else: return extra_mon_data def _verify_data(self, extra_mon_data, txrx_data): for index, (extra_mon_element, txrx_element) in enumerate( zip(extra_mon_data, txrx_data)): if extra_mon_element != txrx_element: raise ValueError(f"WRONG (at index {index})")
[docs] def load_initial_buffers(self): """ Load the initial buffers for the senders using memory writes. """ total_data = 0 for vertex in self._sender_vertices: for region in vertex.get_regions(): total_data += vertex.get_region_buffer_size(region) progress = ProgressBar(total_data, "Loading buffers") for vertex in self._sender_vertices: for region in vertex.get_regions(): self._send_initial_messages(vertex, region, progress) progress.end()
[docs] def reset(self): """ Resets the buffered regions to start transmitting from the beginning of its expected regions and clears the buffered out data files. """ with BufferDatabase() as db: db.write_session_credentials_to_db() # rewind buffered in for vertex in self._sender_vertices: for region in vertex.get_regions(): vertex.rewind(region)
[docs] def resume(self): """ Resets any data structures needed before starting running again. """
[docs] def clear_recorded_data(self, x, y, p, recording_region_id): """ Removes the recorded data stored in memory. :param int x: placement x coordinate :param int y: placement y coordinate :param int p: placement p coordinate :param int recording_region_id: the recording region ID """ with BufferDatabase() as db: db.clear_region(x, y, p, recording_region_id)
def _create_message_to_send(self, size, vertex, region): """ Creates a single message to send with the given boundaries. :param int size: The number of bytes available for the whole packet :param AbstractSendsBuffersFromHost vertex: The vertex to get the keys from :param int region: The region of the vertex to get keys from :return: A new message, or `None` if no keys can be added :rtype: None or ~spinnman.messages.eieio.data_messages.EIEIODataMessage """ # If there are no more messages to send, return None if not vertex.is_next_timestamp(region): return None # Create a new message next_timestamp = vertex.get_next_timestamp(region) message = EIEIODataMessage.create( EIEIOType.KEY_32_BIT, timestamp=next_timestamp) # If there is no room for the message, return None if message.size + _N_BYTES_PER_KEY > size: return None # Add keys up to the limit bytes_to_go = size - message.size while (bytes_to_go >= _N_BYTES_PER_KEY and vertex.is_next_key(region, next_timestamp)): key = vertex.get_next_key(region) message.add_key(key) bytes_to_go -= _N_BYTES_PER_KEY return message def _send_initial_messages(self, vertex, region, progress): """ Send the initial set of messages. :param AbstractSendsBuffersFromHost vertex: The vertex to get the keys from :param int region: The region to get the keys from :return: A list of messages :rtype: list(~spinnman.messages.eieio.data_messages.EIEIODataMessage) """ # Get the vertex load details # region_base_address = self._locate_region_address(region, vertex) placement = FecDataView.get_placement_of_vertex(vertex) region_base_address = locate_memory_region_for_placement( placement, region) # Add packets until out of space sent_message = False bytes_to_go = vertex.get_region_buffer_size(region) if bytes_to_go % 2 != 0: raise SpinnFrontEndException( f"The buffer region of {vertex} must be divisible by 2") all_data = b"" if vertex.is_empty(region): sent_message = True else: min_size_of_packet = _MIN_MESSAGE_SIZE while (vertex.is_next_timestamp(region) and bytes_to_go > min_size_of_packet): space_available = min(bytes_to_go, _SDP_MAX_PACKAGE_SIZE) next_message = self._create_message_to_send( space_available, vertex, region) if next_message is None: break # Write the message to the memory data = next_message.bytestring all_data += data sent_message = True # Update the positions bytes_to_go -= len(data) progress.update(len(data)) if not sent_message: raise BufferableRegionTooSmall( f"The buffer size {bytes_to_go} is too small for any data to " f"be added for region {region} of vertex {vertex}") # If there are no more messages and there is space, add a stop request if (not vertex.is_next_timestamp(region) and bytes_to_go >= EventStopRequest.get_min_packet_length()): data = EventStopRequest().bytestring # logger.debug( # "Writing stop message of {} bytes to {} on {}, {}, {}" # len(data), hex(region_base_address), # placement.x, placement.y, placement.p) all_data += data bytes_to_go -= len(data) progress.update(len(data)) self._sent_messages[vertex] = BuffersSentDeque( region, sent_stop_message=True) # Do the writing all at once for efficiency FecDataView.write_memory( placement.x, placement.y, region_base_address, all_data) def __get_recording_placements(self): """ :rtype: list(~.Placement) """ recording_placements = list() for placement in FecDataView.iterate_placements_by_vertex_type( AbstractReceiveBuffersToHost): recording_placements.append(placement) return recording_placements
[docs] def get_placement_data(self): if self._java_caller is not None: self.__get_data_for_placements_using_java() else: recording_placements = self.__get_recording_placements() if get_config_bool( "Machine", "enable_advanced_monitor_support"): self.__python_get_data_for_placements_with_monitors( recording_placements) else: self.__python_get_data_for_placements(recording_placements)
def __get_data_for_placements_using_java(self): logger.info("Starting buffer extraction using Java") self._java_caller.set_placements( FecDataView.iterate_placements_by_vertex_type( AbstractReceiveBuffersToHost)) self._java_caller.get_all_data() def __python_get_data_for_placements_with_monitors( self, recording_placements): """ :param ~pacman.model.placements.Placements recording_placements: Where to get the data from. """ # locate receivers receivers = list(OrderedSet( locate_extra_monitor_mc_receiver(placement.x, placement.y) for placement in recording_placements)) # update transaction id from the machine for all extra monitors for extra_mon in FecDataView.iterate_monitors(): extra_mon.update_transaction_id_from_machine() with StreamingContextManager(receivers): # get data self.__python_get_data_for_placements(recording_placements) def __python_get_data_for_placements(self, recording_placements): """ :param ~pacman.model.placements.Placements recording_placements: Where to get the data from. """ # get data progress = ProgressBar( len(recording_placements), "Extracting buffers from the last run") with BufferDatabase() as db: for placement in progress.over(recording_placements): self._retreive_by_placement(db, placement)
[docs] def get_data_by_placement(self, placement, recording_region_id): """ Get the data container for all the data retrieved during the simulation from a specific region area of a core. :param ~pacman.model.placements.Placement placement: the placement to get the data from :param int recording_region_id: desired recording data region :return: an array contained all the data received during the simulation, and a flag indicating if any data was missing :rtype: tuple(bytearray, bool) """ # Ensure that any transfers in progress are complete first if not isinstance(placement.vertex, AbstractReceiveBuffersToHost): raise NotImplementedError( f"vertex {placement.vertex} does not implement " "AbstractReceiveBuffersToHost so no data read") # data flush has been completed - return appropriate data with BufferDatabase() as db: return db.get_region_data( placement.x, placement.y, placement.p, recording_region_id)
def _retreive_by_placement(self, db, placement): """ Retrieve the data for a vertex; must be locked first. :param db BufferDatabase: dtabase to store into :param ~pacman.model.placements.Placement placement: the placement to get the data from :param int recording_region_id: desired recording data region """ vertex = placement.vertex addr = vertex.get_recording_region_base_address(placement) sizes_and_addresses = self._get_region_information( addr, placement.x, placement.y) # Read the data if not already received for region in vertex.get_recorded_region_ids(): # Now read the data and store it size, addr, missing = sizes_and_addresses[region] data = self._request_data( placement.x, placement.y, addr, size) db.store_data_in_region_buffer( placement.x, placement.y, placement.p, region, missing, data) def _get_region_information(self, addr, x, y): """ Get the recording information from all regions of a core. :param addr: The recording region base address :param x: The x-coordinate of the chip containing the data :param y: The y-coordinate of the chip containing the data """ transceiver = FecDataView.get_transceiver() n_regions = transceiver.read_word(x, y, addr) n_bytes = get_recording_header_size(n_regions) data = transceiver.read_memory( x, y, addr + BYTES_PER_WORD, n_bytes - BYTES_PER_WORD) data_type = _RecordingRegion * n_regions regions = data_type.from_buffer_copy(data) sizes_and_addresses = [ (r.size, r.data, bool(r.missing)) for r in regions] return sizes_and_addresses @property def sender_vertices(self): """ The vertices which are buffered. :rtype: iterable(AbstractSendsBuffersFromHost) """ return self._sender_vertices