Source code for mosaic.utils.logger


import os
import sys
import asyncio
import logging
from cached_property import cached_property

import mosaic


__all__ = ['LoggerManager', 'clear_logger', 'default_logger', 'log_level']


log_level = 'perf'


_stdout = sys.stdout
_stderr = sys.stderr


_local_log_levels = {
    'info': logging.INFO,
    'perf': 19,
    'debug': logging.DEBUG,
    'error': logging.ERROR,
    'warning': logging.WARNING,
}


_remote_log_levels = {
    'info': 'log_info',
    'perf': 'log_perf',
    'debug': 'log_debug',
    'error': 'log_error',
    'warning': 'log_warning',
}


logging.addLevelName(_local_log_levels['perf'], 'PERF')


class LoggerBase:

    @property
    def runtime(self):
        return mosaic.runtime()

    @property
    def comms(self):
        return mosaic.get_comms()

    @property
    def loop(self):
        return mosaic.get_event_loop()

    def isatty(self):
        return False


class LocalLogger(LoggerBase):
    def __init__(self, logger, log_level='info'):
        self._logger = logger
        self._log_level = _local_log_levels[log_level]
        self._linebuf = ''

    def write(self, buf, uid=None):
        if uid is None:
            if self.runtime is not None:
                uid = self.runtime.uid
            else:
                uid = ''
        uid = uid.upper()

        temp_linebuf = self._linebuf + buf
        self._linebuf = ''
        for line in temp_linebuf.splitlines(True):
            # From the io.TextIOWrapper docs:
            #   On output, if newline is None, any '\n' characters written
            #   are translated to the system default line separator.
            # By default sys.stdout.write() expects '\n' newlines and then
            # translates them so this is still cross platform.
            if line[-1] == '\n':
                self._logger.log(self._log_level, line.rstrip(), extra={'runtime_id': uid})
            else:
                self._linebuf += line

    def flush(self, uid=None):
        if uid is None:
            if self.runtime is not None:
                uid = self.runtime.uid
            else:
                uid = ''
        uid = uid.upper()

        if self._linebuf != '':
            self._logger.log(self._log_level, self._linebuf.rstrip(), extra={'runtime_id': uid})
        self._linebuf = ''

    def log(self, msg, uid=None):
        if uid is None:
            if self.runtime is not None:
                uid = self.runtime.uid
            else:
                uid = ''
            uid = uid.upper()
            self._logger.log(self._log_level, msg, extra={'runtime_id': uid})
        else:
            self.write(msg, uid=uid)
            self.flush(uid=uid)

    async def send(self):
        pass


class RemoteLogger(LoggerBase):
    def __init__(self, logger, runtime_id, log_level='info'):
        self._logger = logger
        self._runtime_id = runtime_id
        self._local_log_level = _local_log_levels[log_level]
        self._remote_log_level = _remote_log_levels[log_level]
        self._linebuf = ''
        self._queuebuf = ''

        loop = mosaic.get_event_loop()
        loop.interval(self.send, interval=0.01)

    @cached_property
    def remote_runtime(self):
        return self.runtime.proxy(self._runtime_id)

    def write(self, buf, uid=None):
        if buf == '\n':
            return

        temp_linebuf = self._linebuf + buf
        self._linebuf = ''
        for line in temp_linebuf.splitlines(True):
            # From the io.TextIOWrapper docs:
            #   On output, if newline is None, any '\n' characters written
            #   are translated to the system default line separator.
            # By default sys.stdout.write() expects '\n' newlines and then
            # translates them so this is still cross platform.
            if line[-1] == '\n':
                continue

            elif line.rstrip() == '':
                continue

            else:
                self._linebuf += line
                self._linebuf += '\n'

        self.queue(self._linebuf)
        self._linebuf = ''

    def flush(self):
        if len(self._linebuf):
            self.queue(self._linebuf)
            self._linebuf = ''

    def log(self, buf, uid=None):
        self.queue(buf)

    def queue(self, buf):
        if not self.comms.shaken(self._runtime_id):
            _stdout.write(buf)
            _stdout.flush()

        else:
            if len(self._queuebuf):
                self._queuebuf += '\n'
            self._queuebuf += buf

        if self.runtime is not None:
            uid = self.runtime.uid
        else:
            uid = ''
        uid = uid.upper()

        self._logger.log(self._local_log_level, buf.rstrip(), extra={'runtime_id': uid})

    async def send(self):
        if len(self._queuebuf):
            await self.remote_runtime[self._remote_log_level](buf=self._queuebuf)
            self._queuebuf = ''


