Source code for pacman.model.routing_tables.multicast_routing_tables

# Copyright (c) 2014 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import gzip
from pacman.exceptions import PacmanAlreadyExistsException
from .uncompressed_multicast_routing_table import (
    UnCompressedMulticastRoutingTable)
from spinn_machine import MulticastRoutingEntry


[docs]class MulticastRoutingTables(object): """ Represents the multicast routing tables for a number of chips. """ __slots__ = [ # dict of (x,y) -> routing table "_routing_tables_by_chip", # maximum value for number_of_entries in all tables "_max_number_of_entries" ] def __init__(self, routing_tables=None): """ :param iterable(MulticastRoutingTable) routing_tables: The routing tables to add :raise PacmanAlreadyExistsException: If any two routing tables are for the same chip """ self._routing_tables_by_chip = dict() self._max_number_of_entries = 0 if routing_tables is not None: for routing_table in routing_tables: self.add_routing_table(routing_table)
[docs] def add_routing_table(self, routing_table): """ Add a routing table. :param MulticastRoutingTable routing_table: a routing table to add :raise PacmanAlreadyExistsException: If a routing table already exists for the chip """ if (routing_table.x, routing_table.y) in self._routing_tables_by_chip: raise PacmanAlreadyExistsException( "The Routing table for chip " f"{routing_table.x}:{routing_table.y} already exists in this " "collection and therefore is deemed an error to re-add it", str(routing_table)) self._routing_tables_by_chip[(routing_table.x, routing_table.y)] = \ routing_table self._max_number_of_entries = max( self._max_number_of_entries, routing_table.number_of_entries)
@property def routing_tables(self): """ The routing tables stored within. :return: an iterable of routing tables :rtype: iterable(MulticastRoutingTable) """ return self._routing_tables_by_chip.values() @property def max_number_of_entries(self): """ The maximum number of multicast routing entries there are in any multicast routing table. Will return zero if there are no routing tables :rtype: int """ return self._max_number_of_entries
[docs] def get_routing_table_for_chip(self, x, y): """ Get a routing table for a particular chip. :param int x: The X-coordinate of the chip :param int y: The Y-coordinate of the chip :return: The routing table, or `None` if no such table exists :rtype: MulticastRoutingTable or None """ return self._routing_tables_by_chip.get((x, y), None)
def __iter__(self): """ Iterator for the multicast routing tables stored within. :return: iterator of multicast_routing_table """ return iter(self._routing_tables_by_chip.values())
def to_json(router_table): json_list = [] for routing_table in router_table: json_routing_table = dict() json_routing_table["x"] = routing_table.x json_routing_table["y"] = routing_table.y entries = [] for entry in routing_table.multicast_routing_entries: json_entry = dict() json_entry["key"] = entry.routing_entry_key json_entry["mask"] = entry.mask json_entry["defaultable"] = entry.defaultable json_entry["spinnaker_route"] = entry.spinnaker_route entries.append(json_entry) json_routing_table["entries"] = entries json_list.append(json_routing_table) return json_list def from_json(j_router): if isinstance(j_router, str): if j_router.endswith(".gz"): with gzip.open(j_router) as j_file: j_router = json.load(j_file) else: with open(j_router, encoding="utf-8") as j_file: j_router = json.load(j_file) tables = MulticastRoutingTables() for j_table in j_router: table = UnCompressedMulticastRoutingTable(j_table["x"], j_table["y"]) tables.add_routing_table(table) for j_entry in j_table["entries"]: table.add_multicast_routing_entry(MulticastRoutingEntry( j_entry["key"], j_entry["mask"], defaultable=j_entry["defaultable"], spinnaker_route=j_entry["spinnaker_route"])) return tables