Source code for spinn_front_end_common.utility_models.extra_monitor_support_machine_vertex

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum, IntEnum
import logging
import struct
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spinn_machine import CoreSubsets, Router
from pacman.model.graphs.machine import MachineVertex
from pacman.model.resources import ConstantSDRAM
from spinn_utilities.config_holder import get_config_bool
from spinn_front_end_common.abstract_models import (
    AbstractHasAssociatedBinary, AbstractGeneratesDataSpecification)
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.utility_objs import ExecutableType
from spinn_front_end_common.utilities.utility_objs.\
    extra_monitor_scp_processes import (
        ReadStatusProcess, ResetCountersProcess, SetPacketTypesProcess,
        SetRouterTimeoutProcess, ClearQueueProcess,
        LoadApplicationMCRoutesProcess, LoadSystemMCRoutesProcess)
from spinn_front_end_common.utilities.constants import (
    SARK_PER_MALLOC_SDRAM_USAGE, DATA_SPECABLE_BASIC_SETUP_INFO_N_BYTES,
    BYTES_PER_WORD, BYTES_PER_KB)
from spinn_front_end_common.utilities.helpful_functions import (
    convert_vertices_to_core_subset, get_region_base_address_offset)
from spinn_front_end_common.utilities.emergency_recovery import (
    emergency_recover_state_from_failure)
from .data_speed_up_packet_gatherer_machine_vertex import (
    DataSpeedUpPacketGatherMachineVertex as
    Gatherer)
from spinn_front_end_common.interface.provenance import (
    AbstractProvidesProvenanceDataFromMachine, ProvenanceWriter)

log = FormatAdapter(logging.getLogger(__name__))

_CONFIG_REGION_REINJECTOR_SIZE_IN_BYTES = 5 * BYTES_PER_WORD
#: 1.new seq key, 2.first data key, 3. transaction id key 4.end flag key,
# 5.base key
_CONFIG_DATA_SPEED_UP_SIZE_IN_BYTES = 5 * BYTES_PER_WORD
_CONFIG_MAX_EXTRA_SEQ_NUM_SIZE_IN_BYTES = 460 * BYTES_PER_KB
_CONFIG_DATA_IN_KEYS_SDRAM_IN_BYTES = 3 * BYTES_PER_WORD
_MAX_DATA_SIZE_FOR_DATA_IN_MULTICAST_ROUTING = ((49 * 3) + 1) * BYTES_PER_WORD
_BIT_SHIFT_TO_MOVE_APP_ID = 24

_ONE_WORD = struct.Struct("<I")
# typedef struct extra_monitor_provenance_t {
#     uint n_sdp_packets;
#     uint n_in_streams;
#     uint n_out_streams;
#     uint n_router_changes;
# } extra_monitor_provenance_t;
_PROVENANCE_FORMAT = struct.Struct("<IIII")

# cap for stopping wrap arounds
TRANSACTION_ID_CAP = 0xFFFFFFFF

# SDRAM requirement for containing router table entries
# 16 bytes per entry:
# 4 for a key, 4 for mask,
# 4 for word alignment for 18 cores and 6 links
# (24 bits, for word aligning)
_SDRAM_FOR_ROUTER_TABLE_ENTRIES = 1024 * 4 * BYTES_PER_WORD


class _DSG_REGIONS(IntEnum):
    REINJECT_CONFIG = 0
    DATA_OUT_CONFIG = 1
    DATA_IN_CONFIG = 2
    PROVENANCE_AREA = 3


class _KEY_OFFSETS(Enum):
    ADDRESS_KEY_OFFSET = 0
    DATA_KEY_OFFSET = 1
    BOUNDARY_KEY_OFFSET = 2


