Source code for spinnman.processes.write_memory_flood_process
# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from spinnman.messages.scp.impl import (
FloodFillEnd, FloodFillStart, FloodFillData)
from .abstract_multi_connection_process import AbstractMultiConnectionProcess
from spinnman.constants import UDP_MESSAGE_MAX_SIZE
[docs]class WriteMemoryFloodProcess(AbstractMultiConnectionProcess):
"""
A process for writing memory on multiple SpiNNaker chips at once.
"""
__slots__ = []
def __init__(self, next_connection_selector):
AbstractMultiConnectionProcess.__init__(
self, next_connection_selector, n_channels=3,
intermediate_channel_waits=2)
def _start_flood_fill(self, n_bytes, nearest_neighbour_id):
n_blocks = int(math.ceil(math.ceil(n_bytes / 4.0) /
UDP_MESSAGE_MAX_SIZE))
self._send_request(
FloodFillStart(nearest_neighbour_id, n_blocks))
self._finish()
self.check_for_error()
def _end_flood_fill(self, nearest_neighbour_id):
self._send_request(FloodFillEnd(nearest_neighbour_id))
self._finish()
self.check_for_error()
[docs] def write_memory_from_bytearray(self, nearest_neighbour_id, base_address,
data, offset, n_bytes=None):
"""
:param int nearest_neighbour_id:
:param int base_address:
:param data:
:type data: bytes or bytearray
:param int offset:
:param int n_bytes:
"""
# pylint: disable=too-many-arguments
if n_bytes is None:
n_bytes = len(data)
self._start_flood_fill(n_bytes, nearest_neighbour_id)
data_offset = offset
offset = base_address
block_no = 0
while n_bytes > 0:
bytes_to_send = min((int(n_bytes), UDP_MESSAGE_MAX_SIZE))
self._send_request(FloodFillData(
nearest_neighbour_id, block_no, offset,
data, data_offset, bytes_to_send))
block_no += 1
n_bytes -= bytes_to_send
offset += bytes_to_send
data_offset += bytes_to_send
self._finish()
self.check_for_error()
self._end_flood_fill(nearest_neighbour_id)
[docs] def write_memory_from_reader(self, nearest_neighbour_id, base_address,
reader, n_bytes):
"""
:param int nearest_neighbour_id:
:param int base_address:
:param reader:
:type reader: ~io.RawIOBase or ~io.BufferedIOBase
:param int n_bytes:
"""
self._start_flood_fill(n_bytes, nearest_neighbour_id)
offset = base_address
block_no = 0
while n_bytes > 0:
bytes_to_send = min((int(n_bytes), UDP_MESSAGE_MAX_SIZE))
data_array = reader.read(bytes_to_send)
self._send_request(FloodFillData(
nearest_neighbour_id, block_no, offset,
data_array, 0, len(data_array)))
block_no += 1
n_bytes -= bytes_to_send
offset += bytes_to_send
self._finish()
self.check_for_error()
self._end_flood_fill(nearest_neighbour_id)