[docs] class LoggerManager: """ Class that manages the creation loggers and the interface with them. It creates local or remote loggers and handles the communication with loggers at different levels ``info``, ``debug``, ``error`` and ``warning``. """ def __init__(self): self._info_logger = None self._perf_logger = None self._debug_logger = None self._error_logger = None self._warn_logger = None self._stdout = _stdout self._stderr = _stderr self._log_level = 'perf' self._log_location = None self._log_path = os.path.join(os.getcwd(), 'mosaic-workspace') if not os.path.exists(self._log_path): os.makedirs(self._log_path)
[docs] def set_default(self, format='interactive'): """ Set up default loggers. Returns ------- """ self._log_location = 'local' logging._srcfile = None logging.logThreads = False logging.logProcesses = False logging.logMultiprocessing = False sys.stdout = self._stdout sys.stderr = self._stderr handler = logging.StreamHandler(self._stdout) handler.setFormatter(CustomFormatter('%(message)s')) logger = logging.getLogger('mosaic') logger.setLevel(_local_log_levels[log_level]) logger.propagate = False if logger.hasHandlers(): logger.handlers.clear() logger.addHandler(handler) self._info_logger = LocalLogger(logger, log_level='info') self._perf_logger = LocalLogger(logger, log_level='perf') self._debug_logger = LocalLogger(logger, log_level='debug') self._error_logger = LocalLogger(logger, log_level='error') self._warn_logger = LocalLogger(logger, log_level='warning') sys.stdout.flush() logging.basicConfig( stream=self._info_logger, level=_local_log_levels[log_level], format='%(message)s', )
[docs] def set_local(self, format='remote'): """ Set up local loggers. Returns ------- """ self._log_location = 'local' logging._srcfile = None logging.logThreads = False logging.logProcesses = False logging.logMultiprocessing = False sys.stdout = self._stdout sys.stderr = self._stderr runtime = mosaic.runtime() log_file = f'{runtime.uid}.log'.replace(':', '-') file_handler = logging.FileHandler(os.path.join(self._log_path, log_file), mode='w') handler = logging.StreamHandler(self._stdout) if format == 'interactive': handler.setFormatter(CustomFormatter('%(runtime_id)-15s %(message)s')) file_handler.setFormatter(CustomFormatter('%(runtime_id)-15s %(message)s')) else: handler.setFormatter(CustomFormatter('%(asctime)s - %(levelname)-10s %(runtime_id)-15s %(message)s')) file_handler.setFormatter(CustomFormatter('%(asctime)s - %(levelname)-10s %(runtime_id)-15s %(message)s')) logger = logging.getLogger('mosaic') logger.setLevel(_local_log_levels[log_level]) logger.propagate = False if logger.hasHandlers(): logger.handlers.clear() logger.addHandler(handler) logger.addHandler(file_handler) self._info_logger = LocalLogger(logger, log_level='info') self._perf_logger = LocalLogger(logger, log_level='perf') self._debug_logger = LocalLogger(logger, log_level='debug') self._error_logger = LocalLogger(logger, log_level='error') self._warn_logger = LocalLogger(logger, log_level='warning') sys.stdout.flush() logging.basicConfig( # stream=self._info_logger, level=_local_log_levels[log_level], format='%(message)s', )
[docs] def set_remote(self, runtime_id='monitor', format='remote'): """ Set up remote loggers. Parameters ---------- runtime_id : str, optional Runtime to which logging will be directed, defaults to ``monitor``. Returns ------- """ self._log_location = 'remote' logging._srcfile = None logging.logThreads = False logging.logProcesses = False logging.logMultiprocessing = False sys.stdout = self._stdout sys.stderr = self._stderr runtime = mosaic.runtime() log_file = f'{runtime.uid}.log'.replace(':', '-') file_handler = logging.FileHandler(os.path.join(self._log_path, log_file), mode='w') if format == 'interactive': file_handler.setFormatter(CustomFormatter('%(runtime_id)-15s %(message)s')) else: file_handler.setFormatter(CustomFormatter('%(asctime)s - %(levelname)-10s %(runtime_id)-15s %(message)s')) logger = logging.getLogger('mosaic') logger.setLevel(_local_log_levels[log_level]) logger.propagate = False if logger.hasHandlers(): logger.handlers.clear() logger.addHandler(file_handler) self._info_logger = RemoteLogger(logger, runtime_id=runtime_id, log_level='info') self._perf_logger = RemoteLogger(logger, runtime_id=runtime_id, log_level='perf') self._debug_logger = RemoteLogger(logger, runtime_id=runtime_id, log_level='debug') self._error_logger = RemoteLogger(logger, runtime_id=runtime_id, log_level='error') self._warn_logger = RemoteLogger(logger, runtime_id=runtime_id, log_level='warning') sys.stdout.flush() sys.stdout = self._info_logger sys.stderr = self._error_logger logging.basicConfig( stream=sys.stdout, level=_local_log_levels[log_level], format='%(message)s', )
[docs] @staticmethod def set_level(level): """ Set log level from options ``info``, ``debug``, ``error`` and ``warning``. Parameters ---------- level : str Log level Returns ------- """ global log_level log_level = level logger = logging.getLogger('mosaic') logger.setLevel(_local_log_levels[level])
[docs] def info(self, buf, uid=None): """ Log message with level ``info``. Parameters ---------- buf : str Message to log. uid : str, optional UID of the runtime from which the message originates, defaults to current runtime. Returns ------- """ if self._info_logger is None: return if log_level in ['error']: return self._info_logger.log(buf, uid=uid)
[docs] def perf(self, buf, uid=None): """ Log message with level ``perf``. Parameters ---------- buf : str Message to log. uid : str, optional UID of the runtime from which the message originates, defaults to current runtime. Returns ------- """ if self._perf_logger is None: return if log_level in ['info', 'error']: return self._perf_logger.log(buf, uid=uid)
[docs] def debug(self, buf, uid=None): """ Log message with level ``debug``. Parameters ---------- buf : str Message to log. uid : str, optional UID of the runtime from which the message originates, defaults to current runtime. Returns ------- """ if self._debug_logger is None: return if log_level in ['info', 'error', 'perf']: return self._debug_logger.log(buf, uid=uid)
[docs] def error(self, buf, uid=None): """ Log message with level ``error``. Parameters ---------- buf : str Message to log. uid : str, optional UID of the runtime from which the message originates, defaults to current runtime. Returns ------- """ if self._error_logger is None: return self._error_logger.log(buf, uid=uid)
[docs] def warning(self, buf, uid=None): """ Log message with level ``warning``. Parameters ---------- buf : str Message to log. uid : str, optional UID of the runtime from which the message originates, defaults to current runtime. Returns ------- """ if self._warn_logger is None: return self._warn_logger.log(buf, uid=uid)
[docs] def warn(self, buf, uid=None): """ Log message with level ``warning``. Parameters ---------- buf : str Message to log. uid : str, optional UID of the runtime from which the message originates, defaults to current runtime. Returns ------- """ self.warning(buf, uid=uid)
[docs] async def send(self): await asyncio.gather( self._info_logger.send(), self._perf_logger.send(), self._debug_logger.send(), self._error_logger.send(), self._warn_logger.send(), )
class CustomFormatter(logging.Formatter): def format(self, record): if not hasattr(record, 'runtime_id'): record.runtime_id = '' return super().format(record)
[docs] def clear_logger(): sys.stdout.flush() sys.stderr.flush() sys.stdout = _stdout sys.stderr = _stderr
default_logger = LoggerManager() default_logger.set_default()