[docs]class ExtraMonitorSupportMachineVertex( MachineVertex, AbstractHasAssociatedBinary, AbstractGeneratesDataSpecification, AbstractProvidesProvenanceDataFromMachine): """ Machine vertex for talking to extra monitor cores. Supports reinjection control and the faster data transfer protocols. Usually deployed once per chip. .. note:: This is an unusual machine vertex, in that it has no associated application vertex. """ __slots__ = ( # if we reinject multicast packets "_reinject_multicast", # if we reinject point to point packets "_reinject_point_to_point", # if we reinject nearest neighbour packets "_reinject_nearest_neighbour", # if we reinject fixed route packets "_reinject_fixed_route", # placement holder for ease of access "_placement", # app id, used for reporting failures on system core RTE "_app_id", # the local transaction id "_transaction_id", # provenance region address "_prov_region" ) def __init__( self, reinject_point_to_point=False, reinject_nearest_neighbour=False, reinject_fixed_route=False): """ :param bool reinject_point_to_point: if we reinject point-to-point packets :param bool reinject_nearest_neighbour: if we reinject nearest-neighbour packets :param bool reinject_fixed_route: if we reinject fixed route packets """ # pylint: disable=too-many-arguments super().__init__( label="SYSTEM:ExtraMonitor", app_vertex=None) self._reinject_multicast = get_config_bool( "Machine", "enable_reinjection") self._reinject_point_to_point = reinject_point_to_point self._reinject_nearest_neighbour = reinject_nearest_neighbour self._reinject_fixed_route = reinject_fixed_route # placement holder for ease of access self._placement = None self._app_id = None self._transaction_id = 0 self._prov_region = None @property def reinject_multicast(self): """ :rtype: bool """ return self._reinject_multicast @property def transaction_id(self): return self._transaction_id
[docs] def update_transaction_id(self): self._transaction_id = (self._transaction_id + 1) & TRANSACTION_ID_CAP
[docs] def update_transaction_id_from_machine(self): """ Looks up from the machine what the current transaction id is and updates the extra monitor. """ self._transaction_id = FecDataView.get_transceiver().read_user_1( self._placement.x, self._placement.y, self._placement.p)
@property def reinject_point_to_point(self): """ :rtype: bool """ return self._reinject_point_to_point @property def reinject_nearest_neighbour(self): """ :rtype: bool """ return self._reinject_nearest_neighbour @property def reinject_fixed_route(self): """ :rtype: bool """ return self._reinject_fixed_route @property @overrides(MachineVertex.sdram_required) def sdram_required(self): return ConstantSDRAM( _CONFIG_REGION_REINJECTOR_SIZE_IN_BYTES + _CONFIG_DATA_SPEED_UP_SIZE_IN_BYTES + _CONFIG_MAX_EXTRA_SEQ_NUM_SIZE_IN_BYTES + # Data spec size DATA_SPECABLE_BASIC_SETUP_INFO_N_BYTES + # One malloc for extra sequence numbers SARK_PER_MALLOC_SDRAM_USAGE + _MAX_DATA_SIZE_FOR_DATA_IN_MULTICAST_ROUTING + _SDRAM_FOR_ROUTER_TABLE_ENTRIES + _CONFIG_DATA_IN_KEYS_SDRAM_IN_BYTES) @property def placement(self): """ :rtype: ~pacman.model.placements.Placement """ return self._placement
[docs] @overrides(AbstractHasAssociatedBinary.get_binary_start_type) def get_binary_start_type(self): return self.static_get_binary_start_type()
[docs] @staticmethod def static_get_binary_start_type(): """ The type of the binary implementing this vertex. :rtype: ExecutableType """ return ExecutableType.SYSTEM
[docs] @overrides(AbstractHasAssociatedBinary.get_binary_file_name) def get_binary_file_name(self): return self.static_get_binary_file_name()
[docs] @staticmethod def static_get_binary_file_name(): """ The name of the binary implementing this vertex. :rtype: str """ return "extra_monitor_support.aplx"
[docs] @overrides(AbstractGeneratesDataSpecification.generate_data_specification) def generate_data_specification(self, spec, placement): # storing for future usage self._placement = placement self._app_id = FecDataView.get_app_id() # write reinjection config self._generate_reinjection_config(spec, placement) # write data speed up out config self._generate_data_speed_up_out_config(spec) # write data speed up in config self._generate_data_speed_up_in_config( spec, FecDataView().get_chip_at(placement.x, placement.y)) self._generate_provenance_area(spec) spec.end_specification()
def _generate_data_speed_up_out_config(self, spec): """ :param ~.DataSpecificationGenerator spec: spec file """ spec.reserve_memory_region( region=_DSG_REGIONS.DATA_OUT_CONFIG, size=_CONFIG_DATA_SPEED_UP_SIZE_IN_BYTES, label="data speed-up out config region") spec.switch_write_focus(_DSG_REGIONS.DATA_OUT_CONFIG) spec.write_value(Gatherer.BASE_KEY) spec.write_value(Gatherer.NEW_SEQ_KEY) spec.write_value(Gatherer.FIRST_DATA_KEY) spec.write_value(Gatherer.TRANSACTION_ID_KEY) spec.write_value(Gatherer.END_FLAG_KEY) def _generate_reinjection_config(self, spec, placement): """ :param ~.DataSpecificationGenerator spec: spec file :param ~.Placement placement: """ spec.reserve_memory_region( region=_DSG_REGIONS.REINJECT_CONFIG, size=_CONFIG_REGION_REINJECTOR_SIZE_IN_BYTES, label="re-injection config region") spec.switch_write_focus(_DSG_REGIONS.REINJECT_CONFIG) for value in [ self._reinject_multicast, self._reinject_point_to_point, self._reinject_fixed_route, self._reinject_nearest_neighbour]: # Note that this is inverted! Why... I dunno! spec.write_value(int(not value)) # add the reinjection mc interface router_timeout_keys = \ FecDataView.get_system_multicast_router_timeout_keys() chip = FecDataView().get_chip_at(placement.x, placement.y) # pylint: disable=unsubscriptable-object reinjector_base_mc_key = ( router_timeout_keys[ (chip.nearest_ethernet_x, chip.nearest_ethernet_y)]) spec.write_value(reinjector_base_mc_key) def _generate_data_speed_up_in_config(self, spec, chip): """ :param ~.DataSpecificationGenerator spec: spec file :param ~.Chip chip: the chip where this monitor will run """ spec.reserve_memory_region( region=_DSG_REGIONS.DATA_IN_CONFIG, size=(_MAX_DATA_SIZE_FOR_DATA_IN_MULTICAST_ROUTING + _CONFIG_DATA_IN_KEYS_SDRAM_IN_BYTES), label="data speed-up in config region") spec.switch_write_focus(_DSG_REGIONS.DATA_IN_CONFIG) # write address key and data key mc_data_chips_to_keys = \ FecDataView.get_data_in_multicast_key_to_chip_map() # pylint: disable=unsubscriptable-object base_key = mc_data_chips_to_keys[chip.x, chip.y] spec.write_value(base_key + _KEY_OFFSETS.ADDRESS_KEY_OFFSET.value) spec.write_value(base_key + _KEY_OFFSETS.DATA_KEY_OFFSET.value) spec.write_value(base_key + _KEY_OFFSETS.BOUNDARY_KEY_OFFSET.value) # write table entries data_in_routing_tables = \ FecDataView.get_data_in_multicast_routing_tables() table = data_in_routing_tables.get_routing_table_for_chip( chip.x, chip.y) spec.write_value(table.number_of_entries) for entry in table.multicast_routing_entries: spec.write_value(entry.routing_entry_key) spec.write_value(entry.mask) spec.write_value(self.__encode_route(entry)) def __encode_route(self, entry): """ :param ~spinn_machine.MulticastRoutingEntry entry: :rtype: int """ route = self._app_id << _BIT_SHIFT_TO_MOVE_APP_ID route |= Router.convert_routing_table_entry_to_spinnaker_route(entry) return route def _generate_provenance_area(self, spec): """ :param ~.DataSpecificationGenerator spec: spec file """ spec.reserve_memory_region( region=_DSG_REGIONS.PROVENANCE_AREA, size=_PROVENANCE_FORMAT.size, label="provenance collection region", empty=True) def __get_provenance_region_address(self, txrx, place): """ :param ~spinnman.transceiver.Transceiver txrx: :param ~pacman.model.placements.Placement place: :rtype: int """ if self._prov_region is None: region_table_addr = txrx.get_cpu_information_from_core( place.x, place.y, place.p).user[0] region_entry_addr = get_region_base_address_offset( region_table_addr, _DSG_REGIONS.PROVENANCE_AREA) self._prov_region, = _ONE_WORD.unpack(txrx.read_memory( place.x, place.y, region_entry_addr, BYTES_PER_WORD)) return self._prov_region
[docs] @overrides(AbstractProvidesProvenanceDataFromMachine. get_provenance_data_from_machine) def get_provenance_data_from_machine(self, placement): # No standard provenance region, so no standard provenance data # But we do have our own. transceiver = FecDataView.get_transceiver() provenance_address = self.__get_provenance_region_address( transceiver, placement) data = transceiver.read_memory( placement.x, placement.y, provenance_address, _PROVENANCE_FORMAT.size) (n_sdp_packets, n_in_streams, n_out_streams, n_router_changes) = \ _PROVENANCE_FORMAT.unpack_from(data) with ProvenanceWriter() as db: db.insert_monitor( placement.x, placement.y, "Number_of_Router_Configuration_Changes", n_router_changes) db.insert_monitor( placement.x, placement.y, "Number_of_Relevant_SDP_Messages", n_sdp_packets) db.insert_monitor( placement.x, placement.y, "Number_of_Input_Streamlets", n_in_streams) db.insert_monitor( placement.x, placement.y, "Number_of_Output_Streamlets", n_out_streams)
[docs] def set_router_wait1_timeout( self, timeout, extra_monitor_cores_to_set): """ Supports setting of the router time outs for a set of chips via their extra monitor cores. This sets the timeout for the time between when a packet arrives and when it starts to be emergency routed. (Actual emergency routing is disabled by default.) :param tuple(int,int) timeout: The mantissa and exponent of the timeout value, each between 0 and 15 :param extra_monitor_cores_to_set: which monitors control the routers to set the timeout of :type extra_monitor_cores_to_set: iterable(ExtraMonitorSupportMachineVertex) """ mantissa, exponent = timeout core_subsets = convert_vertices_to_core_subset( extra_monitor_cores_to_set) process = SetRouterTimeoutProcess( FecDataView.get_scamp_connection_selector()) try: process.set_wait1_timeout(mantissa, exponent, core_subsets) except: # noqa: E722 emergency_recover_state_from_failure( self, FecDataView.get_placement_of_vertex(self)) raise
[docs] def set_router_wait2_timeout( self, timeout, extra_monitor_cores_to_set): """ Supports setting of the router time outs for a set of chips via their extra monitor cores. This sets the timeout for the time between when a packet starts to be emergency routed and when it is dropped. (Actual emergency routing is disabled by default.) :param tuple(int,int) timeout: The mantissa and exponent of the timeout value, each between 0 and 15 :param extra_monitor_cores_to_set: which monitors control the routers to set the timeout of :type extra_monitor_cores_to_set: iterable(ExtraMonitorSupportMachineVertex) """ mantissa, exponent = timeout core_subsets = convert_vertices_to_core_subset( extra_monitor_cores_to_set) process = SetRouterTimeoutProcess( FecDataView.get_scamp_connection_selector()) try: process.set_wait2_timeout(mantissa, exponent, core_subsets) except: # noqa: E722 emergency_recover_state_from_failure( self, FecDataView.get_placement_of_vertex(self)) raise
[docs] def reset_reinjection_counters(self, extra_monitor_cores_to_set): """ Resets the counters for reinjection. :param ~spinnman.transceiver.Transceiver transceiver: the spinnMan interface :param extra_monitor_cores_to_set: which monitors control the routers to reset the counters of :type extra_monitor_cores_to_set: iterable(ExtraMonitorSupportMachineVertex) """ core_subsets = convert_vertices_to_core_subset( extra_monitor_cores_to_set) process = ResetCountersProcess( FecDataView.get_scamp_connection_selector()) try: process.reset_counters(core_subsets) except: # noqa: E722 emergency_recover_state_from_failure( self, FecDataView.get_placement_of_vertex(self)) raise
[docs] def clear_reinjection_queue(self, extra_monitor_cores_to_set): """ Clears the queues for reinjection. :param extra_monitor_cores_to_set: Which extra monitors need to clear their queues. :type extra_monitor_cores_to_set: iterable(ExtraMonitorSupportMachineVertex) """ core_subsets = convert_vertices_to_core_subset( extra_monitor_cores_to_set) process = ClearQueueProcess( FecDataView.get_scamp_connection_selector()) try: process.reset_counters(core_subsets) except: # noqa: E722 emergency_recover_state_from_failure( self, FecDataView.get_placement_of_vertex(self)) raise
[docs] def get_reinjection_status(self): """ Get the reinjection status from this extra monitor vertex. :return: the reinjection status for this vertex :rtype: ReInjectionStatus """ placement = FecDataView.get_placement_of_vertex(self) process = ReadStatusProcess( FecDataView.get_scamp_connection_selector()) try: return process.get_reinjection_status( placement.x, placement.y, placement.p) except: # noqa: E722 emergency_recover_state_from_failure(self, placement) raise
[docs] def get_reinjection_status_for_vertices(self): """ Get the reinjection status from a set of extra monitor cores. :rtype: dict(tuple(int,int), ReInjectionStatus) """ core_subsets = convert_vertices_to_core_subset( FecDataView.iterate_monitors()) process = ReadStatusProcess( FecDataView.get_scamp_connection_selector()) return process.get_reinjection_status_for_core_subsets(core_subsets)
[docs] def set_reinjection_packets( self, point_to_point=None, multicast=None, nearest_neighbour=None, fixed_route=None): """ :param point_to_point: If point to point should be set, or `None` if left as before :type point_to_point: bool or None :param multicast: If multicast should be set, or `None` if left as before :type multicast: bool or None :param nearest_neighbour: If nearest neighbour should be set, or `None` if left as before :type nearest_neighbour: bool or None :param fixed_route: If fixed route should be set, or `None` if left as before. :type fixed_route: bool or None """ # pylint: disable=too-many-arguments if multicast is not None: self._reinject_multicast = multicast if point_to_point is not None: self._reinject_point_to_point = point_to_point if nearest_neighbour is not None: self._reinject_nearest_neighbour = nearest_neighbour if fixed_route is not None: self._reinject_fixed_route = fixed_route core_subsets = convert_vertices_to_core_subset( FecDataView.iterate_monitors()) process = SetPacketTypesProcess( FecDataView.get_scamp_connection_selector()) try: process.set_packet_types( core_subsets, self._reinject_point_to_point, self._reinject_multicast, self._reinject_nearest_neighbour, self._reinject_fixed_route) except: # noqa: E722 emergency_recover_state_from_failure( self, FecDataView.get_placement_of_vertex(self)) raise
[docs] def load_system_mc_routes(self): """ Get the extra monitor cores to load up the system-based multicast routes (used by the Data In protocol). :param ~spinnman.transceiver.Transceiver transceiver: the spinnMan interface """ core_subsets = self._convert_vertices_to_core_subset() process = LoadSystemMCRoutesProcess( FecDataView.get_scamp_connection_selector()) try: return process.load_system_mc_routes(core_subsets) except: # noqa: E722 emergency_recover_state_from_failure( self, FecDataView.get_placement_of_vertex(self)) raise
[docs] def load_application_mc_routes(self): """ Get the extra monitor cores to load up the application-based multicast routes (used by the Data In protocol). """ core_subsets = self._convert_vertices_to_core_subset() process = LoadApplicationMCRoutesProcess( FecDataView.get_scamp_connection_selector()) try: return process.load_application_mc_routes(core_subsets) except: # noqa: E722 emergency_recover_state_from_failure( self, FecDataView.get_placement_of_vertex(self)) raise
@staticmethod def _convert_vertices_to_core_subset(): """ Convert vertices into the subset of cores where they've been placed. :return: where the vertices have been placed :rtype: ~.CoreSubsets """ core_subsets = CoreSubsets() for vertex in FecDataView.iterate_monitors(): placement = FecDataView.get_placement_of_vertex(vertex) core_subsets.add_processor(placement.x, placement.y, placement.p) return core_subsets