enables Queues if multiprocessing.Queue() is set in the "log_queue" setting
Now a logger "Log.init_worker_logging" can be attached to the ProcessPoolExecutor,
the init args must be the log_queue set
example:
```py
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_forks,
initializer=Log.init_worker_logging,
initargs=(log_queue,)
) as executor:
````
Move all settings into a settings argument, the structure is defined in LogSettings.
Default settings are in Log.DEFAULT_LOG_SETTINGS
Only log path and log name are parameters
Color output for console is on default enabled, disable via "console_color_output_enabled"
The complete console output can be stopped with "console_enabled"
342 lines
12 KiB
Python
342 lines
12 KiB
Python
"""
|
|
A log handler wrapper
|
|
if log_settings['log_queue'] is set to multiprocessing.Queue it will launch with listeners
|
|
attach "init_worker_logging" with the set log_queue
|
|
"""
|
|
|
|
import re
|
|
import logging.handlers
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Mapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
|
|
from corelibs.string_handling.text_colors import Colors
|
|
|
|
if TYPE_CHECKING:
|
|
from multiprocessing import Queue
|
|
|
|
|
|
class LogSettings(TypedDict):
|
|
"""
|
|
log settings
|
|
|
|
Arguments:
|
|
TypedDict {_type_} -- _description_
|
|
"""
|
|
log_level_console: str
|
|
log_level_file: str
|
|
console_enabled: bool
|
|
console_color_output_enabled: bool
|
|
add_start_info: bool
|
|
add_end_info: bool
|
|
log_queue: 'Queue[str] | None'
|
|
|
|
|
|
class CustomConsoleFormatter(logging.Formatter):
|
|
"""
|
|
Custom formatter with colors for console output
|
|
"""
|
|
|
|
COLORS = {
|
|
"DEBUG": Colors.cyan,
|
|
"INFO": Colors.green,
|
|
"WARNING": Colors.yellow,
|
|
"ERROR": Colors.red,
|
|
"CRITICAL": Colors.magenta,
|
|
"EXCEPTION": Colors.magenta_bold,
|
|
"RESET": Colors.reset, # Reset
|
|
}
|
|
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
"""
|
|
set the color highlight
|
|
|
|
Arguments:
|
|
record {logging.LogRecord} -- _description_
|
|
|
|
Returns:
|
|
str -- _description_
|
|
"""
|
|
# Add color to levelname for console output
|
|
reset = self.COLORS["RESET"]
|
|
color = self.COLORS.get(record.levelname, reset)
|
|
# only highlight level for basic
|
|
if record.levelname in ['DEBUG', 'INFO']:
|
|
record.levelname = f"{color}{record.levelname}{reset}"
|
|
return super().format(record)
|
|
# highlight whole line
|
|
message = super().format(record)
|
|
return f"{color}{message}{reset}"
|
|
|
|
|
|
class Log:
|
|
"""
|
|
logger setup
|
|
"""
|
|
|
|
# exception level
|
|
EXCEPTION: int = 60
|
|
# spacer lenght characters and the character
|
|
SPACER_CHAR: str = '='
|
|
SPACER_LENGTH: int = 32
|
|
# default logging level
|
|
DEFAULT_LOG_LEVEL: str = 'WARNING'
|
|
# default settings
|
|
DEFAULT_LOG_SETTINGS: LogSettings = {
|
|
"log_level_console": "WARNING",
|
|
"log_level_file": "DEBUG",
|
|
"console_enabled": True,
|
|
"console_color_output_enabled": True,
|
|
"add_start_info": True,
|
|
"add_end_info": False,
|
|
"log_queue": None,
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
log_path: Path,
|
|
log_name: str,
|
|
log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None = None,
|
|
):
|
|
# add new level for EXCEPTION
|
|
logging.addLevelName(Log.EXCEPTION, 'EXCEPTION')
|
|
# parse the logging settings
|
|
self.log_settings = self.__parse_log_settings(log_settings)
|
|
# if path, set log name with .log
|
|
# if log name with .log, strip .log for naming
|
|
if log_path.is_dir():
|
|
__log_file_name = re.sub(r'[^a-zA-Z0-9]', '', log_name)
|
|
if not log_name.endswith('.log'):
|
|
log_path = log_path.joinpath(Path(__log_file_name).with_suffix('.log'))
|
|
else:
|
|
log_path = log_path.joinpath(__log_file_name)
|
|
elif not log_path.suffix == '.log':
|
|
# add .log if the path is a file but without .log
|
|
log_path = log_path.with_suffix('.log')
|
|
# stip .log from the log name if set
|
|
if not log_name.endswith('.log'):
|
|
log_name = Path(log_name).stem
|
|
# general log name
|
|
self.log_name = log_name
|
|
|
|
self.log_queue: 'Queue[str] | None' = None
|
|
self.listener: logging.handlers.QueueListener | None = None
|
|
|
|
# setup handlers
|
|
# NOTE if console with color is set first, some of the color formatting is set
|
|
# in the file writer too, for the ones where color is set BEFORE the format
|
|
self.handlers: list[logging.StreamHandler[TextIO] | logging.handlers.TimedRotatingFileHandler] = [
|
|
# file handler, always
|
|
self.__file_handler(self.log_settings['log_level_file'], log_path)
|
|
]
|
|
if self.log_settings['console_enabled']:
|
|
# console
|
|
self.handlers.append(self.__console_handler(self.log_settings['log_level_console']))
|
|
# init listener if we have a log_queue set
|
|
self.__init_listener(self.log_settings['log_queue'])
|
|
|
|
# overall logger start
|
|
self.__init_log(log_name)
|
|
# if requests set a start log
|
|
if self.log_settings['add_start_info'] is True:
|
|
self.break_line('START')
|
|
|
|
def __del__(self):
|
|
"""
|
|
Call when class is destroyed, make sure the listender is closed or else we throw a thread error
|
|
"""
|
|
if self.log_settings['add_end_info']:
|
|
self.break_line('END')
|
|
self.stop_listener()
|
|
|
|
def __parse_log_settings(
|
|
self,
|
|
log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None
|
|
) -> LogSettings:
|
|
# skip with defaul it not set
|
|
if log_settings is None:
|
|
return __class__.DEFAULT_LOG_SETTINGS
|
|
# check entries
|
|
default_log_settings = __class__.DEFAULT_LOG_SETTINGS
|
|
# check log levels
|
|
for __log_entry in ['log_level_console', 'log_level_file']:
|
|
if log_settings.get(__log_entry) is None:
|
|
continue
|
|
# if not valid reset to default, if not in default set to WARNING
|
|
if not self.validate_log_level(_log_level := log_settings.get(__log_entry, '')):
|
|
_log_level = __class__.DEFAULT_LOG_SETTINGS.get(
|
|
__log_entry, __class__.DEFAULT_LOG_LEVEL
|
|
)
|
|
default_log_settings[__log_entry] = str(_log_level)
|
|
# check bool
|
|
for __log_entry in [
|
|
"console_enabled",
|
|
"console_color_output_enabled",
|
|
"add_start_info",
|
|
"add_end_info",
|
|
]:
|
|
if log_settings.get(__log_entry) is None:
|
|
continue
|
|
if not isinstance(__setting := log_settings.get(__log_entry, ''), bool):
|
|
__setting = __class__.DEFAULT_LOG_SETTINGS.get(__log_entry, True)
|
|
default_log_settings[__log_entry] = __setting
|
|
# check log queue
|
|
__setting = log_settings.get('log_queue', __class__.DEFAULT_LOG_SETTINGS['log_queue'])
|
|
if __setting is not None:
|
|
__setting = cast('Queue[str]', __setting)
|
|
default_log_settings['log_queue'] = __setting
|
|
return default_log_settings
|
|
|
|
def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
|
return record.levelname != "EXCEPTION"
|
|
|
|
def __console_handler(self, log_level_console: str = 'WARNING') -> logging.StreamHandler[TextIO]:
|
|
# console logger
|
|
if not isinstance(getattr(logging, log_level_console.upper(), None), int):
|
|
log_level_console = 'WARNING'
|
|
console_handler = logging.StreamHandler()
|
|
# format layouts
|
|
format_string = (
|
|
'[%(asctime)s.%(msecs)03d] '
|
|
'[%(name)s] '
|
|
'[%(filename)s:%(funcName)s:%(lineno)d] '
|
|
'<%(levelname)s> '
|
|
'%(message)s'
|
|
)
|
|
format_date = "%Y-%m-%d %H:%M:%S"
|
|
# color or not
|
|
if self.log_settings['console_color_output_enabled']:
|
|
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
|
|
else:
|
|
formatter_console = logging.Formatter(format_string, datefmt=format_date)
|
|
console_handler.setLevel(log_level_console)
|
|
# do not show exceptions logs on console
|
|
console_handler.addFilter(self.__filter_exceptions)
|
|
console_handler.setFormatter(formatter_console)
|
|
return console_handler
|
|
|
|
def __file_handler(self, log_level_file: str, log_path: Path) -> logging.handlers.TimedRotatingFileHandler:
|
|
# file logger
|
|
if not isinstance(getattr(logging, log_level_file.upper(), None), int):
|
|
log_level_file = 'DEBUG'
|
|
file_handler = logging.handlers.TimedRotatingFileHandler(
|
|
filename=log_path,
|
|
encoding="utf-8",
|
|
when="D",
|
|
interval=1
|
|
)
|
|
formatter_file_handler = logging.Formatter(
|
|
(
|
|
# time stamp
|
|
'[%(asctime)s.%(msecs)03d] '
|
|
# log name
|
|
'[%(name)s] '
|
|
# filename + pid
|
|
'[%(filename)s:%(process)d] '
|
|
# path + func + line number
|
|
'[%(pathname)s:%(funcName)s:%(lineno)d] '
|
|
# error level
|
|
'<%(levelname)s> '
|
|
# message
|
|
'%(message)s'
|
|
),
|
|
datefmt="%Y-%m-%dT%H:%M:%S",
|
|
)
|
|
file_handler.setLevel(log_level_file)
|
|
file_handler.setFormatter(formatter_file_handler)
|
|
return file_handler
|
|
|
|
def __init_listener(self, log_queue: 'Queue[str] | None' = None):
|
|
"""
|
|
If we have a Queue option start the logging queue
|
|
|
|
Keyword Arguments:
|
|
log_queue {Queue[str] | None} -- _description_ (default: {None})
|
|
"""
|
|
if log_queue is None:
|
|
return
|
|
self.log_queue = log_queue
|
|
self.listener = logging.handlers.QueueListener(self.log_queue, *self.handlers)
|
|
self.listener.start()
|
|
|
|
def __init_log(self, log_name: str) -> None:
|
|
"""
|
|
Initialize the main loggger
|
|
"""
|
|
queue_handler: logging.handlers.QueueHandler | None = None
|
|
if self.log_queue is not None:
|
|
queue_handler = logging.handlers.QueueHandler(self.log_queue)
|
|
# overall logger settings
|
|
self.logger = logging.getLogger(log_name)
|
|
# add all the handlers
|
|
if queue_handler is None:
|
|
for handler in self.handlers:
|
|
self.logger.addHandler(handler)
|
|
else:
|
|
self.logger.addHandler(queue_handler)
|
|
# set maximum logging level for all logging output
|
|
# log level filtering is done per handler
|
|
self.logger.setLevel(logging.DEBUG)
|
|
|
|
@staticmethod
|
|
def init_worker_logging(log_queue: 'Queue[str]') -> logging.Logger:
|
|
"""
|
|
This initalizes a logger that can be used in pool/thread queue calls
|
|
"""
|
|
queue_handler = logging.handlers.QueueHandler(log_queue)
|
|
# getLogger call MUST be WITHOUT and logger name
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(logging.DEBUG)
|
|
root_logger.handlers.clear()
|
|
root_logger.addHandler(queue_handler)
|
|
|
|
# for debug only
|
|
root_logger.debug('[LOGGER] Init log: %s - %s', log_queue, root_logger.handlers)
|
|
|
|
return root_logger
|
|
|
|
def stop_listener(self):
|
|
"""
|
|
stop the listener
|
|
"""
|
|
if self.listener is not None:
|
|
self.listener.stop()
|
|
|
|
def break_line(self, info: str = "BREAK"):
|
|
"""
|
|
add a break line as info level
|
|
|
|
Keyword Arguments:
|
|
info {str} -- _description_ (default: {"BREAK"})
|
|
"""
|
|
self.logger.info("[%s] %s>", info, __class__.SPACER_CHAR * __class__.SPACER_LENGTH)
|
|
|
|
def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None:
|
|
"""
|
|
log on exceotion level
|
|
|
|
Args:
|
|
msg (object): _description_
|
|
*args (object): arguments for msg
|
|
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
|
|
"""
|
|
self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra)
|
|
|
|
def validate_log_level(self, log_level: Any) -> bool:
|
|
"""
|
|
if the log level is invalid, will erturn false
|
|
|
|
Args:
|
|
log_level (Any): _description_
|
|
|
|
Returns:
|
|
bool: _description_
|
|
"""
|
|
if isinstance(log_level, int):
|
|
return any(getattr(logging, attr) == log_level for attr in dir(logging))
|
|
elif isinstance(log_level, str):
|
|
return isinstance(getattr(logging, log_level.upper(), None), int)
|
|
else:
|
|
return False
|
|
|
|
# __END__
|