Source code for pacman.operations.router_algorithms.basic_dijkstra_routing

# Copyright (c) 2014 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import sys
from collections import defaultdict
from spinn_utilities.log import FormatAdapter
from spinn_utilities.progress_bar import ProgressBar
from spinn_utilities.ordered_set import OrderedSet
from pacman.data import PacmanDataView
from pacman.exceptions import PacmanRoutingException
from pacman.model.routing_table_by_partition import (
    MulticastRoutingTableByPartition, MulticastRoutingTableByPartitionEntry)
from pacman.utilities.algorithm_utilities.routing_algorithm_utilities import (
    get_app_partitions, vertex_xy_and_route)
from pacman.model.graphs.application import ApplicationVertex

logger = FormatAdapter(logging.getLogger(__name__))
infinity = float("inf")

BW_PER_ROUTE_ENTRY = 0.01
MAX_BW = 250


class _NodeInfo(object):
    """
    :ivar list(~spinn_machine.Link) neighbours:
    :ivar list(float) bws:
    :ivar list(float) weights:
    """
    __slots__ = ["neighbours", "bws", "weights"]

    def __init__(self):
        self.neighbours = list()
        self.bws = list()
        self.weights = list()

    @property
    def neighweights(self):
        return zip(self.neighbours, self.weights)


class _DijkstraInfo(object):
    __slots__ = ["activated", "cost"]

    def __init__(self):
        self.activated = False
        self.cost = None


