94 lines
2.7 KiB
Python
94 lines
2.7 KiB
Python
"""
|
|
Pool Queue log handling
|
|
Thread Queue log handling
|
|
"""
|
|
|
|
import random
|
|
import time
|
|
from multiprocessing import Queue
|
|
import concurrent.futures
|
|
import logging
|
|
from pathlib import Path
|
|
from corelibs.logging_handling.log import Log
|
|
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
|
|
|
|
|
def work_function(log_name: str, worker_id: int, data: list[int]) -> int:
|
|
"""
|
|
simulate worker
|
|
|
|
Arguments:
|
|
worker_id {int} -- _description_
|
|
data {list[int]} -- _description_
|
|
|
|
Returns:
|
|
int -- _description_
|
|
"""
|
|
log = logging.getLogger(f'{log_name}-WorkerFn-{worker_id}')
|
|
log.info('Starting worker: %s', worker_id)
|
|
time.sleep(random.uniform(1, 3))
|
|
result = sum(data) * worker_id
|
|
return result
|
|
|
|
|
|
def main():
|
|
"""
|
|
Queue log tester
|
|
"""
|
|
print("[START] Queue logger test")
|
|
log_queue: 'Queue[str]' = Queue()
|
|
script_path: Path = Path(__file__).resolve().parent
|
|
log = Log(
|
|
log_path=script_path.joinpath('log', 'test.log'),
|
|
log_name="Test Log",
|
|
log_settings={
|
|
"log_level_console": 'INFO',
|
|
"log_level_file": 'INFO',
|
|
"log_queue": log_queue,
|
|
}
|
|
)
|
|
if log.logger is None:
|
|
print("logger not yet started")
|
|
return
|
|
log.logger.debug('Pool Fork logging test')
|
|
max_forks = 2
|
|
data_sets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
|
|
with concurrent.futures.ProcessPoolExecutor(
|
|
max_workers=max_forks,
|
|
initializer=Log.init_worker_logging,
|
|
initargs=(log_queue,)
|
|
) as executor:
|
|
log.logger.info('Start workers')
|
|
futures = [
|
|
executor.submit(work_function, log.log_name, worker_id, data)
|
|
for worker_id, data in enumerate(data_sets, 1)
|
|
]
|
|
log.logger.info('Workders started')
|
|
|
|
for future in concurrent.futures.as_completed(futures):
|
|
log.logger.warning('Processing result: %s', future.result())
|
|
print(f"Processing result: {future.result()}")
|
|
|
|
log.set_log_level('stream_handler', LoggingLevel.ERROR)
|
|
log.logger.error('SECOND Start workers')
|
|
futures = [
|
|
executor.submit(work_function, log.log_name, worker_id, data)
|
|
for worker_id, data in enumerate(data_sets, 1)
|
|
]
|
|
log.logger.info('[INVISIBLE] Workders started')
|
|
log.logger.error('[VISIBLE] Second Workders started')
|
|
|
|
for future in concurrent.futures.as_completed(futures):
|
|
log.logger.error('Processing result: %s', future.result())
|
|
print(f"Processing result: {future.result()}")
|
|
|
|
log.set_log_level('stream_handler', LoggingLevel.DEBUG)
|
|
log.logger.info('[END] Queue logger test')
|
|
log.stop_listener()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
# __END__
|