Compare commits

...

5 Commits

Author SHA1 Message Date
Clemens Schwaighofer
79d1ccae9a Readme file update 2025-07-09 14:48:54 +09:00
Clemens Schwaighofer
6e69af4aa8 Update pyproject.toml with new version 2025-07-09 14:45:53 +09:00
Clemens Schwaighofer
d500b7d473 Log update with listener Queues and color highlight for console
enables Queues if multiprocessing.Queue() is set in the "log_queue" setting

Now a logger "Log.init_worker_logging" can be attached to the ProcessPoolExecutor,
the init args must be the log_queue set

example:

```py
with concurrent.futures.ProcessPoolExecutor(
    max_workers=max_forks,
    initializer=Log.init_worker_logging,
    initargs=(log_queue,)
) as executor:
````

Move all settings into a settings argument, the structure is defined in LogSettings.
Default settings are in Log.DEFAULT_LOG_SETTINGS

Only log path and log name are parameters

Color output for console is on default enabled, disable via "console_color_output_enabled"
The complete console output can be stopped with "console_enabled"
2025-07-09 14:41:53 +09:00
Clemens Schwaighofer
ef599a1aad Version update in uv.lock 2025-07-09 09:45:35 +09:00
Clemens Schwaighofer
2d197134f1 log splitter output make length configurable 2025-07-09 09:44:55 +09:00
8 changed files with 491 additions and 41 deletions

View File

@@ -71,18 +71,34 @@ uv run pytest --cov=corelibs
In the test-run folder usage and run tests are located In the test-run folder usage and run tests are located
#### Progress
```sh ```sh
uv run test-run/progress/progress_test.py uv run test-run/progress/progress_test.py
``` ```
#### Double byte string format
```sh ```sh
uv run test-run/double_byte_string_format/double_byte_string_format.py uv run test-run/double_byte_string_format/double_byte_string_format.py
``` ```
#### Strings helpers
```sh ```sh
uv run test-run/timestamp_strings/timestamp_strings.py uv run test-run/timestamp_strings/timestamp_strings.py
``` ```
```sh
uv run test-run/string_handling/string_helpers.py
```
#### Log
```sh
uv run test-run/logging_handling/log.py
```
## How to install in another project ## How to install in another project
This will also add the index entry This will also add the index entry
@@ -93,13 +109,7 @@ uv add corelibs --index egra-gitea=https://git.egplusww.jp/api/packages/PyPI/pyp
## Python venv setup ## Python venv setup
In the folder where the script will be located After clone, run the command below to install all dependenciss
```sh
uv venv --python 3.13
```
Install all neded dependencies
```sh ```sh
uv sync uv sync

View File

@@ -1,7 +1,7 @@
# MARK: Project info # MARK: Project info
[project] [project]
name = "corelibs" name = "corelibs"
version = "0.8.0" version = "0.9.0"
description = "Collection of utils for Python scripts" description = "Collection of utils for Python scripts"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"

View File

@@ -1,11 +1,71 @@
""" """
A log handler wrapper A log handler wrapper
if log_settings['log_queue'] is set to multiprocessing.Queue it will launch with listeners
attach "init_worker_logging" with the set log_queue
""" """
import re
import logging.handlers import logging.handlers
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Mapping from typing import Mapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
from corelibs.string_handling.text_colors import Colors
if TYPE_CHECKING:
from multiprocessing import Queue
class LogSettings(TypedDict):
"""
log settings
Arguments:
TypedDict {_type_} -- _description_
"""
log_level_console: str
log_level_file: str
console_enabled: bool
console_color_output_enabled: bool
add_start_info: bool
add_end_info: bool
log_queue: 'Queue[str] | None'
class CustomConsoleFormatter(logging.Formatter):
"""
Custom formatter with colors for console output
"""
COLORS = {
"DEBUG": Colors.cyan,
"INFO": Colors.green,
"WARNING": Colors.yellow,
"ERROR": Colors.red,
"CRITICAL": Colors.magenta,
"EXCEPTION": Colors.magenta_bold,
"RESET": Colors.reset, # Reset
}
def format(self, record: logging.LogRecord) -> str:
"""
set the color highlight
Arguments:
record {logging.LogRecord} -- _description_
Returns:
str -- _description_
"""
# Add color to levelname for console output
reset = self.COLORS["RESET"]
color = self.COLORS.get(record.levelname, reset)
# only highlight level for basic
if record.levelname in ['DEBUG', 'INFO']:
record.levelname = f"{color}{record.levelname}{reset}"
return super().format(record)
# highlight whole line
message = super().format(record)
return f"{color}{message}{reset}"
class Log: class Log:
@@ -13,57 +73,148 @@ class Log:
logger setup logger setup
""" """
# exception level
EXCEPTION: int = 60 EXCEPTION: int = 60
# spacer lenght characters and the character
SPACER_CHAR: str = '='
SPACER_LENGTH: int = 32
# default logging level
DEFAULT_LOG_LEVEL: str = 'WARNING'
# default settings
DEFAULT_LOG_SETTINGS: LogSettings = {
"log_level_console": "WARNING",
"log_level_file": "DEBUG",
"console_enabled": True,
"console_color_output_enabled": True,
"add_start_info": True,
"add_end_info": False,
"log_queue": None,
}
def __init__( def __init__(
self, self,
log_path: Path, log_path: Path,
log_name: str, log_name: str,
log_level_console: str = 'WARNING', log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None = None,
log_level_file: str = 'DEBUG',
add_start_info: bool = True
): ):
# add new level for EXCEPTION
logging.addLevelName(Log.EXCEPTION, 'EXCEPTION') logging.addLevelName(Log.EXCEPTION, 'EXCEPTION')
if not log_name.endswith('.log'): # parse the logging settings
self.log_settings = self.__parse_log_settings(log_settings)
# if path, set log name with .log
# if log name with .log, strip .log for naming
if log_path.is_dir():
__log_file_name = re.sub(r'[^a-zA-Z0-9]', '', log_name)
if not log_name.endswith('.log'):
log_path = log_path.joinpath(Path(__log_file_name).with_suffix('.log'))
else:
log_path = log_path.joinpath(__log_file_name)
elif not log_path.suffix == '.log':
# add .log if the path is a file but without .log
log_path = log_path.with_suffix('.log') log_path = log_path.with_suffix('.log')
# overall logger settings # stip .log from the log name if set
self.logger = logging.getLogger(log_name) if not log_name.endswith('.log'):
# set maximum logging level for all logging output log_name = Path(log_name).stem
self.logger.setLevel(logging.DEBUG) # general log name
self.log_name = log_name
# self.handlers = [] self.log_queue: 'Queue[str] | None' = None
# console logger self.listener: logging.handlers.QueueListener | None = None
self.__console_handler(log_level_console)
# file logger # setup handlers
self.__file_handler(log_level_file, log_path) # NOTE if console with color is set first, some of the color formatting is set
# in the file writer too, for the ones where color is set BEFORE the format
self.handlers: list[logging.StreamHandler[TextIO] | logging.handlers.TimedRotatingFileHandler] = [
# file handler, always
self.__file_handler(self.log_settings['log_level_file'], log_path)
]
if self.log_settings['console_enabled']:
# console
self.handlers.append(self.__console_handler(self.log_settings['log_level_console']))
# init listener if we have a log_queue set
self.__init_listener(self.log_settings['log_queue'])
# overall logger start
self.__init_log(log_name)
# if requests set a start log # if requests set a start log
if add_start_info is True: if self.log_settings['add_start_info'] is True:
self.break_line('START') self.break_line('START')
def __del__(self):
"""
Call when class is destroyed, make sure the listender is closed or else we throw a thread error
"""
if self.log_settings['add_end_info']:
self.break_line('END')
self.stop_listener()
def __parse_log_settings(
self,
log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None
) -> LogSettings:
# skip with defaul it not set
if log_settings is None:
return __class__.DEFAULT_LOG_SETTINGS
# check entries
default_log_settings = __class__.DEFAULT_LOG_SETTINGS
# check log levels
for __log_entry in ['log_level_console', 'log_level_file']:
if log_settings.get(__log_entry) is None:
continue
# if not valid reset to default, if not in default set to WARNING
if not self.validate_log_level(_log_level := log_settings.get(__log_entry, '')):
_log_level = __class__.DEFAULT_LOG_SETTINGS.get(
__log_entry, __class__.DEFAULT_LOG_LEVEL
)
default_log_settings[__log_entry] = str(_log_level)
# check bool
for __log_entry in [
"console_enabled",
"console_color_output_enabled",
"add_start_info",
"add_end_info",
]:
if log_settings.get(__log_entry) is None:
continue
if not isinstance(__setting := log_settings.get(__log_entry, ''), bool):
__setting = __class__.DEFAULT_LOG_SETTINGS.get(__log_entry, True)
default_log_settings[__log_entry] = __setting
# check log queue
__setting = log_settings.get('log_queue', __class__.DEFAULT_LOG_SETTINGS['log_queue'])
if __setting is not None:
__setting = cast('Queue[str]', __setting)
default_log_settings['log_queue'] = __setting
return default_log_settings
def __filter_exceptions(self, record: logging.LogRecord) -> bool: def __filter_exceptions(self, record: logging.LogRecord) -> bool:
return record.levelname != "EXCEPTION" return record.levelname != "EXCEPTION"
def __console_handler(self, log_level_console: str = 'WARNING'): def __console_handler(self, log_level_console: str = 'WARNING') -> logging.StreamHandler[TextIO]:
# console logger # console logger
if not isinstance(getattr(logging, log_level_console.upper(), None), int): if not isinstance(getattr(logging, log_level_console.upper(), None), int):
log_level_console = 'WARNING' log_level_console = 'WARNING'
console_handler = logging.StreamHandler() console_handler = logging.StreamHandler()
formatter_console = logging.Formatter( # format layouts
( format_string = (
'[%(asctime)s.%(msecs)03d] ' '[%(asctime)s.%(msecs)03d] '
'[%(filename)s:%(funcName)s:%(lineno)d] ' '[%(name)s] '
'<%(levelname)s> ' '[%(filename)s:%(funcName)s:%(lineno)d] '
'%(message)s' '<%(levelname)s> '
), '%(message)s'
datefmt="%Y-%m-%d %H:%M:%S",
) )
format_date = "%Y-%m-%d %H:%M:%S"
# color or not
if self.log_settings['console_color_output_enabled']:
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
else:
formatter_console = logging.Formatter(format_string, datefmt=format_date)
console_handler.setLevel(log_level_console) console_handler.setLevel(log_level_console)
# do not show exceptions logs on console # do not show exceptions logs on console
console_handler.addFilter(self.__filter_exceptions) console_handler.addFilter(self.__filter_exceptions)
console_handler.setFormatter(formatter_console) console_handler.setFormatter(formatter_console)
self.logger.addHandler(console_handler) return console_handler
def __file_handler(self, log_level_file: str, log_path: Path) -> None: def __file_handler(self, log_level_file: str, log_path: Path) -> logging.handlers.TimedRotatingFileHandler:
# file logger # file logger
if not isinstance(getattr(logging, log_level_file.upper(), None), int): if not isinstance(getattr(logging, log_level_file.upper(), None), int):
log_level_file = 'DEBUG' log_level_file = 'DEBUG'
@@ -75,17 +226,80 @@ class Log:
) )
formatter_file_handler = logging.Formatter( formatter_file_handler = logging.Formatter(
( (
# time stamp
'[%(asctime)s.%(msecs)03d] ' '[%(asctime)s.%(msecs)03d] '
'[%(name)s:%(process)d] ' # log name
'[%(name)s] '
# filename + pid
'[%(filename)s:%(process)d] '
# path + func + line number
'[%(pathname)s:%(funcName)s:%(lineno)d] ' '[%(pathname)s:%(funcName)s:%(lineno)d] '
# error level
'<%(levelname)s> ' '<%(levelname)s> '
# message
'%(message)s' '%(message)s'
), ),
datefmt="%Y-%m-%dT%H:%M:%S", datefmt="%Y-%m-%dT%H:%M:%S",
) )
file_handler.setLevel(log_level_file) file_handler.setLevel(log_level_file)
file_handler.setFormatter(formatter_file_handler) file_handler.setFormatter(formatter_file_handler)
self.logger.addHandler(file_handler) return file_handler
def __init_listener(self, log_queue: 'Queue[str] | None' = None):
"""
If we have a Queue option start the logging queue
Keyword Arguments:
log_queue {Queue[str] | None} -- _description_ (default: {None})
"""
if log_queue is None:
return
self.log_queue = log_queue
self.listener = logging.handlers.QueueListener(self.log_queue, *self.handlers)
self.listener.start()
def __init_log(self, log_name: str) -> None:
"""
Initialize the main loggger
"""
queue_handler: logging.handlers.QueueHandler | None = None
if self.log_queue is not None:
queue_handler = logging.handlers.QueueHandler(self.log_queue)
# overall logger settings
self.logger = logging.getLogger(log_name)
# add all the handlers
if queue_handler is None:
for handler in self.handlers:
self.logger.addHandler(handler)
else:
self.logger.addHandler(queue_handler)
# set maximum logging level for all logging output
# log level filtering is done per handler
self.logger.setLevel(logging.DEBUG)
@staticmethod
def init_worker_logging(log_queue: 'Queue[str]') -> logging.Logger:
"""
This initalizes a logger that can be used in pool/thread queue calls
"""
queue_handler = logging.handlers.QueueHandler(log_queue)
# getLogger call MUST be WITHOUT and logger name
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.handlers.clear()
root_logger.addHandler(queue_handler)
# for debug only
root_logger.debug('[LOGGER] Init log: %s - %s', log_queue, root_logger.handlers)
return root_logger
def stop_listener(self):
"""
stop the listener
"""
if self.listener is not None:
self.listener.stop()
def break_line(self, info: str = "BREAK"): def break_line(self, info: str = "BREAK"):
""" """
@@ -94,7 +308,7 @@ class Log:
Keyword Arguments: Keyword Arguments:
info {str} -- _description_ (default: {"BREAK"}) info {str} -- _description_ (default: {"BREAK"})
""" """
self.logger.info("[%s] ================================>", info) self.logger.info("[%s] %s>", info, __class__.SPACER_CHAR * __class__.SPACER_LENGTH)
def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None: def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None:
""" """
@@ -107,16 +321,21 @@ class Log:
""" """
self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra) self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra)
def validate_log_level(self, log_level: str) -> bool: def validate_log_level(self, log_level: Any) -> bool:
""" """
if the log level is invalid, will erturn false if the log level is invalid, will erturn false
Args: Args:
log_level (str): _description_ log_level (Any): _description_
Returns: Returns:
bool: _description_ bool: _description_
""" """
return isinstance(getattr(logging, log_level.upper(), None), int) if isinstance(log_level, int):
return any(getattr(logging, attr) == log_level for attr in dir(logging))
elif isinstance(log_level, str):
return isinstance(getattr(logging, log_level.upper(), None), int)
else:
return False
# __END__ # __END__

View File

@@ -0,0 +1,48 @@
"""
Log logging_handling.log testing
"""
# import atexit
from pathlib import Path
from multiprocessing import Queue
# this is for testing only
from queue_logger.log_queue import QueueLogger
from corelibs.logging_handling.log import Log
def main():
"""
Log testing
"""
script_path: Path = Path(__file__).resolve().parent
log = Log(
log_path=script_path.joinpath('log', 'test.log'),
log_name="Test Log",
log_settings={
"log_level_console": 'DEBUG',
"log_level_file": 'DEBUG',
# "console_color_output_enabled": False,
}
)
log.logger.debug('Debug test: %s', log.logger.name)
log.logger.info('Info test: %s', log.logger.name)
log.logger.warning('Warning test: %s', log.logger.name)
log.logger.error('Error test: %s', log.logger.name)
log.logger.critical('Critical test: %s', log.logger.name)
log.exception('Exception test: %s', log.logger.name)
log_queue: 'Queue[str]' = Queue()
log_q = QueueLogger(
log_file=script_path.joinpath('log', 'test_queue.log'),
log_name="Test Log Queue",
log_queue=log_queue
)
log_q.mlog.info('Log test: %s', log.logger.name)
# log_q.stop_listener()
if __name__ == "__main__":
main()
# __END__

View File

@@ -0,0 +1,2 @@
*
!.gitignore

View File

@@ -0,0 +1,75 @@
"""
Pool Queue log handling
Thread Queue log handling
"""
import random
import time
from multiprocessing import Queue
import concurrent.futures
import logging
from pathlib import Path
from corelibs.logging_handling.log import Log
def work_function(log_name: str, worker_id: int, data: list[int]) -> int:
"""
simulate worker
Arguments:
worker_id {int} -- _description_
data {list[int]} -- _description_
Returns:
int -- _description_
"""
log = logging.getLogger(f'{log_name}-WorkerFn-{worker_id}')
log.info('Starting worker: %s', worker_id)
time.sleep(random.uniform(1, 3))
result = sum(data) * worker_id
return result
def main():
"""
Queue log tester
"""
print("[START] Queue logger test")
log_queue: 'Queue[str]' = Queue()
script_path: Path = Path(__file__).resolve().parent
log = Log(
log_path=script_path.joinpath('log', 'test.log'),
log_name="Test Log",
log_settings={
"log_level_console": 'INFO',
"log_level_file": 'INFO',
"log_queue": log_queue,
}
)
log.logger.info('Pool Fork logging test')
max_forks = 2
data_sets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_forks,
initializer=Log.init_worker_logging,
initargs=(log_queue,)
) as executor:
log.logger.info('Start workers')
futures = [
executor.submit(work_function, log.log_name, worker_id, data)
for worker_id, data in enumerate(data_sets, 1)
]
log.logger.info('Workders started')
for future in concurrent.futures.as_completed(futures):
log.logger.info('Processing result: %s', future.result())
print(f"Processing result: {future.result()}")
log.logger.info('[END] Queue logger test')
log.stop_listener()
if __name__ == "__main__":
main()
# __END__

View File

@@ -0,0 +1,96 @@
"""
test queue logger interface
NOTE: this has all moved to the default log interface
"""
import logging
import logging.handlers
from pathlib import Path
from multiprocessing import Queue
class QueueLogger:
"""
Queue logger
"""
def __init__(self, log_file: Path, log_name: str, log_queue: 'Queue[str] | None' = None):
self.log_file = log_file
self.log_name = log_name
self.handlers = self.setup_logging()
self.log_queue: 'Queue[str]' = log_queue if log_queue is not None else Queue()
self.listener = logging.handlers.QueueListener(self.log_queue, *self.handlers)
self.listener.start()
self.mlog: logging.Logger = self.main_log(log_name)
def __del__(self):
self.mlog.info("[%s] ================================>", "END")
self.listener.stop()
def setup_logging(self):
"""
setup basic logging
"""
# Create formatters
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - [PID:%(process)d] [%(filename)s:%(lineno)d] - %(message)s'
)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create handlers
file_handler = logging.FileHandler(self.log_file)
file_handler.setFormatter(file_formatter)
file_handler.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setFormatter(console_formatter)
console_handler.setLevel(logging.DEBUG)
return [file_handler, console_handler]
def main_log(self, log_name: str) -> logging.Logger:
"""
main logger
Arguments:
log_name {str} -- _description_
Returns:
logging.Logger -- _description_
"""
mlog_handler = logging.handlers.QueueHandler(self.log_queue)
mlog = logging.getLogger(f'{log_name}-MainProcess')
mlog.addHandler(mlog_handler)
mlog.setLevel(logging.DEBUG)
return mlog
@staticmethod
def init_worker_logging(log_queue: 'Queue[str]', log_name: str, ):
"""
Initialize logging for worker processes
"""
# Create QueueHandler
queue_handler = logging.handlers.QueueHandler(log_queue)
# Setup root logger for this process
# NOTE: This must be EMPTY or new SINGLE NEW logger is created, we need one for EACH fork
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.handlers.clear()
root_logger.addHandler(queue_handler)
root_logger.info('[LOGGER] Init log: %s - %s', log_queue, log_name)
return root_logger
def stop_listener(self):
"""
stop the listener
"""
self.listener.stop()

2
uv.lock generated
View File

@@ -44,7 +44,7 @@ wheels = [
[[package]] [[package]]
name = "corelibs" name = "corelibs"
version = "0.7.0" version = "0.9.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "jmespath" }, { name = "jmespath" },