Source code for pacman.model.routing_tables.uncompressed_multicast_routing_table
# Copyright (c) 2014 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import csv
import gzip
import logging
from spinn_utilities.log import FormatAdapter
from spinn_machine import MulticastRoutingEntry
from pacman.exceptions import PacmanAlreadyExistsException
from pacman.model.routing_tables import AbstractMulticastRoutingTable
from spinn_utilities.overrides import overrides
logger = FormatAdapter(logging.getLogger(__name__))
[docs]class UnCompressedMulticastRoutingTable(AbstractMulticastRoutingTable):
"""
Represents a uncompressed routing table for a chip.
"""
__slots__ = [
# The x-coordinate of the chip for which this is the routing table
"_x",
# The y-coordinate of the chip for which this is the routing tables
"_y",
# dict of multicast routing entries.
# (key, mask) -> multicast_routing_entry
"_entries_by_key_mask",
# counter of how many entries in their multicast routing table are
# defaultable
"_number_of_defaulted_routing_entries"
]
def __init__(self, x, y, multicast_routing_entries=None):
"""
:param int x:
The x-coordinate of the chip for which this is the routing table
:param int y:
The y-coordinate of the chip for which this is the routing tables
:param multicast_routing_entries:
The routing entries to add to the table
:type multicast_routing_entries:
iterable(~spinn_machine.MulticastRoutingEntry)
:raise PacmanAlreadyExistsException:
If any two routing entries contain the same key-mask combination
"""
self._x = x
self._y = y
self._number_of_defaulted_routing_entries = 0
self._entries_by_key_mask = dict()
if multicast_routing_entries is not None:
for multicast_routing_entry in multicast_routing_entries:
self.add_multicast_routing_entry(multicast_routing_entry)
[docs] def add_multicast_routing_entry(self, multicast_routing_entry):
"""
Adds a routing entry to this table.
:param ~spinn_machine.MulticastRoutingEntry multicast_routing_entry:
The route to add
:raise PacmanAlreadyExistsException:
If a routing entry with the same key-mask combination already
exists
"""
routing_entry_key = multicast_routing_entry.routing_entry_key
mask = multicast_routing_entry.mask
tuple_key = (routing_entry_key, mask)
if tuple_key in self._entries_by_key_mask:
# Only fail if they don't go to the same place
if self._entries_by_key_mask[tuple_key] == multicast_routing_entry:
return
raise PacmanAlreadyExistsException(
f"Multicast_routing_entry {tuple_key}: "
f"{self._entries_by_key_mask[tuple_key]} on "
f"{self._x}, {self._y}",
str(multicast_routing_entry))
self._entries_by_key_mask[tuple_key] = multicast_routing_entry
# update default routed counter if required
if multicast_routing_entry.defaultable:
self._number_of_defaulted_routing_entries += 1
@property
@overrides(AbstractMulticastRoutingTable.x)
def x(self):
return self._x
@property
@overrides(AbstractMulticastRoutingTable.y)
def y(self):
return self._y
@property
@overrides(AbstractMulticastRoutingTable.multicast_routing_entries)
def multicast_routing_entries(self):
return self._entries_by_key_mask.values()
@property
@overrides(AbstractMulticastRoutingTable.number_of_entries)
def number_of_entries(self):
return len(self._entries_by_key_mask)
@property
@overrides(AbstractMulticastRoutingTable.number_of_defaultable_entries)
def number_of_defaultable_entries(self):
return self._number_of_defaulted_routing_entries
@overrides(AbstractMulticastRoutingTable.__eq__)
def __eq__(self, other):
if not isinstance(other, UnCompressedMulticastRoutingTable):
return False
if self._x != other.x and self._y != other.y:
return False
return self._entries_by_key_mask == other._entries_by_key_mask
@overrides(AbstractMulticastRoutingTable.__ne__)
def __ne__(self, other):
return not self.__eq__(other)
@overrides(AbstractMulticastRoutingTable.__repr__)
def __repr__(self):
entry_string = ""
for entry in self.multicast_routing_entries:
entry_string += f"{entry}\n"
return f"{self._x}:{self._y}\n\n{entry_string}"
@overrides(AbstractMulticastRoutingTable.__hash__)
def __hash__(self):
return id(self)
def _from_csv_file(csvfile):
table_reader = csv.reader(csvfile)
table = UnCompressedMulticastRoutingTable(0, 0)
for row in table_reader:
try:
if len(row) == 3:
key = int(row[0], base=16)
mask = int(row[1], base=16)
route = int(row[2], base=16)
table.add_multicast_routing_entry(
MulticastRoutingEntry(key, mask, spinnaker_route=route))
elif len(row) == 6:
key = int(row[1], base=16)
mask = int(row[2], base=16)
route = int(row[3], base=16)
table.add_multicast_routing_entry(
MulticastRoutingEntry(key, mask, spinnaker_route=route))
except ValueError as ex:
logger.warning(f"csv read error {ex}")
return table
def from_csv(file_name):
if file_name.endswith(".gz"):
with gzip.open(file_name, mode="rt", newline='') as csvfile:
return _from_csv_file(csvfile)
else:
with open(file_name, newline='', encoding="utf-8") as csvfile:
return _from_csv_file(csvfile)