Move all debug handling into their own packages
dump data: corelibs_dump_data stack trace: corelibs_stack_trace profiling, timing, etc: corelibs_debug
This commit is contained in:
@@ -7,8 +7,11 @@ readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"corelibs-datetime>=1.0.1",
|
||||
"corelibs-debug>=1.0.0",
|
||||
"corelibs-dump-data>=1.0.0",
|
||||
"corelibs-enum-base>=1.0.0",
|
||||
"corelibs-regex-checks>=1.0.0",
|
||||
"corelibs-stack-trace>=1.0.0",
|
||||
"corelibs-text-colors>=1.0.0",
|
||||
"corelibs-var>=1.0.0",
|
||||
"cryptography>=46.0.3",
|
||||
|
||||
@@ -12,7 +12,7 @@ TODO: adapt more CoreLibs DB IO class flow here
|
||||
"""
|
||||
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
from corelibs.debug_handling.debug_helpers import call_stack
|
||||
from corelibs_stack_trace.stack import call_stack
|
||||
from corelibs.db_handling.sqlite_io import SQLiteIO
|
||||
if TYPE_CHECKING:
|
||||
from corelibs.logging_handling.log import Logger
|
||||
|
||||
@@ -8,7 +8,7 @@ also method names are subject to change
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal, TYPE_CHECKING
|
||||
import sqlite3
|
||||
from corelibs.debug_handling.debug_helpers import call_stack
|
||||
from corelibs_stack_trace.stack import call_stack
|
||||
if TYPE_CHECKING:
|
||||
from corelibs.logging_handling.log import Logger
|
||||
|
||||
|
||||
@@ -2,16 +2,16 @@
|
||||
Various debug helpers
|
||||
"""
|
||||
|
||||
import traceback
|
||||
import os
|
||||
import sys
|
||||
from warnings import deprecated
|
||||
from typing import Tuple, Type
|
||||
from types import TracebackType
|
||||
from corelibs_stack_trace.stack import call_stack as call_stack_ng, exception_stack as exception_stack_ng
|
||||
|
||||
# _typeshed.OptExcInfo
|
||||
OptExcInfo = Tuple[None, None, None] | Tuple[Type[BaseException], BaseException, TracebackType]
|
||||
|
||||
|
||||
@deprecated("Use corelibs_stack_trace.stack.call_stack instead")
|
||||
def call_stack(
|
||||
start: int = 0,
|
||||
skip_last: int = -1,
|
||||
@@ -31,23 +31,15 @@ def call_stack(
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
# stack = traceback.extract_stack()[start:depth]
|
||||
# how many of the last entries we skip (so we do not get self), default is -1
|
||||
# start cannot be negative
|
||||
if skip_last > 0:
|
||||
skip_last = skip_last * -1
|
||||
stack = traceback.extract_stack()
|
||||
__stack = stack[start:skip_last]
|
||||
# start possible to high, reset start to 0
|
||||
if not __stack and reset_start_if_empty:
|
||||
start = 0
|
||||
__stack = stack[start:skip_last]
|
||||
if not separator:
|
||||
separator = ' -> '
|
||||
# print(f"* HERE: {dump_data(stack)}")
|
||||
return f"{separator}".join(f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}" for f in __stack)
|
||||
return call_stack_ng(
|
||||
start=start,
|
||||
skip_last=skip_last,
|
||||
separator=separator,
|
||||
reset_start_if_empty=reset_start_if_empty
|
||||
)
|
||||
|
||||
|
||||
@deprecated("Use corelibs_stack_trace.stack.exception_stack instead")
|
||||
def exception_stack(
|
||||
exc_stack: OptExcInfo | None = None,
|
||||
separator: str = ' -> '
|
||||
@@ -62,15 +54,9 @@ def exception_stack(
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
if exc_stack is not None:
|
||||
_, _, exc_traceback = exc_stack
|
||||
else:
|
||||
exc_traceback = None
|
||||
_, _, exc_traceback = sys.exc_info()
|
||||
stack = traceback.extract_tb(exc_traceback)
|
||||
if not separator:
|
||||
separator = ' -> '
|
||||
# print(f"* HERE: {dump_data(stack)}")
|
||||
return f"{separator}".join(f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}" for f in stack)
|
||||
return exception_stack_ng(
|
||||
exc_stack=exc_stack,
|
||||
separator=separator
|
||||
)
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -2,10 +2,12 @@
|
||||
dict dump as JSON formatted
|
||||
"""
|
||||
|
||||
import json
|
||||
from warnings import deprecated
|
||||
from typing import Any
|
||||
from corelibs_dump_data.dump_data import dump_data as dump_data_ng
|
||||
|
||||
|
||||
@deprecated("Use corelibs_dump_data.dump_data.dump_data instead")
|
||||
def dump_data(data: Any, use_indent: bool = True) -> str:
|
||||
"""
|
||||
dump formated output from dict/list
|
||||
@@ -16,7 +18,6 @@ def dump_data(data: Any, use_indent: bool = True) -> str:
|
||||
Returns:
|
||||
str: _description_
|
||||
"""
|
||||
indent = 4 if use_indent else None
|
||||
return json.dumps(data, indent=indent, ensure_ascii=False, default=str)
|
||||
return dump_data_ng(data=data, use_indent=use_indent)
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -4,123 +4,40 @@ Profile memory usage in Python
|
||||
|
||||
# https://docs.python.org/3/library/tracemalloc.html
|
||||
|
||||
import os
|
||||
import time
|
||||
import tracemalloc
|
||||
import linecache
|
||||
from typing import Tuple
|
||||
from tracemalloc import Snapshot
|
||||
import psutil
|
||||
from warnings import warn, deprecated
|
||||
from typing import TYPE_CHECKING
|
||||
from corelibs_debug.profiling import display_top as display_top_ng, display_top_str, Profiling as CoreLibsProfiling
|
||||
if TYPE_CHECKING:
|
||||
from tracemalloc import Snapshot
|
||||
|
||||
|
||||
def display_top(snapshot: Snapshot, key_type: str = 'lineno', limit: int = 10) -> str:
|
||||
@deprecated("Use corelibs_debug.profiling.display_top_str with data from display_top instead")
|
||||
def display_top(snapshot: 'Snapshot', key_type: str = 'lineno', limit: int = 10) -> str:
|
||||
"""
|
||||
Print tracmalloc stats
|
||||
https://docs.python.org/3/library/tracemalloc.html#pretty-top
|
||||
|
||||
Args:
|
||||
snapshot (Snapshot): _description_
|
||||
snapshot ('Snapshot'): _description_
|
||||
key_type (str, optional): _description_. Defaults to 'lineno'.
|
||||
limit (int, optional): _description_. Defaults to 10.
|
||||
"""
|
||||
snapshot = snapshot.filter_traces((
|
||||
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
|
||||
tracemalloc.Filter(False, "<unknown>"),
|
||||
))
|
||||
top_stats = snapshot.statistics(key_type)
|
||||
|
||||
profiler_msg = f"Top {limit} lines"
|
||||
for index, stat in enumerate(top_stats[:limit], 1):
|
||||
frame = stat.traceback[0]
|
||||
# replace "/path/to/module/file.py" with "module/file.py"
|
||||
filename = os.sep.join(frame.filename.split(os.sep)[-2:])
|
||||
profiler_msg += f"#{index}: {filename}:{frame.lineno}: {(stat.size / 1024):.1f} KiB"
|
||||
line = linecache.getline(frame.filename, frame.lineno).strip()
|
||||
if line:
|
||||
profiler_msg += f" {line}"
|
||||
|
||||
other = top_stats[limit:]
|
||||
if other:
|
||||
size = sum(stat.size for stat in other)
|
||||
profiler_msg += f"{len(other)} other: {(size / 1024):.1f} KiB"
|
||||
total = sum(stat.size for stat in top_stats)
|
||||
profiler_msg += f"Total allocated size: {(total / 1024):.1f} KiB"
|
||||
return profiler_msg
|
||||
return display_top_str(
|
||||
display_top_ng(
|
||||
snapshot=snapshot,
|
||||
key_type=key_type,
|
||||
limit=limit
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class Profiling:
|
||||
class Profiling(CoreLibsProfiling):
|
||||
"""
|
||||
Profile memory usage and elapsed time for some block
|
||||
Based on: https://stackoverflow.com/a/53301648
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# profiling id
|
||||
self.__ident: str = ''
|
||||
# memory
|
||||
self.__rss_before: int = 0
|
||||
self.__vms_before: int = 0
|
||||
# self.shared_before: int = 0
|
||||
self.__rss_used: int = 0
|
||||
self.__vms_used: int = 0
|
||||
# self.shared_used: int = 0
|
||||
# time
|
||||
self.__call_start: float = 0
|
||||
self.__elapsed = 0
|
||||
|
||||
def __get_process_memory(self) -> Tuple[int, int]:
|
||||
process = psutil.Process(os.getpid())
|
||||
mi = process.memory_info()
|
||||
# macos does not have mi.shared
|
||||
return mi.rss, mi.vms
|
||||
|
||||
def __elapsed_since(self) -> str:
|
||||
elapsed = time.time() - self.__call_start
|
||||
if elapsed < 1:
|
||||
return str(round(elapsed * 1000, 2)) + "ms"
|
||||
if elapsed < 60:
|
||||
return str(round(elapsed, 2)) + "s"
|
||||
if elapsed < 3600:
|
||||
return str(round(elapsed / 60, 2)) + "min"
|
||||
return str(round(elapsed / 3600, 2)) + "hrs"
|
||||
|
||||
def __format_bytes(self, bytes_data: int) -> str:
|
||||
if abs(bytes_data) < 1000:
|
||||
return str(bytes_data) + "B"
|
||||
if abs(bytes_data) < 1e6:
|
||||
return str(round(bytes_data / 1e3, 2)) + "kB"
|
||||
if abs(bytes_data) < 1e9:
|
||||
return str(round(bytes_data / 1e6, 2)) + "MB"
|
||||
return str(round(bytes_data / 1e9, 2)) + "GB"
|
||||
|
||||
def start_profiling(self, ident: str) -> None:
|
||||
"""
|
||||
start the profiling
|
||||
"""
|
||||
self.__ident = ident
|
||||
self.__rss_before, self.__vms_before = self.__get_process_memory()
|
||||
self.__call_start = time.time()
|
||||
|
||||
def end_profiling(self) -> None:
|
||||
"""
|
||||
end the profiling
|
||||
"""
|
||||
if self.__rss_before == 0 and self.__vms_before == 0:
|
||||
print("start_profile() was not called, output will be negative")
|
||||
self.__elapsed = self.__elapsed_since()
|
||||
__rss_after, __vms_after = self.__get_process_memory()
|
||||
self.__rss_used = __rss_after - self.__rss_before
|
||||
self.__vms_used = __vms_after - self.__vms_before
|
||||
|
||||
def print_profiling(self) -> str:
|
||||
"""
|
||||
print the profiling time
|
||||
"""
|
||||
return (
|
||||
f"Profiling: {self.__ident:>20} "
|
||||
f"RSS: {self.__format_bytes(self.__rss_used):>8} | "
|
||||
f"VMS: {self.__format_bytes(self.__vms_used):>8} | "
|
||||
f"time: {self.__elapsed:>8}"
|
||||
)
|
||||
warn("Use corelibs_debug.profiling.Profiling instead", DeprecationWarning, stacklevel=2)
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -5,109 +5,16 @@ Returns:
|
||||
Timer: class timer for basic time run calculations
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from warnings import warn
|
||||
from corelibs_debug.timer import Timer as CorelibsTimer
|
||||
|
||||
|
||||
class Timer:
|
||||
class Timer(CorelibsTimer):
|
||||
"""
|
||||
get difference between start and end date/time
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
init new start time and set end time to None
|
||||
"""
|
||||
self._overall_start_time = datetime.now()
|
||||
self._overall_end_time = None
|
||||
self._overall_run_time = None
|
||||
self._start_time = datetime.now()
|
||||
self._end_time = None
|
||||
self._run_time = None
|
||||
|
||||
# MARK: overall run time
|
||||
def overall_run_time(self) -> timedelta:
|
||||
"""
|
||||
overall run time difference from class launch to call of this function
|
||||
|
||||
Returns:
|
||||
timedelta: _description_
|
||||
"""
|
||||
self._overall_end_time = datetime.now()
|
||||
self._overall_run_time = self._overall_end_time - self._overall_start_time
|
||||
return self._overall_run_time
|
||||
|
||||
def get_overall_start_time(self) -> datetime:
|
||||
"""
|
||||
get set start time
|
||||
|
||||
Returns:
|
||||
datetime: _description_
|
||||
"""
|
||||
return self._overall_start_time
|
||||
|
||||
def get_overall_end_time(self) -> datetime | None:
|
||||
"""
|
||||
get set end time or None for not set
|
||||
|
||||
Returns:
|
||||
datetime|None: _description_
|
||||
"""
|
||||
return self._overall_end_time
|
||||
|
||||
def get_overall_run_time(self) -> timedelta | None:
|
||||
"""
|
||||
get run time or None if run time was not called
|
||||
|
||||
Returns:
|
||||
datetime|None: _description_
|
||||
"""
|
||||
return self._overall_run_time
|
||||
|
||||
# MARK: set run time
|
||||
def run_time(self) -> timedelta:
|
||||
"""
|
||||
difference between start time and current time
|
||||
|
||||
Returns:
|
||||
datetime: _description_
|
||||
"""
|
||||
self._end_time = datetime.now()
|
||||
self._run_time = self._end_time - self._start_time
|
||||
return self._run_time
|
||||
|
||||
def reset_run_time(self):
|
||||
"""
|
||||
reset start/end and run tine
|
||||
"""
|
||||
self._start_time = datetime.now()
|
||||
self._end_time = None
|
||||
self._run_time = None
|
||||
|
||||
def get_start_time(self) -> datetime:
|
||||
"""
|
||||
get set start time
|
||||
|
||||
Returns:
|
||||
datetime: _description_
|
||||
"""
|
||||
return self._start_time
|
||||
|
||||
def get_end_time(self) -> datetime | None:
|
||||
"""
|
||||
get set end time or None for not set
|
||||
|
||||
Returns:
|
||||
datetime|None: _description_
|
||||
"""
|
||||
return self._end_time
|
||||
|
||||
def get_run_time(self) -> timedelta | None:
|
||||
"""
|
||||
get run time or None if run time was not called
|
||||
|
||||
Returns:
|
||||
datetime|None: _description_
|
||||
"""
|
||||
return self._run_time
|
||||
warn("Use corelibs_debug.timer.Timer instead", DeprecationWarning, stacklevel=2)
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -2,11 +2,18 @@
|
||||
Various small helpers for data writing
|
||||
"""
|
||||
|
||||
from warnings import deprecated
|
||||
from typing import TYPE_CHECKING
|
||||
from corelibs_debug.writeline import (
|
||||
write_l as write_l_ng, pr_header as pr_header_ng,
|
||||
pr_title as pr_title_ng, pr_open as pr_open_ng,
|
||||
pr_close as pr_close_ng, pr_act as pr_act_ng
|
||||
)
|
||||
if TYPE_CHECKING:
|
||||
from io import TextIOWrapper, StringIO
|
||||
|
||||
|
||||
@deprecated("Use corelibs_debug.writeline.write_l instead")
|
||||
def write_l(line: str, fpl: 'TextIOWrapper | StringIO | None' = None, print_line: bool = False):
|
||||
"""
|
||||
Write a line to screen and to output file
|
||||
@@ -15,23 +22,30 @@ def write_l(line: str, fpl: 'TextIOWrapper | StringIO | None' = None, print_line
|
||||
line (String): Line to write
|
||||
fpl (Resource): file handler resource, if none write only to console
|
||||
"""
|
||||
if print_line is True:
|
||||
print(line)
|
||||
if fpl is not None:
|
||||
fpl.write(line + "\n")
|
||||
return write_l_ng(
|
||||
line=line,
|
||||
fpl=fpl,
|
||||
print_line=print_line
|
||||
)
|
||||
|
||||
|
||||
# progress printers
|
||||
|
||||
@deprecated("Use corelibs_debug.writeline.pr_header instead")
|
||||
def pr_header(tag: str, marker_string: str = '#', width: int = 35):
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
tag (str): _description_
|
||||
"""
|
||||
print(f" {marker_string} {tag:^{width}} {marker_string}")
|
||||
return pr_header_ng(
|
||||
tag=tag,
|
||||
marker_string=marker_string,
|
||||
width=width
|
||||
)
|
||||
|
||||
|
||||
@deprecated("Use corelibs_debug.writeline.pr_title instead")
|
||||
def pr_title(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35):
|
||||
"""_summary_
|
||||
|
||||
@@ -39,9 +53,15 @@ def pr_title(tag: str, prefix_string: str = '|', space_filler: str = '.', width:
|
||||
tag (str): _description_
|
||||
prefix_string (str, optional): _description_. Defaults to '|'.
|
||||
"""
|
||||
print(f" {prefix_string} {tag:{space_filler}<{width}}:", flush=True)
|
||||
return pr_title_ng(
|
||||
tag=tag,
|
||||
prefix_string=prefix_string,
|
||||
space_filler=space_filler,
|
||||
width=width
|
||||
)
|
||||
|
||||
|
||||
@deprecated("Use corelibs_debug.writeline.pr_open instead")
|
||||
def pr_open(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35):
|
||||
"""
|
||||
writen progress open line with tag
|
||||
@@ -50,9 +70,15 @@ def pr_open(tag: str, prefix_string: str = '|', space_filler: str = '.', width:
|
||||
tag (str): _description_
|
||||
prefix_string (str): prefix string. Default: '|'
|
||||
"""
|
||||
print(f" {prefix_string} {tag:{space_filler}<{width}} [", end="", flush=True)
|
||||
return pr_open_ng(
|
||||
tag=tag,
|
||||
prefix_string=prefix_string,
|
||||
space_filler=space_filler,
|
||||
width=width
|
||||
)
|
||||
|
||||
|
||||
@deprecated("Use corelibs_debug.writeline.pr_close instead")
|
||||
def pr_close(tag: str = ''):
|
||||
"""
|
||||
write the close tag with new line
|
||||
@@ -60,9 +86,10 @@ def pr_close(tag: str = ''):
|
||||
Args:
|
||||
tag (str, optional): _description_. Defaults to ''.
|
||||
"""
|
||||
print(f"{tag}]", flush=True)
|
||||
return pr_close_ng(tag=tag)
|
||||
|
||||
|
||||
@deprecated("Use corelibs_debug.writeline.pr_act instead")
|
||||
def pr_act(act: str = "."):
|
||||
"""
|
||||
write progress character
|
||||
@@ -70,6 +97,6 @@ def pr_act(act: str = "."):
|
||||
Args:
|
||||
act (str, optional): _description_. Defaults to ".".
|
||||
"""
|
||||
print(f"{act}", end="", flush=True)
|
||||
return pr_act_ng(act=act)
|
||||
|
||||
# __EMD__
|
||||
|
||||
0
src/corelibs/exceptions/__init__.py
Normal file
0
src/corelibs/exceptions/__init__.py
Normal file
@@ -13,9 +13,9 @@ from pathlib import Path
|
||||
import atexit
|
||||
from enum import Flag, auto
|
||||
from typing import MutableMapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
|
||||
from corelibs_stack_trace.stack import call_stack, exception_stack
|
||||
from corelibs_text_colors.text_colors import Colors
|
||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||
from corelibs.debug_handling.debug_helpers import call_stack, exception_stack
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from multiprocessing import Queue
|
||||
|
||||
@@ -4,7 +4,7 @@ Settings loader test
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
from corelibs.logging_handling.log import Log
|
||||
from corelibs.config_handling.settings_loader import SettingsLoader
|
||||
from corelibs.config_handling.settings_loader_handling.settings_loader_check import SettingsLoaderCheck
|
||||
|
||||
@@ -5,7 +5,7 @@ SQL Main wrapper test
|
||||
from pathlib import Path
|
||||
from uuid import uuid4
|
||||
import json
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
from corelibs.logging_handling.log import Log, Logger
|
||||
from corelibs.db_handling.sql_main import SQLMain
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from pathlib import Path
|
||||
from uuid import uuid4
|
||||
import json
|
||||
import sqlite3
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
from corelibs.logging_handling.log import Log, Logger
|
||||
from corelibs.db_handling.sqlite_io import SQLiteIO
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ Symmetric encryption test
|
||||
"""
|
||||
|
||||
import json
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
from corelibs.encryption_handling.symmetric_encryption import SymmetricEncryption
|
||||
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ BOM check for files
|
||||
|
||||
from pathlib import Path
|
||||
from corelibs.file_handling.file_bom_encoding import is_bom_encoded, is_bom_encoded_info
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
|
||||
|
||||
def main() -> None:
|
||||
|
||||
@@ -5,7 +5,7 @@ Search data tests
|
||||
iterator_handling.data_search
|
||||
"""
|
||||
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
from corelibs.iterator_handling.data_search import find_in_array_from_list, ArraySearchList
|
||||
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ Iterator helper testing
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
from corelibs.iterator_handling.dict_mask import mask
|
||||
from corelibs.iterator_handling.dict_helpers import set_entry
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ test list helpers
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list, make_unique_list_of_dicts
|
||||
from corelibs.iterator_handling.fingerprint import dict_hash_crc
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
jmes path testing
|
||||
"""
|
||||
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
from corelibs.json_handling.jmespath_helper import jmespath_search
|
||||
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ JSON content replace tets
|
||||
"""
|
||||
|
||||
from deepdiff import DeepDiff
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
from corelibs.json_handling.json_helper import modify_with_jsonpath
|
||||
|
||||
|
||||
|
||||
@@ -6,8 +6,8 @@ Log logging_handling.log testing
|
||||
import sys
|
||||
from pathlib import Path
|
||||
# this is for testing only
|
||||
from corelibs_stack_trace.stack import exception_stack, call_stack
|
||||
from corelibs.logging_handling.log import Log, Logger, ConsoleFormat, ConsoleFormatSettings
|
||||
from corelibs.debug_handling.debug_helpers import exception_stack, call_stack
|
||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||
|
||||
|
||||
|
||||
@@ -1,639 +0,0 @@
|
||||
"""
|
||||
Unit tests for debug_handling.debug_helpers module
|
||||
"""
|
||||
|
||||
import sys
|
||||
import pytest
|
||||
|
||||
from corelibs.debug_handling.debug_helpers import (
|
||||
call_stack,
|
||||
exception_stack,
|
||||
OptExcInfo
|
||||
)
|
||||
|
||||
|
||||
class TestCallStack:
|
||||
"""Test cases for call_stack function"""
|
||||
|
||||
def test_call_stack_basic(self):
|
||||
"""Test basic call_stack functionality"""
|
||||
result = call_stack()
|
||||
assert isinstance(result, str)
|
||||
assert "test_debug_helpers.py" in result
|
||||
assert "test_call_stack_basic" in result
|
||||
|
||||
def test_call_stack_with_default_separator(self):
|
||||
"""Test call_stack with default separator"""
|
||||
result = call_stack()
|
||||
assert " -> " in result
|
||||
|
||||
def test_call_stack_with_custom_separator(self):
|
||||
"""Test call_stack with custom separator"""
|
||||
result = call_stack(separator=" | ")
|
||||
assert " | " in result
|
||||
assert " -> " not in result
|
||||
|
||||
def test_call_stack_with_empty_separator(self):
|
||||
"""Test call_stack with empty separator (should default to ' -> ')"""
|
||||
result = call_stack(separator="")
|
||||
assert " -> " in result
|
||||
|
||||
def test_call_stack_format(self):
|
||||
"""Test call_stack output format (filename:function:lineno)"""
|
||||
result = call_stack()
|
||||
parts = result.split(" -> ")
|
||||
for part in parts:
|
||||
# Each part should have format: filename:function:lineno
|
||||
assert part.count(":") >= 2
|
||||
# Most parts should contain .py but some system frames might not
|
||||
# Just check that we have some .py files in the trace
|
||||
assert ".py" in result or "test_debug_helpers" in result
|
||||
|
||||
def test_call_stack_with_start_offset(self):
|
||||
"""Test call_stack with start offset"""
|
||||
result_no_offset = call_stack(start=0)
|
||||
result_with_offset = call_stack(start=2)
|
||||
|
||||
# With offset, we should get fewer frames
|
||||
parts_no_offset = result_no_offset.split(" -> ")
|
||||
parts_with_offset = result_with_offset.split(" -> ")
|
||||
|
||||
assert len(parts_with_offset) <= len(parts_no_offset)
|
||||
|
||||
def test_call_stack_with_skip_last(self):
|
||||
"""Test call_stack with skip_last parameter"""
|
||||
result_skip_default = call_stack(skip_last=-1)
|
||||
result_skip_more = call_stack(skip_last=-3)
|
||||
|
||||
# Skipping more should result in fewer frames
|
||||
parts_default = result_skip_default.split(" -> ")
|
||||
parts_more = result_skip_more.split(" -> ")
|
||||
|
||||
assert len(parts_more) <= len(parts_default)
|
||||
|
||||
def test_call_stack_skip_last_positive_converts_to_negative(self):
|
||||
"""Test that positive skip_last is converted to negative"""
|
||||
# Both should produce same result
|
||||
result_negative = call_stack(skip_last=-2)
|
||||
result_positive = call_stack(skip_last=2)
|
||||
|
||||
assert result_negative == result_positive
|
||||
|
||||
def test_call_stack_nested_calls(self):
|
||||
"""Test call_stack in nested function calls"""
|
||||
def level_one():
|
||||
return level_two()
|
||||
|
||||
def level_two():
|
||||
return level_three()
|
||||
|
||||
def level_three():
|
||||
return call_stack()
|
||||
|
||||
result = level_one()
|
||||
assert "level_one" in result
|
||||
assert "level_two" in result
|
||||
assert "level_three" in result
|
||||
|
||||
def test_call_stack_reset_start_if_empty_false(self):
|
||||
"""Test call_stack with high start value and reset_start_if_empty=False"""
|
||||
# Using a very high start value should result in empty stack
|
||||
result = call_stack(start=1000, reset_start_if_empty=False)
|
||||
assert result == ""
|
||||
|
||||
def test_call_stack_reset_start_if_empty_true(self):
|
||||
"""Test call_stack with high start value and reset_start_if_empty=True"""
|
||||
# Using a very high start value with reset should give non-empty result
|
||||
result = call_stack(start=1000, reset_start_if_empty=True)
|
||||
assert result != ""
|
||||
assert "test_debug_helpers.py" in result
|
||||
|
||||
def test_call_stack_contains_line_numbers(self):
|
||||
"""Test that call_stack includes line numbers"""
|
||||
result = call_stack()
|
||||
# Extract parts and check for numbers
|
||||
parts = result.split(" -> ")
|
||||
for part in parts:
|
||||
# Line numbers should be present (digits at the end)
|
||||
assert any(char.isdigit() for char in part)
|
||||
|
||||
def test_call_stack_separator_none(self):
|
||||
"""Test call_stack with None separator"""
|
||||
result = call_stack(separator="") # Use empty string instead of None
|
||||
# Empty string should be converted to default ' -> '
|
||||
assert " -> " in result
|
||||
|
||||
def test_call_stack_multiple_separators(self):
|
||||
"""Test call_stack with various custom separators"""
|
||||
separators = [" | ", " >> ", " => ", " / ", "\n"]
|
||||
|
||||
for sep in separators:
|
||||
result = call_stack(separator=sep)
|
||||
assert sep in result or result == "" # May be empty based on stack depth
|
||||
|
||||
|
||||
class TestExceptionStack:
|
||||
"""Test cases for exception_stack function"""
|
||||
|
||||
def test_exception_stack_with_active_exception(self):
|
||||
"""Test exception_stack when an exception is active"""
|
||||
try:
|
||||
raise ValueError("Test exception")
|
||||
except ValueError:
|
||||
result = exception_stack()
|
||||
assert isinstance(result, str)
|
||||
assert "test_debug_helpers.py" in result
|
||||
assert "test_exception_stack_with_active_exception" in result
|
||||
|
||||
def test_exception_stack_format(self):
|
||||
"""Test exception_stack output format"""
|
||||
try:
|
||||
raise RuntimeError("Test error")
|
||||
except RuntimeError:
|
||||
result = exception_stack()
|
||||
parts = result.split(" -> ")
|
||||
for part in parts:
|
||||
# Each part should have format: filename:function:lineno
|
||||
assert part.count(":") >= 2
|
||||
|
||||
def test_exception_stack_with_custom_separator(self):
|
||||
"""Test exception_stack with custom separator"""
|
||||
def nested_call():
|
||||
def inner_call():
|
||||
raise TypeError("Test type error")
|
||||
inner_call()
|
||||
|
||||
try:
|
||||
nested_call()
|
||||
except TypeError:
|
||||
result = exception_stack(separator=" | ")
|
||||
# Only check separator if there are multiple frames
|
||||
if " | " in result or result.count(":") == 2:
|
||||
# Single frame or has separator
|
||||
assert isinstance(result, str)
|
||||
assert " -> " not in result
|
||||
|
||||
def test_exception_stack_with_empty_separator(self):
|
||||
"""Test exception_stack with empty separator (should default to ' -> ')"""
|
||||
def nested_call():
|
||||
def inner_call():
|
||||
raise KeyError("Test key error")
|
||||
inner_call()
|
||||
|
||||
try:
|
||||
nested_call()
|
||||
except KeyError:
|
||||
result = exception_stack(separator="")
|
||||
# Should use default separator if multiple frames exist
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_exception_stack_separator_none(self):
|
||||
"""Test exception_stack with empty separator"""
|
||||
def nested_call():
|
||||
def inner_call():
|
||||
raise IndexError("Test index error")
|
||||
inner_call()
|
||||
|
||||
try:
|
||||
nested_call()
|
||||
except IndexError:
|
||||
result = exception_stack(separator="") # Use empty string instead of None
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_exception_stack_nested_exceptions(self):
|
||||
"""Test exception_stack with nested function calls"""
|
||||
def level_one():
|
||||
level_two()
|
||||
|
||||
def level_two():
|
||||
level_three()
|
||||
|
||||
def level_three():
|
||||
raise ValueError("Nested exception")
|
||||
|
||||
try:
|
||||
level_one()
|
||||
except ValueError:
|
||||
result = exception_stack()
|
||||
# Should contain all levels in the stack
|
||||
assert "level_one" in result or "level_two" in result or "level_three" in result
|
||||
|
||||
def test_exception_stack_with_provided_exc_info(self):
|
||||
"""Test exception_stack with explicitly provided exc_info"""
|
||||
try:
|
||||
raise AttributeError("Test attribute error")
|
||||
except AttributeError:
|
||||
exc_info = sys.exc_info()
|
||||
result = exception_stack(exc_stack=exc_info)
|
||||
assert isinstance(result, str)
|
||||
assert len(result) > 0
|
||||
|
||||
def test_exception_stack_no_active_exception(self):
|
||||
"""Test exception_stack when no exception is active"""
|
||||
# This should handle the case gracefully
|
||||
# When no exception is active, sys.exc_info() returns (None, None, None)
|
||||
result = exception_stack()
|
||||
# With no traceback, should return empty string or handle gracefully
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_exception_stack_contains_line_numbers(self):
|
||||
"""Test that exception_stack includes line numbers"""
|
||||
try:
|
||||
raise OSError("Test OS error")
|
||||
except OSError:
|
||||
result = exception_stack()
|
||||
if result: # May be empty
|
||||
parts = result.split(" -> ")
|
||||
for part in parts:
|
||||
# Line numbers should be present
|
||||
assert any(char.isdigit() for char in part)
|
||||
|
||||
def test_exception_stack_multiple_exceptions(self):
|
||||
"""Test exception_stack captures the current exception only"""
|
||||
first_result = None
|
||||
second_result = None
|
||||
|
||||
try:
|
||||
raise ValueError("First exception")
|
||||
except ValueError:
|
||||
first_result = exception_stack()
|
||||
|
||||
try:
|
||||
raise TypeError("Second exception")
|
||||
except TypeError:
|
||||
second_result = exception_stack()
|
||||
|
||||
# Both should be valid but may differ
|
||||
assert isinstance(first_result, str)
|
||||
assert isinstance(second_result, str)
|
||||
|
||||
def test_exception_stack_with_multiple_separators(self):
|
||||
"""Test exception_stack with various custom separators"""
|
||||
separators = [" | ", " >> ", " => ", " / ", "\n"]
|
||||
|
||||
def nested_call():
|
||||
def inner_call():
|
||||
raise ValueError("Test exception")
|
||||
inner_call()
|
||||
|
||||
for sep in separators:
|
||||
try:
|
||||
nested_call()
|
||||
except ValueError:
|
||||
result = exception_stack(separator=sep)
|
||||
assert isinstance(result, str)
|
||||
# Separator only appears if there are multiple frames
|
||||
|
||||
|
||||
class TestOptExcInfo:
|
||||
"""Test cases for OptExcInfo type definition"""
|
||||
|
||||
def test_opt_exc_info_type_none_tuple(self):
|
||||
"""Test OptExcInfo can be None tuple"""
|
||||
exc_info: OptExcInfo = (None, None, None)
|
||||
assert exc_info == (None, None, None)
|
||||
|
||||
def test_opt_exc_info_type_exception_tuple(self):
|
||||
"""Test OptExcInfo can be exception tuple"""
|
||||
try:
|
||||
raise ValueError("Test")
|
||||
except ValueError:
|
||||
exc_info: OptExcInfo = sys.exc_info()
|
||||
assert exc_info[0] is not None
|
||||
assert exc_info[1] is not None
|
||||
assert exc_info[2] is not None
|
||||
|
||||
def test_opt_exc_info_with_exception_stack(self):
|
||||
"""Test that OptExcInfo works with exception_stack function"""
|
||||
try:
|
||||
raise RuntimeError("Test runtime error")
|
||||
except RuntimeError:
|
||||
exc_info = sys.exc_info()
|
||||
result = exception_stack(exc_stack=exc_info)
|
||||
assert isinstance(result, str)
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests combining multiple scenarios"""
|
||||
|
||||
def test_call_stack_and_exception_stack_together(self):
|
||||
"""Test using both call_stack and exception_stack in error handling"""
|
||||
def faulty_function():
|
||||
_ = call_stack() # Get call stack before exception
|
||||
raise ValueError("Intentional error")
|
||||
|
||||
try:
|
||||
faulty_function()
|
||||
except ValueError:
|
||||
exception_trace = exception_stack()
|
||||
|
||||
assert isinstance(exception_trace, str)
|
||||
assert "faulty_function" in exception_trace or "test_debug_helpers.py" in exception_trace
|
||||
|
||||
def test_nested_exception_with_call_stack(self):
|
||||
"""Test call_stack within exception handling"""
|
||||
def outer():
|
||||
return inner()
|
||||
|
||||
def inner():
|
||||
try:
|
||||
raise RuntimeError("Inner error")
|
||||
except RuntimeError:
|
||||
return {
|
||||
'call_stack': call_stack(),
|
||||
'exception_stack': exception_stack()
|
||||
}
|
||||
|
||||
result = outer()
|
||||
assert 'call_stack' in result
|
||||
assert 'exception_stack' in result
|
||||
assert isinstance(result['call_stack'], str)
|
||||
assert isinstance(result['exception_stack'], str)
|
||||
|
||||
def test_multiple_nested_levels(self):
|
||||
"""Test with multiple nested function levels"""
|
||||
def level_a():
|
||||
return level_b()
|
||||
|
||||
def level_b():
|
||||
return level_c()
|
||||
|
||||
def level_c():
|
||||
return level_d()
|
||||
|
||||
def level_d():
|
||||
try:
|
||||
raise ValueError("Deep error")
|
||||
except ValueError:
|
||||
return {
|
||||
'call': call_stack(),
|
||||
'exception': exception_stack()
|
||||
}
|
||||
|
||||
result = level_a()
|
||||
# Should contain information about the call chain
|
||||
assert result['call']
|
||||
assert result['exception']
|
||||
|
||||
def test_different_separators_consistency(self):
|
||||
"""Test that different separators work consistently"""
|
||||
separators = [" -> ", " | ", " / ", " >> "]
|
||||
|
||||
def nested_call():
|
||||
def inner_call():
|
||||
raise ValueError("Test")
|
||||
inner_call()
|
||||
|
||||
for sep in separators:
|
||||
try:
|
||||
nested_call()
|
||||
except ValueError:
|
||||
exc_result = exception_stack(separator=sep)
|
||||
call_result = call_stack(separator=sep)
|
||||
|
||||
assert isinstance(exc_result, str)
|
||||
assert isinstance(call_result, str)
|
||||
# Both should be valid strings (separator check only if multiple frames)
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and boundary conditions"""
|
||||
|
||||
def test_call_stack_with_zero_start(self):
|
||||
"""Test call_stack with start=0 (should include all frames)"""
|
||||
result = call_stack(start=0)
|
||||
assert isinstance(result, str)
|
||||
assert len(result) > 0
|
||||
|
||||
def test_call_stack_with_large_skip_last(self):
|
||||
"""Test call_stack with very large skip_last value"""
|
||||
result = call_stack(skip_last=-100)
|
||||
# Should handle gracefully, may be empty
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_exception_stack_none_exc_info(self):
|
||||
"""Test exception_stack with None as exc_stack"""
|
||||
result = exception_stack(exc_stack=None)
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_exception_stack_empty_tuple(self):
|
||||
"""Test exception_stack with empty exception info"""
|
||||
exc_info: OptExcInfo = (None, None, None)
|
||||
result = exception_stack(exc_stack=exc_info)
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_call_stack_special_characters_in_separator(self):
|
||||
"""Test call_stack with special characters in separator"""
|
||||
special_separators = ["\n", "\t", "->", "||", "//"]
|
||||
|
||||
for sep in special_separators:
|
||||
result = call_stack(separator=sep)
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_very_deep_call_stack(self):
|
||||
"""Test call_stack with very deep recursion (up to a limit)"""
|
||||
def recursive_call(depth: int, max_depth: int = 5) -> str:
|
||||
if depth >= max_depth:
|
||||
return call_stack()
|
||||
return recursive_call(depth + 1, max_depth)
|
||||
|
||||
result = recursive_call(0)
|
||||
assert isinstance(result, str)
|
||||
# Should contain multiple recursive_call entries
|
||||
assert result.count("recursive_call") > 0
|
||||
|
||||
def test_exception_stack_different_exception_types(self):
|
||||
"""Test exception_stack with various exception types"""
|
||||
exception_types = [
|
||||
ValueError("value"),
|
||||
TypeError("type"),
|
||||
KeyError("key"),
|
||||
IndexError("index"),
|
||||
AttributeError("attr"),
|
||||
RuntimeError("runtime"),
|
||||
]
|
||||
|
||||
for exc in exception_types:
|
||||
try:
|
||||
raise exc
|
||||
except (ValueError, TypeError, KeyError, IndexError, AttributeError, RuntimeError):
|
||||
result = exception_stack()
|
||||
assert isinstance(result, str)
|
||||
|
||||
|
||||
class TestRealWorldScenarios:
|
||||
"""Test real-world debugging scenarios"""
|
||||
|
||||
def test_debugging_workflow(self):
|
||||
"""Test typical debugging workflow with both functions"""
|
||||
def process_data(data: str) -> str:
|
||||
_ = call_stack() # Capture call stack for debugging
|
||||
if not data:
|
||||
raise ValueError("No data provided")
|
||||
return data.upper()
|
||||
|
||||
# Success case
|
||||
result = process_data("test")
|
||||
assert result == "TEST"
|
||||
|
||||
# Error case
|
||||
try:
|
||||
process_data("")
|
||||
except ValueError:
|
||||
exc_trace = exception_stack()
|
||||
assert isinstance(exc_trace, str)
|
||||
|
||||
def test_logging_context(self):
|
||||
"""Test using call_stack for logging context"""
|
||||
def get_logging_context():
|
||||
return {
|
||||
'timestamp': 'now',
|
||||
'stack': call_stack(start=1, separator=" > "),
|
||||
'function': 'get_logging_context'
|
||||
}
|
||||
|
||||
context = get_logging_context()
|
||||
assert 'stack' in context
|
||||
assert 'timestamp' in context
|
||||
assert isinstance(context['stack'], str)
|
||||
|
||||
def test_error_reporting(self):
|
||||
"""Test comprehensive error reporting"""
|
||||
def dangerous_operation() -> dict[str, str]:
|
||||
try:
|
||||
# Simulate some operation
|
||||
_ = 1 / 0
|
||||
except ZeroDivisionError:
|
||||
return {
|
||||
'error': 'Division by zero',
|
||||
'call_stack': call_stack(),
|
||||
'exception_stack': exception_stack(),
|
||||
}
|
||||
return {} # Fallback return
|
||||
|
||||
error_report = dangerous_operation()
|
||||
assert error_report is not None
|
||||
assert 'error' in error_report
|
||||
assert 'call_stack' in error_report
|
||||
assert 'exception_stack' in error_report
|
||||
assert error_report['error'] == 'Division by zero'
|
||||
|
||||
def test_function_tracing(self):
|
||||
"""Test function call tracing"""
|
||||
traces: list[str] = []
|
||||
|
||||
def traced_function_a() -> str:
|
||||
traces.append(call_stack())
|
||||
return traced_function_b()
|
||||
|
||||
def traced_function_b() -> str:
|
||||
traces.append(call_stack())
|
||||
return traced_function_c()
|
||||
|
||||
def traced_function_c() -> str:
|
||||
traces.append(call_stack())
|
||||
return "done"
|
||||
|
||||
result = traced_function_a()
|
||||
assert result == "done"
|
||||
assert len(traces) == 3
|
||||
# Each trace should be different (different call depths)
|
||||
assert all(isinstance(t, str) for t in traces)
|
||||
|
||||
def test_exception_chain_tracking(self):
|
||||
"""Test tracking exception chains"""
|
||||
exception_traces: list[str] = []
|
||||
|
||||
def operation_one() -> None:
|
||||
try:
|
||||
operation_two()
|
||||
except ValueError:
|
||||
exception_traces.append(exception_stack())
|
||||
raise
|
||||
|
||||
def operation_two() -> None:
|
||||
try:
|
||||
operation_three()
|
||||
except TypeError as exc:
|
||||
exception_traces.append(exception_stack())
|
||||
raise ValueError("Wrapped error") from exc
|
||||
|
||||
def operation_three() -> None:
|
||||
raise TypeError("Original error")
|
||||
|
||||
try:
|
||||
operation_one()
|
||||
except ValueError:
|
||||
exception_traces.append(exception_stack())
|
||||
|
||||
# Should have captured multiple exception stacks
|
||||
assert len(exception_traces) > 0
|
||||
assert all(isinstance(t, str) for t in exception_traces)
|
||||
|
||||
|
||||
class TestParametrized:
|
||||
"""Parametrized tests for comprehensive coverage"""
|
||||
|
||||
@pytest.mark.parametrize("start", [0, 1, 2, 5, 10])
|
||||
def test_call_stack_various_starts(self, start: int) -> None:
|
||||
"""Test call_stack with various start values"""
|
||||
result = call_stack(start=start)
|
||||
assert isinstance(result, str)
|
||||
|
||||
@pytest.mark.parametrize("skip_last", [-1, -2, -3, -5, 1, 2, 3, 5])
|
||||
def test_call_stack_various_skip_lasts(self, skip_last: int) -> None:
|
||||
"""Test call_stack with various skip_last values"""
|
||||
result = call_stack(skip_last=skip_last)
|
||||
assert isinstance(result, str)
|
||||
|
||||
@pytest.mark.parametrize("separator", [" -> ", " | ", " / ", " >> ", " => ", "\n", "\t"])
|
||||
def test_call_stack_various_separators(self, separator: str) -> None:
|
||||
"""Test call_stack with various separators"""
|
||||
result = call_stack(separator=separator)
|
||||
assert isinstance(result, str)
|
||||
if result:
|
||||
assert separator in result
|
||||
|
||||
@pytest.mark.parametrize("reset_start", [True, False])
|
||||
def test_call_stack_reset_start_variations(self, reset_start: bool) -> None:
|
||||
"""Test call_stack with reset_start_if_empty variations"""
|
||||
result = call_stack(start=100, reset_start_if_empty=reset_start)
|
||||
assert isinstance(result, str)
|
||||
if reset_start:
|
||||
assert len(result) > 0 # Should have content after reset
|
||||
else:
|
||||
assert len(result) == 0 # Should be empty
|
||||
|
||||
@pytest.mark.parametrize("separator", [" -> ", " | ", " / ", " >> ", "\n"])
|
||||
def test_exception_stack_various_separators(self, separator: str) -> None:
|
||||
"""Test exception_stack with various separators"""
|
||||
def nested_call():
|
||||
def inner_call():
|
||||
raise ValueError("Test")
|
||||
inner_call()
|
||||
|
||||
try:
|
||||
nested_call()
|
||||
except ValueError:
|
||||
result = exception_stack(separator=separator)
|
||||
assert isinstance(result, str)
|
||||
# Check that result is valid (separator only if multiple frames exist)
|
||||
|
||||
@pytest.mark.parametrize("exception_type", [
|
||||
ValueError,
|
||||
TypeError,
|
||||
KeyError,
|
||||
IndexError,
|
||||
AttributeError,
|
||||
RuntimeError,
|
||||
OSError,
|
||||
])
|
||||
def test_exception_stack_various_exception_types(self, exception_type: type[Exception]) -> None:
|
||||
"""Test exception_stack with various exception types"""
|
||||
try:
|
||||
raise exception_type("Test exception")
|
||||
except (ValueError, TypeError, KeyError, IndexError, AttributeError, RuntimeError, OSError):
|
||||
result = exception_stack()
|
||||
assert isinstance(result, str)
|
||||
|
||||
# __END__
|
||||
@@ -1,288 +0,0 @@
|
||||
"""
|
||||
Unit tests for debug_handling.dump_data module
|
||||
"""
|
||||
|
||||
import json
|
||||
from datetime import datetime, date
|
||||
from decimal import Decimal
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
|
||||
|
||||
class TestDumpData:
|
||||
"""Test cases for dump_data function"""
|
||||
|
||||
def test_dump_simple_dict(self):
|
||||
"""Test dumping a simple dictionary"""
|
||||
data = {"name": "John", "age": 30}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed == data
|
||||
|
||||
def test_dump_simple_list(self):
|
||||
"""Test dumping a simple list"""
|
||||
data = [1, 2, 3, 4, 5]
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed == data
|
||||
|
||||
def test_dump_nested_dict(self):
|
||||
"""Test dumping a nested dictionary"""
|
||||
data = {
|
||||
"user": {
|
||||
"name": "Alice",
|
||||
"address": {
|
||||
"city": "Tokyo",
|
||||
"country": "Japan"
|
||||
}
|
||||
}
|
||||
}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed == data
|
||||
|
||||
def test_dump_mixed_types(self):
|
||||
"""Test dumping data with mixed types"""
|
||||
data = {
|
||||
"string": "test",
|
||||
"number": 42,
|
||||
"float": 3.14,
|
||||
"boolean": True,
|
||||
"null": None,
|
||||
"list": [1, 2, 3]
|
||||
}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed == data
|
||||
|
||||
def test_dump_with_indent_default(self):
|
||||
"""Test that indent is applied by default"""
|
||||
data = {"a": 1, "b": 2}
|
||||
result = dump_data(data)
|
||||
|
||||
# With indent, result should contain newlines
|
||||
assert "\n" in result
|
||||
assert " " in result # 4 spaces for indent
|
||||
|
||||
def test_dump_with_indent_true(self):
|
||||
"""Test explicit indent=True"""
|
||||
data = {"a": 1, "b": 2}
|
||||
result = dump_data(data, use_indent=True)
|
||||
|
||||
# With indent, result should contain newlines
|
||||
assert "\n" in result
|
||||
assert " " in result # 4 spaces for indent
|
||||
|
||||
def test_dump_without_indent(self):
|
||||
"""Test dumping without indentation"""
|
||||
data = {"a": 1, "b": 2}
|
||||
result = dump_data(data, use_indent=False)
|
||||
|
||||
# Without indent, result should be compact
|
||||
assert "\n" not in result
|
||||
assert result == '{"a": 1, "b": 2}'
|
||||
|
||||
def test_dump_unicode_characters(self):
|
||||
"""Test that unicode characters are preserved (ensure_ascii=False)"""
|
||||
data = {"message": "こんにちは", "emoji": "😀", "german": "Müller"}
|
||||
result = dump_data(data)
|
||||
|
||||
# Unicode characters should be preserved, not escaped
|
||||
assert "こんにちは" in result
|
||||
assert "😀" in result
|
||||
assert "Müller" in result
|
||||
|
||||
parsed = json.loads(result)
|
||||
assert parsed == data
|
||||
|
||||
def test_dump_datetime_object(self):
|
||||
"""Test dumping data with datetime objects (using default=str)"""
|
||||
now = datetime(2023, 10, 15, 14, 30, 0)
|
||||
data = {"timestamp": now}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
# datetime should be converted to string
|
||||
assert "2023-10-15" in result
|
||||
|
||||
def test_dump_date_object(self):
|
||||
"""Test dumping data with date objects"""
|
||||
today = date(2023, 10, 15)
|
||||
data = {"date": today}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "2023-10-15" in result
|
||||
|
||||
def test_dump_decimal_object(self):
|
||||
"""Test dumping data with Decimal objects"""
|
||||
data = {"amount": Decimal("123.45")}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "123.45" in result
|
||||
|
||||
def test_dump_empty_dict(self):
|
||||
"""Test dumping an empty dictionary"""
|
||||
data = {}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed == {}
|
||||
|
||||
def test_dump_empty_list(self):
|
||||
"""Test dumping an empty list"""
|
||||
data = []
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed == []
|
||||
|
||||
def test_dump_string_directly(self):
|
||||
"""Test dumping a string directly"""
|
||||
data = "Hello, World!"
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed == data
|
||||
|
||||
def test_dump_number_directly(self):
|
||||
"""Test dumping a number directly"""
|
||||
data = 42
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed == data
|
||||
|
||||
def test_dump_boolean_directly(self):
|
||||
"""Test dumping a boolean directly"""
|
||||
data = True
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed is True
|
||||
|
||||
def test_dump_none_directly(self):
|
||||
"""Test dumping None directly"""
|
||||
data = None
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert result == "null"
|
||||
parsed = json.loads(result)
|
||||
assert parsed is None
|
||||
|
||||
def test_dump_complex_nested_structure(self):
|
||||
"""Test dumping a complex nested structure"""
|
||||
data = {
|
||||
"users": [
|
||||
{
|
||||
"id": 1,
|
||||
"name": "Alice",
|
||||
"tags": ["admin", "user"],
|
||||
"metadata": {
|
||||
"created": datetime(2023, 1, 1),
|
||||
"active": True
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Bob",
|
||||
"tags": ["user"],
|
||||
"metadata": {
|
||||
"created": datetime(2023, 6, 15),
|
||||
"active": False
|
||||
}
|
||||
}
|
||||
],
|
||||
"total": 2
|
||||
}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
# Check that it's valid JSON
|
||||
parsed = json.loads(result)
|
||||
assert len(parsed["users"]) == 2
|
||||
assert parsed["total"] == 2
|
||||
|
||||
def test_dump_special_characters(self):
|
||||
"""Test dumping data with special characters"""
|
||||
data = {
|
||||
"quote": 'He said "Hello"',
|
||||
"backslash": "path\\to\\file",
|
||||
"newline": "line1\nline2",
|
||||
"tab": "col1\tcol2"
|
||||
}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed == data
|
||||
|
||||
def test_dump_large_numbers(self):
|
||||
"""Test dumping large numbers"""
|
||||
data = {
|
||||
"big_int": 123456789012345678901234567890,
|
||||
"big_float": 1.23456789e100
|
||||
}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed["big_int"] == data["big_int"]
|
||||
|
||||
def test_dump_list_of_dicts(self):
|
||||
"""Test dumping a list of dictionaries"""
|
||||
data = [
|
||||
{"id": 1, "name": "Item 1"},
|
||||
{"id": 2, "name": "Item 2"},
|
||||
{"id": 3, "name": "Item 3"}
|
||||
]
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
parsed = json.loads(result)
|
||||
assert parsed == data
|
||||
assert len(parsed) == 3
|
||||
|
||||
|
||||
class CustomObject:
|
||||
"""Custom class for testing default=str conversion"""
|
||||
def __init__(self, value: Any):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return f"CustomObject({self.value})"
|
||||
|
||||
|
||||
class TestDumpDataWithCustomObjects:
|
||||
"""Test cases for dump_data with custom objects"""
|
||||
|
||||
def test_dump_custom_object(self):
|
||||
"""Test that custom objects are converted using str()"""
|
||||
obj = CustomObject("test")
|
||||
data = {"custom": obj}
|
||||
result = dump_data(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "CustomObject(test)" in result
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,560 +0,0 @@
|
||||
"""
|
||||
Unit tests for corelibs.debug_handling.profiling module
|
||||
"""
|
||||
|
||||
import time
|
||||
import tracemalloc
|
||||
|
||||
from corelibs.debug_handling.profiling import display_top, Profiling
|
||||
|
||||
|
||||
class TestDisplayTop:
|
||||
"""Test display_top function"""
|
||||
|
||||
def test_display_top_basic(self):
|
||||
"""Test that display_top returns a string with basic stats"""
|
||||
tracemalloc.start()
|
||||
|
||||
# Allocate some memory
|
||||
data = [0] * 10000
|
||||
|
||||
snapshot = tracemalloc.take_snapshot()
|
||||
tracemalloc.stop()
|
||||
|
||||
result = display_top(snapshot)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "Top 10 lines" in result
|
||||
assert "KiB" in result
|
||||
assert "Total allocated size:" in result
|
||||
|
||||
# Clean up
|
||||
del data
|
||||
|
||||
def test_display_top_with_custom_limit(self):
|
||||
"""Test display_top with custom limit parameter"""
|
||||
tracemalloc.start()
|
||||
|
||||
# Allocate some memory
|
||||
data = [0] * 10000
|
||||
|
||||
snapshot = tracemalloc.take_snapshot()
|
||||
tracemalloc.stop()
|
||||
|
||||
result = display_top(snapshot, limit=5)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "Top 5 lines" in result
|
||||
|
||||
# Clean up
|
||||
del data
|
||||
|
||||
def test_display_top_with_different_key_type(self):
|
||||
"""Test display_top with different key_type parameter"""
|
||||
tracemalloc.start()
|
||||
|
||||
# Allocate some memory
|
||||
data = [0] * 10000
|
||||
|
||||
snapshot = tracemalloc.take_snapshot()
|
||||
tracemalloc.stop()
|
||||
|
||||
result = display_top(snapshot, key_type='filename')
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "Top 10 lines" in result
|
||||
|
||||
# Clean up
|
||||
del data
|
||||
|
||||
def test_display_top_filters_traces(self):
|
||||
"""Test that display_top filters out bootstrap and unknown traces"""
|
||||
tracemalloc.start()
|
||||
|
||||
# Allocate some memory
|
||||
data = [0] * 10000
|
||||
|
||||
snapshot = tracemalloc.take_snapshot()
|
||||
tracemalloc.stop()
|
||||
|
||||
result = display_top(snapshot)
|
||||
|
||||
# Should not contain filtered traces
|
||||
assert "<frozen importlib._bootstrap>" not in result
|
||||
assert "<unknown>" not in result
|
||||
|
||||
# Clean up
|
||||
del data
|
||||
|
||||
def test_display_top_with_limit_larger_than_stats(self):
|
||||
"""Test display_top when limit is larger than available stats"""
|
||||
tracemalloc.start()
|
||||
|
||||
# Allocate some memory
|
||||
data = [0] * 100
|
||||
|
||||
snapshot = tracemalloc.take_snapshot()
|
||||
tracemalloc.stop()
|
||||
|
||||
result = display_top(snapshot, limit=1000)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "Top 1000 lines" in result
|
||||
assert "Total allocated size:" in result
|
||||
|
||||
# Clean up
|
||||
del data
|
||||
|
||||
def test_display_top_empty_snapshot(self):
|
||||
"""Test display_top with a snapshot that has minimal traces"""
|
||||
tracemalloc.start()
|
||||
snapshot = tracemalloc.take_snapshot()
|
||||
tracemalloc.stop()
|
||||
|
||||
result = display_top(snapshot, limit=1)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "Top 1 lines" in result
|
||||
|
||||
|
||||
class TestProfilingInitialization:
|
||||
"""Test Profiling class initialization"""
|
||||
|
||||
def test_profiling_initialization(self):
|
||||
"""Test that Profiling initializes correctly"""
|
||||
profiler = Profiling()
|
||||
|
||||
# Should be able to create instance
|
||||
assert isinstance(profiler, Profiling)
|
||||
|
||||
def test_profiling_initial_state(self):
|
||||
"""Test that Profiling starts in a clean state"""
|
||||
profiler = Profiling()
|
||||
|
||||
# Should not raise an error when calling end_profiling
|
||||
# even though start_profiling wasn't called
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
assert isinstance(result, str)
|
||||
|
||||
|
||||
class TestProfilingStartEnd:
|
||||
"""Test start_profiling and end_profiling functionality"""
|
||||
|
||||
def test_start_profiling(self):
|
||||
"""Test that start_profiling can be called"""
|
||||
profiler = Profiling()
|
||||
|
||||
# Should not raise an error
|
||||
profiler.start_profiling("test_operation")
|
||||
|
||||
def test_end_profiling(self):
|
||||
"""Test that end_profiling can be called"""
|
||||
profiler = Profiling()
|
||||
profiler.start_profiling("test_operation")
|
||||
|
||||
# Should not raise an error
|
||||
profiler.end_profiling()
|
||||
|
||||
def test_start_profiling_with_different_idents(self):
|
||||
"""Test start_profiling with different identifier strings"""
|
||||
profiler = Profiling()
|
||||
|
||||
identifiers = ["short", "longer_identifier", "very_long_identifier_with_many_chars"]
|
||||
|
||||
for ident in identifiers:
|
||||
profiler.start_profiling(ident)
|
||||
profiler.end_profiling()
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert ident in result
|
||||
|
||||
def test_end_profiling_without_start(self):
|
||||
"""Test that end_profiling can be called without start_profiling"""
|
||||
profiler = Profiling()
|
||||
|
||||
# Should not raise an error but internal state should indicate warning
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_profiling_measures_time(self):
|
||||
"""Test that profiling measures elapsed time"""
|
||||
profiler = Profiling()
|
||||
profiler.start_profiling("time_test")
|
||||
|
||||
sleep_duration = 0.05 # 50ms
|
||||
time.sleep(sleep_duration)
|
||||
|
||||
profiler.end_profiling()
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "time:" in result
|
||||
# Should have some time measurement
|
||||
assert "ms" in result or "s" in result
|
||||
|
||||
def test_profiling_measures_memory(self):
|
||||
"""Test that profiling measures memory usage"""
|
||||
profiler = Profiling()
|
||||
profiler.start_profiling("memory_test")
|
||||
|
||||
# Allocate some memory
|
||||
data = [0] * 100000
|
||||
|
||||
profiler.end_profiling()
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "RSS:" in result
|
||||
assert "VMS:" in result
|
||||
assert "time:" in result
|
||||
|
||||
# Clean up
|
||||
del data
|
||||
|
||||
|
||||
class TestProfilingPrintProfiling:
|
||||
"""Test print_profiling functionality"""
|
||||
|
||||
def test_print_profiling_returns_string(self):
|
||||
"""Test that print_profiling returns a string"""
|
||||
profiler = Profiling()
|
||||
profiler.start_profiling("test")
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_print_profiling_contains_identifier(self):
|
||||
"""Test that print_profiling includes the identifier"""
|
||||
profiler = Profiling()
|
||||
identifier = "my_test_operation"
|
||||
|
||||
profiler.start_profiling(identifier)
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert identifier in result
|
||||
|
||||
def test_print_profiling_format(self):
|
||||
"""Test that print_profiling has expected format"""
|
||||
profiler = Profiling()
|
||||
profiler.start_profiling("test")
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
|
||||
# Check for expected components
|
||||
assert "Profiling:" in result
|
||||
assert "RSS:" in result
|
||||
assert "VMS:" in result
|
||||
assert "time:" in result
|
||||
|
||||
def test_print_profiling_multiple_calls(self):
|
||||
"""Test that print_profiling can be called multiple times"""
|
||||
profiler = Profiling()
|
||||
profiler.start_profiling("test")
|
||||
profiler.end_profiling()
|
||||
|
||||
result1 = profiler.print_profiling()
|
||||
result2 = profiler.print_profiling()
|
||||
|
||||
# Should return the same result
|
||||
assert result1 == result2
|
||||
|
||||
def test_print_profiling_time_formats(self):
|
||||
"""Test different time format outputs"""
|
||||
profiler = Profiling()
|
||||
|
||||
# Very short duration (milliseconds)
|
||||
profiler.start_profiling("ms_test")
|
||||
time.sleep(0.001)
|
||||
profiler.end_profiling()
|
||||
result = profiler.print_profiling()
|
||||
assert "ms" in result
|
||||
|
||||
# Slightly longer duration (seconds)
|
||||
profiler.start_profiling("s_test")
|
||||
time.sleep(0.1)
|
||||
profiler.end_profiling()
|
||||
result = profiler.print_profiling()
|
||||
# Could be ms or s depending on timing
|
||||
assert ("ms" in result or "s" in result)
|
||||
|
||||
def test_print_profiling_memory_formats(self):
|
||||
"""Test different memory format outputs"""
|
||||
profiler = Profiling()
|
||||
profiler.start_profiling("memory_format_test")
|
||||
|
||||
# Allocate some memory
|
||||
data = [0] * 50000
|
||||
|
||||
profiler.end_profiling()
|
||||
result = profiler.print_profiling()
|
||||
|
||||
# Should have some memory unit (B, kB, MB, GB)
|
||||
assert any(unit in result for unit in ["B", "kB", "MB", "GB"])
|
||||
|
||||
# Clean up
|
||||
del data
|
||||
|
||||
|
||||
class TestProfilingIntegration:
|
||||
"""Integration tests for Profiling class"""
|
||||
|
||||
def test_complete_profiling_cycle(self):
|
||||
"""Test a complete profiling cycle from start to print"""
|
||||
profiler = Profiling()
|
||||
|
||||
profiler.start_profiling("complete_cycle")
|
||||
|
||||
# Do some work
|
||||
data = [i for i in range(10000)]
|
||||
time.sleep(0.01)
|
||||
|
||||
profiler.end_profiling()
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "complete_cycle" in result
|
||||
assert "RSS:" in result
|
||||
assert "VMS:" in result
|
||||
assert "time:" in result
|
||||
|
||||
# Clean up
|
||||
del data
|
||||
|
||||
def test_multiple_profiling_sessions(self):
|
||||
"""Test running multiple profiling sessions"""
|
||||
profiler = Profiling()
|
||||
|
||||
# First session
|
||||
profiler.start_profiling("session_1")
|
||||
time.sleep(0.01)
|
||||
profiler.end_profiling()
|
||||
result1 = profiler.print_profiling()
|
||||
|
||||
# Second session (same profiler instance)
|
||||
profiler.start_profiling("session_2")
|
||||
data = [0] * 100000
|
||||
time.sleep(0.01)
|
||||
profiler.end_profiling()
|
||||
result2 = profiler.print_profiling()
|
||||
|
||||
# Results should be different
|
||||
assert "session_1" in result1
|
||||
assert "session_2" in result2
|
||||
assert result1 != result2
|
||||
|
||||
# Clean up
|
||||
del data
|
||||
|
||||
def test_profiling_with_zero_work(self):
|
||||
"""Test profiling with minimal work"""
|
||||
profiler = Profiling()
|
||||
|
||||
profiler.start_profiling("zero_work")
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "zero_work" in result
|
||||
|
||||
def test_profiling_with_heavy_computation(self):
|
||||
"""Test profiling with heavier computation"""
|
||||
profiler = Profiling()
|
||||
|
||||
profiler.start_profiling("heavy_computation")
|
||||
|
||||
# Do some computation
|
||||
result_data: list[list[int]] = []
|
||||
for _ in range(1000):
|
||||
result_data.append([j * 2 for j in range(100)])
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
profiler.end_profiling()
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "heavy_computation" in result
|
||||
# Should show measurable time and memory
|
||||
assert "time:" in result
|
||||
|
||||
# Clean up
|
||||
del result_data
|
||||
|
||||
def test_independent_profilers(self):
|
||||
"""Test that multiple Profiling instances are independent"""
|
||||
profiler1 = Profiling()
|
||||
profiler2 = Profiling()
|
||||
|
||||
profiler1.start_profiling("profiler_1")
|
||||
time.sleep(0.01)
|
||||
|
||||
profiler2.start_profiling("profiler_2")
|
||||
data = [0] * 100000
|
||||
time.sleep(0.01)
|
||||
|
||||
profiler1.end_profiling()
|
||||
profiler2.end_profiling()
|
||||
|
||||
result1 = profiler1.print_profiling()
|
||||
result2 = profiler2.print_profiling()
|
||||
|
||||
# Should have different identifiers
|
||||
assert "profiler_1" in result1
|
||||
assert "profiler_2" in result2
|
||||
|
||||
# Results should be different
|
||||
assert result1 != result2
|
||||
|
||||
# Clean up
|
||||
del data
|
||||
|
||||
|
||||
class TestProfilingEdgeCases:
|
||||
"""Test edge cases and boundary conditions"""
|
||||
|
||||
def test_empty_identifier(self):
|
||||
"""Test profiling with empty identifier"""
|
||||
profiler = Profiling()
|
||||
|
||||
profiler.start_profiling("")
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "Profiling:" in result
|
||||
|
||||
def test_very_long_identifier(self):
|
||||
"""Test profiling with very long identifier"""
|
||||
profiler = Profiling()
|
||||
|
||||
long_ident = "a" * 100
|
||||
|
||||
profiler.start_profiling(long_ident)
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert long_ident in result
|
||||
|
||||
def test_special_characters_in_identifier(self):
|
||||
"""Test profiling with special characters in identifier"""
|
||||
profiler = Profiling()
|
||||
|
||||
special_ident = "test_@#$%_operation"
|
||||
|
||||
profiler.start_profiling(special_ident)
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert special_ident in result
|
||||
|
||||
def test_rapid_consecutive_profiling(self):
|
||||
"""Test rapid consecutive profiling cycles"""
|
||||
profiler = Profiling()
|
||||
|
||||
for i in range(5):
|
||||
profiler.start_profiling(f"rapid_{i}")
|
||||
profiler.end_profiling()
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert f"rapid_{i}" in result
|
||||
|
||||
def test_profiling_negative_memory_change(self):
|
||||
"""Test profiling when memory usage decreases"""
|
||||
profiler = Profiling()
|
||||
|
||||
# Allocate some memory before profiling
|
||||
pre_data = [0] * 1000000
|
||||
|
||||
profiler.start_profiling("memory_decrease")
|
||||
|
||||
# Free the memory
|
||||
del pre_data
|
||||
|
||||
profiler.end_profiling()
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "memory_decrease" in result
|
||||
# Should handle negative memory change gracefully
|
||||
|
||||
def test_very_short_duration(self):
|
||||
"""Test profiling with extremely short duration"""
|
||||
profiler = Profiling()
|
||||
|
||||
profiler.start_profiling("instant")
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "instant" in result
|
||||
assert "ms" in result # Should show milliseconds for very short duration
|
||||
|
||||
|
||||
class TestProfilingContextManager:
|
||||
"""Test profiling usage patterns similar to context managers"""
|
||||
|
||||
def test_typical_usage_pattern(self):
|
||||
"""Test typical usage pattern for profiling"""
|
||||
profiler = Profiling()
|
||||
|
||||
# Typical pattern
|
||||
profiler.start_profiling("typical_operation")
|
||||
|
||||
# Perform operation
|
||||
result_list: list[int] = []
|
||||
for _ in range(1000):
|
||||
result_list.append(_ * 2)
|
||||
|
||||
profiler.end_profiling()
|
||||
|
||||
# Get results
|
||||
output = profiler.print_profiling()
|
||||
|
||||
assert isinstance(output, str)
|
||||
assert "typical_operation" in output
|
||||
|
||||
# Clean up
|
||||
del result_list
|
||||
|
||||
def test_profiling_without_end(self):
|
||||
"""Test what happens when end_profiling is not called"""
|
||||
profiler = Profiling()
|
||||
|
||||
profiler.start_profiling("no_end")
|
||||
|
||||
# Don't call end_profiling
|
||||
|
||||
result = profiler.print_profiling()
|
||||
|
||||
# Should still return a string (though data might be incomplete)
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_profiling_end_without_start(self):
|
||||
"""Test calling end_profiling multiple times without start"""
|
||||
profiler = Profiling()
|
||||
|
||||
profiler.end_profiling()
|
||||
profiler.end_profiling()
|
||||
|
||||
result = profiler.print_profiling()
|
||||
|
||||
assert isinstance(result, str)
|
||||
|
||||
# __END__
|
||||
@@ -1,405 +0,0 @@
|
||||
"""
|
||||
Unit tests for corelibs.debug_handling.timer module
|
||||
"""
|
||||
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from corelibs.debug_handling.timer import Timer
|
||||
|
||||
|
||||
class TestTimerInitialization:
|
||||
"""Test Timer class initialization"""
|
||||
|
||||
def test_timer_initialization(self):
|
||||
"""Test that Timer initializes with correct default values"""
|
||||
timer = Timer()
|
||||
|
||||
# Check that start times are set
|
||||
assert isinstance(timer.get_overall_start_time(), datetime)
|
||||
assert isinstance(timer.get_start_time(), datetime)
|
||||
|
||||
# Check that end times are None
|
||||
assert timer.get_overall_end_time() is None
|
||||
assert timer.get_end_time() is None
|
||||
|
||||
# Check that run times are None
|
||||
assert timer.get_overall_run_time() is None
|
||||
assert timer.get_run_time() is None
|
||||
|
||||
def test_timer_start_times_are_recent(self):
|
||||
"""Test that start times are set to current time on initialization"""
|
||||
before_init = datetime.now()
|
||||
timer = Timer()
|
||||
after_init = datetime.now()
|
||||
|
||||
overall_start = timer.get_overall_start_time()
|
||||
start = timer.get_start_time()
|
||||
|
||||
assert before_init <= overall_start <= after_init
|
||||
assert before_init <= start <= after_init
|
||||
|
||||
def test_timer_start_times_are_same(self):
|
||||
"""Test that overall_start_time and start_time are initialized to the same time"""
|
||||
timer = Timer()
|
||||
|
||||
overall_start = timer.get_overall_start_time()
|
||||
start = timer.get_start_time()
|
||||
|
||||
# They should be very close (within a few microseconds)
|
||||
time_diff = abs((overall_start - start).total_seconds())
|
||||
assert time_diff < 0.001 # Less than 1 millisecond
|
||||
|
||||
|
||||
class TestOverallRunTime:
|
||||
"""Test overall run time functionality"""
|
||||
|
||||
def test_overall_run_time_returns_timedelta(self):
|
||||
"""Test that overall_run_time returns a timedelta object"""
|
||||
timer = Timer()
|
||||
time.sleep(0.01) # Sleep for 10ms
|
||||
|
||||
result = timer.overall_run_time()
|
||||
|
||||
assert isinstance(result, timedelta)
|
||||
|
||||
def test_overall_run_time_sets_end_time(self):
|
||||
"""Test that calling overall_run_time sets the end time"""
|
||||
timer = Timer()
|
||||
|
||||
assert timer.get_overall_end_time() is None
|
||||
|
||||
timer.overall_run_time()
|
||||
|
||||
assert isinstance(timer.get_overall_end_time(), datetime)
|
||||
|
||||
def test_overall_run_time_sets_run_time(self):
|
||||
"""Test that calling overall_run_time sets the run time"""
|
||||
timer = Timer()
|
||||
|
||||
assert timer.get_overall_run_time() is None
|
||||
|
||||
timer.overall_run_time()
|
||||
|
||||
assert isinstance(timer.get_overall_run_time(), timedelta)
|
||||
|
||||
def test_overall_run_time_accuracy(self):
|
||||
"""Test that overall_run_time calculates time difference accurately"""
|
||||
timer = Timer()
|
||||
sleep_duration = 0.05 # 50ms
|
||||
time.sleep(sleep_duration)
|
||||
|
||||
result = timer.overall_run_time()
|
||||
|
||||
# Allow for some variance (10ms tolerance)
|
||||
assert sleep_duration - 0.01 <= result.total_seconds() <= sleep_duration + 0.01
|
||||
|
||||
def test_overall_run_time_multiple_calls(self):
|
||||
"""Test that calling overall_run_time multiple times updates the values"""
|
||||
timer = Timer()
|
||||
time.sleep(0.01)
|
||||
|
||||
first_result = timer.overall_run_time()
|
||||
first_end_time = timer.get_overall_end_time()
|
||||
|
||||
time.sleep(0.01)
|
||||
|
||||
second_result = timer.overall_run_time()
|
||||
second_end_time = timer.get_overall_end_time()
|
||||
|
||||
# Second call should have longer runtime
|
||||
assert second_result > first_result
|
||||
assert second_end_time is not None
|
||||
assert first_end_time is not None
|
||||
# End time should be updated
|
||||
assert second_end_time > first_end_time
|
||||
|
||||
def test_overall_run_time_consistency(self):
|
||||
"""Test that get_overall_run_time returns the same value as overall_run_time"""
|
||||
timer = Timer()
|
||||
time.sleep(0.01)
|
||||
|
||||
calculated_time = timer.overall_run_time()
|
||||
retrieved_time = timer.get_overall_run_time()
|
||||
|
||||
assert calculated_time == retrieved_time
|
||||
|
||||
|
||||
class TestRunTime:
|
||||
"""Test run time functionality"""
|
||||
|
||||
def test_run_time_returns_timedelta(self):
|
||||
"""Test that run_time returns a timedelta object"""
|
||||
timer = Timer()
|
||||
time.sleep(0.01)
|
||||
|
||||
result = timer.run_time()
|
||||
|
||||
assert isinstance(result, timedelta)
|
||||
|
||||
def test_run_time_sets_end_time(self):
|
||||
"""Test that calling run_time sets the end time"""
|
||||
timer = Timer()
|
||||
|
||||
assert timer.get_end_time() is None
|
||||
|
||||
timer.run_time()
|
||||
|
||||
assert isinstance(timer.get_end_time(), datetime)
|
||||
|
||||
def test_run_time_sets_run_time(self):
|
||||
"""Test that calling run_time sets the run time"""
|
||||
timer = Timer()
|
||||
|
||||
assert timer.get_run_time() is None
|
||||
|
||||
timer.run_time()
|
||||
|
||||
assert isinstance(timer.get_run_time(), timedelta)
|
||||
|
||||
def test_run_time_accuracy(self):
|
||||
"""Test that run_time calculates time difference accurately"""
|
||||
timer = Timer()
|
||||
sleep_duration = 0.05 # 50ms
|
||||
time.sleep(sleep_duration)
|
||||
|
||||
result = timer.run_time()
|
||||
|
||||
# Allow for some variance (10ms tolerance)
|
||||
assert sleep_duration - 0.01 <= result.total_seconds() <= sleep_duration + 0.01
|
||||
|
||||
def test_run_time_multiple_calls(self):
|
||||
"""Test that calling run_time multiple times updates the values"""
|
||||
timer = Timer()
|
||||
time.sleep(0.01)
|
||||
|
||||
first_result = timer.run_time()
|
||||
first_end_time = timer.get_end_time()
|
||||
|
||||
time.sleep(0.01)
|
||||
|
||||
second_result = timer.run_time()
|
||||
second_end_time = timer.get_end_time()
|
||||
|
||||
# Second call should have longer runtime
|
||||
assert second_result > first_result
|
||||
assert second_end_time is not None
|
||||
assert first_end_time is not None
|
||||
# End time should be updated
|
||||
assert second_end_time > first_end_time
|
||||
|
||||
def test_run_time_consistency(self):
|
||||
"""Test that get_run_time returns the same value as run_time"""
|
||||
timer = Timer()
|
||||
time.sleep(0.01)
|
||||
|
||||
calculated_time = timer.run_time()
|
||||
retrieved_time = timer.get_run_time()
|
||||
|
||||
assert calculated_time == retrieved_time
|
||||
|
||||
|
||||
class TestResetRunTime:
|
||||
"""Test reset_run_time functionality"""
|
||||
|
||||
def test_reset_run_time_resets_start_time(self):
|
||||
"""Test that reset_run_time updates the start time"""
|
||||
timer = Timer()
|
||||
original_start = timer.get_start_time()
|
||||
|
||||
time.sleep(0.02)
|
||||
timer.reset_run_time()
|
||||
|
||||
new_start = timer.get_start_time()
|
||||
|
||||
assert new_start > original_start
|
||||
|
||||
def test_reset_run_time_clears_end_time(self):
|
||||
"""Test that reset_run_time clears the end time"""
|
||||
timer = Timer()
|
||||
timer.run_time()
|
||||
|
||||
assert timer.get_end_time() is not None
|
||||
|
||||
timer.reset_run_time()
|
||||
|
||||
assert timer.get_end_time() is None
|
||||
|
||||
def test_reset_run_time_clears_run_time(self):
|
||||
"""Test that reset_run_time clears the run time"""
|
||||
timer = Timer()
|
||||
timer.run_time()
|
||||
|
||||
assert timer.get_run_time() is not None
|
||||
|
||||
timer.reset_run_time()
|
||||
|
||||
assert timer.get_run_time() is None
|
||||
|
||||
def test_reset_run_time_does_not_affect_overall_times(self):
|
||||
"""Test that reset_run_time does not affect overall times"""
|
||||
timer = Timer()
|
||||
|
||||
overall_start = timer.get_overall_start_time()
|
||||
timer.overall_run_time()
|
||||
overall_end = timer.get_overall_end_time()
|
||||
overall_run = timer.get_overall_run_time()
|
||||
|
||||
timer.reset_run_time()
|
||||
|
||||
# Overall times should remain unchanged
|
||||
assert timer.get_overall_start_time() == overall_start
|
||||
assert timer.get_overall_end_time() == overall_end
|
||||
assert timer.get_overall_run_time() == overall_run
|
||||
|
||||
def test_reset_run_time_allows_new_measurement(self):
|
||||
"""Test that reset_run_time allows for new time measurements"""
|
||||
timer = Timer()
|
||||
time.sleep(0.02)
|
||||
timer.run_time()
|
||||
|
||||
first_run_time = timer.get_run_time()
|
||||
|
||||
timer.reset_run_time()
|
||||
time.sleep(0.01)
|
||||
timer.run_time()
|
||||
|
||||
second_run_time = timer.get_run_time()
|
||||
|
||||
assert second_run_time is not None
|
||||
assert first_run_time is not None
|
||||
# Second measurement should be shorter since we reset
|
||||
assert second_run_time < first_run_time
|
||||
|
||||
|
||||
class TestTimerIntegration:
|
||||
"""Integration tests for Timer class"""
|
||||
|
||||
def test_independent_timers(self):
|
||||
"""Test that multiple Timer instances are independent"""
|
||||
timer1 = Timer()
|
||||
time.sleep(0.01)
|
||||
timer2 = Timer()
|
||||
|
||||
# timer1 should have earlier start time
|
||||
assert timer1.get_start_time() < timer2.get_start_time()
|
||||
assert timer1.get_overall_start_time() < timer2.get_overall_start_time()
|
||||
|
||||
def test_overall_and_run_time_independence(self):
|
||||
"""Test that overall time and run time are independent"""
|
||||
timer = Timer()
|
||||
time.sleep(0.02)
|
||||
|
||||
# Reset run time but not overall
|
||||
timer.reset_run_time()
|
||||
time.sleep(0.01)
|
||||
|
||||
run_time = timer.run_time()
|
||||
overall_time = timer.overall_run_time()
|
||||
|
||||
# Overall time should be longer than run time
|
||||
assert overall_time > run_time
|
||||
|
||||
def test_typical_usage_pattern(self):
|
||||
"""Test a typical usage pattern of the Timer class"""
|
||||
timer = Timer()
|
||||
|
||||
# Measure first operation
|
||||
time.sleep(0.01)
|
||||
first_operation = timer.run_time()
|
||||
assert first_operation.total_seconds() > 0
|
||||
|
||||
# Reset and measure second operation
|
||||
timer.reset_run_time()
|
||||
time.sleep(0.01)
|
||||
second_operation = timer.run_time()
|
||||
assert second_operation.total_seconds() > 0
|
||||
|
||||
# Get overall time
|
||||
overall = timer.overall_run_time()
|
||||
|
||||
# Overall should be greater than individual operations
|
||||
assert overall > first_operation
|
||||
assert overall > second_operation
|
||||
|
||||
def test_zero_sleep_timer(self):
|
||||
"""Test timer with minimal sleep (edge case)"""
|
||||
timer = Timer()
|
||||
|
||||
# Call run_time immediately
|
||||
result = timer.run_time()
|
||||
|
||||
# Should still return a valid timedelta (very small)
|
||||
assert isinstance(result, timedelta)
|
||||
assert result.total_seconds() >= 0
|
||||
|
||||
def test_getter_methods_before_calculation(self):
|
||||
"""Test that getter methods return None before calculation methods are called"""
|
||||
timer = Timer()
|
||||
|
||||
# Before calling run_time()
|
||||
assert timer.get_end_time() is None
|
||||
assert timer.get_run_time() is None
|
||||
|
||||
# Before calling overall_run_time()
|
||||
assert timer.get_overall_end_time() is None
|
||||
assert timer.get_overall_run_time() is None
|
||||
|
||||
# But start times should always be set
|
||||
assert timer.get_start_time() is not None
|
||||
assert timer.get_overall_start_time() is not None
|
||||
|
||||
|
||||
class TestTimerEdgeCases:
|
||||
"""Test edge cases and boundary conditions"""
|
||||
|
||||
def test_rapid_consecutive_calls(self):
|
||||
"""Test rapid consecutive calls to run_time"""
|
||||
timer = Timer()
|
||||
|
||||
results: list[timedelta] = []
|
||||
for _ in range(5):
|
||||
results.append(timer.run_time())
|
||||
|
||||
# Each result should be greater than or equal to the previous
|
||||
for i in range(1, len(results)):
|
||||
assert results[i] >= results[i - 1]
|
||||
|
||||
def test_very_short_duration(self):
|
||||
"""Test timer with very short duration"""
|
||||
timer = Timer()
|
||||
result = timer.run_time()
|
||||
|
||||
# Should be a very small positive timedelta
|
||||
assert isinstance(result, timedelta)
|
||||
assert result.total_seconds() >= 0
|
||||
assert result.total_seconds() < 0.1 # Less than 100ms
|
||||
|
||||
def test_reset_multiple_times(self):
|
||||
"""Test resetting the timer multiple times"""
|
||||
timer = Timer()
|
||||
|
||||
for _ in range(3):
|
||||
timer.reset_run_time()
|
||||
time.sleep(0.01)
|
||||
result = timer.run_time()
|
||||
|
||||
assert isinstance(result, timedelta)
|
||||
assert result.total_seconds() > 0
|
||||
|
||||
def test_overall_time_persists_through_resets(self):
|
||||
"""Test that overall time continues even when run_time is reset"""
|
||||
timer = Timer()
|
||||
|
||||
time.sleep(0.01)
|
||||
timer.reset_run_time()
|
||||
|
||||
time.sleep(0.01)
|
||||
timer.reset_run_time()
|
||||
|
||||
overall = timer.overall_run_time()
|
||||
|
||||
# Overall time should reflect total elapsed time
|
||||
assert overall.total_seconds() >= 0.02
|
||||
|
||||
# __END__
|
||||
@@ -1,975 +0,0 @@
|
||||
"""
|
||||
Unit tests for debug_handling.writeline module
|
||||
"""
|
||||
|
||||
import io
|
||||
import pytest
|
||||
from pytest import CaptureFixture
|
||||
|
||||
from corelibs.debug_handling.writeline import (
|
||||
write_l,
|
||||
pr_header,
|
||||
pr_title,
|
||||
pr_open,
|
||||
pr_close,
|
||||
pr_act
|
||||
)
|
||||
|
||||
|
||||
class TestWriteL:
|
||||
"""Test cases for write_l function"""
|
||||
|
||||
def test_write_l_print_only(self, capsys: CaptureFixture[str]):
|
||||
"""Test write_l with print_line=True and no file"""
|
||||
write_l("Test line", print_line=True)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "Test line\n"
|
||||
|
||||
def test_write_l_no_print_no_file(self, capsys: CaptureFixture[str]):
|
||||
"""Test write_l with print_line=False and no file (should do nothing)"""
|
||||
write_l("Test line", print_line=False)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == ""
|
||||
|
||||
def test_write_l_file_only(self, capsys: CaptureFixture[str]):
|
||||
"""Test write_l with file handler only (no print)"""
|
||||
fpl = io.StringIO()
|
||||
write_l("Test line", fpl=fpl, print_line=False)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == ""
|
||||
assert fpl.getvalue() == "Test line\n"
|
||||
fpl.close()
|
||||
|
||||
def test_write_l_both_print_and_file(self, capsys: CaptureFixture[str]):
|
||||
"""Test write_l with both print and file output"""
|
||||
fpl = io.StringIO()
|
||||
write_l("Test line", fpl=fpl, print_line=True)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "Test line\n"
|
||||
assert fpl.getvalue() == "Test line\n"
|
||||
fpl.close()
|
||||
|
||||
def test_write_l_multiple_lines_to_file(self):
|
||||
"""Test write_l writing multiple lines to file"""
|
||||
fpl = io.StringIO()
|
||||
write_l("Line 1", fpl=fpl, print_line=False)
|
||||
write_l("Line 2", fpl=fpl, print_line=False)
|
||||
write_l("Line 3", fpl=fpl, print_line=False)
|
||||
assert fpl.getvalue() == "Line 1\nLine 2\nLine 3\n"
|
||||
fpl.close()
|
||||
|
||||
def test_write_l_empty_string(self, capsys: CaptureFixture[str]):
|
||||
"""Test write_l with empty string"""
|
||||
fpl = io.StringIO()
|
||||
write_l("", fpl=fpl, print_line=True)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "\n"
|
||||
assert fpl.getvalue() == "\n"
|
||||
fpl.close()
|
||||
|
||||
def test_write_l_special_characters(self):
|
||||
"""Test write_l with special characters"""
|
||||
fpl = io.StringIO()
|
||||
special_line = "Special: \t\n\r\\ 特殊文字 €"
|
||||
write_l(special_line, fpl=fpl, print_line=False)
|
||||
assert special_line + "\n" in fpl.getvalue()
|
||||
fpl.close()
|
||||
|
||||
def test_write_l_long_string(self):
|
||||
"""Test write_l with long string"""
|
||||
fpl = io.StringIO()
|
||||
long_line = "A" * 1000
|
||||
write_l(long_line, fpl=fpl, print_line=False)
|
||||
assert fpl.getvalue() == long_line + "\n"
|
||||
fpl.close()
|
||||
|
||||
def test_write_l_unicode_content(self):
|
||||
"""Test write_l with unicode content"""
|
||||
fpl = io.StringIO()
|
||||
unicode_line = "Hello 世界 🌍 Привет"
|
||||
write_l(unicode_line, fpl=fpl, print_line=False)
|
||||
assert fpl.getvalue() == unicode_line + "\n"
|
||||
fpl.close()
|
||||
|
||||
def test_write_l_default_parameters(self, capsys: CaptureFixture[str]):
|
||||
"""Test write_l with default parameters"""
|
||||
write_l("Test")
|
||||
captured = capsys.readouterr()
|
||||
# Default print_line is False
|
||||
assert captured.out == ""
|
||||
|
||||
def test_write_l_with_newline_in_string(self):
|
||||
"""Test write_l with newline characters in the string"""
|
||||
fpl = io.StringIO()
|
||||
write_l("Line with\nnewline", fpl=fpl, print_line=False)
|
||||
assert fpl.getvalue() == "Line with\nnewline\n"
|
||||
fpl.close()
|
||||
|
||||
|
||||
class TestPrHeader:
|
||||
"""Test cases for pr_header function"""
|
||||
|
||||
def test_pr_header_default(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with default parameters"""
|
||||
pr_header("TEST")
|
||||
captured = capsys.readouterr()
|
||||
assert "#" in captured.out
|
||||
assert "TEST" in captured.out
|
||||
|
||||
def test_pr_header_custom_marker(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with custom marker string"""
|
||||
pr_header("TEST", marker_string="*")
|
||||
captured = capsys.readouterr()
|
||||
assert "*" in captured.out
|
||||
assert "TEST" in captured.out
|
||||
assert "#" not in captured.out
|
||||
|
||||
def test_pr_header_custom_width(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with custom width"""
|
||||
pr_header("TEST", width=50)
|
||||
captured = capsys.readouterr()
|
||||
# Check that output is formatted
|
||||
assert "TEST" in captured.out
|
||||
|
||||
def test_pr_header_short_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with short tag"""
|
||||
pr_header("X")
|
||||
captured = capsys.readouterr()
|
||||
assert "X" in captured.out
|
||||
assert "#" in captured.out
|
||||
|
||||
def test_pr_header_long_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with long tag"""
|
||||
pr_header("This is a very long header tag")
|
||||
captured = capsys.readouterr()
|
||||
assert "This is a very long header tag" in captured.out
|
||||
|
||||
def test_pr_header_empty_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with empty tag"""
|
||||
pr_header("")
|
||||
captured = capsys.readouterr()
|
||||
assert "#" in captured.out
|
||||
|
||||
def test_pr_header_special_characters(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with special characters in tag"""
|
||||
pr_header("TEST: 123! @#$")
|
||||
captured = capsys.readouterr()
|
||||
assert "TEST: 123! @#$" in captured.out
|
||||
|
||||
def test_pr_header_unicode(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with unicode characters"""
|
||||
pr_header("テスト 🎉")
|
||||
captured = capsys.readouterr()
|
||||
assert "テスト 🎉" in captured.out
|
||||
|
||||
def test_pr_header_various_markers(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with various marker strings"""
|
||||
markers = ["*", "=", "-", "+", "~", "@"]
|
||||
for marker in markers:
|
||||
pr_header("TEST", marker_string=marker)
|
||||
captured = capsys.readouterr()
|
||||
assert marker in captured.out
|
||||
assert "TEST" in captured.out
|
||||
|
||||
def test_pr_header_zero_width(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with width of 0"""
|
||||
pr_header("TEST", width=0)
|
||||
captured = capsys.readouterr()
|
||||
assert "TEST" in captured.out
|
||||
|
||||
def test_pr_header_large_width(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with large width"""
|
||||
pr_header("TEST", width=100)
|
||||
captured = capsys.readouterr()
|
||||
assert "TEST" in captured.out
|
||||
assert "#" in captured.out
|
||||
|
||||
def test_pr_header_format(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header output format"""
|
||||
pr_header("CENTER", marker_string="#", width=20)
|
||||
captured = capsys.readouterr()
|
||||
# Should have spaces around centered text
|
||||
assert " CENTER " in captured.out or "CENTER" in captured.out
|
||||
|
||||
|
||||
class TestPrTitle:
|
||||
"""Test cases for pr_title function"""
|
||||
|
||||
def test_pr_title_default(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with default parameters"""
|
||||
pr_title("Test Title")
|
||||
captured = capsys.readouterr()
|
||||
assert "Test Title" in captured.out
|
||||
assert "|" in captured.out
|
||||
assert "." in captured.out
|
||||
assert ":" in captured.out
|
||||
|
||||
def test_pr_title_custom_prefix(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with custom prefix string"""
|
||||
pr_title("Test", prefix_string=">")
|
||||
captured = capsys.readouterr()
|
||||
assert ">" in captured.out
|
||||
assert "Test" in captured.out
|
||||
assert "|" not in captured.out
|
||||
|
||||
def test_pr_title_custom_space_filler(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with custom space filler"""
|
||||
pr_title("Test", space_filler="-")
|
||||
captured = capsys.readouterr()
|
||||
assert "Test" in captured.out
|
||||
assert "-" in captured.out
|
||||
assert "." not in captured.out
|
||||
|
||||
def test_pr_title_custom_width(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with custom width"""
|
||||
pr_title("Test", width=50)
|
||||
captured = capsys.readouterr()
|
||||
assert "Test" in captured.out
|
||||
|
||||
def test_pr_title_short_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with short tag"""
|
||||
pr_title("X")
|
||||
captured = capsys.readouterr()
|
||||
assert "X" in captured.out
|
||||
assert "." in captured.out
|
||||
|
||||
def test_pr_title_long_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with long tag"""
|
||||
pr_title("This is a very long title tag")
|
||||
captured = capsys.readouterr()
|
||||
assert "This is a very long title tag" in captured.out
|
||||
|
||||
def test_pr_title_empty_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with empty tag"""
|
||||
pr_title("")
|
||||
captured = capsys.readouterr()
|
||||
assert "|" in captured.out
|
||||
assert ":" in captured.out
|
||||
|
||||
def test_pr_title_special_characters(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with special characters"""
|
||||
pr_title("Task #123!")
|
||||
captured = capsys.readouterr()
|
||||
assert "Task #123!" in captured.out
|
||||
|
||||
def test_pr_title_unicode(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with unicode characters"""
|
||||
pr_title("タイトル 📝")
|
||||
captured = capsys.readouterr()
|
||||
assert "タイトル 📝" in captured.out
|
||||
|
||||
def test_pr_title_various_fillers(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with various space fillers"""
|
||||
fillers = [".", "-", "_", "*", " ", "~"]
|
||||
for filler in fillers:
|
||||
pr_title("Test", space_filler=filler)
|
||||
captured = capsys.readouterr()
|
||||
assert "Test" in captured.out
|
||||
|
||||
def test_pr_title_zero_width(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with width of 0"""
|
||||
pr_title("Test", width=0)
|
||||
captured = capsys.readouterr()
|
||||
assert "Test" in captured.out
|
||||
|
||||
def test_pr_title_large_width(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with large width"""
|
||||
pr_title("Test", width=100)
|
||||
captured = capsys.readouterr()
|
||||
assert "Test" in captured.out
|
||||
|
||||
def test_pr_title_format_left_align(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title output format (should be left-aligned with filler)"""
|
||||
pr_title("Start", space_filler=".", width=10)
|
||||
captured = capsys.readouterr()
|
||||
# Should have the tag followed by dots
|
||||
assert "Start" in captured.out
|
||||
assert ":" in captured.out
|
||||
|
||||
|
||||
class TestPrOpen:
|
||||
"""Test cases for pr_open function"""
|
||||
|
||||
def test_pr_open_default(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open with default parameters"""
|
||||
pr_open("Processing")
|
||||
captured = capsys.readouterr()
|
||||
assert "Processing" in captured.out
|
||||
assert "|" in captured.out
|
||||
assert "." in captured.out
|
||||
assert "[" in captured.out
|
||||
# Should not have newline at the end
|
||||
assert not captured.out.endswith("\n")
|
||||
|
||||
def test_pr_open_custom_prefix(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open with custom prefix string"""
|
||||
pr_open("Task", prefix_string=">")
|
||||
captured = capsys.readouterr()
|
||||
assert ">" in captured.out
|
||||
assert "Task" in captured.out
|
||||
assert "|" not in captured.out
|
||||
|
||||
def test_pr_open_custom_space_filler(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open with custom space filler"""
|
||||
pr_open("Task", space_filler="-")
|
||||
captured = capsys.readouterr()
|
||||
assert "Task" in captured.out
|
||||
assert "-" in captured.out
|
||||
assert "." not in captured.out
|
||||
|
||||
def test_pr_open_custom_width(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open with custom width"""
|
||||
pr_open("Task", width=50)
|
||||
captured = capsys.readouterr()
|
||||
assert "Task" in captured.out
|
||||
assert "[" in captured.out
|
||||
|
||||
def test_pr_open_short_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open with short tag"""
|
||||
pr_open("X")
|
||||
captured = capsys.readouterr()
|
||||
assert "X" in captured.out
|
||||
assert "[" in captured.out
|
||||
|
||||
def test_pr_open_long_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open with long tag"""
|
||||
pr_open("This is a very long task tag")
|
||||
captured = capsys.readouterr()
|
||||
assert "This is a very long task tag" in captured.out
|
||||
|
||||
def test_pr_open_empty_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open with empty tag"""
|
||||
pr_open("")
|
||||
captured = capsys.readouterr()
|
||||
assert "[" in captured.out
|
||||
assert "|" in captured.out
|
||||
|
||||
def test_pr_open_no_newline(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open doesn't end with newline"""
|
||||
pr_open("Test")
|
||||
captured = capsys.readouterr()
|
||||
# Output should not end with newline (uses end="")
|
||||
assert not captured.out.endswith("\n")
|
||||
|
||||
def test_pr_open_special_characters(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open with special characters"""
|
||||
pr_open("Loading: 50%")
|
||||
captured = capsys.readouterr()
|
||||
assert "Loading: 50%" in captured.out
|
||||
|
||||
def test_pr_open_unicode(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open with unicode characters"""
|
||||
pr_open("処理中 ⏳")
|
||||
captured = capsys.readouterr()
|
||||
assert "処理中 ⏳" in captured.out
|
||||
|
||||
def test_pr_open_format(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_open output format"""
|
||||
pr_open("Task", prefix_string="|", space_filler=".", width=20)
|
||||
captured = capsys.readouterr()
|
||||
assert "|" in captured.out
|
||||
assert "Task" in captured.out
|
||||
assert "[" in captured.out
|
||||
|
||||
|
||||
class TestPrClose:
|
||||
"""Test cases for pr_close function"""
|
||||
|
||||
def test_pr_close_default(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_close with default (empty) tag"""
|
||||
pr_close()
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "]\n"
|
||||
|
||||
def test_pr_close_with_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_close with custom tag"""
|
||||
pr_close("DONE")
|
||||
captured = capsys.readouterr()
|
||||
assert "DONE" in captured.out
|
||||
assert "]" in captured.out
|
||||
assert captured.out.endswith("\n")
|
||||
|
||||
def test_pr_close_with_space(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_close with space in tag"""
|
||||
pr_close(" OK ")
|
||||
captured = capsys.readouterr()
|
||||
assert " OK " in captured.out
|
||||
assert "]" in captured.out
|
||||
|
||||
def test_pr_close_empty_string(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_close with empty string (same as default)"""
|
||||
pr_close("")
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "]\n"
|
||||
|
||||
def test_pr_close_special_characters(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_close with special characters"""
|
||||
pr_close("✓")
|
||||
captured = capsys.readouterr()
|
||||
assert "✓" in captured.out
|
||||
assert "]" in captured.out
|
||||
|
||||
def test_pr_close_unicode(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_close with unicode characters"""
|
||||
pr_close("完了")
|
||||
captured = capsys.readouterr()
|
||||
assert "完了" in captured.out
|
||||
assert "]" in captured.out
|
||||
|
||||
def test_pr_close_newline(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_close ends with newline"""
|
||||
pr_close("OK")
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out.endswith("\n")
|
||||
|
||||
def test_pr_close_various_tags(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_close with various tags"""
|
||||
tags = ["OK", "DONE", "✓", "✗", "SKIP", "PASS", "FAIL"]
|
||||
for tag in tags:
|
||||
pr_close(tag)
|
||||
captured = capsys.readouterr()
|
||||
assert tag in captured.out
|
||||
assert "]" in captured.out
|
||||
|
||||
|
||||
class TestPrAct:
|
||||
"""Test cases for pr_act function"""
|
||||
|
||||
def test_pr_act_default(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act with default dot"""
|
||||
pr_act()
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "."
|
||||
assert not captured.out.endswith("\n")
|
||||
|
||||
def test_pr_act_custom_character(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act with custom character"""
|
||||
pr_act("#")
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "#"
|
||||
|
||||
def test_pr_act_multiple_calls(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act with multiple calls"""
|
||||
pr_act(".")
|
||||
pr_act(".")
|
||||
pr_act(".")
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "..."
|
||||
|
||||
def test_pr_act_various_characters(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act with various characters"""
|
||||
characters = [".", "#", "*", "+", "-", "=", ">", "~"]
|
||||
for char in characters:
|
||||
pr_act(char)
|
||||
captured = capsys.readouterr()
|
||||
assert "".join(characters) in captured.out
|
||||
|
||||
def test_pr_act_empty_string(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act with empty string"""
|
||||
pr_act("")
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == ""
|
||||
|
||||
def test_pr_act_special_character(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act with special characters"""
|
||||
pr_act("✓")
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "✓"
|
||||
|
||||
def test_pr_act_unicode(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act with unicode character"""
|
||||
pr_act("●")
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "●"
|
||||
|
||||
def test_pr_act_no_newline(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act doesn't add newline"""
|
||||
pr_act("x")
|
||||
captured = capsys.readouterr()
|
||||
assert not captured.out.endswith("\n")
|
||||
|
||||
def test_pr_act_multiple_characters(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act with multiple characters in string"""
|
||||
pr_act("...")
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "..."
|
||||
|
||||
def test_pr_act_whitespace(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act with whitespace"""
|
||||
pr_act(" ")
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == " "
|
||||
|
||||
|
||||
class TestProgressCombinations:
|
||||
"""Test combinations of progress printer functions"""
|
||||
|
||||
def test_complete_progress_flow(self, capsys: CaptureFixture[str]):
|
||||
"""Test complete progress output flow"""
|
||||
pr_header("PROCESS")
|
||||
pr_title("Task 1")
|
||||
pr_open("Subtask")
|
||||
pr_act(".")
|
||||
pr_act(".")
|
||||
pr_act(".")
|
||||
pr_close(" OK")
|
||||
captured = capsys.readouterr()
|
||||
|
||||
assert "PROCESS" in captured.out
|
||||
assert "Task 1" in captured.out
|
||||
assert "Subtask" in captured.out
|
||||
assert "..." in captured.out
|
||||
assert " OK]" in captured.out
|
||||
|
||||
def test_multiple_tasks_progress(self, capsys: CaptureFixture[str]):
|
||||
"""Test multiple tasks with progress"""
|
||||
pr_header("BATCH PROCESS")
|
||||
for i in range(3):
|
||||
pr_open(f"Task {i + 1}")
|
||||
for _ in range(5):
|
||||
pr_act(".")
|
||||
pr_close(" DONE")
|
||||
captured = capsys.readouterr()
|
||||
|
||||
assert "BATCH PROCESS" in captured.out
|
||||
assert "Task 1" in captured.out
|
||||
assert "Task 2" in captured.out
|
||||
assert "Task 3" in captured.out
|
||||
assert " DONE]" in captured.out
|
||||
|
||||
def test_nested_progress(self, capsys: CaptureFixture[str]):
|
||||
"""Test nested progress indicators"""
|
||||
pr_header("MAIN TASK", marker_string="=")
|
||||
pr_title("Subtask A", prefix_string=">")
|
||||
pr_open("Processing")
|
||||
pr_act("#")
|
||||
pr_act("#")
|
||||
pr_close()
|
||||
pr_title("Subtask B", prefix_string=">")
|
||||
pr_open("Processing")
|
||||
pr_act("*")
|
||||
pr_act("*")
|
||||
pr_close(" OK")
|
||||
captured = capsys.readouterr()
|
||||
|
||||
assert "MAIN TASK" in captured.out
|
||||
assert "Subtask A" in captured.out
|
||||
assert "Subtask B" in captured.out
|
||||
assert "##" in captured.out
|
||||
assert "**" in captured.out
|
||||
|
||||
def test_progress_with_different_markers(self, capsys: CaptureFixture[str]):
|
||||
"""Test progress with different marker styles"""
|
||||
pr_header("Process", marker_string="*")
|
||||
pr_title("Step 1", prefix_string=">>", space_filler="-")
|
||||
pr_open("Work", prefix_string=">>", space_filler="-")
|
||||
pr_act("+")
|
||||
pr_close(" ✓")
|
||||
captured = capsys.readouterr()
|
||||
|
||||
assert "*" in captured.out
|
||||
assert ">>" in captured.out
|
||||
assert "-" in captured.out
|
||||
assert "+" in captured.out
|
||||
assert "✓" in captured.out
|
||||
|
||||
def test_empty_progress_sequence(self, capsys: CaptureFixture[str]):
|
||||
"""Test progress sequence with no actual progress"""
|
||||
pr_open("Quick task")
|
||||
pr_close(" SKIP")
|
||||
captured = capsys.readouterr()
|
||||
|
||||
assert "Quick task" in captured.out
|
||||
assert " SKIP]" in captured.out
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests combining multiple scenarios"""
|
||||
|
||||
def test_file_and_console_logging(self, capsys: CaptureFixture[str]):
|
||||
"""Test logging to both file and console"""
|
||||
fpl = io.StringIO()
|
||||
|
||||
write_l("Starting process", fpl=fpl, print_line=True)
|
||||
write_l("Processing item 1", fpl=fpl, print_line=True)
|
||||
write_l("Processing item 2", fpl=fpl, print_line=True)
|
||||
write_l("Complete", fpl=fpl, print_line=True)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
file_content = fpl.getvalue()
|
||||
|
||||
# Check console output
|
||||
assert "Starting process\n" in captured.out
|
||||
assert "Processing item 1\n" in captured.out
|
||||
assert "Processing item 2\n" in captured.out
|
||||
assert "Complete\n" in captured.out
|
||||
|
||||
# Check file output
|
||||
assert "Starting process\n" in file_content
|
||||
assert "Processing item 1\n" in file_content
|
||||
assert "Processing item 2\n" in file_content
|
||||
assert "Complete\n" in file_content
|
||||
|
||||
fpl.close()
|
||||
|
||||
def test_progress_with_logging(self, capsys: CaptureFixture[str]):
|
||||
"""Test combining progress output with file logging"""
|
||||
fpl = io.StringIO()
|
||||
|
||||
write_l("=== Process Start ===", fpl=fpl, print_line=True)
|
||||
pr_header("MAIN PROCESS")
|
||||
write_l("Header shown", fpl=fpl, print_line=False)
|
||||
|
||||
pr_open("Task 1")
|
||||
pr_act(".")
|
||||
pr_act(".")
|
||||
pr_close(" OK")
|
||||
write_l("Task 1 completed", fpl=fpl, print_line=False)
|
||||
|
||||
write_l("=== Process End ===", fpl=fpl, print_line=True)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
file_content = fpl.getvalue()
|
||||
|
||||
assert "=== Process Start ===" in captured.out
|
||||
assert "MAIN PROCESS" in captured.out
|
||||
assert "Task 1" in captured.out
|
||||
assert "=== Process End ===" in captured.out
|
||||
|
||||
assert "=== Process Start ===\n" in file_content
|
||||
assert "Header shown\n" in file_content
|
||||
assert "Task 1 completed\n" in file_content
|
||||
assert "=== Process End ===\n" in file_content
|
||||
|
||||
fpl.close()
|
||||
|
||||
def test_complex_workflow(self, capsys: CaptureFixture[str]):
|
||||
"""Test complex workflow with all functions"""
|
||||
fpl = io.StringIO()
|
||||
|
||||
write_l("Log: Starting batch process", fpl=fpl, print_line=False)
|
||||
pr_header("BATCH PROCESSOR", marker_string="=", width=40)
|
||||
|
||||
for i in range(2):
|
||||
write_l(f"Log: Processing batch {i + 1}", fpl=fpl, print_line=False)
|
||||
pr_title(f"Batch {i + 1}", prefix_string="|", space_filler=".")
|
||||
|
||||
pr_open(f"Item {i + 1}", prefix_string="|", space_filler=".")
|
||||
for j in range(3):
|
||||
pr_act("*")
|
||||
write_l(f"Log: Progress {j + 1}/3", fpl=fpl, print_line=False)
|
||||
pr_close(" ✓")
|
||||
|
||||
write_l(f"Log: Batch {i + 1} complete", fpl=fpl, print_line=False)
|
||||
|
||||
write_l("Log: All batches complete", fpl=fpl, print_line=False)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
file_content = fpl.getvalue()
|
||||
|
||||
# Check console has progress indicators
|
||||
assert "BATCH PROCESSOR" in captured.out
|
||||
assert "Batch 1" in captured.out
|
||||
assert "Batch 2" in captured.out
|
||||
assert "***" in captured.out
|
||||
assert "✓" in captured.out
|
||||
|
||||
# Check file has all log entries
|
||||
assert "Log: Starting batch process\n" in file_content
|
||||
assert "Log: Processing batch 1\n" in file_content
|
||||
assert "Log: Processing batch 2\n" in file_content
|
||||
assert "Log: Progress 1/3\n" in file_content
|
||||
assert "Log: Batch 1 complete\n" in file_content
|
||||
assert "Log: All batches complete\n" in file_content
|
||||
|
||||
fpl.close()
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and boundary conditions"""
|
||||
|
||||
def test_write_l_none_file_handler(self, capsys: CaptureFixture[str]):
|
||||
"""Test write_l explicitly with None file handler"""
|
||||
write_l("Test", fpl=None, print_line=True)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "Test\n"
|
||||
|
||||
def test_pr_header_negative_width(self):
|
||||
"""Test pr_header with negative width raises ValueError"""
|
||||
with pytest.raises(ValueError):
|
||||
pr_header("Test", width=-10)
|
||||
|
||||
def test_pr_title_negative_width(self):
|
||||
"""Test pr_title with negative width raises ValueError"""
|
||||
with pytest.raises(ValueError):
|
||||
pr_title("Test", width=-10)
|
||||
|
||||
def test_pr_open_negative_width(self):
|
||||
"""Test pr_open with negative width raises ValueError"""
|
||||
with pytest.raises(ValueError):
|
||||
pr_open("Test", width=-10)
|
||||
|
||||
def test_multiple_pr_act_no_close(self, capsys: CaptureFixture[str]):
|
||||
"""Test multiple pr_act calls without pr_close"""
|
||||
pr_act(".")
|
||||
pr_act(".")
|
||||
pr_act(".")
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == "..."
|
||||
|
||||
def test_pr_close_without_pr_open(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_close without prior pr_open (should still work)"""
|
||||
pr_close(" OK")
|
||||
captured = capsys.readouterr()
|
||||
assert " OK]" in captured.out
|
||||
|
||||
def test_very_long_strings(self):
|
||||
"""Test with very long strings"""
|
||||
fpl = io.StringIO()
|
||||
long_str = "A" * 10000
|
||||
write_l(long_str, fpl=fpl, print_line=False)
|
||||
assert len(fpl.getvalue()) == 10001 # string + newline
|
||||
fpl.close()
|
||||
|
||||
def test_pr_header_very_long_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with tag longer than width"""
|
||||
pr_header("This is a very long tag that exceeds the width", width=10)
|
||||
captured = capsys.readouterr()
|
||||
assert "This is a very long tag that exceeds the width" in captured.out
|
||||
|
||||
def test_pr_title_very_long_tag(self, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with tag longer than width"""
|
||||
pr_title("This is a very long tag that exceeds the width", width=10)
|
||||
captured = capsys.readouterr()
|
||||
assert "This is a very long tag that exceeds the width" in captured.out
|
||||
|
||||
def test_write_l_closed_file(self):
|
||||
"""Test write_l with closed file should raise error"""
|
||||
fpl = io.StringIO()
|
||||
fpl.close()
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
write_l("Test", fpl=fpl, print_line=False)
|
||||
|
||||
|
||||
class TestParametrized:
|
||||
"""Parametrized tests for comprehensive coverage"""
|
||||
|
||||
@pytest.mark.parametrize("print_line", [True, False])
|
||||
def test_write_l_print_line_variations(self, print_line: bool, capsys: CaptureFixture[str]):
|
||||
"""Test write_l with different print_line values"""
|
||||
write_l("Test", print_line=print_line)
|
||||
captured = capsys.readouterr()
|
||||
if print_line:
|
||||
assert captured.out == "Test\n"
|
||||
else:
|
||||
assert captured.out == ""
|
||||
|
||||
@pytest.mark.parametrize("marker", ["#", "*", "=", "-", "+", "~", "@", "^"])
|
||||
def test_pr_header_various_markers_param(self, marker: str, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with various markers"""
|
||||
pr_header("TEST", marker_string=marker)
|
||||
captured = capsys.readouterr()
|
||||
assert marker in captured.out
|
||||
assert "TEST" in captured.out
|
||||
|
||||
@pytest.mark.parametrize("width", [0, 5, 10, 20, 35, 50, 100])
|
||||
def test_pr_header_various_widths(self, width: int, capsys: CaptureFixture[str]):
|
||||
"""Test pr_header with various widths"""
|
||||
pr_header("TEST", width=width)
|
||||
captured = capsys.readouterr()
|
||||
assert "TEST" in captured.out
|
||||
|
||||
@pytest.mark.parametrize("filler", [".", "-", "_", "*", " ", "~", "="])
|
||||
def test_pr_title_various_fillers_param(self, filler: str, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with various space fillers"""
|
||||
pr_title("Test", space_filler=filler)
|
||||
captured = capsys.readouterr()
|
||||
assert "Test" in captured.out
|
||||
|
||||
@pytest.mark.parametrize("prefix", ["|", ">", ">>", "*", "-", "+"])
|
||||
def test_pr_title_various_prefixes(self, prefix: str, capsys: CaptureFixture[str]):
|
||||
"""Test pr_title with various prefix strings"""
|
||||
pr_title("Test", prefix_string=prefix)
|
||||
captured = capsys.readouterr()
|
||||
assert prefix in captured.out
|
||||
assert "Test" in captured.out
|
||||
|
||||
@pytest.mark.parametrize("act_char", [".", "#", "*", "+", "-", "=", ">", "~", "✓", "●"])
|
||||
def test_pr_act_various_characters_param(self, act_char: str, capsys: CaptureFixture[str]):
|
||||
"""Test pr_act with various characters"""
|
||||
pr_act(act_char)
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == act_char
|
||||
|
||||
@pytest.mark.parametrize("close_tag", ["", " OK", " DONE", " ✓", " ✗", " SKIP", " PASS"])
|
||||
def test_pr_close_various_tags_param(self, close_tag: str, capsys: CaptureFixture[str]):
|
||||
"""Test pr_close with various tags"""
|
||||
pr_close(close_tag)
|
||||
captured = capsys.readouterr()
|
||||
assert f"{close_tag}]" in captured.out
|
||||
|
||||
@pytest.mark.parametrize("content", [
|
||||
"Simple text",
|
||||
"Text with 特殊文字",
|
||||
"Text with emoji 🎉",
|
||||
"Text\twith\ttabs",
|
||||
"Multiple\n\nNewlines",
|
||||
"",
|
||||
"A" * 100,
|
||||
])
|
||||
def test_write_l_various_content(self, content: str, capsys: CaptureFixture[str]):
|
||||
"""Test write_l with various content types"""
|
||||
fpl = io.StringIO()
|
||||
write_l(content, fpl=fpl, print_line=True)
|
||||
captured = capsys.readouterr()
|
||||
assert content in captured.out
|
||||
assert content + "\n" in fpl.getvalue()
|
||||
fpl.close()
|
||||
|
||||
|
||||
class TestRealWorldScenarios:
|
||||
"""Test real-world usage scenarios"""
|
||||
|
||||
def test_batch_processing_output(self, capsys: CaptureFixture[str]):
|
||||
"""Test typical batch processing output"""
|
||||
pr_header("BATCH PROCESSOR", marker_string="=", width=50)
|
||||
|
||||
items = ["file1.txt", "file2.txt", "file3.txt"]
|
||||
for item in items:
|
||||
pr_open(f"Processing {item}")
|
||||
for _ in range(10):
|
||||
pr_act(".")
|
||||
pr_close(" ✓")
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "BATCH PROCESSOR" in captured.out
|
||||
for item in items:
|
||||
assert item in captured.out
|
||||
assert "✓" in captured.out
|
||||
|
||||
def test_logging_workflow(self, capsys: CaptureFixture[str]):
|
||||
"""Test typical logging workflow"""
|
||||
log_file = io.StringIO()
|
||||
|
||||
# Simulate a workflow with logging
|
||||
write_l("[INFO] Starting process", fpl=log_file, print_line=True)
|
||||
write_l("[INFO] Initializing components", fpl=log_file, print_line=True)
|
||||
write_l("[DEBUG] Component A loaded", fpl=log_file, print_line=False)
|
||||
write_l("[DEBUG] Component B loaded", fpl=log_file, print_line=False)
|
||||
write_l("[INFO] Processing data", fpl=log_file, print_line=True)
|
||||
write_l("[INFO] Process complete", fpl=log_file, print_line=True)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
log_content = log_file.getvalue()
|
||||
|
||||
# Console should only have INFO messages
|
||||
assert "[INFO] Starting process" in captured.out
|
||||
assert "[DEBUG] Component A loaded" not in captured.out
|
||||
|
||||
# Log file should have all messages
|
||||
assert "[INFO] Starting process\n" in log_content
|
||||
assert "[DEBUG] Component A loaded\n" in log_content
|
||||
assert "[DEBUG] Component B loaded\n" in log_content
|
||||
|
||||
log_file.close()
|
||||
|
||||
def test_progress_indicator_for_long_task(self, capsys: CaptureFixture[str]):
|
||||
"""Test progress indicator for a long-running task"""
|
||||
pr_header("DATA PROCESSING")
|
||||
pr_open("Loading data", width=50)
|
||||
|
||||
# Simulate progress
|
||||
for i in range(20):
|
||||
if i % 5 == 0:
|
||||
pr_act(str(i // 5))
|
||||
else:
|
||||
pr_act(".")
|
||||
|
||||
pr_close(" COMPLETE")
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "DATA PROCESSING" in captured.out
|
||||
assert "Loading data" in captured.out
|
||||
assert "COMPLETE" in captured.out
|
||||
|
||||
def test_multi_stage_process(self, capsys: CaptureFixture[str]):
|
||||
"""Test multi-stage process with titles and progress"""
|
||||
pr_header("DEPLOYMENT PIPELINE", marker_string="=")
|
||||
|
||||
stages = ["Build", "Test", "Deploy"]
|
||||
for stage in stages:
|
||||
pr_title(stage)
|
||||
pr_open(f"Running {stage.lower()}")
|
||||
pr_act("#")
|
||||
pr_act("#")
|
||||
pr_act("#")
|
||||
pr_close(" OK")
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "DEPLOYMENT PIPELINE" in captured.out
|
||||
for stage in stages:
|
||||
assert stage in captured.out
|
||||
assert "###" in captured.out
|
||||
|
||||
def test_error_reporting_with_logging(self, capsys: CaptureFixture[str]):
|
||||
"""Test error reporting workflow"""
|
||||
error_log = io.StringIO()
|
||||
|
||||
pr_header("VALIDATION", marker_string="!")
|
||||
pr_open("Checking files")
|
||||
|
||||
write_l("[ERROR] File not found: data.csv", fpl=error_log, print_line=False)
|
||||
pr_act("✗")
|
||||
|
||||
write_l("[ERROR] Permission denied: output.txt", fpl=error_log, print_line=False)
|
||||
pr_act("✗")
|
||||
|
||||
pr_close(" FAILED")
|
||||
|
||||
captured = capsys.readouterr()
|
||||
log_content = error_log.getvalue()
|
||||
|
||||
assert "VALIDATION" in captured.out
|
||||
assert "Checking files" in captured.out
|
||||
assert "✗✗" in captured.out
|
||||
assert "FAILED" in captured.out
|
||||
|
||||
assert "[ERROR] File not found: data.csv\n" in log_content
|
||||
assert "[ERROR] Permission denied: output.txt\n" in log_content
|
||||
|
||||
error_log.close()
|
||||
|
||||
def test_detailed_reporting(self, capsys: CaptureFixture[str]):
|
||||
"""Test detailed reporting with mixed output"""
|
||||
report_file = io.StringIO()
|
||||
|
||||
pr_header("SYSTEM REPORT", marker_string="#", width=60)
|
||||
write_l("=== System Report Generated ===", fpl=report_file, print_line=False)
|
||||
|
||||
pr_title("Database Status", prefix_string=">>")
|
||||
write_l("Database: Connected", fpl=report_file, print_line=False)
|
||||
write_l("Tables: 15", fpl=report_file, print_line=False)
|
||||
write_l("Records: 1,234,567", fpl=report_file, print_line=False)
|
||||
|
||||
pr_title("API Status", prefix_string=">>")
|
||||
write_l("API: Online", fpl=report_file, print_line=False)
|
||||
write_l("Requests/min: 1,500", fpl=report_file, print_line=False)
|
||||
|
||||
write_l("=== Report Complete ===", fpl=report_file, print_line=False)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
report_content = report_file.getvalue()
|
||||
|
||||
assert "SYSTEM REPORT" in captured.out
|
||||
assert "Database Status" in captured.out
|
||||
assert "API Status" in captured.out
|
||||
|
||||
assert "=== System Report Generated ===\n" in report_content
|
||||
assert "Database: Connected\n" in report_content
|
||||
assert "API: Online\n" in report_content
|
||||
assert "=== Report Complete ===\n" in report_content
|
||||
|
||||
report_file.close()
|
||||
|
||||
# __END__
|
||||
Reference in New Issue
Block a user