Source code for pacman.operations.placer_algorithms.application_placer

# Copyright (c) 2021 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import numpy
import os

from spinn_utilities.config_holder import get_config_bool, get_config_str
from spinn_utilities.log import FormatAdapter
from spinn_utilities.ordered_set import OrderedSet
from spinn_utilities.progress_bar import ProgressBar

from pacman.data import PacmanDataView
from pacman.model.placements import Placements, Placement
from pacman.model.graphs import AbstractVirtual
from pacman.exceptions import (
    PacmanPlaceException, PacmanConfigurationException)

logger = FormatAdapter(logging.getLogger(__name__))


[docs]def place_application_graph(system_placements): """ Perform placement of an application graph on the machine. .. note:: app_graph must have been partitioned """ # Track the placements and space placements = Placements(system_placements) # board_colours = dict() machine = PacmanDataView.get_machine() plan_n_timesteps = PacmanDataView.get_plan_n_timestep() spaces = _Spaces(machine, placements, plan_n_timesteps) # Go through the application graph by application vertex progress = ProgressBar( PacmanDataView.get_n_vertices(), "Placing Vertices") for app_vertex in progress.over(PacmanDataView.iterate_vertices()): spaces.restore_chips() # Try placements from the next chip, but try again if fails placed = False while not placed: chips_attempted = list() try: same_chip_groups = app_vertex.splitter.get_same_chip_groups() if not same_chip_groups: placed = True break # Start a new space try: next_chip_space, space = spaces.get_next_chip_and_space() except PacmanPlaceException as e: _place_error( placements, system_placements, e, plan_n_timesteps, machine) logger.debug(f"Starting placement from {next_chip_space}") placements_to_make = list() # Go through the groups last_chip_space = None for vertices, sdram in same_chip_groups: vertices_to_place = list() for vertex in vertices: # No need to place virtual vertices if isinstance(vertex, AbstractVirtual): continue if not placements.is_vertex_placed(vertex): vertices_to_place.append(vertex) sdram = sdram.get_total_sdram(plan_n_timesteps) n_cores = len(vertices_to_place) if _do_fixed_location(vertices_to_place, sdram, placements, machine, next_chip_space): continue # Try to find a chip with space; this might result in a # SpaceExceededException while not next_chip_space.is_space(n_cores, sdram): next_chip_space = spaces.get_next_chip_space( space, last_chip_space) last_chip_space = None # If this worked, store placements to be made last_chip_space = next_chip_space chips_attempted.append(next_chip_space.chip) _store_on_chip( placements_to_make, vertices_to_place, sdram, next_chip_space) # Now make the placements having confirmed all can be done placements.add_placements(placements_to_make) placed = True logger.debug(f"Used {chips_attempted}") except _SpaceExceededException: # This might happen while exploring a space; this may not be # fatal since the last space might have just been bound by # existing placements, and there might be bigger spaces out # there to use logger.debug(f"Failed, saving {chips_attempted}") spaces.save_chips(chips_attempted) chips_attempted.clear() if get_config_bool("Reports", "draw_placements"): _draw_placements(placements, system_placements) return placements
def _place_error( placements, system_placements, exception, plan_n_timesteps, machine): unplaceable = list() vertex_count = 0 n_vertices = 0 for app_vertex in PacmanDataView.iterate_vertices(): same_chip_groups = app_vertex.splitter.get_same_chip_groups() app_vertex_placed = True found_placed_cores = False for vertices, _sdram in same_chip_groups: if isinstance(vertices[0], AbstractVirtual): break if placements.is_vertex_placed(vertices[0]): found_placed_cores = True elif found_placed_cores: vertex_count += len(vertices) n_vertices = len(same_chip_groups) app_vertex_placed = False break else: app_vertex_placed = False break if not app_vertex_placed: unplaceable.append(app_vertex) report_file = os.path.join( PacmanDataView.get_run_dir_path(), "placements_error.txt") with open(report_file, 'w', encoding="utf-8") as f: f.write(f"Could not place {len(unplaceable)} of " f"{PacmanDataView.get_n_vertices()} application vertices.\n") f.write(f" Could not place {vertex_count} of {n_vertices} in the" " last app vertex\n\n") for x, y in placements.chips_with_placements: first = True for placement in placements.placements_on_chip(x, y): if system_placements.is_vertex_placed(placement.vertex): continue if first: f.write(f"Chip ({x}, {y}):\n") first = False f.write(f" Processor {placement.p}:" f" Vertex {placement.vertex}\n") if not first: f.write("\n") f.write("\n") f.write("Not placed:\n") for app_vertex in unplaceable: f.write(f"Vertex: {app_vertex}\n") same_chip_groups = app_vertex.splitter.get_same_chip_groups() for vertices, sdram in same_chip_groups: f.write(f" Group of {len(vertices)} vertices uses " f"{sdram.get_total_sdram(plan_n_timesteps)} " "bytes of SDRAM:\n") for vertex in vertices: f.write(f" Vertex {vertex}") if placements.is_vertex_placed(vertex): plce = placements.get_placement_of_vertex(vertex) f.write(f" (placed at {plce.x}, {plce.y}, {plce.p})") f.write("\n") f.write("\n") f.write("Unused chips:\n") for x, y in machine.chip_coordinates: n_placed = placements.n_placements_on_chip(x, y) system_placed = system_placements.n_placements_on_chip(x, y) if n_placed - system_placed == 0: n_procs = machine.get_chip_at(x, y).n_user_processors f.write(f" {x}, {y} ({n_procs - system_placed}" " free cores)\n") if get_config_bool("Reports", "draw_placements_on_error"): _draw_placements(placements, system_placements) raise PacmanPlaceException( f" {exception}." f" Report written to {report_file}.") def _next_colour(): """ Get the next (random) RGB colour to use for a vertex for placement drawings :rtype: tuple(int, int, int) """ return tuple(numpy.concatenate( (numpy.random.choice(range(256), size=3) / 256, [1.0]))) def _draw_placements(placements, system_placements): try: # spinner as graphical library so # pylint: disable=import-error from spinner.scripts.contexts import PNGContextManager from spinner.diagrams.machine_map import ( get_machine_map_aspect_ratio, draw_machine_map) from spinner import board from collections import defaultdict import math except ImportError: logger.exception( "Unable to draw placements as no spinner install found") return report_file = os.path.join( PacmanDataView.get_run_dir_path(), "placements_error.png") # Colour the boards by placements unused = (0.5, 0.5, 0.5, 1.0) vertex_colours = defaultdict(_next_colour) board_colours = dict() machine = PacmanDataView.get_machine() for x, y in machine.chip_coordinates: if (placements.n_placements_on_chip(x, y) == system_placements.n_placements_on_chip(x, y)): board_colours[x, y] = unused else: vertex = None for placement in placements.placements_on_chip(x, y): if not system_placements.is_vertex_placed(placement.vertex): vertex = placement.vertex break if vertex is not None: board_colours[x, y] = vertex_colours[vertex.app_vertex] include_boards = [ (chip.x, chip.y) for chip in machine.ethernet_connected_chips] w = math.ceil(machine.width / 12) h = math.ceil(machine.height / 12) aspect_ratio = get_machine_map_aspect_ratio(w, h) image_width = 10000 image_height = int(image_width * aspect_ratio) output_filename = report_file hex_boards = board.create_torus(w, h) with PNGContextManager( output_filename, image_width, image_height) as ctx: draw_machine_map( ctx, image_width, image_height, machine.width, machine.height, hex_boards, dict(), board_colours, include_boards) class _SpaceExceededException(Exception): pass def _do_fixed_location(vertices, sdram, placements, machine, next_chip_space): """ :param vertices: :param sdram: :param placements: :param machine: :param _ChipWithSpace next_chip_space: :return: """ x = None y = None constrained = False for vertex in vertices: if vertex.get_fixed_location(): x = vertex.get_fixed_location().x y = vertex.get_fixed_location().y constrained = True if constrained: chip = machine.get_chip_at(x, y) if chip is None: raise PacmanConfigurationException( f"Constrained to chip {x, y} but no such chip") on_chip = placements.placements_on_chip(x, y) cores_used = {p.p for p in on_chip} cores = set(p.processor_id for p in chip.processors if not p.is_monitor) - cores_used next_cores = iter(cores) for vertex in vertices: next_core = None if vertex.get_fixed_location(): fixed = vertex.get_fixed_location() if fixed.p is not None: if fixed.p not in next_cores: raise PacmanConfigurationException( f"Core {fixed.p} on {x}, {y} not available to " f"place {vertex} on") next_core = fixed.p if next_core is None: try: next_core = next(next_cores) except StopIteration: # pylint: disable=raise-missing-from raise PacmanConfigurationException( f"No more cores available on {x}, {y}: {on_chip}") placements.add_placement(Placement(vertex, x, y, next_core)) if next_chip_space.x == x and next_chip_space.y == y: next_chip_space.cores.remove(next_core) next_chip_space.use_sdram(sdram) return True return False def _store_on_chip(placements_to_make, vertices, sdram, next_chip_space): """ :param placements_to_make: :param vertices: :param sdram: :param _ChipWithSpace next_chip_space: """ for vertex in vertices: core = next_chip_space.use_next_core() placements_to_make.append(Placement( vertex, next_chip_space.x, next_chip_space.y, core)) next_chip_space.use_sdram(sdram) class _Spaces(object): __slots__ = ["__machine", "__chips", "__next_chip", "__used_chips", "__system_placements", "__placements", "__plan_n_timesteps", "__last_chip_space", "__saved_chips", "__restored_chips"] def __init__(self, machine, placements, plan_n_timesteps): self.__machine = machine self.__placements = placements self.__plan_n_timesteps = plan_n_timesteps self.__chips = iter(_chip_order(machine)) self.__next_chip = next(self.__chips) self.__used_chips = set() self.__last_chip_space = None self.__saved_chips = OrderedSet() self.__restored_chips = OrderedSet() def __cores_and_sdram(self, chip): """ :param Chip chip: :rtype: (int, int) :return: """ on_chip = self.__placements.placements_on_chip(chip.x, chip.y) cores_used = {p.p for p in on_chip} sdram_used = sum( p.vertex.sdram_required.get_total_sdram( self.__plan_n_timesteps) for p in on_chip) return cores_used, sdram_used def get_next_chip_and_space(self): """ :rtype: (_ChipWithSpace, _Space) """ try: if self.__last_chip_space is None: chip = self.__get_next_chip() cores_used, sdram_used = self.__cores_and_sdram(chip) self.__last_chip_space = _ChipWithSpace( chip, cores_used, sdram_used) self.__used_chips.add(chip) # Start a new space by finding all the chips that can be reached # from the start chip but have not been used return (self.__last_chip_space, _Space(self.__last_chip_space.chip)) except StopIteration: raise PacmanPlaceException( # pylint: disable=raise-missing-from f"No more chips to place on; {self.n_chips_used} of " f"{self.__machine.n_chips} used") def __get_next_chip(self): """ :rtype: Chip """ while self.__restored_chips: chip = self.__restored_chips.pop(last=False) if chip not in self.__used_chips: return chip while (self.__next_chip in self.__used_chips): self.__next_chip = next(self.__chips) return self.__next_chip def get_next_chip_space(self, space, last_chip_space): """ :param _Space space: :param _ChipWithSpace last_chip_space: :rtype: _ChipWithSpace """ # If we are reporting a used chip, update with reachable chips if last_chip_space is not None: last_chip = last_chip_space.chip space.update(self.__usable_from_chip(last_chip)) # If no space, error if not space: self.__last_chip_space = None raise _SpaceExceededException( "No more chips to place on in this space; " f"{self.n_chips_used} of {self.__machine.n_chips} used") chip = space.pop() self.__used_chips.add(chip) self.__restored_chips.discard(chip) cores_used, sdram_used = self.__cores_and_sdram(chip) self.__last_chip_space = _ChipWithSpace(chip, cores_used, sdram_used) return self.__last_chip_space @property def n_chips_used(self): """ The number of chips used. :rtype: int """ return len(self.__used_chips) def __usable_from_chip(self, chip): """ :param Chip chip: :rtype set(Chip) """ chips = OrderedSet() for link in chip.router.links: chip_coords = (link.destination_x, link.destination_y) target_chip = self.__machine.get_chip_at(*chip_coords) if target_chip not in self.__used_chips: chips.add(target_chip) return chips def save_chips(self, chips): """ :param iter(Chip) chips: """ self.__saved_chips.update(chips) def restore_chips(self): for chip in self.__saved_chips: self.__used_chips.remove(chip) self.__restored_chips.add(chip) self.__saved_chips.clear() class _Space(object): __slots__ = ["__same_board_chips", "__remaining_chips", "__board_x", "__board_y", "__first_chip"] def __init__(self, chip): self.__board_x = chip.nearest_ethernet_x self.__board_y = chip.nearest_ethernet_y self.__same_board_chips = OrderedSet() self.__remaining_chips = OrderedSet() def __len__(self): return len(self.__same_board_chips) + len(self.__remaining_chips) def __on_same_board(self, chip): return (chip.nearest_ethernet_x == self.__board_x and chip.nearest_ethernet_y == self.__board_y) def pop(self): """ :type: Chip :return: """ if self.__same_board_chips: return self.__same_board_chips.pop(last=False) if self.__remaining_chips: next_chip = self.__remaining_chips.pop(last=False) self.__board_x = next_chip.nearest_ethernet_x self.__board_y = next_chip.nearest_ethernet_y to_remove = list() for chip in self.__remaining_chips: if self.__on_same_board(chip): to_remove.append(chip) self.__same_board_chips.add(chip) for chip in to_remove: self.__remaining_chips.remove(chip) return next_chip raise StopIteration def update(self, chips): """ :param iter(Chip) chips: """ for chip in chips: if self.__on_same_board(chip): self.__same_board_chips.add(chip) else: self.__remaining_chips.add(chip) class _ChipWithSpace(object): """ A chip with space for placement. """ __slots__ = ["chip", "cores", "sdram"] def __init__(self, chip, used_processors, used_sdram): self.chip = chip self.cores = set(p.processor_id for p in chip.processors if not p.is_monitor) self.cores -= used_processors self.sdram = chip.sdram.size - used_sdram @property def x(self): return self.chip.x @property def y(self): return self.chip.y def is_space(self, n_cores, sdram): return len(self.cores) >= n_cores and self.sdram >= sdram def use_sdram(self, sdram): self.sdram -= sdram def use_next_core(self): core = next(iter(self.cores)) self.cores.remove(core) return core def __repr__(self): return f"({self.x}, {self.y})" def _chip_order(machine): """ :param machine: :rtype: Chip """ s_x, s_y = get_config_str("Mapping", "placer_start_chip").split(",") s_x = int(s_x) s_y = int(s_y) for x in range(machine.width): for y in range(machine.height): c_x = (x + s_x) % machine.width c_y = (y + s_y) % machine.height chip = machine.get_chip_at(c_x, c_y) if chip: yield chip