Source code for spinn_front_end_common.interface.java_caller

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import defaultdict
import json
import logging
import os
import subprocess
from spinn_utilities.config_holder import get_config_str
from spinn_utilities.log import FormatAdapter
from pacman.exceptions import PacmanExternalAlgorithmFailedToCompleteException
from pacman.model.graphs import AbstractVirtual
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.report_functions import (
    write_json_machine)
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spinn_front_end_common.interface.buffer_management.buffer_models import (
    AbstractReceiveBuffersToHost)
from spinn_front_end_common.interface.buffer_management.storage_objects \
    import BufferDatabase
from spinn_front_end_common.interface.ds import DsSqlliteDatabase

logger = FormatAdapter(logging.getLogger(__name__))


[docs]class JavaCaller(object): """ Support class that holds all the stuff for running stuff in Java. This includes the work of preparing data for transmitting to Java and back. This separates the choices of how to call the Java batch vs streaming, jar locations, parameters, etc from the rest of the python code. """ __slots__ = [ "_chipxy_by_ethernet", # The call to get java to work. Including the path if required. "_java_call", # The location of the java jar file "_jar_file", # The location where the machine json is written "_machine_json_path", # Dict of chip (x, y) to the p of the monitor vertex "_monitor_cores", # Flag to indicate if at least one placement is recording "_recording", # Dict of ethernet (x, y) and the packetGather IPtago "_gatherer_iptags", # Dict of ethernet (x, y) to the p of the packetGather vertex "_gatherer_cores", # The location where the latest placement json is written "_placement_json", # Properties flag to be passed to Java "_java_properties" ] def __init__(self): """ Creates a java caller and checks the user/config parameters. :raise ConfigurationException: if simple parameter checking fails. """ self._recording = None self._java_call = get_config_str("Java", "java_call") result = subprocess.call([self._java_call, '-version']) if result != 0: raise ConfigurationException( f" {self._java_call} -version failed. " "Please set [Java] java_call to the absolute path " "to start java. (in config file)") self._find_java_jar() self._machine_json_path = None self._placement_json = None self._monitor_cores = None self._gatherer_iptags = None self._gatherer_cores = None self._java_properties = get_config_str("Java", "java_properties") self._chipxy_by_ethernet = None if self._java_properties is not None: self._java_properties = self._java_properties.split() # pylint: disable=not-an-iterable for _property in self._java_properties: if _property[:2] != "-D": raise ConfigurationException( "Java Properties must start with -D " f"found at {_property}") def _find_java_jar(self): java_spinnaker_path = get_config_str("Java", "java_spinnaker_path") java_jar_path = get_config_str("Java", "java_jar_path") if java_spinnaker_path is None: interface = os.path.dirname(os.path.realpath(__file__)) spinn_front_end_common = os.path.dirname(interface) github_checkout_dir = os.path.dirname(spinn_front_end_common) parent = os.path.dirname(github_checkout_dir) java_spinnaker_path = os.path.join(parent, "JavaSpiNNaker") else: # As I don't know how to write pwd and /JavaSpiNNaker to one line indirect_path = os.path.join( java_spinnaker_path, "JavaSpiNNaker") if os.path.isdir(indirect_path): java_spinnaker_path = indirect_path auto_jar_file = os.path.join( java_spinnaker_path, "SpiNNaker-front-end", "target", "spinnaker-exe.jar") if os.path.exists(auto_jar_file): if (java_jar_path is None) or (java_jar_path == auto_jar_file): self._jar_file = auto_jar_file else: raise ConfigurationException( f"Found a jar file at {auto_jar_file} " f"while java_jar_path as set. " f"Please delete on of the two.") else: if java_jar_path is None: if not os.path.isdir(java_spinnaker_path): raise ConfigurationException( f"No Java code found at {java_spinnaker_path} " f"Nor is java_jar_path set.") else: raise ConfigurationException( f"No jar file at {auto_jar_file} " f"Nor is java_jar_path set.") elif os.path.exists(java_jar_path): self._jar_file = auto_jar_file else: raise ConfigurationException( f"No file found at java_jar_path: {java_jar_path}")
[docs] def set_advanced_monitors(self): """ Create information describing what's going on with the monitor cores. """ tags = FecDataView.get_tags() self._monitor_cores = dict() for core, monitor_core in FecDataView.iterate_monitor_items(): placement = FecDataView.get_placement_of_vertex(monitor_core) self._monitor_cores[core] = placement.p self._gatherer_iptags = dict() self._gatherer_cores = dict() for core, packet_gather in FecDataView.iterate_gather_items(): self._gatherer_iptags[core] = \ tags.get_ip_tags_for_vertex(packet_gather)[0] placement = FecDataView.get_placement_of_vertex(packet_gather) self._gatherer_cores[core] = placement.p self._chipxy_by_ethernet = defaultdict(list) machine = FecDataView.get_machine() for chip in machine.chips: chip_xy = (chip.x, chip.y) ethernet = (chip.nearest_ethernet_x, chip.nearest_ethernet_y) self._chipxy_by_ethernet[ethernet].append(chip_xy)
def _machine_json(self): """ Converts the machine in this class to JSON. :return: the name of the file containing the JSON """ if self._machine_json_path is None: self._machine_json_path = write_json_machine( progress_bar=False, validate=False) return self._machine_json_path
[docs] def set_placements(self, used_placements): """ Passes in the placements leaving this class to decide pass it to Java. Currently the extra information extracted is recording region base address but this could change if recording region saved in the database. Currently this method uses JSON but that may well change to using the database. :param ~pacman.model.placements.Placements used_placements: Placements that are recording. May not be all placements """ path = os.path.join( FecDataView.get_json_dir_path(), "java_placements.json") self._recording = False if self._gatherer_iptags is None: self._placement_json = self._write_placements( used_placements, path) else: self._placement_json = self._write_gather( used_placements, path)
def _json_placement(self, placement): """ :param ~pacman.model.placements.Placement placement: :rtype: dict """ vertex = placement.vertex json_placement = dict() json_placement["x"] = placement.x json_placement["y"] = placement.y json_placement["p"] = placement.p json_vertex = dict() json_vertex["label"] = vertex.label if isinstance(vertex, AbstractReceiveBuffersToHost) and \ vertex.get_recorded_region_ids(): self._recording = True json_vertex["recordedRegionIds"] = vertex.get_recorded_region_ids() json_vertex["recordingRegionBaseAddress"] = \ vertex.get_recording_region_base_address(placement) else: json_vertex["recordedRegionIds"] = [] json_vertex["recordingRegionBaseAddress"] = 0 json_placement["vertex"] = json_vertex return json_placement def _json_iptag(self, iptag): """ :param ~pacman.model.tags.IPTag iptag: :rtype: dict """ json_tag = dict() json_tag["x"] = iptag.destination_x json_tag["y"] = iptag.destination_y json_tag["boardAddress"] = iptag.board_address json_tag["targetAddress"] = iptag.ip_address # Intentionally not including port! json_tag["stripSDP"] = iptag.strip_sdp json_tag["tagID"] = iptag.tag json_tag["trafficIdentifier"] = iptag.traffic_identifier return json_tag def _placements_grouped(self, recording_placements): """ :param ~pacman.model.placements.Placements recording_placementss: :rtype: dict(tuple(int,int),dict(tuple(int,int), ~pacman.model.placements.Placement)) """ by_ethernet = defaultdict(lambda: defaultdict(list)) for placement in recording_placements: if not isinstance(placement.vertex, AbstractVirtual): machine = FecDataView.get_machine() chip = machine.get_chip_at(placement.x, placement.y) chip_xy = (placement.x, placement.y) ethernet = (chip.nearest_ethernet_x, chip.nearest_ethernet_y) by_ethernet[ethernet][chip_xy].append(placement) return by_ethernet def _write_gather(self, used_placements, path): """ :param ~pacman.model.placements.Placements used_placements: placements that are being used. May not eb all placements :param str path: :rtype: str """ placements_by_ethernet = self._placements_grouped(used_placements) json_obj = list() for ethernet in self._chipxy_by_ethernet: by_chip = placements_by_ethernet[ethernet] json_gather = dict() json_gather["x"] = ethernet[0] json_gather["y"] = ethernet[1] json_gather["p"] = self._gatherer_cores[ethernet] json_gather["iptag"] = self._json_iptag( self._gatherer_iptags[ethernet]) json_chips = list() for chip_xy in self._chipxy_by_ethernet[ethernet]: json_chip = dict() json_chip["x"] = chip_xy[0] json_chip["y"] = chip_xy[1] json_chip["p"] = self._monitor_cores[chip_xy] if chip_xy in by_chip: json_placements = list() for placement in by_chip[chip_xy]: json_p = self._json_placement(placement) if json_p: json_placements.append(json_p) if len(json_placements) > 0: json_chip["placements"] = json_placements json_chips.append(json_chip) json_gather["monitors"] = json_chips json_obj.append(json_gather) # dump to json file with open(path, "w", encoding="utf-8") as f: json.dump(json_obj, f) return path def _write_placements(self, used_placements, path): """ :param ~pacman.model.placements.Placements placements: Placements that are being used. May not be all placements :param str path: :rtype: str """ # Read back the regions json_obj = list() for placement in used_placements: if not isinstance(placement.vertex, AbstractVirtual): json_p = self._json_placement(placement) if json_p: json_obj.append(json_p) # dump to json file with open(path, "w", encoding="utf-8") as f: json.dump(json_obj, f) return path def _run_java(self, *args): """ Does the actual running of JavaSpiNNaker. Arguments are those that will be processed by the `main` method on the Java side. :rtype: int """ if self._java_properties is None: params = [self._java_call, '-jar', self._jar_file] else: params = [self._java_call] + self._java_properties \ + ['-jar', self._jar_file] params.extend(args) return subprocess.call(params)
[docs] def get_all_data(self): """ Gets all the data from the previously set placements and put these in the previously set database. :raises PacmanExternalAlgorithmFailedToCompleteException: On failure of the Java code. """ if not self._recording: return if self._gatherer_iptags is None: result = self._run_java( 'download', self._placement_json, self._machine_json(), BufferDatabase.default_database_file(), FecDataView.get_run_dir_path()) else: result = self._run_java( 'gather', self._placement_json, self._machine_json(), BufferDatabase.default_database_file(), FecDataView.get_run_dir_path()) if result != 0: log_file = os.path.join( FecDataView.get_run_dir_path(), "jspin.log") raise PacmanExternalAlgorithmFailedToCompleteException( "Java call exited with value " + str(result) + " see " + str(log_file) + " for logged info")
[docs] def execute_data_specification(self): """ Writes all the data specs, uploading the result to the machine. :raises PacmanExternalAlgorithmFailedToCompleteException: On failure of the Java code. """ result = self._run_java( 'dse', self._machine_json(), DsSqlliteDatabase.default_database_file(), FecDataView.get_run_dir_path()) if result != 0: log_file = os.path.join( FecDataView.get_run_dir_path(), "jspin.log") raise PacmanExternalAlgorithmFailedToCompleteException( "Java call exited with value " + str(result) + " see " + str(log_file) + " for logged info")
[docs] def execute_system_data_specification(self): """ Writes all the data specs for system cores, uploading the result to the machine. :raises PacmanExternalAlgorithmFailedToCompleteException: On failure of the Java code. """ result = self._run_java( 'dse_sys', self._machine_json(), DsSqlliteDatabase.default_database_file(), FecDataView.get_run_dir_path()) if result != 0: log_file = os.path.join( FecDataView.get_run_dir_path(), "jspin.log") raise PacmanExternalAlgorithmFailedToCompleteException( "Java call exited with value " + str(result) + " see " + str(log_file) + " for logged info")
[docs] def execute_app_data_specification(self, use_monitors): """ Writes all the data specs for application cores, uploading the result to the machine. .. note:: May assume that system cores are already loaded and running if `use_monitors` is set to `True`. :param bool use_monitors: :raises PacmanExternalAlgorithmFailedToCompleteException: On failure of the Java code. """ if use_monitors: result = self._run_java( 'dse_app_mon', self._placement_json, self._machine_json(), DsSqlliteDatabase.default_database_file(), FecDataView.get_run_dir_path()) else: result = self._run_java( 'dse_app', self._machine_json(), DsSqlliteDatabase.default_database_file(), FecDataView.get_run_dir_path()) if result != 0: log_file = os.path.join( FecDataView.get_run_dir_path(), "jspin.log") raise PacmanExternalAlgorithmFailedToCompleteException( "Java call exited with value " + str(result) + " see " + str(log_file) + " for logged info")