Source code for pacman.operations.multi_cast_router_check_functionality.valid_routes_checker

# Copyright (c) 2014 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Collection of functions which together validate routes.
"""
from collections import namedtuple, defaultdict
import logging
from spinn_utilities.ordered_set import OrderedSet
from spinn_utilities.progress_bar import ProgressBar
from spinn_utilities.log import FormatAdapter
from pacman.data import PacmanDataView
from pacman.exceptions import (
    PacmanConfigurationException, PacmanRoutingException)
from pacman.model.graphs.application import ApplicationVertex
from pacman.utilities.constants import FULL_MASK
from pacman.utilities.algorithm_utilities.routing_algorithm_utilities import (
    get_app_partitions)


logger = FormatAdapter(logging.getLogger(__name__))
range_masks = {FULL_MASK - ((2 ** i) - 1) for i in range(33)}

# Define an internal class for placements
PlacementTuple = namedtuple('PlacementTuple', 'x y p')
# Define an internal class for failures
_Failure = namedtuple('_Failure', 'router_x router_y keys source_mask')


[docs]def validate_routes(placements, routing_tables): """ Go though the placements given and check that the routing entries within the routing tables support reach the correction destinations as well as not producing any cycles. :param Placements placements: the placements container :param RoutingInfo routing_infos: the routing info container :param MulticastRoutingTables routing_tables: the routing tables generated by the routing algorithm :raises PacmanRoutingException: when either no routing table entry is found by the search on a given router, or a cycle is detected """ # Find all partitions that need to be dealt with partitions = get_app_partitions() routing_infos = PacmanDataView.get_routing_infos() # Now go through the app edges and route app vertex by app vertex progress = ProgressBar(len(partitions), "Checking Routes") for partition in progress.over(partitions): source = partition.pre_vertex # Destination cores by source machine vertices destinations = defaultdict(OrderedSet) for edge in partition.edges: target = edge.post_vertex target_vertices = \ target.splitter.get_source_specific_in_coming_vertices( source, partition.identifier) for tgt, srcs in target_vertices: place = placements.get_placement_of_vertex(tgt) for src in srcs: if isinstance(src, ApplicationVertex): for s in src.splitter.get_out_going_vertices( partition.identifier): destinations[s].add(PlacementTuple( x=place.x, y=place.y, p=place.p)) else: destinations[src].add(PlacementTuple( x=place.x, y=place.y, p=place.p)) outgoing = OrderedSet(source.splitter.get_out_going_vertices( partition.identifier)) internal = source.splitter.get_internal_multicast_partitions() for in_part in internal: if in_part.partition_id == partition.identifier: outgoing.add(in_part.pre_vertex) for edge in in_part.edges: place = placements.get_placement_of_vertex( edge.post_vertex) destinations[in_part.pre_vertex].add(PlacementTuple( x=place.x, y=place.y, p=place.p)) # locate all placements to which this placement/vertex will # communicate with for a given key_and_mask and search its # determined destinations for m_vertex in outgoing: placement = placements.get_placement_of_vertex(m_vertex) r_info = routing_infos.get_routing_info_from_pre_vertex( m_vertex, partition.identifier) # search for these destinations _search_route( placement, destinations[m_vertex], r_info.key_and_mask, routing_tables, m_vertex.vertex_slice.n_atoms)
def _search_route(source_placement, dest_placements, key_and_mask, routing_tables, n_atoms): """ Locate if the routing tables work for the source to desks as defined. :param Placement source_placement: the placement from which the search started :param iterable(Placement) dest_placements: the placements to which this trace should visit only once :param BaseKeyAndMask key_and_mask: the key and mask associated with this set of edges :param MulticastRoutingTables routing_tables: :param int n_atoms: the number of atoms going through this path :param bool is_continuous: whether the keys and atoms mapping is continuous :raise PacmanRoutingException: when the trace completes and there are still destinations not visited """ if logger.isEnabledFor(logging.DEBUG): for dest in dest_placements: logger.debug("[{}:{}:{}]", dest.x, dest.y, dest.p) located_destinations = set() failed_to_cover_all_keys_routers = list() _start_trace_via_routing_tables( source_placement, key_and_mask, located_destinations, routing_tables, n_atoms, failed_to_cover_all_keys_routers) # start removing from located_destinations and check if destinations not # reached failed_to_reach_destinations = list() for dest in dest_placements: if dest in located_destinations: located_destinations.remove(dest) else: failed_to_reach_destinations.append(dest) # check for error if trace didn't reach a destination it was meant to error_message = "" if failed_to_reach_destinations: failures = ", ".join( f"[{dest.x}:{dest.y}:{dest.p}]" for dest in failed_to_reach_destinations) error_message += ( "failed to locate all destinations with vertex " f"{source_placement.vertex.label} on processor " f"[{source_placement.x}:{source_placement.y}:{source_placement.p}]" f" with keys {key_and_mask} as it did not reach destinations " f"{failures}") # check for error if the trace went to a destination it shouldn't have if located_destinations: failures = ", ".join( f"[{dest.x}:{dest.y}:{dest.p}]" for dest in located_destinations) error_message += ( "trace went to more failed to locate all destinations with " f"vertex {source_placement.vertex.label} on processor " f"[{source_placement.x}:{source_placement.y}:{source_placement.p}]" f" with keys {key_and_mask} as it didn't reach destinations " f"{failures}") if failed_to_cover_all_keys_routers: failures = ", ".join( f"[{router.router_x}, {router.router_y}, " f"{router.keys}, {router.source_mask}]" for router in failed_to_cover_all_keys_routers) error_message += ( "trace detected that there were atoms which the routing entries " "won't cover and therefore packets will fly off to unknown places." f" These keys came from the vertex {source_placement.vertex.label}" f" on processor [{source_placement.x}:{source_placement.y}:" f"{source_placement.p}] and the failed routers are {failures}") # raise error if required if error_message != "": raise PacmanRoutingException(error_message) logger.debug("successful test between {} and {}", source_placement.vertex.label, dest_placements) def _start_trace_via_routing_tables( source_placement, key_and_mask, reached_placements, routing_tables, n_atoms, failed_to_cover_all_keys_routers): """ Start the trace, by using the source placement's router and tracing from the route. :param Placement source_placement: the source placement used by the trace :param BaseKeyAndMask key_and_mask: the key being used by the vertex which resides on the source placement :param set(PlacementTuple) reached_placements: the placements reached during the trace :param MulticastRoutingTables routing_tables: :param int n_atoms: the number of atoms going through this path :param list(_Failure) failed_to_cover_all_keys_routers: list of failed routers for all keys """ current_router_table = routing_tables.get_routing_table_for_chip( source_placement.x, source_placement.y) visited_routers = set() visited_routers.add((current_router_table.x, current_router_table.y)) # get src router entry = _locate_routing_entry( current_router_table, key_and_mask.key, n_atoms) _recursive_trace_to_destinations( entry, current_router_table, source_placement.x, source_placement.y, key_and_mask, visited_routers, reached_placements, routing_tables, n_atoms, failed_to_cover_all_keys_routers) def _check_all_keys_hit_entry(entry, n_atoms, base_key): """ :param ~spinn_machine.MulticastRoutingEntry entry: routing entry discovered :param int n_atoms: the number of atoms this partition covers :param int base_key: the base key of the partition :return: the list of keys which this entry doesn't cover which it should :rtype: list(int) """ bad_entries = list() for atom_id in range(0, n_atoms): key = base_key + atom_id if entry.mask & key != entry.routing_entry_key: bad_entries.append(key) return bad_entries # locates the next dest position to check def _recursive_trace_to_destinations( entry, current_router, chip_x, chip_y, key_and_mask, visited_routers, reached_placements, routing_tables, n_atoms, failed_to_cover_all_keys_routers): """ Recursively search though routing tables until no more entries are registered with this key. :param ~spinn_machine.MulticastRoutingEntry entry: the original entry used by the first router which resides on the source placement chip. :param MulticastRoutingTable current_router: the router currently being visited during the trace :param int chip_x: the x coordinate of the chip being considered :param int chip_y: the y coordinate of the chip being considered :param BaseKeyAndMask key_and_mask: the key and mask being used by the vertex which resides on the source placement :param set(tuple(int,int)) visited_routers: the list of routers which have been visited during this trace so far :param set(PlacementTuple) reached_placements: the placements reached during the trace :param MulticastRoutingTables routing_tables: :param bool is_continuous: whether the keys and atoms mapping is continuous :param int n_atoms: the number of atoms going through this path :param list(_Failure) failed_to_cover_all_keys_routers: list of failed routers for all keys """ # determine where the route takes us chip_links = entry.link_ids processor_values = entry.processor_ids # if goes down a chip link if chip_links: # also goes to a processor if processor_values: _is_dest(processor_values, current_router, reached_placements) # only goes to new chip for link_id in chip_links: # locate next chips router machine_router = PacmanDataView.get_chip_at(chip_x, chip_y).router link = machine_router.get_link(link_id) next_router = routing_tables.get_routing_table_for_chip( link.destination_x, link.destination_y) # check that we've not visited this router before _check_visited_routers( next_router.x, next_router.y, visited_routers) # locate next entry entry = _locate_routing_entry( next_router, key_and_mask.key, n_atoms) bad_entries = _check_all_keys_hit_entry( entry, n_atoms, key_and_mask.key) if bad_entries: failed_to_cover_all_keys_routers.append( _Failure(next_router.x, next_router.y, bad_entries, key_and_mask.mask)) # get next route value from the new router _recursive_trace_to_destinations( entry, next_router, link.destination_x, link.destination_y, key_and_mask, visited_routers, reached_placements, routing_tables, n_atoms, failed_to_cover_all_keys_routers) # only goes to a processor elif processor_values: _is_dest(processor_values, current_router, reached_placements) def _check_visited_routers(chip_x, chip_y, visited_routers): """ Check if the trace has visited this router already. :param int chip_x: the x coordinate of the chip being checked :param int chip_y: the y coordinate of the chip being checked :param set(tuple(int,int)) visited_routers: routers already visited :raise PacmanRoutingException: when a router has been visited twice. """ visited_routers_router = (chip_x, chip_y) if visited_routers_router in visited_routers: raise PacmanRoutingException( "visited this router before, there is a cycle here. " f"The routers I've currently visited are {visited_routers} and " f"the router i'm visiting is {visited_routers_router}") visited_routers.add(visited_routers_router) def _is_dest(processor_ids, current_router, reached_placements): """ Collect processors to be removed. :param list(int) processor_ids: the processor IDs which the last router entry said the trace should visit :param MulticastRoutingTable current_router: the current router being used in the trace :param set(PlacementTuple) reached_placements: the placements to which the trace visited """ dest_x, dest_y = current_router.x, current_router.y for processor_id in processor_ids: reached_placements.add(PlacementTuple(dest_x, dest_y, processor_id)) def _locate_routing_entry(current_router, key, n_atoms): """ Locate the entry from the router based off the edge. :param MulticastRoutingTable current_router: the current router being used in the trace :param int key: the key being used by the source placement :param int n_atoms: the number of atoms :rtype: ~spinn_machine.MulticastRoutingEntry :raise PacmanRoutingException: when there is no entry located on this router """ found_entry = None for entry in current_router.multicast_routing_entries: key_combo = entry.mask & key e_key = entry.routing_entry_key if key_combo == e_key: if found_entry is None: found_entry = entry else: logger.warning( "Found more than one entry for key {}. This could be " "an error, as currently no router supports overloading" " of entries.", hex(key)) if entry.mask in range_masks: last_atom = key + n_atoms - 1 last_key = e_key + (~entry.mask & FULL_MASK) if last_key < last_atom: raise PacmanRoutingException( f"Full key range not covered: key:0x{key:x} " f"key_combo:0x{key_combo:x} mask:0x{entry.mask:x}, " f"last_key:0x{last_key:x}, e_key:0x{e_key:x}") elif entry.mask in range_masks: last_atom = key + n_atoms last_key = e_key + (~entry.mask & FULL_MASK) if min(last_key, last_atom) - max(e_key, key) + 1 > 0: raise PacmanConfigurationException( f"Key range partially covered: key:0x{key:x}, " f"key_combo:0x{key_combo:x} mask:0x{entry.mask:x}, " f"last_key:0x{last_key:x}, e_key:0x{e_key:x}") if found_entry is None: raise PacmanRoutingException("no entry located") return found_entry