Compare commits

...

5 Commits

Author SHA1 Message Date
Clemens Schwaighofer
79d1ccae9a Readme file update 2025-07-09 14:48:54 +09:00
Clemens Schwaighofer
6e69af4aa8 Update pyproject.toml with new version 2025-07-09 14:45:53 +09:00
Clemens Schwaighofer
d500b7d473 Log update with listener Queues and color highlight for console
enables Queues if multiprocessing.Queue() is set in the "log_queue" setting

Now a logger "Log.init_worker_logging" can be attached to the ProcessPoolExecutor,
the init args must be the log_queue set

example:

```py
with concurrent.futures.ProcessPoolExecutor(
    max_workers=max_forks,
    initializer=Log.init_worker_logging,
    initargs=(log_queue,)
) as executor:
````

Move all settings into a settings argument, the structure is defined in LogSettings.
Default settings are in Log.DEFAULT_LOG_SETTINGS

Only log path and log name are parameters

Color output for console is on default enabled, disable via "console_color_output_enabled"
The complete console output can be stopped with "console_enabled"
2025-07-09 14:41:53 +09:00
Clemens Schwaighofer
ef599a1aad Version update in uv.lock 2025-07-09 09:45:35 +09:00
Clemens Schwaighofer
2d197134f1 log splitter output make length configurable 2025-07-09 09:44:55 +09:00
8 changed files with 491 additions and 41 deletions

View File

@@ -71,18 +71,34 @@ uv run pytest --cov=corelibs
In the test-run folder usage and run tests are located
#### Progress
```sh
uv run test-run/progress/progress_test.py
```
#### Double byte string format
```sh
uv run test-run/double_byte_string_format/double_byte_string_format.py
```
#### Strings helpers
```sh
uv run test-run/timestamp_strings/timestamp_strings.py
```
```sh
uv run test-run/string_handling/string_helpers.py
```
#### Log
```sh
uv run test-run/logging_handling/log.py
```
## How to install in another project
This will also add the index entry
@@ -93,13 +109,7 @@ uv add corelibs --index egra-gitea=https://git.egplusww.jp/api/packages/PyPI/pyp
## Python venv setup
In the folder where the script will be located
```sh
uv venv --python 3.13
```
Install all neded dependencies
After clone, run the command below to install all dependenciss
```sh
uv sync

View File

@@ -1,7 +1,7 @@
# MARK: Project info
[project]
name = "corelibs"
version = "0.8.0"
version = "0.9.0"
description = "Collection of utils for Python scripts"
readme = "README.md"
requires-python = ">=3.13"

View File

