Source code for spinn_front_end_common.utilities.report_functions.drift_report

# Copyright (c) 2022 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import struct
import logging
from spinn_utilities.progress_bar import ProgressBar
from spinnman.messages.spinnaker_boot import SystemVariableDefinition
from spinn_utilities.config_holder import get_config_bool
from spinn_utilities.log import FormatAdapter
from spinn_front_end_common.data import FecDataView

# The fixed point position for drift readings
DRIFT_FP = 1 << 17
CLOCK_DRIFT_REPORT = "clock_drift.csv"

logger = FormatAdapter(logging.getLogger(__name__))


[docs]def drift_report(): """ A report on the clock drift as reported by each chip """ ethernet_only = get_config_bool( "Reports", "drift_report_ethernet_only") machine = FecDataView.get_machine() eth_chips = machine.ethernet_connected_chips n_chips = machine.n_chips if ethernet_only: n_chips = len(eth_chips) # create file path directory_name = os.path.join( FecDataView.get_run_dir_path(), CLOCK_DRIFT_REPORT) # If the file is new, write a header if not os.path.exists(directory_name): with open(directory_name, "w", encoding="utf-8") as writer: for eth_chip in eth_chips: if ethernet_only: writer.write(f'"{eth_chip.x} {eth_chip.y}",') else: for chip in machine.get_chips_by_ethernet( eth_chip.x, eth_chip.y): writer.write(f'"{chip.x} {chip.y}",') writer.write("\n") # create the progress bar for end users progress = ProgressBar(n_chips, "Writing clock drift report") # iterate over ethernet chips and then the chips on that board txrx = FecDataView.get_transceiver() with open(directory_name, "a", encoding="utf-8") as writer: for eth_chip in eth_chips: if ethernet_only: __write_drift(txrx, eth_chip, writer) progress.update() else: last_drift = None for chip in machine.get_chips_by_ethernet( eth_chip.x, eth_chip.y): drift = __write_drift(txrx, chip, writer) if last_drift is None: last_drift = drift elif last_drift != drift: logger.warning( "On board {}, chip {}, {} is not in sync" " ({} vs {})", eth_chip.ip_address, chip.x, chip.y, drift, last_drift) progress.update() writer.write("\n")
def __write_drift(txrx, chip, writer): # pylint: disable=protected-access drift = txrx._get_sv_data( chip.x, chip.y, SystemVariableDefinition.clock_drift) drift = struct.unpack("<i", struct.pack("<I", drift))[0] drift = drift / (1 << 17) writer.write(f'"{drift}",') return drift