Source code for spinn_utilities.log

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import atexit
from datetime import datetime
import logging
import re
import sys
from inspect import getfullargspec
from .log_store import LogStore
from .overrides import overrides

_LEVELS = {
    'debug': logging.DEBUG,
    'info': logging.INFO,
    'warning': logging.WARNING,
    'error': logging.ERROR,
    'critical': logging.CRITICAL,
}


[docs]class ConfiguredFilter(object): """ Allow a parent logger to filter a child logger. """ __slots__ = [ "_default_level", "_levels"] def __init__(self, conf): self._levels = ConfiguredFormatter.construct_logging_parents(conf) self._default_level = _LEVELS[conf.get("Logging", "default")]
[docs] def filter(self, record): """ Get the level for the deepest parent, and filter appropriately. """ level = ConfiguredFormatter.level_of_deepest_parent( self._levels, record.name) if level is None: return record.levelno >= self._default_level return record.levelno >= level
[docs]class ConfiguredFormatter(logging.Formatter): """ Defines the logging format for the SpiNNaker host software. """ # Precompile this RE; it gets used quite a few times __last_component = re.compile(r'\.[^.]+$') def __init__(self, conf): if conf.get("Logging", "default") == "debug": fmt = "%(asctime)-15s %(levelname)s: %(pathname)s: %(message)s" else: fmt = "%(asctime)-15s %(levelname)s: %(message)s" super().__init__(fmt=fmt, datefmt="%Y-%m-%d %H:%M:%S")
[docs] @staticmethod def construct_logging_parents(conf): """ Create a dictionary of module names and logging levels. """ # Construct the dictionary _levels = {} if not conf.has_section("Logging"): return _levels for label, level in _LEVELS.items(): if conf.has_option("Logging", label): modules = [s.strip() for s in conf.get('Logging', label).split(',')] if '' not in modules: _levels.update(dict((m, level) for m in modules)) return _levels
[docs] @staticmethod def deepest_parent(parents, child): """ Greediest match between child and parent. """ # TODO: this can almost certainly be neater! # Repeatedly strip elements off the child until we match an item in # parents match = child while '.' in match and match not in parents: match = ConfiguredFormatter.__last_component.sub('', match) # If no match then return None, there is no deepest parent if match not in parents: match = None return match
[docs] @staticmethod def level_of_deepest_parent(parents, child): """ The logging level of the greediest match between child and parent. """ # child = re.sub( r'^pacman103\.', '', child ) parent = ConfiguredFormatter.deepest_parent(parents.keys(), child) if parent is None: return None return parents[parent]
class _BraceMessage(object): """ A message that converts a Python format string to a string. """ __slots__ = [ "args", "fmt", "kwargs"] def __init__(self, fmt, args, kwargs): self.fmt = fmt self.args = args self.kwargs = kwargs def __str__(self): try: return str(self.fmt).format(*self.args, **self.kwargs) except KeyError: try: if (not self.args and not self.kwargs and isinstance(self.fmt, str)): # nothing to format with so assume brackets not formatting return self.fmt return "KeyError: " + str(self.fmt) except KeyError: return "Double KeyError" except IndexError: try: if self.args or self.kwargs: return "IndexError: " + str(self.fmt) else: # nothing to format with so assume brackets not formatting return str(self.fmt) except IndexError: return "Double IndexError"
[docs]class LogLevelTooHighException(Exception): """ An Exception throw when the System tries to log at a level where an Exception is a better option. """
[docs]class FormatAdapter(logging.LoggerAdapter): """ An adaptor for logging with messages that uses Python format strings. Example:: log = FormatAdapter(logging.getLogger(__name__)) log.info("this message has {} inside {}", 123, 'itself') # --> INFO: this message has 123 inside itself """ __kill_level = logging.CRITICAL + 1 __repeat_at_end = logging.WARNING __not_stored_messages = [] __log_store = None
[docs] @classmethod def set_kill_level(cls, level=None): """ Allow system to change the level at which a log is changed to an Exception. .. note:: This is a static method; it affects all log messages. :param int level: The level to set. The values in :py:mod:`logging` are recommended. """ if level is None: cls.__kill_level = logging.CRITICAL + 1 else: cls.__kill_level = level
[docs] @classmethod def set_log_store(cls, log_store): if not isinstance(log_store, (type(None), LogStore)): raise TypeError("log_store must be a LogStore") cls.__log_store = log_store for timestamp, level, message in cls._pop_not_stored_messages(): cls.__log_store.store_log(level, message, timestamp)
def __init__(self, logger, extra=None): if extra is None: extra = {} super().__init__(logger, extra) self.do_log = logger._log # pylint: disable=protected-access
[docs] @overrides(logging.LoggerAdapter.log, extend_doc=False) def log(self, level, msg, *args, **kwargs): """ Delegate a log call to the underlying logger, applying appropriate transformations to allow the log message to be written using Python format string, rather than via `%`-substitutions. """ if level >= FormatAdapter.__kill_level: raise LogLevelTooHighException(_BraceMessage(msg, args, kwargs)) if self.isEnabledFor(level): message = _BraceMessage(msg, args, kwargs) if self.__log_store: try: FormatAdapter.__log_store.store_log(level, str(message)) except Exception as ex: # Avoid an endless loop of log store errors being logged self.__not_stored_messages.append(( datetime.now(), level, f"Unable to store log messages in database due to" f" {ex}")) self.__not_stored_messages.append( (datetime.now(), level, str(message))) FormatAdapter.__log_store = None raise else: self.__not_stored_messages.append( (datetime.now(), level, str(message))) msg, log_kwargs = self.process(msg, kwargs) if "exc_info" in kwargs: log_kwargs["exc_info"] = kwargs["exc_info"] self.do_log(level, message, (), **log_kwargs)
[docs] @overrides(logging.LoggerAdapter.process, extend_doc=False) def process(self, msg, kwargs): """ Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword arguments or both. Return the message and *kwargs* modified (or not) to suit your needs. """ return msg, { key: kwargs[key] for key in getfullargspec(self.do_log).args[1:] if key in kwargs}
[docs] @classmethod def atexit_handler(cls): messages = [] if cls.__log_store: try: messages = cls.__log_store.retreive_log_messages( cls.__repeat_at_end) except Exception: # pylint: disable=broad-except pass messages.extend(cls._pop_not_stored_messages(cls.__repeat_at_end)) if messages: level = logging.getLevelName(cls.__repeat_at_end) print(f"\n!WARNING: {len(messages)} log messages were " f"generated at level {level} or above.", file=sys.stderr) print("This may mean that the results are invalid.", file=sys.stderr) if cls.__log_store: print(f"You are advised to check the details of these in " f"the p_log_view of : " f"{cls.__log_store.get_location()}", file=sys.stderr) if len(messages) < 10: print("These are:", file=sys.stderr) else: print("The first 10 are:", file=sys.stderr) for message in messages[0:10]: print(message, file=sys.stderr)
@classmethod def _pop_not_stored_messages(cls, min_level=0): """ Returns the log of messages to print on exit and *clears that log*. .. note:: Should only be called externally from test code! """ result = [] try: for timestamp, level, message in cls.__not_stored_messages: if level >= min_level: result.append((timestamp, level, message)) return result except Exception: # pylint: disable=broad-except return result finally: cls.__not_stored_messages = []
atexit.register(FormatAdapter.atexit_handler)