""" A log handler wrapper if log_settings['log_queue'] is set to multiprocessing.Queue it will launch with listeners attach "init_worker_logging" with the set log_queue """ import re import logging.handlers import logging from datetime import datetime import time from pathlib import Path import atexit from typing import MutableMapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel from corelibs.string_handling.text_colors import Colors from corelibs.debug_handling.debug_helpers import call_stack, exception_stack if TYPE_CHECKING: from multiprocessing import Queue # MARK: Log settings TypedDict class LogSettings(TypedDict): """log settings, for Log setup""" log_level_console: LoggingLevel log_level_file: LoggingLevel per_run_log: bool console_enabled: bool console_color_output_enabled: bool add_start_info: bool add_end_info: bool log_queue: 'Queue[str] | None' class LoggerInit(TypedDict): """for Logger init""" logger: logging.Logger log_queue: 'Queue[str] | None' # MARK: Custom color filter class CustomConsoleFormatter(logging.Formatter): """ Custom formatter with colors for console output """ COLORS = { LoggingLevel.DEBUG.name: Colors.cyan, LoggingLevel.INFO.name: Colors.green, LoggingLevel.WARNING.name: Colors.yellow, LoggingLevel.ERROR.name: Colors.red, LoggingLevel.CRITICAL.name: Colors.red_bold, LoggingLevel.ALERT.name: Colors.yellow_bold, LoggingLevel.EMERGENCY.name: Colors.magenta_bold, LoggingLevel.EXCEPTION.name: Colors.magenta_bright, # will never be written to console } def format(self, record: logging.LogRecord) -> str: """ set the color highlight Arguments: record {logging.LogRecord} -- _description_ Returns: str -- _description_ """ # Add color to levelname for console output reset = Colors.reset color = self.COLORS.get(record.levelname, reset) # only highlight level for basic if record.levelname in [LoggingLevel.DEBUG.name, LoggingLevel.INFO.name]: record.levelname = f"{color}{record.levelname}{reset}" return super().format(record) # highlight whole line message = super().format(record) return f"{color}{message}{reset}" # TODO: add custom handlers for stack_trace, if not set fill with %(filename)s:%(funcName)s:%(lineno)d # hasattr(record, 'stack_trace') # also for something like "context" where we add an array of anything to a message class CustomHandlerFilter(logging.Filter): """ Add a custom handler for filtering """ HANDLER_NAME_FILTER_EXCEPTION: str = 'console' def __init__(self, handler_name: str, filter_exceptions: bool = False): super().__init__(name=handler_name) self.handler_name = handler_name self.filter_exceptions = filter_exceptions def filter(self, record: logging.LogRecord) -> bool: # if console and exception do not show if self.handler_name == self.HANDLER_NAME_FILTER_EXCEPTION and self.filter_exceptions: return record.levelname != "EXCEPTION" # if cnosole entry is true and traget file filter if hasattr(record, 'console') and getattr(record, 'console') is True and self.handler_name == 'file': return False return True # def __filter_exceptions(self, record: logging.LogRecord) -> bool: # return record.levelname != "EXCEPTION" # MARK: Parent class class LogParent: """ Parent class with general methods used by Log and Logger """ # spacer lenght characters and the character SPACER_CHAR: str = '=' SPACER_LENGTH: int = 32 def __init__(self): self.logger: logging.Logger self.log_queue: 'Queue[str] | None' = None self.handlers: dict[str, Any] = {} # FIXME: we need to add a custom formater to add stack level listing if we want to # Important note, although they exist, it is recommended to use self.logger.NAME directly # so that the correct filename, method and row number is set # for > 50 use logger.log(LoggingLevel..value, ...) # for exception logger.log(LoggingLevel.EXCEPTION.value, ..., execInfo=True) # MARK: log message def log(self, level: int, msg: object, *args: object, extra: MutableMapping[str, object] | None = None): """log general""" if not hasattr(self, 'logger'): raise ValueError('Logger is not yet initialized') if extra is None: extra = {} extra['stack_trace'] = call_stack(skip_last=2) self.logger.log(level, msg, *args, extra=extra, stacklevel=2) # MARK: DEBUG 10 def debug(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None: """debug""" if not hasattr(self, 'logger'): raise ValueError('Logger is not yet initialized') if extra is None: extra = {} extra['stack_trace'] = call_stack(skip_last=2) self.logger.debug(msg, *args, extra=extra, stacklevel=2) # MARK: INFO 20 def info(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None: """info""" if not hasattr(self, 'logger'): raise ValueError('Logger is not yet initialized') if extra is None: extra = {} extra['stack_trace'] = call_stack(skip_last=2) self.logger.info(msg, *args, extra=extra, stacklevel=2) # MARK: WARNING 30 def warning(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None: """warning""" if not hasattr(self, 'logger'): raise ValueError('Logger is not yet initialized') if extra is None: extra = {} extra['stack_trace'] = call_stack(skip_last=2) self.logger.warning(msg, *args, extra=extra, stacklevel=2) # MARK: ERROR 40 def error(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None: """error""" if not hasattr(self, 'logger'): raise ValueError('Logger is not yet initialized') if extra is None: extra = {} extra['stack_trace'] = call_stack(skip_last=2) self.logger.error(msg, *args, extra=extra, stacklevel=2) # MARK: CRITICAL 50 def critical(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None: """critcal""" if not hasattr(self, 'logger'): raise ValueError('Logger is not yet initialized') if extra is None: extra = {} extra['stack_trace'] = call_stack(skip_last=2) self.logger.critical(msg, *args, extra=extra, stacklevel=2) # MARK: ALERT 55 def alert(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None: """alert""" if not hasattr(self, 'logger'): raise ValueError('Logger is not yet initialized') # extra_dict = dict(extra) if extra is None: extra = {} extra['stack_trace'] = call_stack(skip_last=2) self.logger.log(LoggingLevel.ALERT.value, msg, *args, extra=extra, stacklevel=2) # MARK: EMERGECNY: 60 def emergency(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None: """emergency""" if not hasattr(self, 'logger'): raise ValueError('Logger is not yet initialized') if extra is None: extra = {} extra['stack_trace'] = call_stack(skip_last=2) self.logger.log(LoggingLevel.EMERGENCY.value, msg, *args, extra=extra, stacklevel=2) # MARK: EXCEPTION: 70 def exception( self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None, log_error: bool = True ) -> None: """ log on exceotion level, this is log.exception, but logs with a new level Args: msg (object): _description_ *args (object): arguments for msg extra: Mapping[str, object] | None: extra arguments for the formatting if needed log_error: (bool): If set to false will not write additional error message for console (Default True) """ if not hasattr(self, 'logger'): raise ValueError('Logger is not yet initialized') if extra is None: extra = {} extra['stack_trace'] = call_stack(skip_last=2) extra['exception_trace'] = exception_stack() # write to console first with extra flag for filtering in file if log_error: self.logger.log( LoggingLevel.ERROR.value, f"<=EXCEPTION={extra['exception_trace']}> {msg} [{extra['stack_trace']}]", *args, extra=dict(extra) | {'console': True}, stacklevel=2 ) self.logger.log(LoggingLevel.EXCEPTION.value, msg, *args, exc_info=True, extra=extra, stacklevel=2) def break_line(self, info: str = "BREAK"): """ add a break line as info level Keyword Arguments: info {str} -- _description_ (default: {"BREAK"}) """ if not hasattr(self, 'logger'): raise ValueError('Logger is not yet initialized') self.logger.info("[%s] %s>", info, self.SPACER_CHAR * self.SPACER_LENGTH) # MARK: queue handling def flush(self, handler_name: str | None = None, timeout: float = 2.0) -> bool: """ Flush all pending messages Keyword Arguments: handler_name {str | None} -- _description_ (default: {None}) timeout {float} -- _description_ (default: {2.0}) Returns: bool -- _description_ """ if not self.log_queue: return False try: # Wait for queue to be processed start_time = time.time() while not self.log_queue.empty() and (time.time() - start_time) < timeout: time.sleep(0.01) # Flush all handlers or handler given if handler_name: try: self.handlers[handler_name].flush() except IndexError: pass else: for handler in self.handlers.values(): handler.flush() except OSError: return False return True def cleanup(self): """ cleanup for any open queues in case we have an abort """ if not self.log_queue: return self.flush() # Close the queue properly self.log_queue.close() self.log_queue.join_thread() # MARK: log level handling def set_log_level(self, handler_name: str, log_level: LoggingLevel) -> bool: """ set the logging level for a handler Arguments: handler {str} -- _description_ log_level {LoggingLevel} -- _description_ Returns: bool -- _description_ """ try: # flush queue befoe changing logging level self.flush(handler_name) self.handlers[handler_name].setLevel(log_level.name) return True except IndexError: if self.logger: self.logger.error('Handler %s not found, cannot change log level', handler_name) return False except AttributeError: if self.logger: self.logger.error( 'Cannot change to log level %s for handler %s, log level invalid', LoggingLevel.name, handler_name ) return False def get_log_level(self, handler_name: str) -> LoggingLevel: """ gettthe logging level for a handler Arguments: handler_name {str} -- _description_ Returns: LoggingLevel -- _description_ """ try: return LoggingLevel.from_any(self.handlers[handler_name].level) except IndexError: return LoggingLevel.NOTSET @staticmethod def validate_log_level(log_level: Any) -> bool: """ if the log level is invalid will return false, else return true Args: log_level (Any): _description_ Returns: bool: _description_ """ try: _ = LoggingLevel.from_any(log_level).value return True except ValueError: return False @staticmethod def get_log_level_int(log_level: Any) -> int: """ Return log level as INT If invalid returns the default log level Arguments: log_level {Any} -- _description_ Returns: int -- _description_ """ try: return LoggingLevel.from_any(log_level).value except ValueError: return LoggingLevel.from_string(Log.DEFAULT_LOG_LEVEL.name).value # MARK: Logger class Logger(LogParent): """ The class we can pass on to other clases without re-init the class itself NOTE: if no queue object is handled over the logging level change might not take immediate effect """ def __init__(self, logger_settings: LoggerInit): LogParent.__init__(self) self.logger = logger_settings['logger'] self.lg = self.logger self.l = self.logger self.handlers = {str(_handler.name): _handler for _handler in self.logger.handlers} self.log_queue = logger_settings['log_queue'] # MARK: LogSetup class class Log(LogParent): """ logger setup """ # spacer lenght characters and the character SPACER_CHAR: str = '=' SPACER_LENGTH: int = 32 # default logging level DEFAULT_LOG_LEVEL: LoggingLevel = LoggingLevel.WARNING DEFAULT_LOG_LEVEL_FILE: LoggingLevel = LoggingLevel.DEBUG DEFAULT_LOG_LEVEL_CONSOLE: LoggingLevel = LoggingLevel.WARNING # default settings DEFAULT_LOG_SETTINGS: LogSettings = { "log_level_console": DEFAULT_LOG_LEVEL_CONSOLE, "log_level_file": DEFAULT_LOG_LEVEL_FILE, "per_run_log": False, "console_enabled": True, "console_color_output_enabled": True, "add_start_info": True, "add_end_info": False, "log_queue": None, } # MARK: constructor def __init__( self, log_path: Path, log_name: str, log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None = None, other_handlers: dict[str, Any] | None = None ): LogParent.__init__(self) # add new level for alert, emergecny and exception logging.addLevelName(LoggingLevel.ALERT.value, LoggingLevel.ALERT.name) logging.addLevelName(LoggingLevel.EMERGENCY.value, LoggingLevel.EMERGENCY.name) logging.addLevelName(LoggingLevel.EXCEPTION.value, LoggingLevel.EXCEPTION.name) # parse the logging settings self.log_settings = self.__parse_log_settings(log_settings) # if path, set log name with .log # if log name with .log, strip .log for naming if log_path.is_dir(): __log_file_name = re.sub(r'[^a-zA-Z0-9]', '', log_name) if not log_name.endswith('.log'): log_path = log_path.joinpath(Path(__log_file_name).with_suffix('.log')) else: log_path = log_path.joinpath(__log_file_name) elif not log_path.suffix == '.log': # add .log if the path is a file but without .log log_path = log_path.with_suffix('.log') # stip .log from the log name if set if not log_name.endswith('.log'): log_name = Path(log_name).stem # general log name self.log_name = log_name self.log_queue: 'Queue[str] | None' = None self.listener: logging.handlers.QueueListener | None = None self.logger: logging.Logger # setup handlers # NOTE if console with color is set first, some of the color formatting is set # in the file writer too, for the ones where color is set BEFORE the format # Any is logging.StreamHandler, logging.FileHandler and all logging.handlers.* self.handlers: dict[str, Any] = {} self.add_handler('file_handler', self.__create_file_handler( 'file_handler', self.log_settings['log_level_file'], log_path) ) if self.log_settings['console_enabled']: # console self.add_handler('stream_handler', self.__create_console_handler( 'stream_handler', self.log_settings['log_level_console']) ) # add other handlers, if other_handlers is not None: for handler_key, handler in other_handlers.items(): self.add_handler(handler_key, handler) # init listener if we have a log_queue set self.__init_listener(self.log_settings['log_queue']) # overall logger start self.__init_log(log_name) # if requests set a start log if self.log_settings['add_start_info'] is True: self.break_line('START') # MARK: deconstructor def __del__(self): """ Call when class is destroyed, make sure the listender is closed or else we throw a thread error """ if hasattr(self, 'log_settings') and self.log_settings.get('add_end_info'): self.break_line('END') self.stop_listener() # MARK: parse log settings def __parse_log_settings( self, log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None ) -> LogSettings: # skip with defaul it not set if log_settings is None: return self.DEFAULT_LOG_SETTINGS # check entries default_log_settings = self.DEFAULT_LOG_SETTINGS # check log levels for __log_entry in ['log_level_console', 'log_level_file']: if log_settings.get(__log_entry) is None: continue # if not valid reset to default, if not in default set to WARNING if not self.validate_log_level(__log_level := log_settings.get(__log_entry, '')): __log_level = self.DEFAULT_LOG_SETTINGS.get( __log_entry, self.DEFAULT_LOG_LEVEL ) default_log_settings[__log_entry] = LoggingLevel.from_any(__log_level) # check bool for __log_entry in [ "per_run_log", "console_enabled", "console_color_output_enabled", "add_start_info", "add_end_info", ]: if log_settings.get(__log_entry) is None: continue if not isinstance(__setting := log_settings.get(__log_entry, ''), bool): __setting = self.DEFAULT_LOG_SETTINGS.get(__log_entry, True) default_log_settings[__log_entry] = __setting # check log queue __setting = log_settings.get('log_queue', self.DEFAULT_LOG_SETTINGS['log_queue']) if __setting is not None: __setting = cast('Queue[str]', __setting) default_log_settings['log_queue'] = __setting return default_log_settings # def __filter_exceptions(self, record: logging.LogRecord) -> bool: # return record.levelname != "EXCEPTION" # MARK: add a handler def add_handler( self, handler_name: str, handler: Any ) -> bool: """ Add a log handler to the handlers dict Arguments: handler_name {str} -- _description_ handler {Any} -- _description_ """ if self.handlers.get(handler_name): return False if self.listener is not None or hasattr(self, 'logger'): raise ValueError( f"Cannot add handler {handler_name}: {handler.get_name()} because logger is already running" ) # TODO: handler must be some handler type, how to check? self.handlers[handler_name] = handler return True # MARK: console handler def __create_console_handler( self, handler_name: str, log_level_console: LoggingLevel = LoggingLevel.WARNING, filter_exceptions: bool = True ) -> logging.StreamHandler[TextIO]: # console logger if not self.validate_log_level(log_level_console): log_level_console = self.DEFAULT_LOG_LEVEL_CONSOLE console_handler = logging.StreamHandler() # format layouts format_string = ( '[%(asctime)s.%(msecs)03d] ' '[%(name)s] ' '[%(filename)s:%(funcName)s:%(lineno)d] ' '<%(levelname)s> ' '%(message)s' ) format_date = "%Y-%m-%d %H:%M:%S" # color or not if self.log_settings['console_color_output_enabled']: formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date) else: formatter_console = logging.Formatter(format_string, datefmt=format_date) console_handler.set_name(handler_name) console_handler.setLevel(log_level_console.name) # do not show exceptions logs on console console_handler.addFilter(CustomHandlerFilter('console', filter_exceptions)) console_handler.setFormatter(formatter_console) return console_handler # MARK: file handler def __create_file_handler( self, handler_name: str, log_level_file: LoggingLevel, log_path: Path, # for TimedRotating, if per_run_log is off when: str = "D", interval: int = 1, backup_count: int = 0 ) -> logging.handlers.TimedRotatingFileHandler | logging.FileHandler: # file logger # when: S/M/H/D/W0-W6/midnight # interval: how many, 1D = every day # backup_count: how many old to keep, 0 = all if not self.validate_log_level(log_level_file): log_level_file = self.DEFAULT_LOG_LEVEL_FILE if self.log_settings['per_run_log']: # log path, remove them stem (".log"), then add the datetime and add .log again now = datetime.now() # we add microseconds part to get milli seconds new_stem = f"{log_path.stem}.{now.strftime('%Y-%m-%d_%H-%M-%S')}.{str(now.microsecond)[:3]}" file_handler = logging.FileHandler( filename=log_path.with_name(f"{new_stem}{log_path.suffix}"), encoding="utf-8", ) else: file_handler = logging.handlers.TimedRotatingFileHandler( filename=log_path, encoding="utf-8", when=when, interval=interval, backupCount=backup_count ) formatter_file_handler = logging.Formatter( ( # time stamp '[%(asctime)s.%(msecs)03d] ' # log name '[%(name)s] ' # filename + pid '[%(filename)s:%(process)d] ' # path + func + line number '[%(pathname)s:%(funcName)s:%(lineno)d] ' # error level '<%(levelname)s> ' # message '%(message)s' ), datefmt="%Y-%m-%dT%H:%M:%S", ) file_handler.set_name(handler_name) file_handler.setLevel(log_level_file.name) # do not show errors flagged with console (they are from exceptions) file_handler.addFilter(CustomHandlerFilter('file')) file_handler.setFormatter(formatter_file_handler) return file_handler # MARK: init listener def __init_listener(self, log_queue: 'Queue[str] | None' = None): """ If we have a Queue option start the logging queue Keyword Arguments: log_queue {Queue[str] | None} -- _description_ (default: {None}) """ if log_queue is None: return self.log_queue = log_queue atexit.register(self.stop_listener) self.listener = logging.handlers.QueueListener( self.log_queue, *self.handlers.values(), respect_handler_level=True ) self.listener.start() def stop_listener(self): """ stop the listener """ if self.listener is not None: self.flush() self.listener.stop() # MARK: init main log def __init_log(self, log_name: str) -> None: """ Initialize the main loggger """ queue_handler: logging.handlers.QueueHandler | None = None if self.log_queue is not None: queue_handler = logging.handlers.QueueHandler(self.log_queue) # overall logger settings self.logger = logging.getLogger(log_name) # add all the handlers if queue_handler is None: for handler in self.handlers.values(): self.logger.addHandler(handler) else: self.logger.addHandler(queue_handler) # set maximum logging level for all logging output # log level filtering is done per handler self.logger.setLevel(logging.DEBUG) # short name self.lg = self.logger self.l = self.logger # MARK: init logger for Fork/Thread @staticmethod def init_worker_logging(log_queue: 'Queue[str]') -> logging.Logger: """ This initalizes a logger that can be used in pool/thread queue calls call in worker initializer as "Log.init_worker_logging(Queue[str]) """ queue_handler = logging.handlers.QueueHandler(log_queue) # getLogger call MUST be WITHOUT and logger name root_logger = logging.getLogger() # base logging level, filtering is done in the handlers root_logger.setLevel(logging.DEBUG) root_logger.handlers.clear() root_logger.addHandler(queue_handler) # for debug only root_logger.debug('[LOGGER] Init log: %s - %s', log_queue, root_logger.handlers) return root_logger def get_logger_settings(self) -> LoggerInit: """ get the logger settings we need to init the Logger class Returns: LoggerInit -- _description_ """ return { "logger": self.logger, "log_queue": self.log_queue } # __END__