diff --git a/pyproject.toml b/pyproject.toml index 695aa49..3c55761 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,8 +7,11 @@ readme = "README.md" requires-python = ">=3.13" dependencies = [ "corelibs-datetime>=1.0.1", + "corelibs-debug>=1.0.0", + "corelibs-dump-data>=1.0.0", "corelibs-enum-base>=1.0.0", "corelibs-regex-checks>=1.0.0", + "corelibs-stack-trace>=1.0.0", "corelibs-text-colors>=1.0.0", "corelibs-var>=1.0.0", "cryptography>=46.0.3", diff --git a/src/corelibs/db_handling/sql_main.py b/src/corelibs/db_handling/sql_main.py index a8d5a25..da96d4f 100644 --- a/src/corelibs/db_handling/sql_main.py +++ b/src/corelibs/db_handling/sql_main.py @@ -12,7 +12,7 @@ TODO: adapt more CoreLibs DB IO class flow here """ from typing import TYPE_CHECKING, Any, Literal -from corelibs.debug_handling.debug_helpers import call_stack +from corelibs_stack_trace.stack import call_stack from corelibs.db_handling.sqlite_io import SQLiteIO if TYPE_CHECKING: from corelibs.logging_handling.log import Logger diff --git a/src/corelibs/db_handling/sqlite_io.py b/src/corelibs/db_handling/sqlite_io.py index 3c11ef2..8c048b4 100644 --- a/src/corelibs/db_handling/sqlite_io.py +++ b/src/corelibs/db_handling/sqlite_io.py @@ -8,7 +8,7 @@ also method names are subject to change from pathlib import Path from typing import Any, Literal, TYPE_CHECKING import sqlite3 -from corelibs.debug_handling.debug_helpers import call_stack +from corelibs_stack_trace.stack import call_stack if TYPE_CHECKING: from corelibs.logging_handling.log import Logger diff --git a/src/corelibs/debug_handling/debug_helpers.py b/src/corelibs/debug_handling/debug_helpers.py index 30d0c51..4f34a95 100644 --- a/src/corelibs/debug_handling/debug_helpers.py +++ b/src/corelibs/debug_handling/debug_helpers.py @@ -2,16 +2,16 @@ Various debug helpers """ -import traceback -import os -import sys +from warnings import deprecated from typing import Tuple, Type from types import TracebackType +from corelibs_stack_trace.stack import call_stack as call_stack_ng, exception_stack as exception_stack_ng # _typeshed.OptExcInfo OptExcInfo = Tuple[None, None, None] | Tuple[Type[BaseException], BaseException, TracebackType] +@deprecated("Use corelibs_stack_trace.stack.call_stack instead") def call_stack( start: int = 0, skip_last: int = -1, @@ -31,23 +31,15 @@ def call_stack( Returns: str -- _description_ """ - # stack = traceback.extract_stack()[start:depth] - # how many of the last entries we skip (so we do not get self), default is -1 - # start cannot be negative - if skip_last > 0: - skip_last = skip_last * -1 - stack = traceback.extract_stack() - __stack = stack[start:skip_last] - # start possible to high, reset start to 0 - if not __stack and reset_start_if_empty: - start = 0 - __stack = stack[start:skip_last] - if not separator: - separator = ' -> ' - # print(f"* HERE: {dump_data(stack)}") - return f"{separator}".join(f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}" for f in __stack) + return call_stack_ng( + start=start, + skip_last=skip_last, + separator=separator, + reset_start_if_empty=reset_start_if_empty + ) +@deprecated("Use corelibs_stack_trace.stack.exception_stack instead") def exception_stack( exc_stack: OptExcInfo | None = None, separator: str = ' -> ' @@ -62,15 +54,9 @@ def exception_stack( Returns: str -- _description_ """ - if exc_stack is not None: - _, _, exc_traceback = exc_stack - else: - exc_traceback = None - _, _, exc_traceback = sys.exc_info() - stack = traceback.extract_tb(exc_traceback) - if not separator: - separator = ' -> ' - # print(f"* HERE: {dump_data(stack)}") - return f"{separator}".join(f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}" for f in stack) + return exception_stack_ng( + exc_stack=exc_stack, + separator=separator + ) # __END__ diff --git a/src/corelibs/debug_handling/dump_data.py b/src/corelibs/debug_handling/dump_data.py index cc769cd..c69ae56 100644 --- a/src/corelibs/debug_handling/dump_data.py +++ b/src/corelibs/debug_handling/dump_data.py @@ -2,10 +2,12 @@ dict dump as JSON formatted """ -import json +from warnings import deprecated from typing import Any +from corelibs_dump_data.dump_data import dump_data as dump_data_ng +@deprecated("Use corelibs_dump_data.dump_data.dump_data instead") def dump_data(data: Any, use_indent: bool = True) -> str: """ dump formated output from dict/list @@ -16,7 +18,6 @@ def dump_data(data: Any, use_indent: bool = True) -> str: Returns: str: _description_ """ - indent = 4 if use_indent else None - return json.dumps(data, indent=indent, ensure_ascii=False, default=str) + return dump_data_ng(data=data, use_indent=use_indent) # __END__ diff --git a/src/corelibs/debug_handling/profiling.py b/src/corelibs/debug_handling/profiling.py index 75f80da..c0924da 100644 --- a/src/corelibs/debug_handling/profiling.py +++ b/src/corelibs/debug_handling/profiling.py @@ -4,123 +4,40 @@ Profile memory usage in Python # https://docs.python.org/3/library/tracemalloc.html -import os -import time -import tracemalloc -import linecache -from typing import Tuple -from tracemalloc import Snapshot -import psutil +from warnings import warn, deprecated +from typing import TYPE_CHECKING +from corelibs_debug.profiling import display_top as display_top_ng, display_top_str, Profiling as CoreLibsProfiling +if TYPE_CHECKING: + from tracemalloc import Snapshot -def display_top(snapshot: Snapshot, key_type: str = 'lineno', limit: int = 10) -> str: +@deprecated("Use corelibs_debug.profiling.display_top_str with data from display_top instead") +def display_top(snapshot: 'Snapshot', key_type: str = 'lineno', limit: int = 10) -> str: """ Print tracmalloc stats https://docs.python.org/3/library/tracemalloc.html#pretty-top Args: - snapshot (Snapshot): _description_ + snapshot ('Snapshot'): _description_ key_type (str, optional): _description_. Defaults to 'lineno'. limit (int, optional): _description_. Defaults to 10. """ - snapshot = snapshot.filter_traces(( - tracemalloc.Filter(False, ""), - tracemalloc.Filter(False, ""), - )) - top_stats = snapshot.statistics(key_type) - - profiler_msg = f"Top {limit} lines" - for index, stat in enumerate(top_stats[:limit], 1): - frame = stat.traceback[0] - # replace "/path/to/module/file.py" with "module/file.py" - filename = os.sep.join(frame.filename.split(os.sep)[-2:]) - profiler_msg += f"#{index}: {filename}:{frame.lineno}: {(stat.size / 1024):.1f} KiB" - line = linecache.getline(frame.filename, frame.lineno).strip() - if line: - profiler_msg += f" {line}" - - other = top_stats[limit:] - if other: - size = sum(stat.size for stat in other) - profiler_msg += f"{len(other)} other: {(size / 1024):.1f} KiB" - total = sum(stat.size for stat in top_stats) - profiler_msg += f"Total allocated size: {(total / 1024):.1f} KiB" - return profiler_msg + return display_top_str( + display_top_ng( + snapshot=snapshot, + key_type=key_type, + limit=limit + ) + ) -class Profiling: +class Profiling(CoreLibsProfiling): """ Profile memory usage and elapsed time for some block Based on: https://stackoverflow.com/a/53301648 """ - def __init__(self): - # profiling id - self.__ident: str = '' - # memory - self.__rss_before: int = 0 - self.__vms_before: int = 0 - # self.shared_before: int = 0 - self.__rss_used: int = 0 - self.__vms_used: int = 0 - # self.shared_used: int = 0 - # time - self.__call_start: float = 0 - self.__elapsed = 0 - def __get_process_memory(self) -> Tuple[int, int]: - process = psutil.Process(os.getpid()) - mi = process.memory_info() - # macos does not have mi.shared - return mi.rss, mi.vms - - def __elapsed_since(self) -> str: - elapsed = time.time() - self.__call_start - if elapsed < 1: - return str(round(elapsed * 1000, 2)) + "ms" - if elapsed < 60: - return str(round(elapsed, 2)) + "s" - if elapsed < 3600: - return str(round(elapsed / 60, 2)) + "min" - return str(round(elapsed / 3600, 2)) + "hrs" - - def __format_bytes(self, bytes_data: int) -> str: - if abs(bytes_data) < 1000: - return str(bytes_data) + "B" - if abs(bytes_data) < 1e6: - return str(round(bytes_data / 1e3, 2)) + "kB" - if abs(bytes_data) < 1e9: - return str(round(bytes_data / 1e6, 2)) + "MB" - return str(round(bytes_data / 1e9, 2)) + "GB" - - def start_profiling(self, ident: str) -> None: - """ - start the profiling - """ - self.__ident = ident - self.__rss_before, self.__vms_before = self.__get_process_memory() - self.__call_start = time.time() - - def end_profiling(self) -> None: - """ - end the profiling - """ - if self.__rss_before == 0 and self.__vms_before == 0: - print("start_profile() was not called, output will be negative") - self.__elapsed = self.__elapsed_since() - __rss_after, __vms_after = self.__get_process_memory() - self.__rss_used = __rss_after - self.__rss_before - self.__vms_used = __vms_after - self.__vms_before - - def print_profiling(self) -> str: - """ - print the profiling time - """ - return ( - f"Profiling: {self.__ident:>20} " - f"RSS: {self.__format_bytes(self.__rss_used):>8} | " - f"VMS: {self.__format_bytes(self.__vms_used):>8} | " - f"time: {self.__elapsed:>8}" - ) +warn("Use corelibs_debug.profiling.Profiling instead", DeprecationWarning, stacklevel=2) # __END__ diff --git a/src/corelibs/debug_handling/timer.py b/src/corelibs/debug_handling/timer.py index a5dcc3c..72f65ac 100644 --- a/src/corelibs/debug_handling/timer.py +++ b/src/corelibs/debug_handling/timer.py @@ -5,109 +5,16 @@ Returns: Timer: class timer for basic time run calculations """ -from datetime import datetime, timedelta +from warnings import warn +from corelibs_debug.timer import Timer as CorelibsTimer -class Timer: +class Timer(CorelibsTimer): """ get difference between start and end date/time """ - def __init__(self): - """ - init new start time and set end time to None - """ - self._overall_start_time = datetime.now() - self._overall_end_time = None - self._overall_run_time = None - self._start_time = datetime.now() - self._end_time = None - self._run_time = None - # MARK: overall run time - def overall_run_time(self) -> timedelta: - """ - overall run time difference from class launch to call of this function - - Returns: - timedelta: _description_ - """ - self._overall_end_time = datetime.now() - self._overall_run_time = self._overall_end_time - self._overall_start_time - return self._overall_run_time - - def get_overall_start_time(self) -> datetime: - """ - get set start time - - Returns: - datetime: _description_ - """ - return self._overall_start_time - - def get_overall_end_time(self) -> datetime | None: - """ - get set end time or None for not set - - Returns: - datetime|None: _description_ - """ - return self._overall_end_time - - def get_overall_run_time(self) -> timedelta | None: - """ - get run time or None if run time was not called - - Returns: - datetime|None: _description_ - """ - return self._overall_run_time - - # MARK: set run time - def run_time(self) -> timedelta: - """ - difference between start time and current time - - Returns: - datetime: _description_ - """ - self._end_time = datetime.now() - self._run_time = self._end_time - self._start_time - return self._run_time - - def reset_run_time(self): - """ - reset start/end and run tine - """ - self._start_time = datetime.now() - self._end_time = None - self._run_time = None - - def get_start_time(self) -> datetime: - """ - get set start time - - Returns: - datetime: _description_ - """ - return self._start_time - - def get_end_time(self) -> datetime | None: - """ - get set end time or None for not set - - Returns: - datetime|None: _description_ - """ - return self._end_time - - def get_run_time(self) -> timedelta | None: - """ - get run time or None if run time was not called - - Returns: - datetime|None: _description_ - """ - return self._run_time +warn("Use corelibs_debug.timer.Timer instead", DeprecationWarning, stacklevel=2) # __END__ diff --git a/src/corelibs/debug_handling/writeline.py b/src/corelibs/debug_handling/writeline.py index ff031f2..92f7933 100644 --- a/src/corelibs/debug_handling/writeline.py +++ b/src/corelibs/debug_handling/writeline.py @@ -2,11 +2,18 @@ Various small helpers for data writing """ +from warnings import deprecated from typing import TYPE_CHECKING +from corelibs_debug.writeline import ( + write_l as write_l_ng, pr_header as pr_header_ng, + pr_title as pr_title_ng, pr_open as pr_open_ng, + pr_close as pr_close_ng, pr_act as pr_act_ng +) if TYPE_CHECKING: from io import TextIOWrapper, StringIO +@deprecated("Use corelibs_debug.writeline.write_l instead") def write_l(line: str, fpl: 'TextIOWrapper | StringIO | None' = None, print_line: bool = False): """ Write a line to screen and to output file @@ -15,23 +22,30 @@ def write_l(line: str, fpl: 'TextIOWrapper | StringIO | None' = None, print_line line (String): Line to write fpl (Resource): file handler resource, if none write only to console """ - if print_line is True: - print(line) - if fpl is not None: - fpl.write(line + "\n") + return write_l_ng( + line=line, + fpl=fpl, + print_line=print_line + ) # progress printers +@deprecated("Use corelibs_debug.writeline.pr_header instead") def pr_header(tag: str, marker_string: str = '#', width: int = 35): """_summary_ Args: tag (str): _description_ """ - print(f" {marker_string} {tag:^{width}} {marker_string}") + return pr_header_ng( + tag=tag, + marker_string=marker_string, + width=width + ) +@deprecated("Use corelibs_debug.writeline.pr_title instead") def pr_title(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35): """_summary_ @@ -39,9 +53,15 @@ def pr_title(tag: str, prefix_string: str = '|', space_filler: str = '.', width: tag (str): _description_ prefix_string (str, optional): _description_. Defaults to '|'. """ - print(f" {prefix_string} {tag:{space_filler}<{width}}:", flush=True) + return pr_title_ng( + tag=tag, + prefix_string=prefix_string, + space_filler=space_filler, + width=width + ) +@deprecated("Use corelibs_debug.writeline.pr_open instead") def pr_open(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35): """ writen progress open line with tag @@ -50,9 +70,15 @@ def pr_open(tag: str, prefix_string: str = '|', space_filler: str = '.', width: tag (str): _description_ prefix_string (str): prefix string. Default: '|' """ - print(f" {prefix_string} {tag:{space_filler}<{width}} [", end="", flush=True) + return pr_open_ng( + tag=tag, + prefix_string=prefix_string, + space_filler=space_filler, + width=width + ) +@deprecated("Use corelibs_debug.writeline.pr_close instead") def pr_close(tag: str = ''): """ write the close tag with new line @@ -60,9 +86,10 @@ def pr_close(tag: str = ''): Args: tag (str, optional): _description_. Defaults to ''. """ - print(f"{tag}]", flush=True) + return pr_close_ng(tag=tag) +@deprecated("Use corelibs_debug.writeline.pr_act instead") def pr_act(act: str = "."): """ write progress character @@ -70,6 +97,6 @@ def pr_act(act: str = "."): Args: act (str, optional): _description_. Defaults to ".". """ - print(f"{act}", end="", flush=True) + return pr_act_ng(act=act) # __EMD__ diff --git a/src/corelibs/exceptions/__init__.py b/src/corelibs/exceptions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/corelibs/logging_handling/log.py b/src/corelibs/logging_handling/log.py index f0a5542..0e97a86 100644 --- a/src/corelibs/logging_handling/log.py +++ b/src/corelibs/logging_handling/log.py @@ -13,9 +13,9 @@ from pathlib import Path import atexit from enum import Flag, auto from typing import MutableMapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast +from corelibs_stack_trace.stack import call_stack, exception_stack from corelibs_text_colors.text_colors import Colors from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel -from corelibs.debug_handling.debug_helpers import call_stack, exception_stack if TYPE_CHECKING: from multiprocessing import Queue diff --git a/test-run/config_handling/settings_loader.py b/test-run/config_handling/settings_loader.py index 82a8575..a21518b 100644 --- a/test-run/config_handling/settings_loader.py +++ b/test-run/config_handling/settings_loader.py @@ -4,7 +4,7 @@ Settings loader test import re from pathlib import Path -from corelibs.debug_handling.dump_data import dump_data +from corelibs_dump_data.dump_data import dump_data from corelibs.logging_handling.log import Log from corelibs.config_handling.settings_loader import SettingsLoader from corelibs.config_handling.settings_loader_handling.settings_loader_check import SettingsLoaderCheck diff --git a/test-run/db_handling/sql_main.py b/test-run/db_handling/sql_main.py index 747da3d..3444b77 100644 --- a/test-run/db_handling/sql_main.py +++ b/test-run/db_handling/sql_main.py @@ -5,7 +5,7 @@ SQL Main wrapper test from pathlib import Path from uuid import uuid4 import json -from corelibs.debug_handling.dump_data import dump_data +from corelibs_dump_data.dump_data import dump_data from corelibs.logging_handling.log import Log, Logger from corelibs.db_handling.sql_main import SQLMain diff --git a/test-run/db_handling/sqlite_io.py b/test-run/db_handling/sqlite_io.py index 51f538f..92710b8 100644 --- a/test-run/db_handling/sqlite_io.py +++ b/test-run/db_handling/sqlite_io.py @@ -6,7 +6,7 @@ from pathlib import Path from uuid import uuid4 import json import sqlite3 -from corelibs.debug_handling.dump_data import dump_data +from corelibs_dump_data.dump_data import dump_data from corelibs.logging_handling.log import Log, Logger from corelibs.db_handling.sqlite_io import SQLiteIO diff --git a/test-run/encryption/symmetric_encryption.py b/test-run/encryption/symmetric_encryption.py index a7fcb76..8badd08 100644 --- a/test-run/encryption/symmetric_encryption.py +++ b/test-run/encryption/symmetric_encryption.py @@ -5,7 +5,7 @@ Symmetric encryption test """ import json -from corelibs.debug_handling.dump_data import dump_data +from corelibs_dump_data.dump_data import dump_data from corelibs.encryption_handling.symmetric_encryption import SymmetricEncryption diff --git a/test-run/file_handling/file_bom_check.py b/test-run/file_handling/file_bom_check.py index 01213ef..c481a70 100644 --- a/test-run/file_handling/file_bom_check.py +++ b/test-run/file_handling/file_bom_check.py @@ -6,7 +6,7 @@ BOM check for files from pathlib import Path from corelibs.file_handling.file_bom_encoding import is_bom_encoded, is_bom_encoded_info -from corelibs.debug_handling.dump_data import dump_data +from corelibs_dump_data.dump_data import dump_data def main() -> None: diff --git a/test-run/iterator_handling/data_search.py b/test-run/iterator_handling/data_search.py index 501c55d..82ea182 100644 --- a/test-run/iterator_handling/data_search.py +++ b/test-run/iterator_handling/data_search.py @@ -5,7 +5,7 @@ Search data tests iterator_handling.data_search """ -from corelibs.debug_handling.dump_data import dump_data +from corelibs_dump_data.dump_data import dump_data from corelibs.iterator_handling.data_search import find_in_array_from_list, ArraySearchList diff --git a/test-run/iterator_handling/dict_helpers.py b/test-run/iterator_handling/dict_helpers.py index 6ad5462..dd19cd8 100644 --- a/test-run/iterator_handling/dict_helpers.py +++ b/test-run/iterator_handling/dict_helpers.py @@ -3,7 +3,7 @@ Iterator helper testing """ from typing import Any -from corelibs.debug_handling.dump_data import dump_data +from corelibs_dump_data.dump_data import dump_data from corelibs.iterator_handling.dict_mask import mask from corelibs.iterator_handling.dict_helpers import set_entry diff --git a/test-run/iterator_handling/list_helpers.py b/test-run/iterator_handling/list_helpers.py index b2997bf..d3fcce4 100644 --- a/test-run/iterator_handling/list_helpers.py +++ b/test-run/iterator_handling/list_helpers.py @@ -3,7 +3,7 @@ test list helpers """ from typing import Any -from corelibs.debug_handling.dump_data import dump_data +from corelibs_dump_data.dump_data import dump_data from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list, make_unique_list_of_dicts from corelibs.iterator_handling.fingerprint import dict_hash_crc diff --git a/test-run/json_handling/jmespath_helper.py b/test-run/json_handling/jmespath_helper.py index 969a45c..6612b12 100644 --- a/test-run/json_handling/jmespath_helper.py +++ b/test-run/json_handling/jmespath_helper.py @@ -4,7 +4,7 @@ jmes path testing """ -from corelibs.debug_handling.dump_data import dump_data +from corelibs_dump_data.dump_data import dump_data from corelibs.json_handling.jmespath_helper import jmespath_search diff --git a/test-run/json_handling/json_replace.py b/test-run/json_handling/json_replace.py index cb9c3fa..b4989f0 100644 --- a/test-run/json_handling/json_replace.py +++ b/test-run/json_handling/json_replace.py @@ -5,7 +5,7 @@ JSON content replace tets """ from deepdiff import DeepDiff -from corelibs.debug_handling.dump_data import dump_data +from corelibs_dump_data.dump_data import dump_data from corelibs.json_handling.json_helper import modify_with_jsonpath diff --git a/test-run/logging_handling/log.py b/test-run/logging_handling/log.py index 6379538..4d156ac 100644 --- a/test-run/logging_handling/log.py +++ b/test-run/logging_handling/log.py @@ -6,8 +6,8 @@ Log logging_handling.log testing import sys from pathlib import Path # this is for testing only +from corelibs_stack_trace.stack import exception_stack, call_stack from corelibs.logging_handling.log import Log, Logger, ConsoleFormat, ConsoleFormatSettings -from corelibs.debug_handling.debug_helpers import exception_stack, call_stack from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel diff --git a/tests/unit/debug_handling/test_debug_helpers.py b/tests/unit/debug_handling/test_debug_helpers.py deleted file mode 100644 index 9e06d56..0000000 --- a/tests/unit/debug_handling/test_debug_helpers.py +++ /dev/null @@ -1,639 +0,0 @@ -""" -Unit tests for debug_handling.debug_helpers module -""" - -import sys -import pytest - -from corelibs.debug_handling.debug_helpers import ( - call_stack, - exception_stack, - OptExcInfo -) - - -class TestCallStack: - """Test cases for call_stack function""" - - def test_call_stack_basic(self): - """Test basic call_stack functionality""" - result = call_stack() - assert isinstance(result, str) - assert "test_debug_helpers.py" in result - assert "test_call_stack_basic" in result - - def test_call_stack_with_default_separator(self): - """Test call_stack with default separator""" - result = call_stack() - assert " -> " in result - - def test_call_stack_with_custom_separator(self): - """Test call_stack with custom separator""" - result = call_stack(separator=" | ") - assert " | " in result - assert " -> " not in result - - def test_call_stack_with_empty_separator(self): - """Test call_stack with empty separator (should default to ' -> ')""" - result = call_stack(separator="") - assert " -> " in result - - def test_call_stack_format(self): - """Test call_stack output format (filename:function:lineno)""" - result = call_stack() - parts = result.split(" -> ") - for part in parts: - # Each part should have format: filename:function:lineno - assert part.count(":") >= 2 - # Most parts should contain .py but some system frames might not - # Just check that we have some .py files in the trace - assert ".py" in result or "test_debug_helpers" in result - - def test_call_stack_with_start_offset(self): - """Test call_stack with start offset""" - result_no_offset = call_stack(start=0) - result_with_offset = call_stack(start=2) - - # With offset, we should get fewer frames - parts_no_offset = result_no_offset.split(" -> ") - parts_with_offset = result_with_offset.split(" -> ") - - assert len(parts_with_offset) <= len(parts_no_offset) - - def test_call_stack_with_skip_last(self): - """Test call_stack with skip_last parameter""" - result_skip_default = call_stack(skip_last=-1) - result_skip_more = call_stack(skip_last=-3) - - # Skipping more should result in fewer frames - parts_default = result_skip_default.split(" -> ") - parts_more = result_skip_more.split(" -> ") - - assert len(parts_more) <= len(parts_default) - - def test_call_stack_skip_last_positive_converts_to_negative(self): - """Test that positive skip_last is converted to negative""" - # Both should produce same result - result_negative = call_stack(skip_last=-2) - result_positive = call_stack(skip_last=2) - - assert result_negative == result_positive - - def test_call_stack_nested_calls(self): - """Test call_stack in nested function calls""" - def level_one(): - return level_two() - - def level_two(): - return level_three() - - def level_three(): - return call_stack() - - result = level_one() - assert "level_one" in result - assert "level_two" in result - assert "level_three" in result - - def test_call_stack_reset_start_if_empty_false(self): - """Test call_stack with high start value and reset_start_if_empty=False""" - # Using a very high start value should result in empty stack - result = call_stack(start=1000, reset_start_if_empty=False) - assert result == "" - - def test_call_stack_reset_start_if_empty_true(self): - """Test call_stack with high start value and reset_start_if_empty=True""" - # Using a very high start value with reset should give non-empty result - result = call_stack(start=1000, reset_start_if_empty=True) - assert result != "" - assert "test_debug_helpers.py" in result - - def test_call_stack_contains_line_numbers(self): - """Test that call_stack includes line numbers""" - result = call_stack() - # Extract parts and check for numbers - parts = result.split(" -> ") - for part in parts: - # Line numbers should be present (digits at the end) - assert any(char.isdigit() for char in part) - - def test_call_stack_separator_none(self): - """Test call_stack with None separator""" - result = call_stack(separator="") # Use empty string instead of None - # Empty string should be converted to default ' -> ' - assert " -> " in result - - def test_call_stack_multiple_separators(self): - """Test call_stack with various custom separators""" - separators = [" | ", " >> ", " => ", " / ", "\n"] - - for sep in separators: - result = call_stack(separator=sep) - assert sep in result or result == "" # May be empty based on stack depth - - -class TestExceptionStack: - """Test cases for exception_stack function""" - - def test_exception_stack_with_active_exception(self): - """Test exception_stack when an exception is active""" - try: - raise ValueError("Test exception") - except ValueError: - result = exception_stack() - assert isinstance(result, str) - assert "test_debug_helpers.py" in result - assert "test_exception_stack_with_active_exception" in result - - def test_exception_stack_format(self): - """Test exception_stack output format""" - try: - raise RuntimeError("Test error") - except RuntimeError: - result = exception_stack() - parts = result.split(" -> ") - for part in parts: - # Each part should have format: filename:function:lineno - assert part.count(":") >= 2 - - def test_exception_stack_with_custom_separator(self): - """Test exception_stack with custom separator""" - def nested_call(): - def inner_call(): - raise TypeError("Test type error") - inner_call() - - try: - nested_call() - except TypeError: - result = exception_stack(separator=" | ") - # Only check separator if there are multiple frames - if " | " in result or result.count(":") == 2: - # Single frame or has separator - assert isinstance(result, str) - assert " -> " not in result - - def test_exception_stack_with_empty_separator(self): - """Test exception_stack with empty separator (should default to ' -> ')""" - def nested_call(): - def inner_call(): - raise KeyError("Test key error") - inner_call() - - try: - nested_call() - except KeyError: - result = exception_stack(separator="") - # Should use default separator if multiple frames exist - assert isinstance(result, str) - - def test_exception_stack_separator_none(self): - """Test exception_stack with empty separator""" - def nested_call(): - def inner_call(): - raise IndexError("Test index error") - inner_call() - - try: - nested_call() - except IndexError: - result = exception_stack(separator="") # Use empty string instead of None - assert isinstance(result, str) - - def test_exception_stack_nested_exceptions(self): - """Test exception_stack with nested function calls""" - def level_one(): - level_two() - - def level_two(): - level_three() - - def level_three(): - raise ValueError("Nested exception") - - try: - level_one() - except ValueError: - result = exception_stack() - # Should contain all levels in the stack - assert "level_one" in result or "level_two" in result or "level_three" in result - - def test_exception_stack_with_provided_exc_info(self): - """Test exception_stack with explicitly provided exc_info""" - try: - raise AttributeError("Test attribute error") - except AttributeError: - exc_info = sys.exc_info() - result = exception_stack(exc_stack=exc_info) - assert isinstance(result, str) - assert len(result) > 0 - - def test_exception_stack_no_active_exception(self): - """Test exception_stack when no exception is active""" - # This should handle the case gracefully - # When no exception is active, sys.exc_info() returns (None, None, None) - result = exception_stack() - # With no traceback, should return empty string or handle gracefully - assert isinstance(result, str) - - def test_exception_stack_contains_line_numbers(self): - """Test that exception_stack includes line numbers""" - try: - raise OSError("Test OS error") - except OSError: - result = exception_stack() - if result: # May be empty - parts = result.split(" -> ") - for part in parts: - # Line numbers should be present - assert any(char.isdigit() for char in part) - - def test_exception_stack_multiple_exceptions(self): - """Test exception_stack captures the current exception only""" - first_result = None - second_result = None - - try: - raise ValueError("First exception") - except ValueError: - first_result = exception_stack() - - try: - raise TypeError("Second exception") - except TypeError: - second_result = exception_stack() - - # Both should be valid but may differ - assert isinstance(first_result, str) - assert isinstance(second_result, str) - - def test_exception_stack_with_multiple_separators(self): - """Test exception_stack with various custom separators""" - separators = [" | ", " >> ", " => ", " / ", "\n"] - - def nested_call(): - def inner_call(): - raise ValueError("Test exception") - inner_call() - - for sep in separators: - try: - nested_call() - except ValueError: - result = exception_stack(separator=sep) - assert isinstance(result, str) - # Separator only appears if there are multiple frames - - -class TestOptExcInfo: - """Test cases for OptExcInfo type definition""" - - def test_opt_exc_info_type_none_tuple(self): - """Test OptExcInfo can be None tuple""" - exc_info: OptExcInfo = (None, None, None) - assert exc_info == (None, None, None) - - def test_opt_exc_info_type_exception_tuple(self): - """Test OptExcInfo can be exception tuple""" - try: - raise ValueError("Test") - except ValueError: - exc_info: OptExcInfo = sys.exc_info() - assert exc_info[0] is not None - assert exc_info[1] is not None - assert exc_info[2] is not None - - def test_opt_exc_info_with_exception_stack(self): - """Test that OptExcInfo works with exception_stack function""" - try: - raise RuntimeError("Test runtime error") - except RuntimeError: - exc_info = sys.exc_info() - result = exception_stack(exc_stack=exc_info) - assert isinstance(result, str) - - -class TestIntegration: - """Integration tests combining multiple scenarios""" - - def test_call_stack_and_exception_stack_together(self): - """Test using both call_stack and exception_stack in error handling""" - def faulty_function(): - _ = call_stack() # Get call stack before exception - raise ValueError("Intentional error") - - try: - faulty_function() - except ValueError: - exception_trace = exception_stack() - - assert isinstance(exception_trace, str) - assert "faulty_function" in exception_trace or "test_debug_helpers.py" in exception_trace - - def test_nested_exception_with_call_stack(self): - """Test call_stack within exception handling""" - def outer(): - return inner() - - def inner(): - try: - raise RuntimeError("Inner error") - except RuntimeError: - return { - 'call_stack': call_stack(), - 'exception_stack': exception_stack() - } - - result = outer() - assert 'call_stack' in result - assert 'exception_stack' in result - assert isinstance(result['call_stack'], str) - assert isinstance(result['exception_stack'], str) - - def test_multiple_nested_levels(self): - """Test with multiple nested function levels""" - def level_a(): - return level_b() - - def level_b(): - return level_c() - - def level_c(): - return level_d() - - def level_d(): - try: - raise ValueError("Deep error") - except ValueError: - return { - 'call': call_stack(), - 'exception': exception_stack() - } - - result = level_a() - # Should contain information about the call chain - assert result['call'] - assert result['exception'] - - def test_different_separators_consistency(self): - """Test that different separators work consistently""" - separators = [" -> ", " | ", " / ", " >> "] - - def nested_call(): - def inner_call(): - raise ValueError("Test") - inner_call() - - for sep in separators: - try: - nested_call() - except ValueError: - exc_result = exception_stack(separator=sep) - call_result = call_stack(separator=sep) - - assert isinstance(exc_result, str) - assert isinstance(call_result, str) - # Both should be valid strings (separator check only if multiple frames) - - -class TestEdgeCases: - """Test edge cases and boundary conditions""" - - def test_call_stack_with_zero_start(self): - """Test call_stack with start=0 (should include all frames)""" - result = call_stack(start=0) - assert isinstance(result, str) - assert len(result) > 0 - - def test_call_stack_with_large_skip_last(self): - """Test call_stack with very large skip_last value""" - result = call_stack(skip_last=-100) - # Should handle gracefully, may be empty - assert isinstance(result, str) - - def test_exception_stack_none_exc_info(self): - """Test exception_stack with None as exc_stack""" - result = exception_stack(exc_stack=None) - assert isinstance(result, str) - - def test_exception_stack_empty_tuple(self): - """Test exception_stack with empty exception info""" - exc_info: OptExcInfo = (None, None, None) - result = exception_stack(exc_stack=exc_info) - assert isinstance(result, str) - - def test_call_stack_special_characters_in_separator(self): - """Test call_stack with special characters in separator""" - special_separators = ["\n", "\t", "->", "||", "//"] - - for sep in special_separators: - result = call_stack(separator=sep) - assert isinstance(result, str) - - def test_very_deep_call_stack(self): - """Test call_stack with very deep recursion (up to a limit)""" - def recursive_call(depth: int, max_depth: int = 5) -> str: - if depth >= max_depth: - return call_stack() - return recursive_call(depth + 1, max_depth) - - result = recursive_call(0) - assert isinstance(result, str) - # Should contain multiple recursive_call entries - assert result.count("recursive_call") > 0 - - def test_exception_stack_different_exception_types(self): - """Test exception_stack with various exception types""" - exception_types = [ - ValueError("value"), - TypeError("type"), - KeyError("key"), - IndexError("index"), - AttributeError("attr"), - RuntimeError("runtime"), - ] - - for exc in exception_types: - try: - raise exc - except (ValueError, TypeError, KeyError, IndexError, AttributeError, RuntimeError): - result = exception_stack() - assert isinstance(result, str) - - -class TestRealWorldScenarios: - """Test real-world debugging scenarios""" - - def test_debugging_workflow(self): - """Test typical debugging workflow with both functions""" - def process_data(data: str) -> str: - _ = call_stack() # Capture call stack for debugging - if not data: - raise ValueError("No data provided") - return data.upper() - - # Success case - result = process_data("test") - assert result == "TEST" - - # Error case - try: - process_data("") - except ValueError: - exc_trace = exception_stack() - assert isinstance(exc_trace, str) - - def test_logging_context(self): - """Test using call_stack for logging context""" - def get_logging_context(): - return { - 'timestamp': 'now', - 'stack': call_stack(start=1, separator=" > "), - 'function': 'get_logging_context' - } - - context = get_logging_context() - assert 'stack' in context - assert 'timestamp' in context - assert isinstance(context['stack'], str) - - def test_error_reporting(self): - """Test comprehensive error reporting""" - def dangerous_operation() -> dict[str, str]: - try: - # Simulate some operation - _ = 1 / 0 - except ZeroDivisionError: - return { - 'error': 'Division by zero', - 'call_stack': call_stack(), - 'exception_stack': exception_stack(), - } - return {} # Fallback return - - error_report = dangerous_operation() - assert error_report is not None - assert 'error' in error_report - assert 'call_stack' in error_report - assert 'exception_stack' in error_report - assert error_report['error'] == 'Division by zero' - - def test_function_tracing(self): - """Test function call tracing""" - traces: list[str] = [] - - def traced_function_a() -> str: - traces.append(call_stack()) - return traced_function_b() - - def traced_function_b() -> str: - traces.append(call_stack()) - return traced_function_c() - - def traced_function_c() -> str: - traces.append(call_stack()) - return "done" - - result = traced_function_a() - assert result == "done" - assert len(traces) == 3 - # Each trace should be different (different call depths) - assert all(isinstance(t, str) for t in traces) - - def test_exception_chain_tracking(self): - """Test tracking exception chains""" - exception_traces: list[str] = [] - - def operation_one() -> None: - try: - operation_two() - except ValueError: - exception_traces.append(exception_stack()) - raise - - def operation_two() -> None: - try: - operation_three() - except TypeError as exc: - exception_traces.append(exception_stack()) - raise ValueError("Wrapped error") from exc - - def operation_three() -> None: - raise TypeError("Original error") - - try: - operation_one() - except ValueError: - exception_traces.append(exception_stack()) - - # Should have captured multiple exception stacks - assert len(exception_traces) > 0 - assert all(isinstance(t, str) for t in exception_traces) - - -class TestParametrized: - """Parametrized tests for comprehensive coverage""" - - @pytest.mark.parametrize("start", [0, 1, 2, 5, 10]) - def test_call_stack_various_starts(self, start: int) -> None: - """Test call_stack with various start values""" - result = call_stack(start=start) - assert isinstance(result, str) - - @pytest.mark.parametrize("skip_last", [-1, -2, -3, -5, 1, 2, 3, 5]) - def test_call_stack_various_skip_lasts(self, skip_last: int) -> None: - """Test call_stack with various skip_last values""" - result = call_stack(skip_last=skip_last) - assert isinstance(result, str) - - @pytest.mark.parametrize("separator", [" -> ", " | ", " / ", " >> ", " => ", "\n", "\t"]) - def test_call_stack_various_separators(self, separator: str) -> None: - """Test call_stack with various separators""" - result = call_stack(separator=separator) - assert isinstance(result, str) - if result: - assert separator in result - - @pytest.mark.parametrize("reset_start", [True, False]) - def test_call_stack_reset_start_variations(self, reset_start: bool) -> None: - """Test call_stack with reset_start_if_empty variations""" - result = call_stack(start=100, reset_start_if_empty=reset_start) - assert isinstance(result, str) - if reset_start: - assert len(result) > 0 # Should have content after reset - else: - assert len(result) == 0 # Should be empty - - @pytest.mark.parametrize("separator", [" -> ", " | ", " / ", " >> ", "\n"]) - def test_exception_stack_various_separators(self, separator: str) -> None: - """Test exception_stack with various separators""" - def nested_call(): - def inner_call(): - raise ValueError("Test") - inner_call() - - try: - nested_call() - except ValueError: - result = exception_stack(separator=separator) - assert isinstance(result, str) - # Check that result is valid (separator only if multiple frames exist) - - @pytest.mark.parametrize("exception_type", [ - ValueError, - TypeError, - KeyError, - IndexError, - AttributeError, - RuntimeError, - OSError, - ]) - def test_exception_stack_various_exception_types(self, exception_type: type[Exception]) -> None: - """Test exception_stack with various exception types""" - try: - raise exception_type("Test exception") - except (ValueError, TypeError, KeyError, IndexError, AttributeError, RuntimeError, OSError): - result = exception_stack() - assert isinstance(result, str) - -# __END__ diff --git a/tests/unit/debug_handling/test_dump_data.py b/tests/unit/debug_handling/test_dump_data.py deleted file mode 100644 index 9168005..0000000 --- a/tests/unit/debug_handling/test_dump_data.py +++ /dev/null @@ -1,288 +0,0 @@ -""" -Unit tests for debug_handling.dump_data module -""" - -import json -from datetime import datetime, date -from decimal import Decimal -from typing import Any - -import pytest - -from corelibs.debug_handling.dump_data import dump_data - - -class TestDumpData: - """Test cases for dump_data function""" - - def test_dump_simple_dict(self): - """Test dumping a simple dictionary""" - data = {"name": "John", "age": 30} - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed == data - - def test_dump_simple_list(self): - """Test dumping a simple list""" - data = [1, 2, 3, 4, 5] - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed == data - - def test_dump_nested_dict(self): - """Test dumping a nested dictionary""" - data = { - "user": { - "name": "Alice", - "address": { - "city": "Tokyo", - "country": "Japan" - } - } - } - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed == data - - def test_dump_mixed_types(self): - """Test dumping data with mixed types""" - data = { - "string": "test", - "number": 42, - "float": 3.14, - "boolean": True, - "null": None, - "list": [1, 2, 3] - } - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed == data - - def test_dump_with_indent_default(self): - """Test that indent is applied by default""" - data = {"a": 1, "b": 2} - result = dump_data(data) - - # With indent, result should contain newlines - assert "\n" in result - assert " " in result # 4 spaces for indent - - def test_dump_with_indent_true(self): - """Test explicit indent=True""" - data = {"a": 1, "b": 2} - result = dump_data(data, use_indent=True) - - # With indent, result should contain newlines - assert "\n" in result - assert " " in result # 4 spaces for indent - - def test_dump_without_indent(self): - """Test dumping without indentation""" - data = {"a": 1, "b": 2} - result = dump_data(data, use_indent=False) - - # Without indent, result should be compact - assert "\n" not in result - assert result == '{"a": 1, "b": 2}' - - def test_dump_unicode_characters(self): - """Test that unicode characters are preserved (ensure_ascii=False)""" - data = {"message": "こんにちは", "emoji": "😀", "german": "Müller"} - result = dump_data(data) - - # Unicode characters should be preserved, not escaped - assert "こんにちは" in result - assert "😀" in result - assert "Müller" in result - - parsed = json.loads(result) - assert parsed == data - - def test_dump_datetime_object(self): - """Test dumping data with datetime objects (using default=str)""" - now = datetime(2023, 10, 15, 14, 30, 0) - data = {"timestamp": now} - result = dump_data(data) - - assert isinstance(result, str) - # datetime should be converted to string - assert "2023-10-15" in result - - def test_dump_date_object(self): - """Test dumping data with date objects""" - today = date(2023, 10, 15) - data = {"date": today} - result = dump_data(data) - - assert isinstance(result, str) - assert "2023-10-15" in result - - def test_dump_decimal_object(self): - """Test dumping data with Decimal objects""" - data = {"amount": Decimal("123.45")} - result = dump_data(data) - - assert isinstance(result, str) - assert "123.45" in result - - def test_dump_empty_dict(self): - """Test dumping an empty dictionary""" - data = {} - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed == {} - - def test_dump_empty_list(self): - """Test dumping an empty list""" - data = [] - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed == [] - - def test_dump_string_directly(self): - """Test dumping a string directly""" - data = "Hello, World!" - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed == data - - def test_dump_number_directly(self): - """Test dumping a number directly""" - data = 42 - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed == data - - def test_dump_boolean_directly(self): - """Test dumping a boolean directly""" - data = True - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed is True - - def test_dump_none_directly(self): - """Test dumping None directly""" - data = None - result = dump_data(data) - - assert isinstance(result, str) - assert result == "null" - parsed = json.loads(result) - assert parsed is None - - def test_dump_complex_nested_structure(self): - """Test dumping a complex nested structure""" - data = { - "users": [ - { - "id": 1, - "name": "Alice", - "tags": ["admin", "user"], - "metadata": { - "created": datetime(2023, 1, 1), - "active": True - } - }, - { - "id": 2, - "name": "Bob", - "tags": ["user"], - "metadata": { - "created": datetime(2023, 6, 15), - "active": False - } - } - ], - "total": 2 - } - result = dump_data(data) - - assert isinstance(result, str) - # Check that it's valid JSON - parsed = json.loads(result) - assert len(parsed["users"]) == 2 - assert parsed["total"] == 2 - - def test_dump_special_characters(self): - """Test dumping data with special characters""" - data = { - "quote": 'He said "Hello"', - "backslash": "path\\to\\file", - "newline": "line1\nline2", - "tab": "col1\tcol2" - } - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed == data - - def test_dump_large_numbers(self): - """Test dumping large numbers""" - data = { - "big_int": 123456789012345678901234567890, - "big_float": 1.23456789e100 - } - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed["big_int"] == data["big_int"] - - def test_dump_list_of_dicts(self): - """Test dumping a list of dictionaries""" - data = [ - {"id": 1, "name": "Item 1"}, - {"id": 2, "name": "Item 2"}, - {"id": 3, "name": "Item 3"} - ] - result = dump_data(data) - - assert isinstance(result, str) - parsed = json.loads(result) - assert parsed == data - assert len(parsed) == 3 - - -class CustomObject: - """Custom class for testing default=str conversion""" - def __init__(self, value: Any): - self.value = value - - def __str__(self): - return f"CustomObject({self.value})" - - -class TestDumpDataWithCustomObjects: - """Test cases for dump_data with custom objects""" - - def test_dump_custom_object(self): - """Test that custom objects are converted using str()""" - obj = CustomObject("test") - data = {"custom": obj} - result = dump_data(data) - - assert isinstance(result, str) - assert "CustomObject(test)" in result - - -if __name__ == "__main__": - pytest.main([__file__, "-v"]) diff --git a/tests/unit/debug_handling/test_profiling.py b/tests/unit/debug_handling/test_profiling.py deleted file mode 100644 index 4f19928..0000000 --- a/tests/unit/debug_handling/test_profiling.py +++ /dev/null @@ -1,560 +0,0 @@ -""" -Unit tests for corelibs.debug_handling.profiling module -""" - -import time -import tracemalloc - -from corelibs.debug_handling.profiling import display_top, Profiling - - -class TestDisplayTop: - """Test display_top function""" - - def test_display_top_basic(self): - """Test that display_top returns a string with basic stats""" - tracemalloc.start() - - # Allocate some memory - data = [0] * 10000 - - snapshot = tracemalloc.take_snapshot() - tracemalloc.stop() - - result = display_top(snapshot) - - assert isinstance(result, str) - assert "Top 10 lines" in result - assert "KiB" in result - assert "Total allocated size:" in result - - # Clean up - del data - - def test_display_top_with_custom_limit(self): - """Test display_top with custom limit parameter""" - tracemalloc.start() - - # Allocate some memory - data = [0] * 10000 - - snapshot = tracemalloc.take_snapshot() - tracemalloc.stop() - - result = display_top(snapshot, limit=5) - - assert isinstance(result, str) - assert "Top 5 lines" in result - - # Clean up - del data - - def test_display_top_with_different_key_type(self): - """Test display_top with different key_type parameter""" - tracemalloc.start() - - # Allocate some memory - data = [0] * 10000 - - snapshot = tracemalloc.take_snapshot() - tracemalloc.stop() - - result = display_top(snapshot, key_type='filename') - - assert isinstance(result, str) - assert "Top 10 lines" in result - - # Clean up - del data - - def test_display_top_filters_traces(self): - """Test that display_top filters out bootstrap and unknown traces""" - tracemalloc.start() - - # Allocate some memory - data = [0] * 10000 - - snapshot = tracemalloc.take_snapshot() - tracemalloc.stop() - - result = display_top(snapshot) - - # Should not contain filtered traces - assert "" not in result - assert "" not in result - - # Clean up - del data - - def test_display_top_with_limit_larger_than_stats(self): - """Test display_top when limit is larger than available stats""" - tracemalloc.start() - - # Allocate some memory - data = [0] * 100 - - snapshot = tracemalloc.take_snapshot() - tracemalloc.stop() - - result = display_top(snapshot, limit=1000) - - assert isinstance(result, str) - assert "Top 1000 lines" in result - assert "Total allocated size:" in result - - # Clean up - del data - - def test_display_top_empty_snapshot(self): - """Test display_top with a snapshot that has minimal traces""" - tracemalloc.start() - snapshot = tracemalloc.take_snapshot() - tracemalloc.stop() - - result = display_top(snapshot, limit=1) - - assert isinstance(result, str) - assert "Top 1 lines" in result - - -class TestProfilingInitialization: - """Test Profiling class initialization""" - - def test_profiling_initialization(self): - """Test that Profiling initializes correctly""" - profiler = Profiling() - - # Should be able to create instance - assert isinstance(profiler, Profiling) - - def test_profiling_initial_state(self): - """Test that Profiling starts in a clean state""" - profiler = Profiling() - - # Should not raise an error when calling end_profiling - # even though start_profiling wasn't called - profiler.end_profiling() - - result = profiler.print_profiling() - assert isinstance(result, str) - - -class TestProfilingStartEnd: - """Test start_profiling and end_profiling functionality""" - - def test_start_profiling(self): - """Test that start_profiling can be called""" - profiler = Profiling() - - # Should not raise an error - profiler.start_profiling("test_operation") - - def test_end_profiling(self): - """Test that end_profiling can be called""" - profiler = Profiling() - profiler.start_profiling("test_operation") - - # Should not raise an error - profiler.end_profiling() - - def test_start_profiling_with_different_idents(self): - """Test start_profiling with different identifier strings""" - profiler = Profiling() - - identifiers = ["short", "longer_identifier", "very_long_identifier_with_many_chars"] - - for ident in identifiers: - profiler.start_profiling(ident) - profiler.end_profiling() - result = profiler.print_profiling() - - assert ident in result - - def test_end_profiling_without_start(self): - """Test that end_profiling can be called without start_profiling""" - profiler = Profiling() - - # Should not raise an error but internal state should indicate warning - profiler.end_profiling() - - result = profiler.print_profiling() - assert isinstance(result, str) - - def test_profiling_measures_time(self): - """Test that profiling measures elapsed time""" - profiler = Profiling() - profiler.start_profiling("time_test") - - sleep_duration = 0.05 # 50ms - time.sleep(sleep_duration) - - profiler.end_profiling() - result = profiler.print_profiling() - - assert isinstance(result, str) - assert "time:" in result - # Should have some time measurement - assert "ms" in result or "s" in result - - def test_profiling_measures_memory(self): - """Test that profiling measures memory usage""" - profiler = Profiling() - profiler.start_profiling("memory_test") - - # Allocate some memory - data = [0] * 100000 - - profiler.end_profiling() - result = profiler.print_profiling() - - assert isinstance(result, str) - assert "RSS:" in result - assert "VMS:" in result - assert "time:" in result - - # Clean up - del data - - -class TestProfilingPrintProfiling: - """Test print_profiling functionality""" - - def test_print_profiling_returns_string(self): - """Test that print_profiling returns a string""" - profiler = Profiling() - profiler.start_profiling("test") - profiler.end_profiling() - - result = profiler.print_profiling() - - assert isinstance(result, str) - - def test_print_profiling_contains_identifier(self): - """Test that print_profiling includes the identifier""" - profiler = Profiling() - identifier = "my_test_operation" - - profiler.start_profiling(identifier) - profiler.end_profiling() - - result = profiler.print_profiling() - - assert identifier in result - - def test_print_profiling_format(self): - """Test that print_profiling has expected format""" - profiler = Profiling() - profiler.start_profiling("test") - profiler.end_profiling() - - result = profiler.print_profiling() - - # Check for expected components - assert "Profiling:" in result - assert "RSS:" in result - assert "VMS:" in result - assert "time:" in result - - def test_print_profiling_multiple_calls(self): - """Test that print_profiling can be called multiple times""" - profiler = Profiling() - profiler.start_profiling("test") - profiler.end_profiling() - - result1 = profiler.print_profiling() - result2 = profiler.print_profiling() - - # Should return the same result - assert result1 == result2 - - def test_print_profiling_time_formats(self): - """Test different time format outputs""" - profiler = Profiling() - - # Very short duration (milliseconds) - profiler.start_profiling("ms_test") - time.sleep(0.001) - profiler.end_profiling() - result = profiler.print_profiling() - assert "ms" in result - - # Slightly longer duration (seconds) - profiler.start_profiling("s_test") - time.sleep(0.1) - profiler.end_profiling() - result = profiler.print_profiling() - # Could be ms or s depending on timing - assert ("ms" in result or "s" in result) - - def test_print_profiling_memory_formats(self): - """Test different memory format outputs""" - profiler = Profiling() - profiler.start_profiling("memory_format_test") - - # Allocate some memory - data = [0] * 50000 - - profiler.end_profiling() - result = profiler.print_profiling() - - # Should have some memory unit (B, kB, MB, GB) - assert any(unit in result for unit in ["B", "kB", "MB", "GB"]) - - # Clean up - del data - - -class TestProfilingIntegration: - """Integration tests for Profiling class""" - - def test_complete_profiling_cycle(self): - """Test a complete profiling cycle from start to print""" - profiler = Profiling() - - profiler.start_profiling("complete_cycle") - - # Do some work - data = [i for i in range(10000)] - time.sleep(0.01) - - profiler.end_profiling() - result = profiler.print_profiling() - - assert isinstance(result, str) - assert "complete_cycle" in result - assert "RSS:" in result - assert "VMS:" in result - assert "time:" in result - - # Clean up - del data - - def test_multiple_profiling_sessions(self): - """Test running multiple profiling sessions""" - profiler = Profiling() - - # First session - profiler.start_profiling("session_1") - time.sleep(0.01) - profiler.end_profiling() - result1 = profiler.print_profiling() - - # Second session (same profiler instance) - profiler.start_profiling("session_2") - data = [0] * 100000 - time.sleep(0.01) - profiler.end_profiling() - result2 = profiler.print_profiling() - - # Results should be different - assert "session_1" in result1 - assert "session_2" in result2 - assert result1 != result2 - - # Clean up - del data - - def test_profiling_with_zero_work(self): - """Test profiling with minimal work""" - profiler = Profiling() - - profiler.start_profiling("zero_work") - profiler.end_profiling() - - result = profiler.print_profiling() - - assert isinstance(result, str) - assert "zero_work" in result - - def test_profiling_with_heavy_computation(self): - """Test profiling with heavier computation""" - profiler = Profiling() - - profiler.start_profiling("heavy_computation") - - # Do some computation - result_data: list[list[int]] = [] - for _ in range(1000): - result_data.append([j * 2 for j in range(100)]) - - time.sleep(0.05) - - profiler.end_profiling() - result = profiler.print_profiling() - - assert isinstance(result, str) - assert "heavy_computation" in result - # Should show measurable time and memory - assert "time:" in result - - # Clean up - del result_data - - def test_independent_profilers(self): - """Test that multiple Profiling instances are independent""" - profiler1 = Profiling() - profiler2 = Profiling() - - profiler1.start_profiling("profiler_1") - time.sleep(0.01) - - profiler2.start_profiling("profiler_2") - data = [0] * 100000 - time.sleep(0.01) - - profiler1.end_profiling() - profiler2.end_profiling() - - result1 = profiler1.print_profiling() - result2 = profiler2.print_profiling() - - # Should have different identifiers - assert "profiler_1" in result1 - assert "profiler_2" in result2 - - # Results should be different - assert result1 != result2 - - # Clean up - del data - - -class TestProfilingEdgeCases: - """Test edge cases and boundary conditions""" - - def test_empty_identifier(self): - """Test profiling with empty identifier""" - profiler = Profiling() - - profiler.start_profiling("") - profiler.end_profiling() - - result = profiler.print_profiling() - - assert isinstance(result, str) - assert "Profiling:" in result - - def test_very_long_identifier(self): - """Test profiling with very long identifier""" - profiler = Profiling() - - long_ident = "a" * 100 - - profiler.start_profiling(long_ident) - profiler.end_profiling() - - result = profiler.print_profiling() - - assert isinstance(result, str) - assert long_ident in result - - def test_special_characters_in_identifier(self): - """Test profiling with special characters in identifier""" - profiler = Profiling() - - special_ident = "test_@#$%_operation" - - profiler.start_profiling(special_ident) - profiler.end_profiling() - - result = profiler.print_profiling() - - assert isinstance(result, str) - assert special_ident in result - - def test_rapid_consecutive_profiling(self): - """Test rapid consecutive profiling cycles""" - profiler = Profiling() - - for i in range(5): - profiler.start_profiling(f"rapid_{i}") - profiler.end_profiling() - result = profiler.print_profiling() - - assert isinstance(result, str) - assert f"rapid_{i}" in result - - def test_profiling_negative_memory_change(self): - """Test profiling when memory usage decreases""" - profiler = Profiling() - - # Allocate some memory before profiling - pre_data = [0] * 1000000 - - profiler.start_profiling("memory_decrease") - - # Free the memory - del pre_data - - profiler.end_profiling() - result = profiler.print_profiling() - - assert isinstance(result, str) - assert "memory_decrease" in result - # Should handle negative memory change gracefully - - def test_very_short_duration(self): - """Test profiling with extremely short duration""" - profiler = Profiling() - - profiler.start_profiling("instant") - profiler.end_profiling() - - result = profiler.print_profiling() - - assert isinstance(result, str) - assert "instant" in result - assert "ms" in result # Should show milliseconds for very short duration - - -class TestProfilingContextManager: - """Test profiling usage patterns similar to context managers""" - - def test_typical_usage_pattern(self): - """Test typical usage pattern for profiling""" - profiler = Profiling() - - # Typical pattern - profiler.start_profiling("typical_operation") - - # Perform operation - result_list: list[int] = [] - for _ in range(1000): - result_list.append(_ * 2) - - profiler.end_profiling() - - # Get results - output = profiler.print_profiling() - - assert isinstance(output, str) - assert "typical_operation" in output - - # Clean up - del result_list - - def test_profiling_without_end(self): - """Test what happens when end_profiling is not called""" - profiler = Profiling() - - profiler.start_profiling("no_end") - - # Don't call end_profiling - - result = profiler.print_profiling() - - # Should still return a string (though data might be incomplete) - assert isinstance(result, str) - - def test_profiling_end_without_start(self): - """Test calling end_profiling multiple times without start""" - profiler = Profiling() - - profiler.end_profiling() - profiler.end_profiling() - - result = profiler.print_profiling() - - assert isinstance(result, str) - -# __END__ diff --git a/tests/unit/debug_handling/test_timer.py b/tests/unit/debug_handling/test_timer.py deleted file mode 100644 index d13384b..0000000 --- a/tests/unit/debug_handling/test_timer.py +++ /dev/null @@ -1,405 +0,0 @@ -""" -Unit tests for corelibs.debug_handling.timer module -""" - -import time -from datetime import datetime, timedelta - -from corelibs.debug_handling.timer import Timer - - -class TestTimerInitialization: - """Test Timer class initialization""" - - def test_timer_initialization(self): - """Test that Timer initializes with correct default values""" - timer = Timer() - - # Check that start times are set - assert isinstance(timer.get_overall_start_time(), datetime) - assert isinstance(timer.get_start_time(), datetime) - - # Check that end times are None - assert timer.get_overall_end_time() is None - assert timer.get_end_time() is None - - # Check that run times are None - assert timer.get_overall_run_time() is None - assert timer.get_run_time() is None - - def test_timer_start_times_are_recent(self): - """Test that start times are set to current time on initialization""" - before_init = datetime.now() - timer = Timer() - after_init = datetime.now() - - overall_start = timer.get_overall_start_time() - start = timer.get_start_time() - - assert before_init <= overall_start <= after_init - assert before_init <= start <= after_init - - def test_timer_start_times_are_same(self): - """Test that overall_start_time and start_time are initialized to the same time""" - timer = Timer() - - overall_start = timer.get_overall_start_time() - start = timer.get_start_time() - - # They should be very close (within a few microseconds) - time_diff = abs((overall_start - start).total_seconds()) - assert time_diff < 0.001 # Less than 1 millisecond - - -class TestOverallRunTime: - """Test overall run time functionality""" - - def test_overall_run_time_returns_timedelta(self): - """Test that overall_run_time returns a timedelta object""" - timer = Timer() - time.sleep(0.01) # Sleep for 10ms - - result = timer.overall_run_time() - - assert isinstance(result, timedelta) - - def test_overall_run_time_sets_end_time(self): - """Test that calling overall_run_time sets the end time""" - timer = Timer() - - assert timer.get_overall_end_time() is None - - timer.overall_run_time() - - assert isinstance(timer.get_overall_end_time(), datetime) - - def test_overall_run_time_sets_run_time(self): - """Test that calling overall_run_time sets the run time""" - timer = Timer() - - assert timer.get_overall_run_time() is None - - timer.overall_run_time() - - assert isinstance(timer.get_overall_run_time(), timedelta) - - def test_overall_run_time_accuracy(self): - """Test that overall_run_time calculates time difference accurately""" - timer = Timer() - sleep_duration = 0.05 # 50ms - time.sleep(sleep_duration) - - result = timer.overall_run_time() - - # Allow for some variance (10ms tolerance) - assert sleep_duration - 0.01 <= result.total_seconds() <= sleep_duration + 0.01 - - def test_overall_run_time_multiple_calls(self): - """Test that calling overall_run_time multiple times updates the values""" - timer = Timer() - time.sleep(0.01) - - first_result = timer.overall_run_time() - first_end_time = timer.get_overall_end_time() - - time.sleep(0.01) - - second_result = timer.overall_run_time() - second_end_time = timer.get_overall_end_time() - - # Second call should have longer runtime - assert second_result > first_result - assert second_end_time is not None - assert first_end_time is not None - # End time should be updated - assert second_end_time > first_end_time - - def test_overall_run_time_consistency(self): - """Test that get_overall_run_time returns the same value as overall_run_time""" - timer = Timer() - time.sleep(0.01) - - calculated_time = timer.overall_run_time() - retrieved_time = timer.get_overall_run_time() - - assert calculated_time == retrieved_time - - -class TestRunTime: - """Test run time functionality""" - - def test_run_time_returns_timedelta(self): - """Test that run_time returns a timedelta object""" - timer = Timer() - time.sleep(0.01) - - result = timer.run_time() - - assert isinstance(result, timedelta) - - def test_run_time_sets_end_time(self): - """Test that calling run_time sets the end time""" - timer = Timer() - - assert timer.get_end_time() is None - - timer.run_time() - - assert isinstance(timer.get_end_time(), datetime) - - def test_run_time_sets_run_time(self): - """Test that calling run_time sets the run time""" - timer = Timer() - - assert timer.get_run_time() is None - - timer.run_time() - - assert isinstance(timer.get_run_time(), timedelta) - - def test_run_time_accuracy(self): - """Test that run_time calculates time difference accurately""" - timer = Timer() - sleep_duration = 0.05 # 50ms - time.sleep(sleep_duration) - - result = timer.run_time() - - # Allow for some variance (10ms tolerance) - assert sleep_duration - 0.01 <= result.total_seconds() <= sleep_duration + 0.01 - - def test_run_time_multiple_calls(self): - """Test that calling run_time multiple times updates the values""" - timer = Timer() - time.sleep(0.01) - - first_result = timer.run_time() - first_end_time = timer.get_end_time() - - time.sleep(0.01) - - second_result = timer.run_time() - second_end_time = timer.get_end_time() - - # Second call should have longer runtime - assert second_result > first_result - assert second_end_time is not None - assert first_end_time is not None - # End time should be updated - assert second_end_time > first_end_time - - def test_run_time_consistency(self): - """Test that get_run_time returns the same value as run_time""" - timer = Timer() - time.sleep(0.01) - - calculated_time = timer.run_time() - retrieved_time = timer.get_run_time() - - assert calculated_time == retrieved_time - - -class TestResetRunTime: - """Test reset_run_time functionality""" - - def test_reset_run_time_resets_start_time(self): - """Test that reset_run_time updates the start time""" - timer = Timer() - original_start = timer.get_start_time() - - time.sleep(0.02) - timer.reset_run_time() - - new_start = timer.get_start_time() - - assert new_start > original_start - - def test_reset_run_time_clears_end_time(self): - """Test that reset_run_time clears the end time""" - timer = Timer() - timer.run_time() - - assert timer.get_end_time() is not None - - timer.reset_run_time() - - assert timer.get_end_time() is None - - def test_reset_run_time_clears_run_time(self): - """Test that reset_run_time clears the run time""" - timer = Timer() - timer.run_time() - - assert timer.get_run_time() is not None - - timer.reset_run_time() - - assert timer.get_run_time() is None - - def test_reset_run_time_does_not_affect_overall_times(self): - """Test that reset_run_time does not affect overall times""" - timer = Timer() - - overall_start = timer.get_overall_start_time() - timer.overall_run_time() - overall_end = timer.get_overall_end_time() - overall_run = timer.get_overall_run_time() - - timer.reset_run_time() - - # Overall times should remain unchanged - assert timer.get_overall_start_time() == overall_start - assert timer.get_overall_end_time() == overall_end - assert timer.get_overall_run_time() == overall_run - - def test_reset_run_time_allows_new_measurement(self): - """Test that reset_run_time allows for new time measurements""" - timer = Timer() - time.sleep(0.02) - timer.run_time() - - first_run_time = timer.get_run_time() - - timer.reset_run_time() - time.sleep(0.01) - timer.run_time() - - second_run_time = timer.get_run_time() - - assert second_run_time is not None - assert first_run_time is not None - # Second measurement should be shorter since we reset - assert second_run_time < first_run_time - - -class TestTimerIntegration: - """Integration tests for Timer class""" - - def test_independent_timers(self): - """Test that multiple Timer instances are independent""" - timer1 = Timer() - time.sleep(0.01) - timer2 = Timer() - - # timer1 should have earlier start time - assert timer1.get_start_time() < timer2.get_start_time() - assert timer1.get_overall_start_time() < timer2.get_overall_start_time() - - def test_overall_and_run_time_independence(self): - """Test that overall time and run time are independent""" - timer = Timer() - time.sleep(0.02) - - # Reset run time but not overall - timer.reset_run_time() - time.sleep(0.01) - - run_time = timer.run_time() - overall_time = timer.overall_run_time() - - # Overall time should be longer than run time - assert overall_time > run_time - - def test_typical_usage_pattern(self): - """Test a typical usage pattern of the Timer class""" - timer = Timer() - - # Measure first operation - time.sleep(0.01) - first_operation = timer.run_time() - assert first_operation.total_seconds() > 0 - - # Reset and measure second operation - timer.reset_run_time() - time.sleep(0.01) - second_operation = timer.run_time() - assert second_operation.total_seconds() > 0 - - # Get overall time - overall = timer.overall_run_time() - - # Overall should be greater than individual operations - assert overall > first_operation - assert overall > second_operation - - def test_zero_sleep_timer(self): - """Test timer with minimal sleep (edge case)""" - timer = Timer() - - # Call run_time immediately - result = timer.run_time() - - # Should still return a valid timedelta (very small) - assert isinstance(result, timedelta) - assert result.total_seconds() >= 0 - - def test_getter_methods_before_calculation(self): - """Test that getter methods return None before calculation methods are called""" - timer = Timer() - - # Before calling run_time() - assert timer.get_end_time() is None - assert timer.get_run_time() is None - - # Before calling overall_run_time() - assert timer.get_overall_end_time() is None - assert timer.get_overall_run_time() is None - - # But start times should always be set - assert timer.get_start_time() is not None - assert timer.get_overall_start_time() is not None - - -class TestTimerEdgeCases: - """Test edge cases and boundary conditions""" - - def test_rapid_consecutive_calls(self): - """Test rapid consecutive calls to run_time""" - timer = Timer() - - results: list[timedelta] = [] - for _ in range(5): - results.append(timer.run_time()) - - # Each result should be greater than or equal to the previous - for i in range(1, len(results)): - assert results[i] >= results[i - 1] - - def test_very_short_duration(self): - """Test timer with very short duration""" - timer = Timer() - result = timer.run_time() - - # Should be a very small positive timedelta - assert isinstance(result, timedelta) - assert result.total_seconds() >= 0 - assert result.total_seconds() < 0.1 # Less than 100ms - - def test_reset_multiple_times(self): - """Test resetting the timer multiple times""" - timer = Timer() - - for _ in range(3): - timer.reset_run_time() - time.sleep(0.01) - result = timer.run_time() - - assert isinstance(result, timedelta) - assert result.total_seconds() > 0 - - def test_overall_time_persists_through_resets(self): - """Test that overall time continues even when run_time is reset""" - timer = Timer() - - time.sleep(0.01) - timer.reset_run_time() - - time.sleep(0.01) - timer.reset_run_time() - - overall = timer.overall_run_time() - - # Overall time should reflect total elapsed time - assert overall.total_seconds() >= 0.02 - -# __END__ diff --git a/tests/unit/debug_handling/test_writeline.py b/tests/unit/debug_handling/test_writeline.py deleted file mode 100644 index 52e23d8..0000000 --- a/tests/unit/debug_handling/test_writeline.py +++ /dev/null @@ -1,975 +0,0 @@ -""" -Unit tests for debug_handling.writeline module -""" - -import io -import pytest -from pytest import CaptureFixture - -from corelibs.debug_handling.writeline import ( - write_l, - pr_header, - pr_title, - pr_open, - pr_close, - pr_act -) - - -class TestWriteL: - """Test cases for write_l function""" - - def test_write_l_print_only(self, capsys: CaptureFixture[str]): - """Test write_l with print_line=True and no file""" - write_l("Test line", print_line=True) - captured = capsys.readouterr() - assert captured.out == "Test line\n" - - def test_write_l_no_print_no_file(self, capsys: CaptureFixture[str]): - """Test write_l with print_line=False and no file (should do nothing)""" - write_l("Test line", print_line=False) - captured = capsys.readouterr() - assert captured.out == "" - - def test_write_l_file_only(self, capsys: CaptureFixture[str]): - """Test write_l with file handler only (no print)""" - fpl = io.StringIO() - write_l("Test line", fpl=fpl, print_line=False) - captured = capsys.readouterr() - assert captured.out == "" - assert fpl.getvalue() == "Test line\n" - fpl.close() - - def test_write_l_both_print_and_file(self, capsys: CaptureFixture[str]): - """Test write_l with both print and file output""" - fpl = io.StringIO() - write_l("Test line", fpl=fpl, print_line=True) - captured = capsys.readouterr() - assert captured.out == "Test line\n" - assert fpl.getvalue() == "Test line\n" - fpl.close() - - def test_write_l_multiple_lines_to_file(self): - """Test write_l writing multiple lines to file""" - fpl = io.StringIO() - write_l("Line 1", fpl=fpl, print_line=False) - write_l("Line 2", fpl=fpl, print_line=False) - write_l("Line 3", fpl=fpl, print_line=False) - assert fpl.getvalue() == "Line 1\nLine 2\nLine 3\n" - fpl.close() - - def test_write_l_empty_string(self, capsys: CaptureFixture[str]): - """Test write_l with empty string""" - fpl = io.StringIO() - write_l("", fpl=fpl, print_line=True) - captured = capsys.readouterr() - assert captured.out == "\n" - assert fpl.getvalue() == "\n" - fpl.close() - - def test_write_l_special_characters(self): - """Test write_l with special characters""" - fpl = io.StringIO() - special_line = "Special: \t\n\r\\ 特殊文字 €" - write_l(special_line, fpl=fpl, print_line=False) - assert special_line + "\n" in fpl.getvalue() - fpl.close() - - def test_write_l_long_string(self): - """Test write_l with long string""" - fpl = io.StringIO() - long_line = "A" * 1000 - write_l(long_line, fpl=fpl, print_line=False) - assert fpl.getvalue() == long_line + "\n" - fpl.close() - - def test_write_l_unicode_content(self): - """Test write_l with unicode content""" - fpl = io.StringIO() - unicode_line = "Hello 世界 🌍 Привет" - write_l(unicode_line, fpl=fpl, print_line=False) - assert fpl.getvalue() == unicode_line + "\n" - fpl.close() - - def test_write_l_default_parameters(self, capsys: CaptureFixture[str]): - """Test write_l with default parameters""" - write_l("Test") - captured = capsys.readouterr() - # Default print_line is False - assert captured.out == "" - - def test_write_l_with_newline_in_string(self): - """Test write_l with newline characters in the string""" - fpl = io.StringIO() - write_l("Line with\nnewline", fpl=fpl, print_line=False) - assert fpl.getvalue() == "Line with\nnewline\n" - fpl.close() - - -class TestPrHeader: - """Test cases for pr_header function""" - - def test_pr_header_default(self, capsys: CaptureFixture[str]): - """Test pr_header with default parameters""" - pr_header("TEST") - captured = capsys.readouterr() - assert "#" in captured.out - assert "TEST" in captured.out - - def test_pr_header_custom_marker(self, capsys: CaptureFixture[str]): - """Test pr_header with custom marker string""" - pr_header("TEST", marker_string="*") - captured = capsys.readouterr() - assert "*" in captured.out - assert "TEST" in captured.out - assert "#" not in captured.out - - def test_pr_header_custom_width(self, capsys: CaptureFixture[str]): - """Test pr_header with custom width""" - pr_header("TEST", width=50) - captured = capsys.readouterr() - # Check that output is formatted - assert "TEST" in captured.out - - def test_pr_header_short_tag(self, capsys: CaptureFixture[str]): - """Test pr_header with short tag""" - pr_header("X") - captured = capsys.readouterr() - assert "X" in captured.out - assert "#" in captured.out - - def test_pr_header_long_tag(self, capsys: CaptureFixture[str]): - """Test pr_header with long tag""" - pr_header("This is a very long header tag") - captured = capsys.readouterr() - assert "This is a very long header tag" in captured.out - - def test_pr_header_empty_tag(self, capsys: CaptureFixture[str]): - """Test pr_header with empty tag""" - pr_header("") - captured = capsys.readouterr() - assert "#" in captured.out - - def test_pr_header_special_characters(self, capsys: CaptureFixture[str]): - """Test pr_header with special characters in tag""" - pr_header("TEST: 123! @#$") - captured = capsys.readouterr() - assert "TEST: 123! @#$" in captured.out - - def test_pr_header_unicode(self, capsys: CaptureFixture[str]): - """Test pr_header with unicode characters""" - pr_header("テスト 🎉") - captured = capsys.readouterr() - assert "テスト 🎉" in captured.out - - def test_pr_header_various_markers(self, capsys: CaptureFixture[str]): - """Test pr_header with various marker strings""" - markers = ["*", "=", "-", "+", "~", "@"] - for marker in markers: - pr_header("TEST", marker_string=marker) - captured = capsys.readouterr() - assert marker in captured.out - assert "TEST" in captured.out - - def test_pr_header_zero_width(self, capsys: CaptureFixture[str]): - """Test pr_header with width of 0""" - pr_header("TEST", width=0) - captured = capsys.readouterr() - assert "TEST" in captured.out - - def test_pr_header_large_width(self, capsys: CaptureFixture[str]): - """Test pr_header with large width""" - pr_header("TEST", width=100) - captured = capsys.readouterr() - assert "TEST" in captured.out - assert "#" in captured.out - - def test_pr_header_format(self, capsys: CaptureFixture[str]): - """Test pr_header output format""" - pr_header("CENTER", marker_string="#", width=20) - captured = capsys.readouterr() - # Should have spaces around centered text - assert " CENTER " in captured.out or "CENTER" in captured.out - - -class TestPrTitle: - """Test cases for pr_title function""" - - def test_pr_title_default(self, capsys: CaptureFixture[str]): - """Test pr_title with default parameters""" - pr_title("Test Title") - captured = capsys.readouterr() - assert "Test Title" in captured.out - assert "|" in captured.out - assert "." in captured.out - assert ":" in captured.out - - def test_pr_title_custom_prefix(self, capsys: CaptureFixture[str]): - """Test pr_title with custom prefix string""" - pr_title("Test", prefix_string=">") - captured = capsys.readouterr() - assert ">" in captured.out - assert "Test" in captured.out - assert "|" not in captured.out - - def test_pr_title_custom_space_filler(self, capsys: CaptureFixture[str]): - """Test pr_title with custom space filler""" - pr_title("Test", space_filler="-") - captured = capsys.readouterr() - assert "Test" in captured.out - assert "-" in captured.out - assert "." not in captured.out - - def test_pr_title_custom_width(self, capsys: CaptureFixture[str]): - """Test pr_title with custom width""" - pr_title("Test", width=50) - captured = capsys.readouterr() - assert "Test" in captured.out - - def test_pr_title_short_tag(self, capsys: CaptureFixture[str]): - """Test pr_title with short tag""" - pr_title("X") - captured = capsys.readouterr() - assert "X" in captured.out - assert "." in captured.out - - def test_pr_title_long_tag(self, capsys: CaptureFixture[str]): - """Test pr_title with long tag""" - pr_title("This is a very long title tag") - captured = capsys.readouterr() - assert "This is a very long title tag" in captured.out - - def test_pr_title_empty_tag(self, capsys: CaptureFixture[str]): - """Test pr_title with empty tag""" - pr_title("") - captured = capsys.readouterr() - assert "|" in captured.out - assert ":" in captured.out - - def test_pr_title_special_characters(self, capsys: CaptureFixture[str]): - """Test pr_title with special characters""" - pr_title("Task #123!") - captured = capsys.readouterr() - assert "Task #123!" in captured.out - - def test_pr_title_unicode(self, capsys: CaptureFixture[str]): - """Test pr_title with unicode characters""" - pr_title("タイトル 📝") - captured = capsys.readouterr() - assert "タイトル 📝" in captured.out - - def test_pr_title_various_fillers(self, capsys: CaptureFixture[str]): - """Test pr_title with various space fillers""" - fillers = [".", "-", "_", "*", " ", "~"] - for filler in fillers: - pr_title("Test", space_filler=filler) - captured = capsys.readouterr() - assert "Test" in captured.out - - def test_pr_title_zero_width(self, capsys: CaptureFixture[str]): - """Test pr_title with width of 0""" - pr_title("Test", width=0) - captured = capsys.readouterr() - assert "Test" in captured.out - - def test_pr_title_large_width(self, capsys: CaptureFixture[str]): - """Test pr_title with large width""" - pr_title("Test", width=100) - captured = capsys.readouterr() - assert "Test" in captured.out - - def test_pr_title_format_left_align(self, capsys: CaptureFixture[str]): - """Test pr_title output format (should be left-aligned with filler)""" - pr_title("Start", space_filler=".", width=10) - captured = capsys.readouterr() - # Should have the tag followed by dots - assert "Start" in captured.out - assert ":" in captured.out - - -class TestPrOpen: - """Test cases for pr_open function""" - - def test_pr_open_default(self, capsys: CaptureFixture[str]): - """Test pr_open with default parameters""" - pr_open("Processing") - captured = capsys.readouterr() - assert "Processing" in captured.out - assert "|" in captured.out - assert "." in captured.out - assert "[" in captured.out - # Should not have newline at the end - assert not captured.out.endswith("\n") - - def test_pr_open_custom_prefix(self, capsys: CaptureFixture[str]): - """Test pr_open with custom prefix string""" - pr_open("Task", prefix_string=">") - captured = capsys.readouterr() - assert ">" in captured.out - assert "Task" in captured.out - assert "|" not in captured.out - - def test_pr_open_custom_space_filler(self, capsys: CaptureFixture[str]): - """Test pr_open with custom space filler""" - pr_open("Task", space_filler="-") - captured = capsys.readouterr() - assert "Task" in captured.out - assert "-" in captured.out - assert "." not in captured.out - - def test_pr_open_custom_width(self, capsys: CaptureFixture[str]): - """Test pr_open with custom width""" - pr_open("Task", width=50) - captured = capsys.readouterr() - assert "Task" in captured.out - assert "[" in captured.out - - def test_pr_open_short_tag(self, capsys: CaptureFixture[str]): - """Test pr_open with short tag""" - pr_open("X") - captured = capsys.readouterr() - assert "X" in captured.out - assert "[" in captured.out - - def test_pr_open_long_tag(self, capsys: CaptureFixture[str]): - """Test pr_open with long tag""" - pr_open("This is a very long task tag") - captured = capsys.readouterr() - assert "This is a very long task tag" in captured.out - - def test_pr_open_empty_tag(self, capsys: CaptureFixture[str]): - """Test pr_open with empty tag""" - pr_open("") - captured = capsys.readouterr() - assert "[" in captured.out - assert "|" in captured.out - - def test_pr_open_no_newline(self, capsys: CaptureFixture[str]): - """Test pr_open doesn't end with newline""" - pr_open("Test") - captured = capsys.readouterr() - # Output should not end with newline (uses end="") - assert not captured.out.endswith("\n") - - def test_pr_open_special_characters(self, capsys: CaptureFixture[str]): - """Test pr_open with special characters""" - pr_open("Loading: 50%") - captured = capsys.readouterr() - assert "Loading: 50%" in captured.out - - def test_pr_open_unicode(self, capsys: CaptureFixture[str]): - """Test pr_open with unicode characters""" - pr_open("処理中 ⏳") - captured = capsys.readouterr() - assert "処理中 ⏳" in captured.out - - def test_pr_open_format(self, capsys: CaptureFixture[str]): - """Test pr_open output format""" - pr_open("Task", prefix_string="|", space_filler=".", width=20) - captured = capsys.readouterr() - assert "|" in captured.out - assert "Task" in captured.out - assert "[" in captured.out - - -class TestPrClose: - """Test cases for pr_close function""" - - def test_pr_close_default(self, capsys: CaptureFixture[str]): - """Test pr_close with default (empty) tag""" - pr_close() - captured = capsys.readouterr() - assert captured.out == "]\n" - - def test_pr_close_with_tag(self, capsys: CaptureFixture[str]): - """Test pr_close with custom tag""" - pr_close("DONE") - captured = capsys.readouterr() - assert "DONE" in captured.out - assert "]" in captured.out - assert captured.out.endswith("\n") - - def test_pr_close_with_space(self, capsys: CaptureFixture[str]): - """Test pr_close with space in tag""" - pr_close(" OK ") - captured = capsys.readouterr() - assert " OK " in captured.out - assert "]" in captured.out - - def test_pr_close_empty_string(self, capsys: CaptureFixture[str]): - """Test pr_close with empty string (same as default)""" - pr_close("") - captured = capsys.readouterr() - assert captured.out == "]\n" - - def test_pr_close_special_characters(self, capsys: CaptureFixture[str]): - """Test pr_close with special characters""" - pr_close("✓") - captured = capsys.readouterr() - assert "✓" in captured.out - assert "]" in captured.out - - def test_pr_close_unicode(self, capsys: CaptureFixture[str]): - """Test pr_close with unicode characters""" - pr_close("完了") - captured = capsys.readouterr() - assert "完了" in captured.out - assert "]" in captured.out - - def test_pr_close_newline(self, capsys: CaptureFixture[str]): - """Test pr_close ends with newline""" - pr_close("OK") - captured = capsys.readouterr() - assert captured.out.endswith("\n") - - def test_pr_close_various_tags(self, capsys: CaptureFixture[str]): - """Test pr_close with various tags""" - tags = ["OK", "DONE", "✓", "✗", "SKIP", "PASS", "FAIL"] - for tag in tags: - pr_close(tag) - captured = capsys.readouterr() - assert tag in captured.out - assert "]" in captured.out - - -class TestPrAct: - """Test cases for pr_act function""" - - def test_pr_act_default(self, capsys: CaptureFixture[str]): - """Test pr_act with default dot""" - pr_act() - captured = capsys.readouterr() - assert captured.out == "." - assert not captured.out.endswith("\n") - - def test_pr_act_custom_character(self, capsys: CaptureFixture[str]): - """Test pr_act with custom character""" - pr_act("#") - captured = capsys.readouterr() - assert captured.out == "#" - - def test_pr_act_multiple_calls(self, capsys: CaptureFixture[str]): - """Test pr_act with multiple calls""" - pr_act(".") - pr_act(".") - pr_act(".") - captured = capsys.readouterr() - assert captured.out == "..." - - def test_pr_act_various_characters(self, capsys: CaptureFixture[str]): - """Test pr_act with various characters""" - characters = [".", "#", "*", "+", "-", "=", ">", "~"] - for char in characters: - pr_act(char) - captured = capsys.readouterr() - assert "".join(characters) in captured.out - - def test_pr_act_empty_string(self, capsys: CaptureFixture[str]): - """Test pr_act with empty string""" - pr_act("") - captured = capsys.readouterr() - assert captured.out == "" - - def test_pr_act_special_character(self, capsys: CaptureFixture[str]): - """Test pr_act with special characters""" - pr_act("✓") - captured = capsys.readouterr() - assert captured.out == "✓" - - def test_pr_act_unicode(self, capsys: CaptureFixture[str]): - """Test pr_act with unicode character""" - pr_act("●") - captured = capsys.readouterr() - assert captured.out == "●" - - def test_pr_act_no_newline(self, capsys: CaptureFixture[str]): - """Test pr_act doesn't add newline""" - pr_act("x") - captured = capsys.readouterr() - assert not captured.out.endswith("\n") - - def test_pr_act_multiple_characters(self, capsys: CaptureFixture[str]): - """Test pr_act with multiple characters in string""" - pr_act("...") - captured = capsys.readouterr() - assert captured.out == "..." - - def test_pr_act_whitespace(self, capsys: CaptureFixture[str]): - """Test pr_act with whitespace""" - pr_act(" ") - captured = capsys.readouterr() - assert captured.out == " " - - -class TestProgressCombinations: - """Test combinations of progress printer functions""" - - def test_complete_progress_flow(self, capsys: CaptureFixture[str]): - """Test complete progress output flow""" - pr_header("PROCESS") - pr_title("Task 1") - pr_open("Subtask") - pr_act(".") - pr_act(".") - pr_act(".") - pr_close(" OK") - captured = capsys.readouterr() - - assert "PROCESS" in captured.out - assert "Task 1" in captured.out - assert "Subtask" in captured.out - assert "..." in captured.out - assert " OK]" in captured.out - - def test_multiple_tasks_progress(self, capsys: CaptureFixture[str]): - """Test multiple tasks with progress""" - pr_header("BATCH PROCESS") - for i in range(3): - pr_open(f"Task {i + 1}") - for _ in range(5): - pr_act(".") - pr_close(" DONE") - captured = capsys.readouterr() - - assert "BATCH PROCESS" in captured.out - assert "Task 1" in captured.out - assert "Task 2" in captured.out - assert "Task 3" in captured.out - assert " DONE]" in captured.out - - def test_nested_progress(self, capsys: CaptureFixture[str]): - """Test nested progress indicators""" - pr_header("MAIN TASK", marker_string="=") - pr_title("Subtask A", prefix_string=">") - pr_open("Processing") - pr_act("#") - pr_act("#") - pr_close() - pr_title("Subtask B", prefix_string=">") - pr_open("Processing") - pr_act("*") - pr_act("*") - pr_close(" OK") - captured = capsys.readouterr() - - assert "MAIN TASK" in captured.out - assert "Subtask A" in captured.out - assert "Subtask B" in captured.out - assert "##" in captured.out - assert "**" in captured.out - - def test_progress_with_different_markers(self, capsys: CaptureFixture[str]): - """Test progress with different marker styles""" - pr_header("Process", marker_string="*") - pr_title("Step 1", prefix_string=">>", space_filler="-") - pr_open("Work", prefix_string=">>", space_filler="-") - pr_act("+") - pr_close(" ✓") - captured = capsys.readouterr() - - assert "*" in captured.out - assert ">>" in captured.out - assert "-" in captured.out - assert "+" in captured.out - assert "✓" in captured.out - - def test_empty_progress_sequence(self, capsys: CaptureFixture[str]): - """Test progress sequence with no actual progress""" - pr_open("Quick task") - pr_close(" SKIP") - captured = capsys.readouterr() - - assert "Quick task" in captured.out - assert " SKIP]" in captured.out - - -class TestIntegration: - """Integration tests combining multiple scenarios""" - - def test_file_and_console_logging(self, capsys: CaptureFixture[str]): - """Test logging to both file and console""" - fpl = io.StringIO() - - write_l("Starting process", fpl=fpl, print_line=True) - write_l("Processing item 1", fpl=fpl, print_line=True) - write_l("Processing item 2", fpl=fpl, print_line=True) - write_l("Complete", fpl=fpl, print_line=True) - - captured = capsys.readouterr() - file_content = fpl.getvalue() - - # Check console output - assert "Starting process\n" in captured.out - assert "Processing item 1\n" in captured.out - assert "Processing item 2\n" in captured.out - assert "Complete\n" in captured.out - - # Check file output - assert "Starting process\n" in file_content - assert "Processing item 1\n" in file_content - assert "Processing item 2\n" in file_content - assert "Complete\n" in file_content - - fpl.close() - - def test_progress_with_logging(self, capsys: CaptureFixture[str]): - """Test combining progress output with file logging""" - fpl = io.StringIO() - - write_l("=== Process Start ===", fpl=fpl, print_line=True) - pr_header("MAIN PROCESS") - write_l("Header shown", fpl=fpl, print_line=False) - - pr_open("Task 1") - pr_act(".") - pr_act(".") - pr_close(" OK") - write_l("Task 1 completed", fpl=fpl, print_line=False) - - write_l("=== Process End ===", fpl=fpl, print_line=True) - - captured = capsys.readouterr() - file_content = fpl.getvalue() - - assert "=== Process Start ===" in captured.out - assert "MAIN PROCESS" in captured.out - assert "Task 1" in captured.out - assert "=== Process End ===" in captured.out - - assert "=== Process Start ===\n" in file_content - assert "Header shown\n" in file_content - assert "Task 1 completed\n" in file_content - assert "=== Process End ===\n" in file_content - - fpl.close() - - def test_complex_workflow(self, capsys: CaptureFixture[str]): - """Test complex workflow with all functions""" - fpl = io.StringIO() - - write_l("Log: Starting batch process", fpl=fpl, print_line=False) - pr_header("BATCH PROCESSOR", marker_string="=", width=40) - - for i in range(2): - write_l(f"Log: Processing batch {i + 1}", fpl=fpl, print_line=False) - pr_title(f"Batch {i + 1}", prefix_string="|", space_filler=".") - - pr_open(f"Item {i + 1}", prefix_string="|", space_filler=".") - for j in range(3): - pr_act("*") - write_l(f"Log: Progress {j + 1}/3", fpl=fpl, print_line=False) - pr_close(" ✓") - - write_l(f"Log: Batch {i + 1} complete", fpl=fpl, print_line=False) - - write_l("Log: All batches complete", fpl=fpl, print_line=False) - - captured = capsys.readouterr() - file_content = fpl.getvalue() - - # Check console has progress indicators - assert "BATCH PROCESSOR" in captured.out - assert "Batch 1" in captured.out - assert "Batch 2" in captured.out - assert "***" in captured.out - assert "✓" in captured.out - - # Check file has all log entries - assert "Log: Starting batch process\n" in file_content - assert "Log: Processing batch 1\n" in file_content - assert "Log: Processing batch 2\n" in file_content - assert "Log: Progress 1/3\n" in file_content - assert "Log: Batch 1 complete\n" in file_content - assert "Log: All batches complete\n" in file_content - - fpl.close() - - -class TestEdgeCases: - """Test edge cases and boundary conditions""" - - def test_write_l_none_file_handler(self, capsys: CaptureFixture[str]): - """Test write_l explicitly with None file handler""" - write_l("Test", fpl=None, print_line=True) - captured = capsys.readouterr() - assert captured.out == "Test\n" - - def test_pr_header_negative_width(self): - """Test pr_header with negative width raises ValueError""" - with pytest.raises(ValueError): - pr_header("Test", width=-10) - - def test_pr_title_negative_width(self): - """Test pr_title with negative width raises ValueError""" - with pytest.raises(ValueError): - pr_title("Test", width=-10) - - def test_pr_open_negative_width(self): - """Test pr_open with negative width raises ValueError""" - with pytest.raises(ValueError): - pr_open("Test", width=-10) - - def test_multiple_pr_act_no_close(self, capsys: CaptureFixture[str]): - """Test multiple pr_act calls without pr_close""" - pr_act(".") - pr_act(".") - pr_act(".") - captured = capsys.readouterr() - assert captured.out == "..." - - def test_pr_close_without_pr_open(self, capsys: CaptureFixture[str]): - """Test pr_close without prior pr_open (should still work)""" - pr_close(" OK") - captured = capsys.readouterr() - assert " OK]" in captured.out - - def test_very_long_strings(self): - """Test with very long strings""" - fpl = io.StringIO() - long_str = "A" * 10000 - write_l(long_str, fpl=fpl, print_line=False) - assert len(fpl.getvalue()) == 10001 # string + newline - fpl.close() - - def test_pr_header_very_long_tag(self, capsys: CaptureFixture[str]): - """Test pr_header with tag longer than width""" - pr_header("This is a very long tag that exceeds the width", width=10) - captured = capsys.readouterr() - assert "This is a very long tag that exceeds the width" in captured.out - - def test_pr_title_very_long_tag(self, capsys: CaptureFixture[str]): - """Test pr_title with tag longer than width""" - pr_title("This is a very long tag that exceeds the width", width=10) - captured = capsys.readouterr() - assert "This is a very long tag that exceeds the width" in captured.out - - def test_write_l_closed_file(self): - """Test write_l with closed file should raise error""" - fpl = io.StringIO() - fpl.close() - - with pytest.raises(ValueError): - write_l("Test", fpl=fpl, print_line=False) - - -class TestParametrized: - """Parametrized tests for comprehensive coverage""" - - @pytest.mark.parametrize("print_line", [True, False]) - def test_write_l_print_line_variations(self, print_line: bool, capsys: CaptureFixture[str]): - """Test write_l with different print_line values""" - write_l("Test", print_line=print_line) - captured = capsys.readouterr() - if print_line: - assert captured.out == "Test\n" - else: - assert captured.out == "" - - @pytest.mark.parametrize("marker", ["#", "*", "=", "-", "+", "~", "@", "^"]) - def test_pr_header_various_markers_param(self, marker: str, capsys: CaptureFixture[str]): - """Test pr_header with various markers""" - pr_header("TEST", marker_string=marker) - captured = capsys.readouterr() - assert marker in captured.out - assert "TEST" in captured.out - - @pytest.mark.parametrize("width", [0, 5, 10, 20, 35, 50, 100]) - def test_pr_header_various_widths(self, width: int, capsys: CaptureFixture[str]): - """Test pr_header with various widths""" - pr_header("TEST", width=width) - captured = capsys.readouterr() - assert "TEST" in captured.out - - @pytest.mark.parametrize("filler", [".", "-", "_", "*", " ", "~", "="]) - def test_pr_title_various_fillers_param(self, filler: str, capsys: CaptureFixture[str]): - """Test pr_title with various space fillers""" - pr_title("Test", space_filler=filler) - captured = capsys.readouterr() - assert "Test" in captured.out - - @pytest.mark.parametrize("prefix", ["|", ">", ">>", "*", "-", "+"]) - def test_pr_title_various_prefixes(self, prefix: str, capsys: CaptureFixture[str]): - """Test pr_title with various prefix strings""" - pr_title("Test", prefix_string=prefix) - captured = capsys.readouterr() - assert prefix in captured.out - assert "Test" in captured.out - - @pytest.mark.parametrize("act_char", [".", "#", "*", "+", "-", "=", ">", "~", "✓", "●"]) - def test_pr_act_various_characters_param(self, act_char: str, capsys: CaptureFixture[str]): - """Test pr_act with various characters""" - pr_act(act_char) - captured = capsys.readouterr() - assert captured.out == act_char - - @pytest.mark.parametrize("close_tag", ["", " OK", " DONE", " ✓", " ✗", " SKIP", " PASS"]) - def test_pr_close_various_tags_param(self, close_tag: str, capsys: CaptureFixture[str]): - """Test pr_close with various tags""" - pr_close(close_tag) - captured = capsys.readouterr() - assert f"{close_tag}]" in captured.out - - @pytest.mark.parametrize("content", [ - "Simple text", - "Text with 特殊文字", - "Text with emoji 🎉", - "Text\twith\ttabs", - "Multiple\n\nNewlines", - "", - "A" * 100, - ]) - def test_write_l_various_content(self, content: str, capsys: CaptureFixture[str]): - """Test write_l with various content types""" - fpl = io.StringIO() - write_l(content, fpl=fpl, print_line=True) - captured = capsys.readouterr() - assert content in captured.out - assert content + "\n" in fpl.getvalue() - fpl.close() - - -class TestRealWorldScenarios: - """Test real-world usage scenarios""" - - def test_batch_processing_output(self, capsys: CaptureFixture[str]): - """Test typical batch processing output""" - pr_header("BATCH PROCESSOR", marker_string="=", width=50) - - items = ["file1.txt", "file2.txt", "file3.txt"] - for item in items: - pr_open(f"Processing {item}") - for _ in range(10): - pr_act(".") - pr_close(" ✓") - - captured = capsys.readouterr() - assert "BATCH PROCESSOR" in captured.out - for item in items: - assert item in captured.out - assert "✓" in captured.out - - def test_logging_workflow(self, capsys: CaptureFixture[str]): - """Test typical logging workflow""" - log_file = io.StringIO() - - # Simulate a workflow with logging - write_l("[INFO] Starting process", fpl=log_file, print_line=True) - write_l("[INFO] Initializing components", fpl=log_file, print_line=True) - write_l("[DEBUG] Component A loaded", fpl=log_file, print_line=False) - write_l("[DEBUG] Component B loaded", fpl=log_file, print_line=False) - write_l("[INFO] Processing data", fpl=log_file, print_line=True) - write_l("[INFO] Process complete", fpl=log_file, print_line=True) - - captured = capsys.readouterr() - log_content = log_file.getvalue() - - # Console should only have INFO messages - assert "[INFO] Starting process" in captured.out - assert "[DEBUG] Component A loaded" not in captured.out - - # Log file should have all messages - assert "[INFO] Starting process\n" in log_content - assert "[DEBUG] Component A loaded\n" in log_content - assert "[DEBUG] Component B loaded\n" in log_content - - log_file.close() - - def test_progress_indicator_for_long_task(self, capsys: CaptureFixture[str]): - """Test progress indicator for a long-running task""" - pr_header("DATA PROCESSING") - pr_open("Loading data", width=50) - - # Simulate progress - for i in range(20): - if i % 5 == 0: - pr_act(str(i // 5)) - else: - pr_act(".") - - pr_close(" COMPLETE") - - captured = capsys.readouterr() - assert "DATA PROCESSING" in captured.out - assert "Loading data" in captured.out - assert "COMPLETE" in captured.out - - def test_multi_stage_process(self, capsys: CaptureFixture[str]): - """Test multi-stage process with titles and progress""" - pr_header("DEPLOYMENT PIPELINE", marker_string="=") - - stages = ["Build", "Test", "Deploy"] - for stage in stages: - pr_title(stage) - pr_open(f"Running {stage.lower()}") - pr_act("#") - pr_act("#") - pr_act("#") - pr_close(" OK") - - captured = capsys.readouterr() - assert "DEPLOYMENT PIPELINE" in captured.out - for stage in stages: - assert stage in captured.out - assert "###" in captured.out - - def test_error_reporting_with_logging(self, capsys: CaptureFixture[str]): - """Test error reporting workflow""" - error_log = io.StringIO() - - pr_header("VALIDATION", marker_string="!") - pr_open("Checking files") - - write_l("[ERROR] File not found: data.csv", fpl=error_log, print_line=False) - pr_act("✗") - - write_l("[ERROR] Permission denied: output.txt", fpl=error_log, print_line=False) - pr_act("✗") - - pr_close(" FAILED") - - captured = capsys.readouterr() - log_content = error_log.getvalue() - - assert "VALIDATION" in captured.out - assert "Checking files" in captured.out - assert "✗✗" in captured.out - assert "FAILED" in captured.out - - assert "[ERROR] File not found: data.csv\n" in log_content - assert "[ERROR] Permission denied: output.txt\n" in log_content - - error_log.close() - - def test_detailed_reporting(self, capsys: CaptureFixture[str]): - """Test detailed reporting with mixed output""" - report_file = io.StringIO() - - pr_header("SYSTEM REPORT", marker_string="#", width=60) - write_l("=== System Report Generated ===", fpl=report_file, print_line=False) - - pr_title("Database Status", prefix_string=">>") - write_l("Database: Connected", fpl=report_file, print_line=False) - write_l("Tables: 15", fpl=report_file, print_line=False) - write_l("Records: 1,234,567", fpl=report_file, print_line=False) - - pr_title("API Status", prefix_string=">>") - write_l("API: Online", fpl=report_file, print_line=False) - write_l("Requests/min: 1,500", fpl=report_file, print_line=False) - - write_l("=== Report Complete ===", fpl=report_file, print_line=False) - - captured = capsys.readouterr() - report_content = report_file.getvalue() - - assert "SYSTEM REPORT" in captured.out - assert "Database Status" in captured.out - assert "API Status" in captured.out - - assert "=== System Report Generated ===\n" in report_content - assert "Database: Connected\n" in report_content - assert "API: Online\n" in report_content - assert "=== Report Complete ===\n" in report_content - - report_file.close() - -# __END__