Files
CoreLibs-PyPI-All/test-run/logging_handling/log_queue.py
Clemens Schwaighofer d500b7d473 Log update with listener Queues and color highlight for console
enables Queues if multiprocessing.Queue() is set in the "log_queue" setting

Now a logger "Log.init_worker_logging" can be attached to the ProcessPoolExecutor,
the init args must be the log_queue set

example:

```py
with concurrent.futures.ProcessPoolExecutor(
    max_workers=max_forks,
    initializer=Log.init_worker_logging,
    initargs=(log_queue,)
) as executor:
````

Move all settings into a settings argument, the structure is defined in LogSettings.
Default settings are in Log.DEFAULT_LOG_SETTINGS

Only log path and log name are parameters

Color output for console is on default enabled, disable via "console_color_output_enabled"
The complete console output can be stopped with "console_enabled"
2025-07-09 14:41:53 +09:00

76 lines
1.9 KiB
Python

"""
Pool Queue log handling
Thread Queue log handling
"""
import random
import time
from multiprocessing import Queue
import concurrent.futures
import logging
from pathlib import Path
from corelibs.logging_handling.log import Log
def work_function(log_name: str, worker_id: int, data: list[int]) -> int:
"""
simulate worker
Arguments:
worker_id {int} -- _description_
data {list[int]} -- _description_
Returns:
int -- _description_
"""
log = logging.getLogger(f'{log_name}-WorkerFn-{worker_id}')
log.info('Starting worker: %s', worker_id)
time.sleep(random.uniform(1, 3))
result = sum(data) * worker_id
return result
def main():
"""
Queue log tester
"""
print("[START] Queue logger test")
log_queue: 'Queue[str]' = Queue()
script_path: Path = Path(__file__).resolve().parent
log = Log(
log_path=script_path.joinpath('log', 'test.log'),
log_name="Test Log",
log_settings={
"log_level_console": 'INFO',
"log_level_file": 'INFO',
"log_queue": log_queue,
}
)
log.logger.info('Pool Fork logging test')
max_forks = 2
data_sets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_forks,
initializer=Log.init_worker_logging,
initargs=(log_queue,)
) as executor:
log.logger.info('Start workers')
futures = [
executor.submit(work_function, log.log_name, worker_id, data)
for worker_id, data in enumerate(data_sets, 1)
]
log.logger.info('Workders started')
for future in concurrent.futures.as_completed(futures):
log.logger.info('Processing result: %s', future.result())
print(f"Processing result: {future.result()}")
log.logger.info('[END] Queue logger test')
log.stop_listener()
if __name__ == "__main__":
main()
# __END__