Source code for spinn_front_end_common.interface.interface_functions.host_bit_field_router_compressor

# Copyright (c) 2019 The University of Manchester
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import math
import os
import struct
from collections import defaultdict
from spinn_utilities.config_holder import get_config_bool
from spinn_utilities.find_max_success import find_max_success
from spinn_utilities.progress_bar import ProgressBar
from spinn_machine import Machine, MulticastRoutingEntry
from pacman.exceptions import (
    PacmanElementAllocationException, MinimisationFailedError)
from pacman.model.routing_tables import (
    MulticastRoutingTables, UnCompressedMulticastRoutingTable,
from pacman.utilities.algorithm_utilities.routes_format import format_route
from spinn_front_end_common.abstract_models import (
from import FecDataView
from spinn_front_end_common.utilities.helpful_functions import n_word_struct
from spinn_front_end_common.utilities.constants import (
from spinn_front_end_common.utilities.report_functions.\
    bit_field_compressor_report import (
from pacman.operations.router_compressors.pair_compressor import (

_REPORT_FOLDER_NAME = "router_compressor_with_bitfield"

[docs]def host_based_bit_field_router_compressor(): """ Entry point when using the PACMANAlgorithmExecutor. :return: compressed routing table entries :rtype: ~pacman.model.routing_tables.MulticastRoutingTables """ routing_tables = FecDataView.get_uncompressed().routing_tables # create progress bar progress = ProgressBar( len(routing_tables) * 2, "Compressing routing Tables with bitfields in host") # create report if get_config_bool( "Reports", "write_router_compression_with_bitfield_report"): report_folder_path = generate_report_path() else: report_folder_path = None # compressed router table compressed_pacman_router_tables = MulticastRoutingTables() key_atom_map = generate_key_to_atom_map() most_costly_cores = defaultdict(lambda: defaultdict(int)) for partition in FecDataView.iterate_partitions(): for edge in partition.edges: splitter = edge.post_vertex.splitter for vertex, _ in splitter.get_source_specific_in_coming_vertices( partition.pre_vertex, partition.identifier): place = FecDataView.get_placement_of_vertex(vertex) most_costly_cores[place.xy][place.p] += 1 # start the routing table choice conversion for router_table in progress.over(routing_tables): start_compression_selection_process( router_table, report_folder_path, most_costly_cores, compressed_pacman_router_tables, key_atom_map) # return compressed tables return compressed_pacman_router_tables
def generate_report_path(): """ :rtype: str """ report_folder_path = os.path.join( FecDataView.get_run_dir_path(), _REPORT_FOLDER_NAME) if not os.path.exists(report_folder_path): os.mkdir(report_folder_path) return report_folder_path def generate_key_to_atom_map(): """ *THIS IS NEEDED* due to the link from key to edge being lost. :return: key to atom map based of key to n atoms :rtype: dict(int,int) """ # build key to n atoms map routing_infos = FecDataView.get_routing_infos() key_to_n_atoms_map = dict() for partition in FecDataView.iterate_partitions(): for vertex in partition.pre_vertex.splitter.get_out_going_vertices( partition.identifier): key = routing_infos.get_first_key_from_pre_vertex( vertex, partition.identifier) key_to_n_atoms_map[key] = vertex.vertex_slice.n_atoms return key_to_n_atoms_map def start_compression_selection_process( router_table, report_folder_path, most_costly_cores, compressed_pacman_router_tables, key_atom_map): """ Entrance method for doing on host compression. Can be used as a public method for other compressors. :param router_table: the routing table in question to compress :type router_table: ~pacman.model.routing_tables.UnCompressedMulticastRoutingTable :param report_folder_path: the report folder base address :type report_folder_path: str or None :param dict(dict(int)) most_costly_cores: Map of chip x, y to processors to count of incoming on processor :param dict(int,int) key_atom_map: key to atoms map should be allowed to handle per time step """ compressor = _HostBasedBitFieldRouterCompressor() # pylint: disable=protected-access compressor._run( router_table, report_folder_path, most_costly_cores, compressed_pacman_router_tables, key_atom_map) class _BitFieldData(object): __slots__ = [ # bit_field data "bit_field", # Key this applies to "master_pop_key", # address of n_atoms word to wrote back merged flags "n_atoms_address", # Word that holds merged: 1; all_ones: 1; n_atoms: 30; "n_atoms_word", # P cooridnate of processor this applies to "processor_id", # index of this entry in the "sorted_list" "sort_index", # The shift to get the core id "core_shift", # The number of atoms per core "n_atoms_per_core" ] def __init__(self, processor_id, bit_field, master_pop_key, n_atoms_address, n_atoms_word, core_shift, n_atoms_per_core): self.processor_id = processor_id self.bit_field = bit_field self.master_pop_key = master_pop_key self.n_atoms_address = n_atoms_address self.n_atoms_word = n_atoms_word self.core_shift = core_shift self.n_atoms_per_core = n_atoms_per_core self.sort_index = None def __str__(self): return f"{self.processor_id} {self.master_pop_key} {self.bit_field}" def bit_field_as_bit_array(self): """ Convert the bitfield into an array of bits like bit_field.c """ return [(word >> i) & 1 for word in self.bit_field for i in range(32)] class _HostBasedBitFieldRouterCompressor(object): """ Host-based fancy router compressor using the bitfield filters of the cores. Compresses bitfields and router table entries together as much as feasible. """ __slots__ = [ # List of the entries from the highest successful midpoint "_best_routing_entries", # The highest successful midpoint "_best_midpoint", # Dict of lists of the original bitfields by key "_bit_fields_by_key", # Record of which midpoints where tired and with what result "_compression_attempts", # Number of bitfields for this core "_n_bitfields" ] # max entries that can be used by the application code _MAX_SUPPORTED_LENGTH = 1023 # the amount of time each attempt at router compression can be allowed to # take (in seconds) # _DEFAULT_TIME_PER_ITERATION = 5 * 60 _DEFAULT_TIME_PER_ITERATION = 10 # report name _REPORT_NAME = "router_{}_{}.rpt" # key id for the initial entry _ORIGINAL_ENTRY = 0 # key id for the bitfield entries _ENTRIES = 1 # size of a filter info in bytes (key, n words, pointer) _SIZE_OF_FILTER_INFO_IN_BYTES = 12 # bit to mask a bit _BIT_MASK = 1 # mask for neuron level _NEURON_LEVEL_MASK = 0xFFFFFFFF # to remove the first merged: 1; and all_ones: 1; N_ATOMS_MASK = 0x3FFFFFFF MERGED_SETTER = 0x80000000 # struct for performance requirements. _FOUR_WORDS = struct.Struct("<IIII") # for router report _LOWER_16_BITS = 0xFFFF # rob paul to pay sam threshold starting point at 1ms time step _N_PACKETS_PER_SECOND = 100000 # convert between milliseconds and second _MS_TO_SEC = 1000 # fraction effect for threshold _THRESHOLD_FRACTION_EFFECT = 2 # Number of bits per word as an int _BITS_PER_WORD = 32 def __init__(self): self._best_routing_entries = None self._best_midpoint = -1 self._bit_fields_by_key = None self._compression_attempts = dict() self._n_bitfields = None def get_bit_field_sdram_base_addresses(self, chip_x, chip_y): """ :param int chip_x: :param int chip_y: """ # pylint: disable=too-many-arguments, unused-argument # locate the bitfields in a chip level scope base_addresses = dict() for placement in FecDataView.iterate_placements_by_xy_and_type( chip_x, chip_y, AbstractSupportsBitFieldRoutingCompression): vertex = placement.vertex base_addresses[placement.p] = vertex.bit_field_base_address( FecDataView.get_placement_of_vertex(vertex)) return base_addresses def _run( self, router_table, report_folder_path, most_costly_cores, compressed_pacman_router_tables, key_atom_map): """ Entrance method for doing on host compression. Can be used as a public method for other compressors. :param router_table: the routing table in question to compress :type router_table: ~pacman.model.routing_tables.UnCompressedMulticastRoutingTable :param report_folder_path: the report folder base address :type report_folder_path: str or None :param dict(dict(int)) most_costly_cores: Map of chip x, y to processors to count of incoming on processor :param compressed_pacman_router_tables: a data holder for compressed tables :type compressed_pacman_router_tables: ~pacman.model.routing_tables.MulticastRoutingTables :param dict(int,int) key_atom_map: key to atoms map should be allowed to handle per time step """ # Find the processors that have bitfield data and where it is bit_field_chip_base_addresses = ( self.get_bit_field_sdram_base_addresses( router_table.x, router_table.y)) # read in bitfields. self._read_in_bit_fields( router_table.x, router_table.y, bit_field_chip_base_addresses, most_costly_cores) # execute binary search self._start_binary_search(router_table, key_atom_map) # add final to compressed tables: # self._best_routing_table is a list of entries best_router_table = CompressedMulticastRoutingTable( router_table.x, router_table.y) for entry in self._best_routing_entries: best_router_table.add_multicast_routing_entry( entry.to_MulticastRoutingEntry()) compressed_pacman_router_tables.add_routing_table(best_router_table) # remove bitfields from cores that have been merged into the # router table self._remove_merged_bitfields_from_cores( router_table.x, router_table.y) # create report file if required if report_folder_path: report_file_path = os.path.join( report_folder_path, self._REPORT_NAME.format(router_table.x, router_table.y)) with open(report_file_path, "w", encoding="utf-8") as report_out: self._create_table_report(router_table, report_out) generate_provenance_item( router_table.x, router_table.y, self._best_midpoint) def _convert_bitfields_into_router_table( self, router_table, mid_point, key_to_n_atoms_map): """ Converts the bitfield into router table entries for compression, based off the entry located in the original router table. :param router_table: the original routing table :type router_table: ~pacman.model.routing_tables.UnCompressedMulticastRoutingTable :param int mid_point: cutoff for botfields to use :param dict(int,int) key_to_n_atoms_map: :return: routing tables. :rtype: ~pacman.model.routing_tables.AbstractMulticastRoutingTable """ new_table = UnCompressedMulticastRoutingTable( router_table.x, router_table.y) # go through the routing tables and convert when needed for original_entry in router_table.multicast_routing_entries: base_key = original_entry.routing_entry_key if base_key not in self._bit_fields_by_key: continue n_neurons = key_to_n_atoms_map[base_key] core_map = dict() entry_links = original_entry.link_ids # Assume all neurons for each processor to be kept for processor_id in original_entry.processor_ids: core_map[processor_id] = [1] * n_neurons # For those below the midpoint use the bitfield data for bf_data in self._bit_fields_by_key[base_key]: if bf_data.sort_index < mid_point: core_map[bf_data.processor_id] = ( bf_data.bit_field_as_bit_array()) # Add an Entry for each neuron for neuron in range(0, n_neurons): processors = list() for processor_id in original_entry.processor_ids: if core_map[processor_id][neuron]: processors.append(processor_id) # build new entry for this neuron and add to table new_table.add_multicast_routing_entry(MulticastRoutingEntry( routing_entry_key=base_key + neuron, mask=self._NEURON_LEVEL_MASK, link_ids=entry_links, defaultable=False, processor_ids=processors)) # return the bitfield tables and the reduced original table return new_table def _bit_for_neuron_id(self, bit_field, neuron_id): """ Locate the bit for the neuron in the bitfield. :param list(int) bit_field: the block of words which represent the bitfield :param int neuron_id: the neuron id to find the bit in the bitfield :return: the bit """ word_id = int(neuron_id // self._BITS_PER_WORD) bit_in_word = neuron_id % self._BITS_PER_WORD flag = (bit_field[word_id] >> bit_in_word) & self._BIT_MASK return flag def _read_in_bit_fields( self, chip_x, chip_y, bit_field_chip_base_addresses, most_costly_cores): """ Read in the bitfields from the cores. :param int chip_x: chip x coord :param int chip_y: chip y coord :param dict(dict(int)) most_costly_cores: Map of chip x, y to processors to count of incoming on processor :param dict(int,int) bit_field_chip_base_addresses: maps core id to base address :return: dict of lists of processor id to bitfields. :rtype: tuple(dict(int,list(_BitFieldData)), list(_BitFieldData)) """ # data holder self._bit_fields_by_key = defaultdict(list) bit_fields_by_coverage = defaultdict(list) processor_coverage_by_bitfield = defaultdict(list) # read in for each app vertex that would have a bitfield for processor_id in bit_field_chip_base_addresses.keys(): bit_field_base_address = ( bit_field_chip_base_addresses[processor_id]) # from filter_region_t read how many bitfields there are # n_filters then array of filters n_filters = FecDataView.get_transceiver().read_word( chip_x, chip_y, bit_field_base_address, BYTES_PER_WORD) reading_address = bit_field_base_address + BYTES_PER_WORD # read in each bitfield for _ in range(0, n_filters): master_pop_key, n_atoms_word, n_per_core_word, read_pointer = \ self._FOUR_WORDS.unpack(FecDataView.read_memory( chip_x, chip_y, reading_address, BYTES_PER_4_WORDS)) n_atoms_address = reading_address + BYTES_PER_WORD reading_address += BYTES_PER_4_WORDS # merged: 1; all_ones: 1; n_atoms: 30; atoms = n_atoms_word & self.N_ATOMS_MASK # get bitfield words n_words_to_read = math.ceil(atoms / self._BITS_PER_WORD) bit_field = n_word_struct(n_words_to_read).unpack( FecDataView.read_memory( chip_x, chip_y, read_pointer, n_words_to_read * BYTES_PER_WORD)) # sorted by best coverage of redundant packets data = _BitFieldData( processor_id, bit_field, master_pop_key, n_atoms_address, n_atoms_word, n_per_core_word & 0x1F, n_per_core_word >> 5) as_array = data.bit_field_as_bit_array() # Number of fields in the array that are zero instead of one n_redundant_packets = len(as_array) - sum(as_array) bit_fields_by_coverage[n_redundant_packets].append(data) processor_coverage_by_bitfield[processor_id].append( n_redundant_packets) # add to the bitfields tracker self._bit_fields_by_key[master_pop_key].append(data) # use the ordered process to find the best ones to do first self._order_bit_fields( bit_fields_by_coverage, most_costly_cores, chip_x, chip_y, processor_coverage_by_bitfield) def _order_bit_fields( self, bit_fields_by_coverage, most_costly_cores, chip_x, chip_y, processor_coverage_by_bitfield): """ Order the bit fields by redundancy setting the sorted index. Also counts the bitfields setting _n_bitfields. :param dict(int,list(_BitFieldData)) bit_fields_by_coverage: :param dict(dict(int)) most_costly_cores: Map of chip x, y to processors to count of incoming on processor :param int chip_x: :param int chip_y: :param dict(int,list(int)) processor_coverage_by_bitfield: """ sort_index = 0 # get cores that are the most likely to have the worst time and order # bitfields accordingly cores = list(most_costly_cores[chip_x, chip_y].keys()) cores = sorted( cores, key=lambda k: most_costly_cores[chip_x, chip_y][k], reverse=True) # only add bit fields from the worst affected cores, which will # build as more and more are taken to make the worst a collection # instead of a individual cores_to_add_for = list() for worst_core_id in range(0, len(cores) - 1): # determine how many of the worst to add before it balances with # next one cores_to_add_for.append(cores[worst_core_id]) diff = ( most_costly_cores[chip_x, chip_y][cores[worst_core_id]] - most_costly_cores[chip_x, chip_y][cores[worst_core_id + 1]]) # order over most effective bitfields coverage = processor_coverage_by_bitfield[cores[worst_core_id]] coverage.sort(reverse=True) # cycle till at least the diff is covered covered = 0 for redundant_packet_count in coverage: to_delete = list() for bit_field_data in bit_fields_by_coverage[ redundant_packet_count]: if bit_field_data.processor_id in cores_to_add_for: if covered < diff: bit_field_data.sort_index = sort_index sort_index += 1 to_delete.append(bit_field_data) covered += 1 for bit_field_data in to_delete: bit_fields_by_coverage[redundant_packet_count].remove( bit_field_data) # take left overs coverage_levels = list(bit_fields_by_coverage.keys()) coverage_levels.sort(reverse=True) for coverage_level in coverage_levels: for bit_field_data in bit_fields_by_coverage[coverage_level]: bit_field_data.sort_index = sort_index sort_index += 1 self._n_bitfields = sort_index def _start_binary_search( self, router_table, key_atom_map): """ Start binary search of the merging of bitfield to router table. :param ~.UnCompressedMulticastRoutingTable router_table: uncompressed router table :param dict(int,int) key_atom_map: map from key to atoms """ # try first just uncompressed. so see if its possible try: self._best_routing_entries = self._run_algorithm(router_table) self._best_midpoint = 0 self._compression_attempts[0] = "succcess" except MinimisationFailedError as e: raise PacmanAlgorithmFailedToGenerateOutputsException( "host bitfield router compressor can't compress the " "uncompressed routing tables, regardless of bitfield merging. " "System is fundamentally flawed here") from e find_max_success(self._n_bitfields, functools.partial( self._binary_search_check, routing_table=router_table, key_to_n_atoms_map=key_atom_map)) def _binary_search_check( self, mid_point, routing_table, key_to_n_atoms_map): """ Check function for fix max success. :param int mid_point: the point if the list to stop at :param ~.UnCompressedMulticastRoutingTable routing_table: the basic routing table :param dict(int,int) key_to_n_atoms_map: :return: true if it compresses :rtype: bool """ # convert bitfields into router tables bit_field_router_table = self._convert_bitfields_into_router_table( routing_table, mid_point, key_to_n_atoms_map) # try to compress try: self._best_routing_entries = self._run_algorithm( bit_field_router_table) self._best_midpoint = mid_point self._compression_attempts[mid_point] = "succcess" return True except MinimisationFailedError: self._compression_attempts[mid_point] = "fail" return False except PacmanElementAllocationException: self._compression_attempts[mid_point] = "Exception" return False def _run_algorithm(self, router_table): """ Attempts to covert the mega router tables into 1 router table. :param list(~.AbsractMulticastRoutingTable) router_table: the set of router tables that together need to be merged into 1 router table :return: compressor router table :rtype: list(~.RoutingTableEntry) :throws MinimisationFailedError: if it fails to compress to the correct length. """ compressor = _PairCompressor(ordered=True) compressed_entries = compressor.compress_table(router_table) if len(compressed_entries) > Machine.ROUTER_ENTRIES: raise MinimisationFailedError( f"Compression failed as {len(compressed_entries)} " f"entires found") return compressed_entries def _remove_merged_bitfields_from_cores(self, chip_x, chip_y): """ Goes to SDRAM and removes said merged entries from the cores' bitfield region. :param int chip_x: the chip x coord from which this happened :param int chip_y: the chip y coord from which this happened """ for entries in self._bit_fields_by_key.values(): for entry in entries: if entry.sort_index < self._best_midpoint: # set merged n_atoms_word = entry.n_atoms_word | self.MERGED_SETTER FecDataView.write_memory( chip_x, chip_y, entry.n_atoms_address, n_atoms_word) def _create_table_report(self, router_table, report_out): """ Create the report entry. :param ~.AbsractMulticastRoutingTable router_table: the uncompressed router table to process :param ~io.TextIOBase report_out: the report writer """ n_bit_fields_merged = 0 n_packets_filtered = 0 n_possible_bit_fields = 0 merged_by_core = defaultdict(list) for key in self._bit_fields_by_key: for bf_data in self._bit_fields_by_key[key]: n_possible_bit_fields += 1 if bf_data.sort_index >= self._best_midpoint: continue n_bit_fields_merged += 1 as_array = bf_data.bit_field_as_bit_array() n_packets_filtered += sum(as_array) merged_by_core[bf_data.processor_id].append(bf_data) percentage_done = 100 if n_possible_bit_fields != 0: percentage_done = ( (100.0 / float(n_possible_bit_fields)) * float(n_bit_fields_merged)) report_out.write( f"\nTable {router_table.x}:{router_table.y} has integrated " f"{n_bit_fields_merged} out of {n_possible_bit_fields} available " "chip level bitfields into the routing table, thereby producing a " f"compression of {percentage_done}%.\n\n") report_out.write( "The uncompressed routing table had " f"{router_table.number_of_entries} entries, the compressed " f"one with {n_bit_fields_merged} integrated bitfields has " f"{len(self._best_routing_entries)} entries.\n\n") report_out.write( f"The integration of {n_bit_fields_merged} bitfields removes up " f"to {n_packets_filtered} MC packets that otherwise would be " "being processed by the cores on the chip, just to be dropped as " "they do not target anything.\n\n") report_out.write("The compression attempts are as follows:\n\n") for mid_point, result in self._compression_attempts.items(): report_out.write(f"Midpoint {mid_point}: {result}\n") report_out.write("\nThe bit_fields merged are as follows:\n\n") for core in merged_by_core: for bf_data in merged_by_core[core]: report_out.write( f"bitfield on core {core} for " f"key {bf_data.master_pop_key}\n") report_out.write("\n\n\n") report_out.write("The final routing table entries are as follows:\n\n") report_out.write( "{: <5s} {: <10s} {: <10s} {: <10s} {: <7s} {}\n".format( "Index", "Key", "Mask", "Route", "Default", "[Cores][Links]")) report_out.write( "{:-<5s} {:-<10s} {:-<10s} {:-<10s} {:-<7s} {:-<14s}\n".format( "", "", "", "", "", "")) entry_count = 0 n_defaultable = 0 # Note: _best_routing_table is a list(), router_table is not for entry in self._best_routing_entries: index = entry_count & self._LOWER_16_BITS entry_str = format_route(entry.to_MulticastRoutingEntry()) entry_count += 1 if entry.defaultable: n_defaultable += 1 report_out.write(f"{index:>5d} {entry_str}\n") report_out.write(f"{n_defaultable} Defaultable entries\n")