Source code for spinnman.processes.get_heap_process

# Copyright (c) 2016 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import struct
import functools
from spinnman.processes import AbstractMultiConnectionProcess
from spinnman.constants import SYSTEM_VARIABLE_BASE_ADDRESS
from spinnman.model import HeapElement
from spinnman.messages.spinnaker_boot import SystemVariableDefinition
from spinnman.messages.scp.impl import ReadMemory


HEAP_ADDRESS = SystemVariableDefinition.sdram_heap_address
_ADDRESS = struct.Struct("<I")
_HEAP_POINTER = struct.Struct("<4xI")
_ELEMENT_HEADER = struct.Struct("<II")


[docs]class GetHeapProcess(AbstractMultiConnectionProcess): __slots__ = [ "_blocks", "_heap_address", "_next_block_address"] def __init__(self, connection_selector): """ :param connection_selector: :type connection_selector: AbstractMultiConnectionProcessConnectionSelector """ super().__init__(connection_selector) self._heap_address = None self._next_block_address = None self._blocks = list() def _read_heap_address_response(self, response): self._heap_address = _ADDRESS.unpack_from( response.data, response.offset)[0] def _read_heap_pointer(self, response): self._next_block_address = _HEAP_POINTER.unpack_from( response.data, response.offset)[0] def _read_next_block(self, block_address, response): self._next_block_address, free = _ELEMENT_HEADER.unpack_from( response.data, response.offset) if self._next_block_address != 0: self._blocks.append(HeapElement( block_address, self._next_block_address, free)) def _read_address(self, chip_address, address, size, callback): (x, y) = chip_address self._send_request( ReadMemory(x, y, address, size), callback) self._finish() self.check_for_error()
[docs] def get_heap(self, chip_address, pointer=HEAP_ADDRESS): """ :param tuple(int,int) chip_address: x, y :param SystemVariableDefinition pointer: :rtype: list(HeapElement) """ self._read_address( chip_address, SYSTEM_VARIABLE_BASE_ADDRESS + pointer.offset, pointer.data_type.value, self._read_heap_address_response) self._read_address( chip_address, self._heap_address, 8, self._read_heap_pointer) while self._next_block_address != 0: self._read_address( chip_address, self._next_block_address, 8, functools.partial( self._read_next_block, self._next_block_address)) return self._blocks