@@ -1,11 +1,71 @@
"""
A log handler wrapper
if log_settings['log_queue'] is set to multiprocessing.Queue it will launch with listeners
attach "init_worker_logging" with the set log_queue
"""
import re
import logging.handlers
import logging
from pathlib import Path
from typing import Mapping
from typing import Mapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
from corelibs.string_handling.text_colors import Colors
if TYPE_CHECKING:
from multiprocessing import Queue
class LogSettings(TypedDict):
"""
log settings
Arguments:
TypedDict {_type_} -- _description_
"""
log_level_console: str
log_level_file: str
console_enabled: bool
console_color_output_enabled: bool
add_start_info: bool
add_end_info: bool
log_queue: 'Queue[str] | None'
class CustomConsoleFormatter(logging.Formatter):
"""
Custom formatter with colors for console output
"""
COLORS = {
"DEBUG": Colors.cyan,
"INFO": Colors.green,
"WARNING": Colors.yellow,
"ERROR": Colors.red,
"CRITICAL": Colors.magenta,
"EXCEPTION": Colors.magenta_bold,
"RESET": Colors.reset, # Reset
}
def format(self, record: logging.LogRecord) -> str:
"""
set the color highlight
Arguments:
record {logging.LogRecord} -- _description_
Returns:
str -- _description_
"""
# Add color to levelname for console output
reset = self.COLORS["RESET"]
color = self.COLORS.get(record.levelname, reset)
# only highlight level for basic
if record.levelname in ['DEBUG', 'INFO']:
record.levelname = f"{color}{record.levelname}{reset}"
return super().format(record)
# highlight whole line
message = super().format(record)
return f"{color}{message}{reset}"
class Log:
@@ -13,57 +73,148 @@ class Log:
logger setup
"""
# exception level
EXCEPTION: int = 60
# spacer lenght characters and the character
SPACER_CHAR: str = '='
SPACER_LENGTH: int = 32
# default logging level
DEFAULT_LOG_LEVEL: str = 'WARNING'
# default settings
DEFAULT_LOG_SETTINGS: LogSettings = {
"log_level_console": "WARNING",
"log_level_file": "DEBUG",
"console_enabled": True,
"console_color_output_enabled": True,
"add_start_info": True,
"add_end_info": False,
"log_queue": None,
}
def __init__(
self,
log_path: Path,
log_name: str,
log_level_console: str = 'WARNING',
log_level_file: str = 'DEBUG',
add_start_info: bool = True
log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None = None,
):
# add new level for EXCEPTION
logging.addLevelName(Log.EXCEPTION, 'EXCEPTION')
if not log_name.endswith('.log'):
# parse the logging settings
self.log_settings = self.__parse_log_settings(log_settings)
# if path, set log name with .log
# if log name with .log, strip .log for naming
if log_path.is_dir():
__log_file_name = re.sub(r'[^a-zA-Z0-9]', '', log_name)
if not log_name.endswith('.log'):
log_path = log_path.joinpath(Path(__log_file_name).with_suffix('.log'))
else:
log_path = log_path.joinpath(__log_file_name)
elif not log_path.suffix == '.log':
# add .log if the path is a file but without .log
log_path = log_path.with_suffix('.log')
# overall logger settings
self.logger = logging.getLogger(log_name)
# set maximum logging level for all logging output
self.logger.setLevel(logging.DEBUG)
# stip .log from the log name if set
if not log_name.endswith('.log'):
log_name = Path(log_name).stem
# general log name
self.log_name = log_name
# self.handlers = []
# console logger
self.__console_handler(log_level_console)
# file logger
self.__file_handler(log_level_file, log_path)
self.log_queue: 'Queue[str] | None' = None
self.listener: logging.handlers.QueueListener | None = None
# setup handlers
# NOTE if console with color is set first, some of the color formatting is set
# in the file writer too, for the ones where color is set BEFORE the format
self.handlers: list[logging.StreamHandler[TextIO] | logging.handlers.TimedRotatingFileHandler] = [
# file handler, always
self.__file_handler(self.log_settings['log_level_file'], log_path)
]
if self.log_settings['console_enabled']:
# console
self.handlers.append(self.__console_handler(self.log_settings['log_level_console']))
# init listener if we have a log_queue set
self.__init_listener(self.log_settings['log_queue'])
# overall logger start
self.__init_log(log_name)
# if requests set a start log
if add_start_info is True:
if self.log_settings['add_start_info'] is True:
self.break_line('START')
def __del__(self):
"""
Call when class is destroyed, make sure the listender is closed or else we throw a thread error
"""
if self.log_settings['add_end_info']:
self.break_line('END')
self.stop_listener()
def __parse_log_settings(
self,
log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None
) -> LogSettings:
# skip with defaul it not set
if log_settings is None:
return __class__.DEFAULT_LOG_SETTINGS
# check entries
default_log_settings = __class__.DEFAULT_LOG_SETTINGS
# check log levels
for __log_entry in ['log_level_console', 'log_level_file']:
if log_settings.get(__log_entry) is None:
continue
# if not valid reset to default, if not in default set to WARNING
if not self.validate_log_level(_log_level := log_settings.get(__log_entry, '')):
_log_level = __class__.DEFAULT_LOG_SETTINGS.get(
__log_entry, __class__.DEFAULT_LOG_LEVEL
)
default_log_settings[__log_entry] = str(_log_level)
# check bool
for __log_entry in [
"console_enabled",
"console_color_output_enabled",
"add_start_info",
"add_end_info",
]:
if log_settings.get(__log_entry) is None:
continue
if not isinstance(__setting := log_settings.get(__log_entry, ''), bool):
__setting = __class__.DEFAULT_LOG_SETTINGS.get(__log_entry, True)
default_log_settings[__log_entry] = __setting
# check log queue
__setting = log_settings.get('log_queue', __class__.DEFAULT_LOG_SETTINGS['log_queue'])
if __setting is not None:
__setting = cast('Queue[str]', __setting)
default_log_settings['log_queue'] = __setting
return default_log_settings
def __filter_exceptions(self, record: logging.LogRecord) -> bool:
return record.levelname != "EXCEPTION"
def __console_handler(self, log_level_console: str = 'WARNING'):
def __console_handler(self, log_level_console: str = 'WARNING') -> logging.StreamHandler[TextIO]:
# console logger
if not isinstance(getattr(logging, log_level_console.upper(), None), int):
log_level_console = 'WARNING'
console_handler = logging.StreamHandler()
formatter_console = logging.Formatter(
(
'[%(asctime)s.%(msecs)03d] '
'[%(filename)s:%(funcName)s:%(lineno)d] '
'<%(levelname)s> '
'%(message)s'
),
datefmt="%Y-%m-%d %H:%M:%S",
# format layouts
format_string = (
'[%(asctime)s.%(msecs)03d] '
'[%(name)s] '
'[%(filename)s:%(funcName)s:%(lineno)d] '
'<%(levelname)s> '
'%(message)s'
)
format_date = "%Y-%m-%d %H:%M:%S"
# color or not
if self.log_settings['console_color_output_enabled']:
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
else:
formatter_console = logging.Formatter(format_string, datefmt=format_date)
console_handler.setLevel(log_level_console)
# do not show exceptions logs on console
console_handler.addFilter(self.__filter_exceptions)
console_handler.setFormatter(formatter_console)
self.logger.addHandler(console_handler)
return console_handler
def __file_handler(self, log_level_file: str, log_path: Path) -> None:
def __file_handler(self, log_level_file: str, log_path: Path) -> logging.handlers.TimedRotatingFileHandler:
# file logger
if not isinstance(getattr(logging, log_level_file.upper(), None), int):
log_level_file = 'DEBUG'
@@ -75,17 +226,80 @@ class Log:
)
formatter_file_handler = logging.Formatter(
(
# time stamp
'[%(asctime)s.%(msecs)03d] '
'[%(name)s:%(process)d] '
# log name
'[%(name)s] '
# filename + pid
'[%(filename)s:%(process)d] '
# path + func + line number
'[%(pathname)s:%(funcName)s:%(lineno)d] '
# error level
'<%(levelname)s> '
# message
'%(message)s'
),
datefmt="%Y-%m-%dT%H:%M:%S",
)
file_handler.setLevel(log_level_file)
file_handler.setFormatter(formatter_file_handler)
self.logger.addHandler(file_handler)
return file_handler
def __init_listener(self, log_queue: 'Queue[str] | None' = None):
"""
If we have a Queue option start the logging queue
Keyword Arguments:
log_queue {Queue[str] | None} -- _description_ (default: {None})
"""
if log_queue is None:
return
self.log_queue = log_queue
self.listener = logging.handlers.QueueListener(self.log_queue, *self.handlers)
self.listener.start()
def __init_log(self, log_name: str) -> None:
"""
Initialize the main loggger
"""
queue_handler: logging.handlers.QueueHandler | None = None
if self.log_queue is not None:
queue_handler = logging.handlers.QueueHandler(self.log_queue)
# overall logger settings
self.logger = logging.getLogger(log_name)
# add all the handlers
if queue_handler is None:
for handler in self.handlers:
self.logger.addHandler(handler)
else:
self.logger.addHandler(queue_handler)
# set maximum logging level for all logging output
# log level filtering is done per handler
self.logger.setLevel(logging.DEBUG)
@staticmethod
def init_worker_logging(log_queue: 'Queue[str]') -> logging.Logger:
"""
This initalizes a logger that can be used in pool/thread queue calls
"""
queue_handler = logging.handlers.QueueHandler(log_queue)
# getLogger call MUST be WITHOUT and logger name
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.handlers.clear()
root_logger.addHandler(queue_handler)
# for debug only
root_logger.debug('[LOGGER] Init log: %s - %s', log_queue, root_logger.handlers)
return root_logger
def stop_listener(self):
"""
stop the listener
"""
if self.listener is not None:
self.listener.stop()
def break_line(self, info: str = "BREAK"):
"""
@@ -94,7 +308,7 @@ class Log:
Keyword Arguments:
info {str} -- _description_ (default: {"BREAK"})
"""
self.logger.info("[%s] ================================>", info)
self.logger.info("[%s] %s>", info, __class__.SPACER_CHAR * __class__.SPACER_LENGTH)
def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None:
"""
@@ -107,16 +321,21 @@ class Log:
"""
self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra)
def validate_log_level(self, log_level: str) -> bool:
def validate_log_level(self, log_level: Any) -> bool:
"""
if the log level is invalid, will erturn false
Args:
log_level (str): _description_
log_level (Any): _description_
Returns:
bool: _description_
"""
return isinstance(getattr(logging, log_level.upper(), None), int)
if isinstance(log_level, int):
return any(getattr(logging, attr) == log_level for attr in dir(logging))
elif isinstance(log_level, str):
return isinstance(getattr(logging, log_level.upper(), None), int)
else:
return False
# __END__

View File

@@ -0,0 +1,48 @@
"""
Log logging_handling.log testing
"""
# import atexit
from pathlib import Path
from multiprocessing import Queue
# this is for testing only
from queue_logger.log_queue import QueueLogger
from corelibs.logging_handling.log import Log
def main():
"""
Log testing
"""
script_path: Path = Path(__file__).resolve().parent
log = Log(
log_path=script_path.joinpath('log', 'test.log'),
log_name="Test Log",
log_settings={
"log_level_console": 'DEBUG',
"log_level_file": 'DEBUG',
# "console_color_output_enabled": False,
}
)
log.logger.debug('Debug test: %s', log.logger.name)
log.logger.info('Info test: %s', log.logger.name)
log.logger.warning('Warning test: %s', log.logger.name)
log.logger.error('Error test: %s', log.logger.name)
log.logger.critical('Critical test: %s', log.logger.name)
log.exception('Exception test: %s', log.logger.name)
log_queue: 'Queue[str]' = Queue()
log_q = QueueLogger(
log_file=script_path.joinpath('log', 'test_queue.log'),
log_name="Test Log Queue",
log_queue=log_queue
)
log_q.mlog.info('Log test: %s', log.logger.name)
# log_q.stop_listener()
if __name__ == "__main__":
main()
# __END__

View File

@@ -0,0 +1,2 @@
*
!.gitignore

View File

@@ -0,0 +1,75 @@
"""
Pool Queue log handling
Thread Queue log handling
"""
import random
import time
from multiprocessing import Queue
import concurrent.futures
import logging
from pathlib import Path
from corelibs.logging_handling.log import Log
def work_function(log_name: str, worker_id: int, data: list[int]) -> int:
"""
simulate worker
Arguments:
worker_id {int} -- _description_
data {list[int]} -- _description_
Returns:
int -- _description_
"""
log = logging.getLogger(f'{log_name}-WorkerFn-{worker_id}')
log.info('Starting worker: %s', worker_id)
time.sleep(random.uniform(1, 3))
result = sum(data) * worker_id
return result
def main():
"""
Queue log tester
"""
print("[START] Queue logger test")
log_queue: 'Queue[str]' = Queue()
script_path: Path = Path(__file__).resolve().parent
log = Log(
log_path=script_path.joinpath('log', 'test.log'),
log_name="Test Log",
log_settings={
"log_level_console": 'INFO',
"log_level_file": 'INFO',
"log_queue": log_queue,
}
)
log.logger.info('Pool Fork logging test')
max_forks = 2
data_sets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_forks,
initializer=Log.init_worker_logging,
initargs=(log_queue,)
) as executor:
log.logger.info('Start workers')
futures = [
executor.submit(work_function, log.log_name, worker_id, data)
for worker_id, data in enumerate(data_sets, 1)
]
log.logger.info('Workders started')
for future in concurrent.futures.as_completed(futures):
log.logger.info('Processing result: %s', future.result())
print(f"Processing result: {future.result()}")
log.logger.info('[END] Queue logger test')
log.stop_listener()
if __name__ == "__main__":
main()
# __END__

View File

@@ -0,0 +1,96 @@
"""
test queue logger interface
NOTE: this has all moved to the default log interface
"""
import logging
import logging.handlers
from pathlib import Path
from multiprocessing import Queue
class QueueLogger:
"""
Queue logger
"""
def __init__(self, log_file: Path, log_name: str, log_queue: 'Queue[str] | None' = None):
self.log_file = log_file
self.log_name = log_name
self.handlers = self.setup_logging()
self.log_queue: 'Queue[str]' = log_queue if log_queue is not None else Queue()
self.listener = logging.handlers.QueueListener(self.log_queue, *self.handlers)
self.listener.start()
self.mlog: logging.Logger = self.main_log(log_name)
def __del__(self):
self.mlog.info("[%s] ================================>", "END")
self.listener.stop()
def setup_logging(self):
"""
setup basic logging
"""
# Create formatters
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - [PID:%(process)d] [%(filename)s:%(lineno)d] - %(message)s'
)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create handlers
file_handler = logging.FileHandler(self.log_file)
file_handler.setFormatter(file_formatter)
file_handler.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setFormatter(console_formatter)
console_handler.setLevel(logging.DEBUG)
return [file_handler, console_handler]
def main_log(self, log_name: str) -> logging.Logger:
"""
main logger
Arguments:
log_name {str} -- _description_
Returns:
logging.Logger -- _description_
"""
mlog_handler = logging.handlers.QueueHandler(self.log_queue)
mlog = logging.getLogger(f'{log_name}-MainProcess')
mlog.addHandler(mlog_handler)
mlog.setLevel(logging.DEBUG)
return mlog
@staticmethod
def init_worker_logging(log_queue: 'Queue[str]', log_name: str, ):
"""
Initialize logging for worker processes
"""
# Create QueueHandler
queue_handler = logging.handlers.QueueHandler(log_queue)
# Setup root logger for this process
# NOTE: This must be EMPTY or new SINGLE NEW logger is created, we need one for EACH fork
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.handlers.clear()
root_logger.addHandler(queue_handler)
root_logger.info('[LOGGER] Init log: %s - %s', log_queue, log_name)
return root_logger
def stop_listener(self):
"""
stop the listener
"""
self.listener.stop()

2
uv.lock generated
View File

@@ -44,7 +44,7 @@ wheels = [
[[package]]
name = "corelibs"
version = "0.7.0"
version = "0.9.0"
source = { editable = "." }
dependencies = [
{ name = "jmespath" },