Compare commits

...

2 Commits

Author SHA1 Message Date
Clemens Schwaighofer
084ecc01e0 Bug fix for logging queue listener and ignoring of per handler set levels
in the QueueLister launcher add "respect_handler_level=True" so it respects the previous set log levels per handler

Also split all logging tests into their own file
2025-07-09 17:14:48 +09:00
Clemens Schwaighofer
08cb994d8d PyProject version update, testing logging 2025-07-09 16:16:02 +09:00
7 changed files with 140 additions and 74 deletions

View File

@@ -1,7 +1,7 @@
# MARK: Project info
[project]
name = "corelibs"
version = "0.9.0"
version = "0.10.0"
description = "Collection of utils for Python scripts"
readme = "README.md"
requires-python = ">=3.13"

View File

@@ -254,7 +254,11 @@ class Log:
if log_queue is None:
return
self.log_queue = log_queue
self.listener = logging.handlers.QueueListener(self.log_queue, *self.handlers)
self.listener = logging.handlers.QueueListener(
self.log_queue,
*self.handlers,
respect_handler_level=True
)
self.listener.start()
def __init_log(self, log_name: str) -> None:

View File

@@ -4,9 +4,7 @@ Log logging_handling.log testing
# import atexit
from pathlib import Path
from multiprocessing import Queue
# this is for testing only
from queue_logger.log_queue import QueueLogger
from corelibs.logging_handling.log import Log
@@ -19,27 +17,18 @@ def main():
log_path=script_path.joinpath('log', 'test.log'),
log_name="Test Log",
log_settings={
"log_level_console": 'DEBUG',
"log_level_console": 'WARNING',
"log_level_file": 'DEBUG',
# "console_color_output_enabled": False,
}
)
log.logger.debug('Debug test: %s', log.logger.name)
log.logger.info('Info test: %s', log.logger.name)
log.logger.warning('Warning test: %s', log.logger.name)
log.logger.error('Error test: %s', log.logger.name)
log.logger.critical('Critical test: %s', log.logger.name)
log.exception('Exception test: %s', log.logger.name)
log_queue: 'Queue[str]' = Queue()
log_q = QueueLogger(
log_file=script_path.joinpath('log', 'test_queue.log'),
log_name="Test Log Queue",
log_queue=log_queue
)
log_q.mlog.info('Log test: %s', log.logger.name)
# log_q.stop_listener()
log.logger.debug('[NORMAL] Debug test: %s', log.logger.name)
log.logger.info('[NORMAL] Info test: %s', log.logger.name)
log.logger.warning('[NORMAL] Warning test: %s', log.logger.name)
log.logger.error('[NORMAL] Error test: %s', log.logger.name)
log.logger.critical('[NORMAL] Critical test: %s', log.logger.name)
log.exception('[NORMAL] Exception test: %s', log.logger.name)
if __name__ == "__main__":

View File

@@ -0,0 +1,75 @@
"""
Pool Queue log handling
Thread Queue log handling
"""
import random
import time
from multiprocessing import Queue
import concurrent.futures
import logging
from pathlib import Path
from corelibs.logging_handling.log import Log
def work_function(log_name: str, worker_id: int, data: list[int]) -> int:
"""
simulate worker
Arguments:
worker_id {int} -- _description_
data {list[int]} -- _description_
Returns:
int -- _description_
"""
log = logging.getLogger(f'{log_name}-WorkerFn-{worker_id}')
log.info('Starting worker: %s', worker_id)
time.sleep(random.uniform(1, 3))
result = sum(data) * worker_id
return result
def main():
"""
Queue log tester
"""
print("[START] Queue logger test")
log_queue: 'Queue[str]' = Queue()
script_path: Path = Path(__file__).resolve().parent
log = Log(
log_path=script_path.joinpath('log', 'test.log'),
log_name="Test Log",
log_settings={
"log_level_console": 'INFO',
"log_level_file": 'INFO',
"log_queue": log_queue,
}
)
log.logger.info('Pool Fork logging test')
max_forks = 2
data_sets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_forks,
initializer=Log.init_worker_logging,
initargs=(log_queue,)
) as executor:
log.logger.info('Start workers')
futures = [
executor.submit(work_function, log.log_name, worker_id, data)
for worker_id, data in enumerate(data_sets, 1)
]
log.logger.info('Workders started')
for future in concurrent.futures.as_completed(futures):
log.logger.info('Processing result: %s', future.result())
print(f"Processing result: {future.result()}")
log.logger.info('[END] Queue logger test')
log.stop_listener()
if __name__ == "__main__":
main()
# __END__

