From d500b7d473f8f372292463f38c62361c4c00c4cc Mon Sep 17 00:00:00 2001 From: Clemens Schwaighofer Date: Wed, 9 Jul 2025 14:41:53 +0900 Subject: [PATCH] Log update with listener Queues and color highlight for console enables Queues if multiprocessing.Queue() is set in the "log_queue" setting Now a logger "Log.init_worker_logging" can be attached to the ProcessPoolExecutor, the init args must be the log_queue set example: ```py with concurrent.futures.ProcessPoolExecutor( max_workers=max_forks, initializer=Log.init_worker_logging, initargs=(log_queue,) ) as executor: ```` Move all settings into a settings argument, the structure is defined in LogSettings. Default settings are in Log.DEFAULT_LOG_SETTINGS Only log path and log name are parameters Color output for console is on default enabled, disable via "console_color_output_enabled" The complete console output can be stopped with "console_enabled" --- src/corelibs/logging_handling/log.py | 284 +++++++++++++++--- test-run/logging_handling/log.py | 48 +++ test-run/logging_handling/log/.gitignore | 2 + test-run/logging_handling/log_queue.py | 75 +++++ .../queue_logger/log_queue.py | 96 ++++++ 5 files changed, 471 insertions(+), 34 deletions(-) create mode 100644 test-run/logging_handling/log.py create mode 100644 test-run/logging_handling/log/.gitignore create mode 100644 test-run/logging_handling/log_queue.py create mode 100644 test-run/logging_handling/queue_logger/log_queue.py diff --git a/src/corelibs/logging_handling/log.py b/src/corelibs/logging_handling/log.py index 53e2d7a..b5a10ab 100644 --- a/src/corelibs/logging_handling/log.py +++ b/src/corelibs/logging_handling/log.py @@ -1,11 +1,71 @@ """ A log handler wrapper +if log_settings['log_queue'] is set to multiprocessing.Queue it will launch with listeners +attach "init_worker_logging" with the set log_queue """ +import re import logging.handlers import logging from pathlib import Path -from typing import Mapping +from typing import Mapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast +from corelibs.string_handling.text_colors import Colors + +if TYPE_CHECKING: + from multiprocessing import Queue + + +class LogSettings(TypedDict): + """ + log settings + + Arguments: + TypedDict {_type_} -- _description_ + """ + log_level_console: str + log_level_file: str + console_enabled: bool + console_color_output_enabled: bool + add_start_info: bool + add_end_info: bool + log_queue: 'Queue[str] | None' + + +class CustomConsoleFormatter(logging.Formatter): + """ + Custom formatter with colors for console output + """ + + COLORS = { + "DEBUG": Colors.cyan, + "INFO": Colors.green, + "WARNING": Colors.yellow, + "ERROR": Colors.red, + "CRITICAL": Colors.magenta, + "EXCEPTION": Colors.magenta_bold, + "RESET": Colors.reset, # Reset + } + + def format(self, record: logging.LogRecord) -> str: + """ + set the color highlight + + Arguments: + record {logging.LogRecord} -- _description_ + + Returns: + str -- _description_ + """ + # Add color to levelname for console output + reset = self.COLORS["RESET"] + color = self.COLORS.get(record.levelname, reset) + # only highlight level for basic + if record.levelname in ['DEBUG', 'INFO']: + record.levelname = f"{color}{record.levelname}{reset}" + return super().format(record) + # highlight whole line + message = super().format(record) + return f"{color}{message}{reset}" class Log: @@ -13,60 +73,148 @@ class Log: logger setup """ + # exception level EXCEPTION: int = 60 - + # spacer lenght characters and the character + SPACER_CHAR: str = '=' SPACER_LENGTH: int = 32 + # default logging level + DEFAULT_LOG_LEVEL: str = 'WARNING' + # default settings + DEFAULT_LOG_SETTINGS: LogSettings = { + "log_level_console": "WARNING", + "log_level_file": "DEBUG", + "console_enabled": True, + "console_color_output_enabled": True, + "add_start_info": True, + "add_end_info": False, + "log_queue": None, + } def __init__( self, log_path: Path, log_name: str, - log_level_console: str = 'WARNING', - log_level_file: str = 'DEBUG', - add_start_info: bool = True + log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None = None, ): + # add new level for EXCEPTION logging.addLevelName(Log.EXCEPTION, 'EXCEPTION') - if not log_name.endswith('.log'): + # parse the logging settings + self.log_settings = self.__parse_log_settings(log_settings) + # if path, set log name with .log + # if log name with .log, strip .log for naming + if log_path.is_dir(): + __log_file_name = re.sub(r'[^a-zA-Z0-9]', '', log_name) + if not log_name.endswith('.log'): + log_path = log_path.joinpath(Path(__log_file_name).with_suffix('.log')) + else: + log_path = log_path.joinpath(__log_file_name) + elif not log_path.suffix == '.log': + # add .log if the path is a file but without .log log_path = log_path.with_suffix('.log') - # overall logger settings - self.logger = logging.getLogger(log_name) - # set maximum logging level for all logging output - self.logger.setLevel(logging.DEBUG) + # stip .log from the log name if set + if not log_name.endswith('.log'): + log_name = Path(log_name).stem + # general log name + self.log_name = log_name - # self.handlers = [] - # console logger - self.__console_handler(log_level_console) - # file logger - self.__file_handler(log_level_file, log_path) + self.log_queue: 'Queue[str] | None' = None + self.listener: logging.handlers.QueueListener | None = None + + # setup handlers + # NOTE if console with color is set first, some of the color formatting is set + # in the file writer too, for the ones where color is set BEFORE the format + self.handlers: list[logging.StreamHandler[TextIO] | logging.handlers.TimedRotatingFileHandler] = [ + # file handler, always + self.__file_handler(self.log_settings['log_level_file'], log_path) + ] + if self.log_settings['console_enabled']: + # console + self.handlers.append(self.__console_handler(self.log_settings['log_level_console'])) + # init listener if we have a log_queue set + self.__init_listener(self.log_settings['log_queue']) + + # overall logger start + self.__init_log(log_name) # if requests set a start log - if add_start_info is True: + if self.log_settings['add_start_info'] is True: self.break_line('START') + def __del__(self): + """ + Call when class is destroyed, make sure the listender is closed or else we throw a thread error + """ + if self.log_settings['add_end_info']: + self.break_line('END') + self.stop_listener() + + def __parse_log_settings( + self, + log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None + ) -> LogSettings: + # skip with defaul it not set + if log_settings is None: + return __class__.DEFAULT_LOG_SETTINGS + # check entries + default_log_settings = __class__.DEFAULT_LOG_SETTINGS + # check log levels + for __log_entry in ['log_level_console', 'log_level_file']: + if log_settings.get(__log_entry) is None: + continue + # if not valid reset to default, if not in default set to WARNING + if not self.validate_log_level(_log_level := log_settings.get(__log_entry, '')): + _log_level = __class__.DEFAULT_LOG_SETTINGS.get( + __log_entry, __class__.DEFAULT_LOG_LEVEL + ) + default_log_settings[__log_entry] = str(_log_level) + # check bool + for __log_entry in [ + "console_enabled", + "console_color_output_enabled", + "add_start_info", + "add_end_info", + ]: + if log_settings.get(__log_entry) is None: + continue + if not isinstance(__setting := log_settings.get(__log_entry, ''), bool): + __setting = __class__.DEFAULT_LOG_SETTINGS.get(__log_entry, True) + default_log_settings[__log_entry] = __setting + # check log queue + __setting = log_settings.get('log_queue', __class__.DEFAULT_LOG_SETTINGS['log_queue']) + if __setting is not None: + __setting = cast('Queue[str]', __setting) + default_log_settings['log_queue'] = __setting + return default_log_settings + def __filter_exceptions(self, record: logging.LogRecord) -> bool: return record.levelname != "EXCEPTION" - def __console_handler(self, log_level_console: str = 'WARNING'): + def __console_handler(self, log_level_console: str = 'WARNING') -> logging.StreamHandler[TextIO]: # console logger if not isinstance(getattr(logging, log_level_console.upper(), None), int): log_level_console = 'WARNING' console_handler = logging.StreamHandler() - # TODO Add flag based color handler - formatter_console = logging.Formatter( - ( - '[%(asctime)s.%(msecs)03d] ' - '[%(filename)s:%(funcName)s:%(lineno)d] ' - '<%(levelname)s> ' - '%(message)s' - ), - datefmt="%Y-%m-%d %H:%M:%S", + # format layouts + format_string = ( + '[%(asctime)s.%(msecs)03d] ' + '[%(name)s] ' + '[%(filename)s:%(funcName)s:%(lineno)d] ' + '<%(levelname)s> ' + '%(message)s' ) + format_date = "%Y-%m-%d %H:%M:%S" + # color or not + if self.log_settings['console_color_output_enabled']: + formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date) + else: + formatter_console = logging.Formatter(format_string, datefmt=format_date) console_handler.setLevel(log_level_console) # do not show exceptions logs on console console_handler.addFilter(self.__filter_exceptions) console_handler.setFormatter(formatter_console) - self.logger.addHandler(console_handler) + return console_handler - def __file_handler(self, log_level_file: str, log_path: Path) -> None: + def __file_handler(self, log_level_file: str, log_path: Path) -> logging.handlers.TimedRotatingFileHandler: # file logger if not isinstance(getattr(logging, log_level_file.upper(), None), int): log_level_file = 'DEBUG' @@ -78,17 +226,80 @@ class Log: ) formatter_file_handler = logging.Formatter( ( + # time stamp '[%(asctime)s.%(msecs)03d] ' - '[%(name)s:%(process)d] ' + # log name + '[%(name)s] ' + # filename + pid + '[%(filename)s:%(process)d] ' + # path + func + line number '[%(pathname)s:%(funcName)s:%(lineno)d] ' + # error level '<%(levelname)s> ' + # message '%(message)s' ), datefmt="%Y-%m-%dT%H:%M:%S", ) file_handler.setLevel(log_level_file) file_handler.setFormatter(formatter_file_handler) - self.logger.addHandler(file_handler) + return file_handler + + def __init_listener(self, log_queue: 'Queue[str] | None' = None): + """ + If we have a Queue option start the logging queue + + Keyword Arguments: + log_queue {Queue[str] | None} -- _description_ (default: {None}) + """ + if log_queue is None: + return + self.log_queue = log_queue + self.listener = logging.handlers.QueueListener(self.log_queue, *self.handlers) + self.listener.start() + + def __init_log(self, log_name: str) -> None: + """ + Initialize the main loggger + """ + queue_handler: logging.handlers.QueueHandler | None = None + if self.log_queue is not None: + queue_handler = logging.handlers.QueueHandler(self.log_queue) + # overall logger settings + self.logger = logging.getLogger(log_name) + # add all the handlers + if queue_handler is None: + for handler in self.handlers: + self.logger.addHandler(handler) + else: + self.logger.addHandler(queue_handler) + # set maximum logging level for all logging output + # log level filtering is done per handler + self.logger.setLevel(logging.DEBUG) + + @staticmethod + def init_worker_logging(log_queue: 'Queue[str]') -> logging.Logger: + """ + This initalizes a logger that can be used in pool/thread queue calls + """ + queue_handler = logging.handlers.QueueHandler(log_queue) + # getLogger call MUST be WITHOUT and logger name + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + root_logger.handlers.clear() + root_logger.addHandler(queue_handler) + + # for debug only + root_logger.debug('[LOGGER] Init log: %s - %s', log_queue, root_logger.handlers) + + return root_logger + + def stop_listener(self): + """ + stop the listener + """ + if self.listener is not None: + self.listener.stop() def break_line(self, info: str = "BREAK"): """ @@ -97,7 +308,7 @@ class Log: Keyword Arguments: info {str} -- _description_ (default: {"BREAK"}) """ - self.logger.info("[%s] %s>", info, '=' * __class__.SPACER_LENGTH) + self.logger.info("[%s] %s>", info, __class__.SPACER_CHAR * __class__.SPACER_LENGTH) def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None: """ @@ -110,16 +321,21 @@ class Log: """ self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra) - def validate_log_level(self, log_level: str) -> bool: + def validate_log_level(self, log_level: Any) -> bool: """ if the log level is invalid, will erturn false Args: - log_level (str): _description_ + log_level (Any): _description_ Returns: bool: _description_ """ - return isinstance(getattr(logging, log_level.upper(), None), int) + if isinstance(log_level, int): + return any(getattr(logging, attr) == log_level for attr in dir(logging)) + elif isinstance(log_level, str): + return isinstance(getattr(logging, log_level.upper(), None), int) + else: + return False # __END__ diff --git a/test-run/logging_handling/log.py b/test-run/logging_handling/log.py new file mode 100644 index 0000000..6de68c9 --- /dev/null +++ b/test-run/logging_handling/log.py @@ -0,0 +1,48 @@ +""" +Log logging_handling.log testing +""" + +# import atexit +from pathlib import Path +from multiprocessing import Queue +# this is for testing only +from queue_logger.log_queue import QueueLogger +from corelibs.logging_handling.log import Log + + +def main(): + """ + Log testing + """ + script_path: Path = Path(__file__).resolve().parent + log = Log( + log_path=script_path.joinpath('log', 'test.log'), + log_name="Test Log", + log_settings={ + "log_level_console": 'DEBUG', + "log_level_file": 'DEBUG', + # "console_color_output_enabled": False, + } + ) + + log.logger.debug('Debug test: %s', log.logger.name) + log.logger.info('Info test: %s', log.logger.name) + log.logger.warning('Warning test: %s', log.logger.name) + log.logger.error('Error test: %s', log.logger.name) + log.logger.critical('Critical test: %s', log.logger.name) + log.exception('Exception test: %s', log.logger.name) + + log_queue: 'Queue[str]' = Queue() + log_q = QueueLogger( + log_file=script_path.joinpath('log', 'test_queue.log'), + log_name="Test Log Queue", + log_queue=log_queue + ) + log_q.mlog.info('Log test: %s', log.logger.name) + # log_q.stop_listener() + + +if __name__ == "__main__": + main() + +# __END__ diff --git a/test-run/logging_handling/log/.gitignore b/test-run/logging_handling/log/.gitignore new file mode 100644 index 0000000..d6b7ef3 --- /dev/null +++ b/test-run/logging_handling/log/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/test-run/logging_handling/log_queue.py b/test-run/logging_handling/log_queue.py new file mode 100644 index 0000000..963e71c --- /dev/null +++ b/test-run/logging_handling/log_queue.py @@ -0,0 +1,75 @@ +""" +Pool Queue log handling +Thread Queue log handling +""" + +import random +import time +from multiprocessing import Queue +import concurrent.futures +import logging +from pathlib import Path +from corelibs.logging_handling.log import Log + + +def work_function(log_name: str, worker_id: int, data: list[int]) -> int: + """ + simulate worker + + Arguments: + worker_id {int} -- _description_ + data {list[int]} -- _description_ + + Returns: + int -- _description_ + """ + log = logging.getLogger(f'{log_name}-WorkerFn-{worker_id}') + log.info('Starting worker: %s', worker_id) + time.sleep(random.uniform(1, 3)) + result = sum(data) * worker_id + return result + + +def main(): + """ + Queue log tester + """ + print("[START] Queue logger test") + log_queue: 'Queue[str]' = Queue() + script_path: Path = Path(__file__).resolve().parent + log = Log( + log_path=script_path.joinpath('log', 'test.log'), + log_name="Test Log", + log_settings={ + "log_level_console": 'INFO', + "log_level_file": 'INFO', + "log_queue": log_queue, + } + ) + log.logger.info('Pool Fork logging test') + max_forks = 2 + data_sets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] + with concurrent.futures.ProcessPoolExecutor( + max_workers=max_forks, + initializer=Log.init_worker_logging, + initargs=(log_queue,) + ) as executor: + log.logger.info('Start workers') + futures = [ + executor.submit(work_function, log.log_name, worker_id, data) + for worker_id, data in enumerate(data_sets, 1) + ] + log.logger.info('Workders started') + + for future in concurrent.futures.as_completed(futures): + log.logger.info('Processing result: %s', future.result()) + print(f"Processing result: {future.result()}") + + log.logger.info('[END] Queue logger test') + log.stop_listener() + + +if __name__ == "__main__": + main() + +# __END__ diff --git a/test-run/logging_handling/queue_logger/log_queue.py b/test-run/logging_handling/queue_logger/log_queue.py new file mode 100644 index 0000000..2e3062e --- /dev/null +++ b/test-run/logging_handling/queue_logger/log_queue.py @@ -0,0 +1,96 @@ +""" +test queue logger interface +NOTE: this has all moved to the default log interface +""" + +import logging +import logging.handlers +from pathlib import Path +from multiprocessing import Queue + + +class QueueLogger: + """ + Queue logger + """ + + def __init__(self, log_file: Path, log_name: str, log_queue: 'Queue[str] | None' = None): + self.log_file = log_file + self.log_name = log_name + self.handlers = self.setup_logging() + self.log_queue: 'Queue[str]' = log_queue if log_queue is not None else Queue() + self.listener = logging.handlers.QueueListener(self.log_queue, *self.handlers) + self.listener.start() + + self.mlog: logging.Logger = self.main_log(log_name) + + def __del__(self): + self.mlog.info("[%s] ================================>", "END") + self.listener.stop() + + def setup_logging(self): + """ + setup basic logging + """ + + # Create formatters + file_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - [PID:%(process)d] [%(filename)s:%(lineno)d] - %(message)s' + ) + + console_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + # Create handlers + file_handler = logging.FileHandler(self.log_file) + file_handler.setFormatter(file_formatter) + file_handler.setLevel(logging.DEBUG) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(console_formatter) + console_handler.setLevel(logging.DEBUG) + + return [file_handler, console_handler] + + def main_log(self, log_name: str) -> logging.Logger: + """ + main logger + + Arguments: + log_name {str} -- _description_ + + Returns: + logging.Logger -- _description_ + """ + mlog_handler = logging.handlers.QueueHandler(self.log_queue) + mlog = logging.getLogger(f'{log_name}-MainProcess') + mlog.addHandler(mlog_handler) + mlog.setLevel(logging.DEBUG) + return mlog + + @staticmethod + def init_worker_logging(log_queue: 'Queue[str]', log_name: str, ): + """ + Initialize logging for worker processes + """ + + # Create QueueHandler + queue_handler = logging.handlers.QueueHandler(log_queue) + + # Setup root logger for this process + # NOTE: This must be EMPTY or new SINGLE NEW logger is created, we need one for EACH fork + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + root_logger.handlers.clear() + root_logger.addHandler(queue_handler) + + root_logger.info('[LOGGER] Init log: %s - %s', log_queue, log_name) + + return root_logger + + def stop_listener(self): + """ + stop the listener + """ + self.listener.stop()