[docs]def basic_dijkstra_routing( bw_per_route_entry=BW_PER_ROUTE_ENTRY, max_bw=MAX_BW): """ Find routes between the edges with the allocated information, placed in the given places :param bool use_progress_bar: whether to show a progress bar :return: The discovered routes :rtype: MulticastRoutingTables :raise PacmanRoutingException: If something goes wrong with the routing """ router = _BasicDijkstraRouting(bw_per_route_entry, max_bw) # pylint:disable=protected-access return router._run()
class _BasicDijkstraRouting(object): """ A routing algorithm that can find routes for edges between vertices in a machine graph that have been placed on a machine by the use of a Dijkstra shortest path algorithm. """ __slots__ = [ # the routing path objects used to be returned to the work flow "_routing_paths", # parameter to control ........... "_bw_per_route_entry", # parameter to control ........... "_max_bw" ] def __init__(self, bw_per_route_entry, max_bw): # set up basic data structures self._routing_paths = MulticastRoutingTableByPartition() self._bw_per_route_entry = bw_per_route_entry self._max_bw = max_bw def _run(self): """ Find routes between the edges with the allocated information, placed in the given places :param bool use_progress_bar: whether to show a progress bar :return: The discovered routes :rtype: MulticastRoutingTableByPartition :raise PacmanRoutingException: If something goes wrong with the routing """ nodes_info = self._initiate_node_info() tables = self._initiate_dijkstra_tables() self._update_all_weights(nodes_info) partitions = get_app_partitions() progress = ProgressBar(len(partitions), "Creating routing entries") for partition in progress.over(partitions): self._route(partition, nodes_info, tables) return self._routing_paths def _route(self, partition, node_info, tables): """ :param ApplicationEdgePartition partition: :param ApplicationGraph graph: :param dict(tuple(int,int),_NodeInfo) node_info: :param dict(tuple(int,int),_DijkstraInfo) tables: :param Machine machine: """ # pylint: disable=too-many-arguments source = partition.pre_vertex # Destination (xy, core, link) by source machine vertices destinations = defaultdict(lambda: defaultdict(lambda: (set(), set()))) dest_chips = defaultdict(set) for edge in partition.edges: target = edge.post_vertex target_vertices = \ target.splitter.get_source_specific_in_coming_vertices( source, partition.identifier) for tgt, srcs in target_vertices: xy, (m_vertex, core, link) = vertex_xy_and_route(tgt) for src in srcs: if isinstance(src, ApplicationVertex): for s in src.splitter.get_out_going_vertices( partition.identifier): if core is not None: destinations[s][xy][0].add(core) if link is not None: destinations[s][xy][1].add(link) dest_chips[s].add(xy) else: if core is not None: destinations[src][xy][0].add(core) if link is not None: destinations[src][xy][1].add(link) dest_chips[src].add(xy) outgoing = OrderedSet(source.splitter.get_out_going_vertices( partition.identifier)) for in_part in source.splitter.get_internal_multicast_partitions(): if in_part.identifier == partition.identifier: outgoing.add(in_part.pre_vertex) for edge in in_part.edges: xy, (_tgt, core, link) = vertex_xy_and_route( edge.post_vertex) if core is not None: destinations[in_part.pre_vertex][xy][0].add(core) if link is not None: destinations[in_part.pre_vertex][xy][1].add(link) dest_chips[in_part.pre_vertex].add(xy) for m_vertex in outgoing: source_xy, (m_vertex, core, link) = vertex_xy_and_route(m_vertex) if dest_chips[m_vertex]: self._update_all_weights(node_info) self._reset_tables(tables) tables[source_xy].activated = True tables[source_xy].cost = 0 x, y = source_xy self._propagate_costs_until_reached_destinations( tables, node_info, dest_chips[m_vertex], x, y) for xy in destinations[m_vertex]: dest_cores, dest_links = destinations[m_vertex][xy] self._retrace_back_to_source( xy, dest_cores, dest_links, tables, node_info, core, link, m_vertex, partition.identifier) def _initiate_node_info(self): """ Set up a dictionary which contains data for each chip in the machine. :return: nodes_info dictionary :rtype: dict(tuple(int,int),_NodeInfo) """ nodes_info = dict() for chip in PacmanDataView.get_machine().chips: # get_neighbours should return a list of # dictionaries of 'x' and 'y' values node = _NodeInfo() for source_id in range(6): n = chip.router.get_link(source_id) node.neighbours.append(n) node.weights.append(infinity) node.bws.append(None if n is None else self._max_bw) nodes_info[chip.x, chip.y] = node return nodes_info def _initiate_dijkstra_tables(self): """ Set up the Dijkstra's table which includes if you've reached a given node. :return: the Dijkstra's table dictionary :rtype: dict(tuple(int,int),_DijkstraInfo) """ # Holds all the information about nodes within one full run of # Dijkstra's algorithm tables = dict() for chip in PacmanDataView.get_machine().chips: tables[chip.x, chip.y] = _DijkstraInfo() return tables def _update_all_weights(self, nodes_info): """ Change the weights of the neighbouring nodes. :param dict(tuple(int,int),_NodeInfo) nodes_info: the node info dictionary """ for key in nodes_info: if nodes_info[key] is not None: self._update_neighbour_weights(nodes_info, key) def _update_neighbour_weights(self, nodes_info, key): """ Change the weights of the neighbouring nodes. :param dict(tuple(int,int),_NodeInfo) nodes_info: the node info dictionary :param tuple(int,int) key: the identifier to the object in `nodes_info` """ for n, neighbour in enumerate(nodes_info[key].neighbours): if neighbour is not None: nodes_info[key].weights[n] = 1 @staticmethod def _reset_tables(tables): """ Reset the Dijkstra tables for a new path search. :param dict(tuple(int,int),_DijkstraInfo) tables: the dictionary object for the Dijkstra-tables """ for key in tables: tables[key] = _DijkstraInfo() def _propagate_costs_until_reached_destinations( self, tables, nodes_info, dest_chips, x_source, y_source): """ Propagate the weights till the destination nodes of the source nodes are retraced. :param dict(tuple(int,int),_DijkstraInfo) tables: the dictionary object for the Dijkstra-tables :param dict(tuple(int,int),_NodeInfo) nodes_info: the dictionary object for the nodes inside a route scope :param set(tuple(int,int)) dest_chips: :param int x_source: :param int y_source: :raise PacmanRoutingException: when the destination node could not be reached from this source node """ dest_chips_to_find = set(dest_chips) source = (x_source, y_source) dest_chips_to_find.discard(source) current = source # Iterate only if the destination node hasn't been activated while dest_chips_to_find: # PROPAGATE! for neighbour, weight in nodes_info[current].neighweights: # "neighbours" is a list of 6 links or None objects. There is # a None object where there is no connection to that neighbour if (neighbour is not None and not (neighbour.destination_x == x_source and neighbour.destination_y == y_source)): # These variables change with every look at a new neighbour self._update_neighbour( tables, neighbour, current, source, weight) # Set the next activated node as the deactivated node with the # lowest current cost current = self._minimum(tables) tables[current].activated = True dest_chips_to_find.discard(current) @staticmethod def _minimum(tables): """ :param dict(tuple(int,int),_DijkstraInfo) tables: :rtype: tuple(int,int) """ # This is the lowest cost across ALL deactivated nodes in the graph. lowest_cost = sys.maxsize lowest = None # Find the next node to be activated for key in tables: # Don't continue if the node hasn't even been touched yet if (tables[key].cost is not None and not tables[key].activated and tables[key].cost < lowest_cost): lowest_cost = tables[key].cost lowest = key # If there were no deactivated nodes with costs, but the destination # was not reached this iteration, raise an exception if lowest is None: raise PacmanRoutingException( "Destination could not be activated, ending run") return int(lowest[0]), int(lowest[1]) @staticmethod def _update_neighbour(tables, neighbour, current, source, weight): """ Update the lowest cost for each neighbour_xy of a node. :param dict(tuple(int,int),_DijkstraInfo) tables: :param ~spinn_machine.Link neighbour: :param tuple(int,int) current: :param tuple(int,int) source: :param float weight: :raise PacmanRoutingException: when the algorithm goes to a node that doesn't exist in the machine or the node's cost was set too low. """ neighbour_xy = (neighbour.destination_x, neighbour.destination_y) if neighbour_xy not in tables: raise PacmanRoutingException( f"Tried to propagate to ({neighbour.destination_x}, " f"{neighbour.destination_y}), which is not in the" " graph: remove non-existent neighbours") chip_cost = tables[current].cost neighbour_cost = tables[neighbour_xy].cost # Only try to update if the neighbour_xy is within the graph and the # cost if the node hasn't already been activated and the lowest cost # if the new cost is less, or if there is no current cost. new_weight = float(chip_cost + weight) if (not tables[neighbour_xy].activated and (neighbour_cost is None or new_weight < neighbour_cost)): # update Dijkstra table tables[neighbour_xy].cost = new_weight if tables[neighbour_xy].cost == 0 and neighbour_xy != source: raise PacmanRoutingException( f"!!!Cost of non-source node ({neighbour.destination_x}, " f"{neighbour.destination_y}) was set to zero!!!") def _retrace_back_to_source( self, dest_xy, dest_cores, dest_links, tables, nodes_info, source_processor, source_link, pre_vertex, partition_id): """ :param Placement dest: Destination placement :param dict(tuple(int,int),_DijkstraInfo) tables: :param MachineEdge edge: :param dict(tuple(int,int),_NodeInfo) nodes_info: :param int source_processor: :param int source_link: :return: the next coordinates to look into :rtype: tuple(int, int) :raise PacmanRoutingException: when the algorithm doesn't find a next point to search from. AKA, the neighbours of a chip do not have a cheaper cost than the node itself, but the node is not the destination or when the algorithm goes to a node that's not considered in the weighted search. """ # Set the tracking node to the destination to begin with x, y = dest_xy entry = MulticastRoutingTableByPartitionEntry( dest_links, dest_cores) self._routing_paths.add_path_entry( entry, x, y, pre_vertex, partition_id) prev_entry = entry while tables[x, y].cost != 0: for idx, neighbour in enumerate(nodes_info[x, y].neighbours): if neighbour is not None: n_xy = (neighbour.destination_x, neighbour.destination_y) # Only check if it can be a preceding node if it actually # exists if n_xy not in tables: raise PacmanRoutingException( "Tried to trace back to node not in " "graph: remove non-existent neighbours") if tables[n_xy].cost is not None: x, y, prev_entry, added = self._create_routing_entry( n_xy, tables, idx, nodes_info, x, y, prev_entry, pre_vertex, partition_id) if added: break else: raise PacmanRoutingException( "Iterated through all neighbours of tracking node but" " did not find a preceding node! Consider increasing " "acceptable discrepancy between sought traceback cost" " and actual cost at node. Terminating...") if source_processor is not None: prev_entry.incoming_processor = source_processor if source_link is not None: prev_entry.incoming_link = source_link return x, y def _create_routing_entry( self, neighbour_xy, tables, neighbour_index, nodes_info, x, y, previous_entry, pre_vertex, partition_id): """ Create a new routing entry. :param tuple(int,int) neighbour_xy: :param dict(tuple(int,int),_DijkstraInfo) tables: :param int neighbour_index: :param dict(tuple(int,int),_NodeInfo) nodes_info: :param int x: :param int y: :param MulticastRoutingTableByPartitionEntry previous_entry: :return: x, y, previous_entry, made_an_entry :rtype: tuple(int, int, MulticastRoutingTableByPartitionEntry, bool) :raise PacmanRoutingException: when the bandwidth of a router is beyond expected parameters """ # Set the direction of the routing other_entry as that which is from # the preceding node to the current tracking node. # neighbour_xy is the 'old' coordinates since it is from the preceding # node. x and y are the 'new' coordinates since they are where the # router should send the packet to. dec_direction = self._get_reverse_direction(neighbour_index) made_an_entry = False neighbour_weight = nodes_info[neighbour_xy].weights[dec_direction] chip_sought_cost = tables[x, y].cost - neighbour_weight neighbours_lowest_cost = tables[neighbour_xy].cost if (neighbours_lowest_cost is not None and self._close_enough(neighbours_lowest_cost, chip_sought_cost)): # build the multicast entry entry = MulticastRoutingTableByPartitionEntry( dec_direction, None) previous_entry.incoming_link = neighbour_index # add entry for next hop going backwards into path self._routing_paths.add_path_entry( entry, neighbour_xy[0], neighbour_xy[1], pre_vertex, partition_id) previous_entry = entry made_an_entry = True # Finally move the tracking node x, y = neighbour_xy return x, y, previous_entry, made_an_entry @staticmethod def _close_enough(v1, v2, delta=0.00000000001): """ :param float v1: :param float v2: :param float delta: How close values have to be to be "equal" """ return abs(v1 - v2) < delta @staticmethod def _get_reverse_direction(neighbour_position): """ Determine the direction of a link to go down. :param int neighbour_position: the position the neighbour is at :return: The position of the opposite link :rtype: int """ if neighbour_position == 0: return 3 elif neighbour_position == 1: return 4 elif neighbour_position == 2: return 5 elif neighbour_position == 3: return 0 elif neighbour_position == 4: return 1 elif neighbour_position == 5: return 2 return None