Source code for spinn_front_end_common.interface.ds.ds_sqllite_database

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import io
import logging
import os
import sqlite3
from spinn_utilities.log import FormatAdapter
from spinnman.spalloc.spalloc_job import SpallocJob
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.exceptions import DsDatabaseException
from spinn_front_end_common.utilities.sqlite_db import SQLiteDB
from .data_row_writer import DataRowWriter

_DDL_FILE = os.path.join(os.path.dirname(__file__), "dse.sql")
logger = FormatAdapter(logging.getLogger(__name__))


[docs]class DsSqlliteDatabase(SQLiteDB): __slots__ = [ # The root ethernet id if required "_root_ethernet_id" ] def __init__(self, init=None): """ :param init: :type init: bool or None """ database_file = self.default_database_file() if init is None: init = not os.path.exists(database_file) super().__init__(database_file, ddl_file=_DDL_FILE if init else None) if init: self.__init_db_contents() self._root_ethernet_id = self.__find_root_id()
[docs] @classmethod def default_database_file(cls): return os.path.join(FecDataView.get_run_dir_path(), f"ds{FecDataView.get_reset_str()}.sqlite3")
def __init_db_contents(self): """ Set up the database contents from the machine. """ eth_chips = FecDataView.get_machine().ethernet_connected_chips with self.transaction() as cursor: cursor.executemany( """ INSERT INTO ethernet( ethernet_x, ethernet_y, ip_address) VALUES(?, ?, ?) """, ( (ethernet.x, ethernet.y, ethernet.ip_address) for ethernet in eth_chips)) def __find_root_id(self): first_x = first_y = root_id = None with self.transaction() as cursor: for row in cursor.execute( """ SELECT ethernet_id, ethernet_x, ethernet_y FROM ethernet ORDER BY ethernet_x, ethernet_y LIMIT 1 """): root_id = row["ethernet_id"] first_x = row["ethernet_x"] first_y = row["ethernet_y"] if root_id is None: # Should only be reachable for an empty machine raise DsDatabaseException("No ethernet chip found") if first_x or first_y: logger.warning( "No Ethernet chip found at 0,0 using {},{} " "for all boards with no IP address.", first_x, first_y) return root_id
[docs] def write_data_spec(self, core_x, core_y, core_p, ds): """ :param int core_x: x of the core ds applies to :param int core_y: y of the core ds applies to :param int p: p of the core ds applies to :param bytearray ds: the data spec as byte code """ chip = FecDataView().get_chip_at(core_x, core_y) with self.transaction() as cursor: cursor.execute( """ INSERT INTO core( x, y, processor, content, ethernet_id, app_id) VALUES(?, ?, ?, ?, ( SELECT COALESCE(( SELECT ethernet_id FROM ethernet WHERE ethernet_x = ? AND ethernet_y = ? ), ?)), ?) """, ( core_x, core_y, core_p, sqlite3.Binary(ds), chip.nearest_ethernet_x, chip.nearest_ethernet_y, self._root_ethernet_id, FecDataView().get_app_id()))
[docs] def get_ds(self, x, y, p): """ Retrieves the data spec as byte code for this core. :param int x: core x :param int y: core y :param int p: core p :return: data spec as byte code :rtype: bytearray """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT content FROM core WHERE x = ? AND y = ? AND processor = ? LIMIT 1 """, (x, y, p)): return row["content"] return b""
[docs] def keys(self): """ Yields the keys. .. note:: Do not use the database for anything else while iterating. :return: Yields the (x, y, p) :rtype: iterable(tuple(int,int,int)) """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT x, y, processor FROM core WHERE content IS NOT NULL """): yield (row["x"], row["y"], row["processor"])
[docs] def items(self): """ Yields the keys and values for the DS data. .. note:: Do not use the database for anything else while iterating. :return: Yields the (x, y, p) and saved ds pairs :rtype: iterable(tuple(tuple(int,int,int),~io.RawIOBase)) """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT x, y, processor, content FROM core WHERE content IS NOT NULL """): yield ((row["x"], row["y"], row["processor"]), io.BytesIO(row["content"]))
[docs] def system_items(self): """ Yields the keys and values for the DS data for system cores. .. note:: Do not use the database for anything else while iterating. :return: Yields the (x, y, p), saved ds and region_size triples :rtype: iterable(tuple(tuple(int,int,int),~io.RawIOBase, int)) """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT x, y, processor, content, memory_used FROM core WHERE content IS NOT NULL AND is_system = 1 """): yield ((row["x"], row["y"], row["processor"]), io.BytesIO(row["content"]), row["memory_used"])
[docs] def app_items(self): """ Yields the keys and values for the DS data for application cores. .. note:: Do not use the database for anything else while iterating. :return: Yields the (x, y, p) and saved ds pairs :rtype: iterable(tuple(tuple(int,int,int),~io.RawIOBase, int)) """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT x, y, processor, content, memory_used FROM core WHERE content IS NOT NULL AND is_system = 0 """): yield ((row["x"], row["y"], row["processor"]), io.BytesIO(row["content"]), row["memory_used"])
[docs] def ds_n_cores(self): """ Returns the number for cores there is a ds saved for. :rtype: int :raises DsDatabaseException: """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT COUNT(*) as count FROM core WHERE content IS NOT NULL LIMIT 1 """): return row["count"] raise DsDatabaseException("Count query failed")
[docs] def ds_n_app_cores(self): """ Returns the number for application cores there is a ds saved for. :rtype: int :raises DsDatabaseException: """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT COUNT(*) as count FROM core WHERE content IS NOT NULL AND is_system = 0 LIMIT 1 """): return row["count"] raise DsDatabaseException("Count query failed")
[docs] def ds_n_system_cores(self): """ Returns the number for system cores there is a ds saved for. :rtype: int :raises DsDatabaseException: """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT COUNT(*) as count FROM core WHERE content IS NOT NULL AND is_system = 1 LIMIT 1 """): return row["count"] raise DsDatabaseException("Count query failed")
[docs] def set_app_id(self, app_id): """ Sets the same app_id for all rows that have ds content. :param int app_id: value to set """ with self.transaction() as cursor: cursor.execute( """ UPDATE core SET app_id = ? WHERE content IS NOT NULL """, (app_id,))
[docs] def ds_get_app_id(self, x, y, p): """ Gets the app_id set for this core. :param int x: core x :param int y: core y :param int p: core :rtype: int """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT app_id FROM core WHERE x = ? AND y = ? AND processor = ? LIMIT 1 """, (x, y, p)): return row["app_id"] return None
[docs] def mark_system_cores(self, core_subsets): """ :param ~spinn_machine.CoreSubsets core_subsets: """ cores_to_mark = [] for subset in core_subsets: x = subset.x y = subset.y for p in subset.processor_ids: cores_to_mark.append((x, y, p)) self.ds_mark_as_system(cores_to_mark)
[docs] def ds_mark_as_system(self, core_list): """ Flags a list of processors as running system binaries. :param iterable(tuple(int,int,int)) core_list: list of (core x, core y, core p) """ with self.transaction() as cursor: cursor.executemany( """ UPDATE core SET is_system = 1 WHERE x = ? AND y = ? AND processor = ? """, core_list)
[docs] def get_write_info(self, x, y, p): """ Gets the provenance returned by the Data Spec executor. :param int x: core x :param int y: core y :param int p: core p :return: start_address, memory_used, memory_written :rtype: DataWritten """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT start_address, memory_used, memory_written FROM core WHERE x = ? AND y = ? AND processor = ? LIMIT 1 """, (x, y, p)): return (row["start_address"], row["memory_used"], row["memory_written"]) raise ValueError(f"No info for {x}:{y}:{p}")
[docs] def set_write_info( self, x, y, p, start, used, written): """ Sets the provenance returned by the Data Spec executor. :param int x: core x :param int y: core y :param int p: core p :param int start: base_address: :param int used: size_allocated: :param int written: bytes_written: """ with self.transaction() as cursor: cursor.execute( """ UPDATE core SET start_address = ?, memory_used = ?, memory_written = ? WHERE x = ? AND y = ? AND processor = ? """, (start, used, written, x, y, p)) if cursor.rowcount == 0: chip = FecDataView().get_chip_at(x, y) cursor.execute( """ INSERT INTO core( x, y, processor, start_address, memory_used, memory_written, ethernet_id) VALUES(?, ?, ?, ?, ?, ?, ( SELECT COALESCE(( SELECT ethernet_id FROM ethernet WHERE ethernet_x = ? AND ethernet_y = ? ), ?))) """, ( x, y, p, start, used, written, chip.nearest_ethernet_x, chip.nearest_ethernet_y, self._root_ethernet_id))
[docs] def set_size_info(self, x, y, p, memory_used): with self.transaction() as cursor: cursor.execute( """ UPDATE core SET memory_used = ? WHERE x = ? AND y = ? AND processor = ? """, (memory_used, x, y, p)) if cursor.rowcount == 0: chip = FecDataView().get_chip_at(x, y) cursor.execute( """ INSERT INTO core( x, y, processor, memory_used, ethernet_id) VALUES(?, ?, ?, ?, ( SELECT COALESCE(( SELECT ethernet_id FROM ethernet WHERE ethernet_x = ? AND ethernet_y = ? ), ?))) """, ( x, y, p, int(memory_used), chip.nearest_ethernet_x, chip.nearest_ethernet_y, self._root_ethernet_id))
[docs] def clear_write_info(self): """ Clears the provenance for all rows. """ with self.transaction() as cursor: cursor.execute( """ UPDATE core SET start_address = NULL, memory_used = NULL, memory_written = NULL """)
[docs] def info_n_cores(self): """ Returns the number for cores there is a info saved for. :rtype: int :raises DsDatabaseException: """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT count(*) as count FROM core WHERE start_address IS NOT NULL LIMIT 1 """): return row["count"] raise DsDatabaseException("Count query failed")
[docs] def info_iteritems(self): """ Yields the keys and values for the Info data. .. note:: A DB transaction may be held while this iterator is processing. Reentrant use of this class is not supported. :return: Yields the (x, y, p), start_address, memory_used and memory_written :rtype: iterable(tuple(tuple(int, int, int), int, int, int)) """ with self.transaction() as cursor: for row in cursor.execute( """ SELECT x, y, processor, start_address, memory_used, memory_written FROM core WHERE start_address IS NOT NULL """): yield ((row["x"], row["y"], row["processor"]), row["start_address"], row["memory_used"], row["memory_written"])
[docs] def create_data_spec(self, x, y, p): """ :param int x: :param int y: :param int p: :rtype: DataRowWriter """ return DataRowWriter(x, y, p, self)
[docs] def write_session_credentials_to_db(self): """ Write Spalloc session credentials to the database if in use """ # pylint: disable=protected-access if not FecDataView.has_allocation_controller(): return mac = FecDataView.get_allocation_controller() if mac.proxying: # This is now assumed to be a SpallocJobController; # can't check that because of import circularity. job = mac._job if isinstance(job, SpallocJob): with self.transaction() as cur: job._write_session_credentials_to_db(cur)