View File

@@ -1,72 +1,39 @@
"""
Pool Queue log handling
Thread Queue log handling
Log logging_handling.log testing
"""
import random
import time
from multiprocessing import Queue
import concurrent.futures
import logging
# import atexit
from pathlib import Path
from multiprocessing import Queue
# this is for testing only
from corelibs.logging_handling.log import Log
def work_function(log_name: str, worker_id: int, data: list[int]) -> int:
"""
simulate worker
Arguments:
worker_id {int} -- _description_
data {list[int]} -- _description_
Returns:
int -- _description_
"""
log = logging.getLogger(f'{log_name}-WorkerFn-{worker_id}')
log.info('Starting worker: %s', worker_id)
time.sleep(random.uniform(1, 3))
result = sum(data) * worker_id
return result
def main():
"""
Queue log tester
Log testing
"""
print("[START] Queue logger test")
log_queue: 'Queue[str]' = Queue()
script_path: Path = Path(__file__).resolve().parent
log = Log(
log_path=script_path.joinpath('log', 'test.log'),
log_queue: 'Queue[str]' = Queue()
log_q = Log(
log_path=script_path.joinpath('log', 'test_queue.log'),
log_name="Test Log",
log_settings={
"log_level_console": 'INFO',
"log_level_file": 'INFO',
"log_queue": log_queue,
"log_level_console": 'WARNING',
"log_level_file": 'ERROR',
"log_queue": log_queue
# "console_color_output_enabled": False,
}
)
log.logger.info('Pool Fork logging test')
max_forks = 2
data_sets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_forks,
initializer=Log.init_worker_logging,
initargs=(log_queue,)
) as executor:
log.logger.info('Start workers')
futures = [
executor.submit(work_function, log.log_name, worker_id, data)
for worker_id, data in enumerate(data_sets, 1)
]
log.logger.info('Workders started')
for future in concurrent.futures.as_completed(futures):
log.logger.info('Processing result: %s', future.result())
print(f"Processing result: {future.result()}")
log.logger.info('[END] Queue logger test')
log.stop_listener()
log_q.logger.debug('[QUEUE] Debug test: %s', log_q.logger.name)
log_q.logger.info('[QUEUE] Info test: %s', log_q.logger.name)
log_q.logger.warning('[QUEUE] Warning test: %s', log_q.logger.name)
log_q.logger.error('[QUEUE] Error test: %s', log_q.logger.name)
log_q.logger.critical('[QUEUE] Critical test: %s', log_q.logger.name)
log_q.exception('[QUEUE] Exception test: %s', log_q.logger.name)
log_q.stop_listener()
if __name__ == "__main__":

View File

@@ -0,0 +1,31 @@
"""
Log logging_handling.log testing
"""
# import atexit
from pathlib import Path
from multiprocessing import Queue
# this is for testing only
from queue_logger.log_queue import QueueLogger
def main():
"""
Log testing
"""
script_path: Path = Path(__file__).resolve().parent
log_queue: 'Queue[str]' = Queue()
log_q_legacy = QueueLogger(
log_file=script_path.joinpath('log', 'test_queue_legacy.log'),
log_name="Test Log Queue",
log_queue=log_queue
)
log_q_legacy.mlog.info('Log test: %s', 'Queue Legacy')
# log_q.stop_listener()
if __name__ == "__main__":
main()
# __END__

2
uv.lock generated
View File

@@ -44,7 +44,7 @@ wheels = [
[[package]]
name = "corelibs"
version = "0.9.0"
version = "0.10.0"
source = { editable = "." }
dependencies = [
{ name = "jmespath" },