diff --git a/src/corelibs/debug_handling/writeline.py b/src/corelibs/debug_handling/writeline.py index cb2c6e2..ff031f2 100644 --- a/src/corelibs/debug_handling/writeline.py +++ b/src/corelibs/debug_handling/writeline.py @@ -4,10 +4,10 @@ Various small helpers for data writing from typing import TYPE_CHECKING if TYPE_CHECKING: - from io import TextIOWrapper + from io import TextIOWrapper, StringIO -def write_l(line: str, fpl: 'TextIOWrapper | None' = None, print_line: bool = False): +def write_l(line: str, fpl: 'TextIOWrapper | StringIO | None' = None, print_line: bool = False): """ Write a line to screen and to output file diff --git a/tests/unit/debug_handling/test_debug_helpers.py b/tests/unit/debug_handling/test_debug_helpers.py new file mode 100644 index 0000000..9e06d56 --- /dev/null +++ b/tests/unit/debug_handling/test_debug_helpers.py @@ -0,0 +1,639 @@ +""" +Unit tests for debug_handling.debug_helpers module +""" + +import sys +import pytest + +from corelibs.debug_handling.debug_helpers import ( + call_stack, + exception_stack, + OptExcInfo +) + + +class TestCallStack: + """Test cases for call_stack function""" + + def test_call_stack_basic(self): + """Test basic call_stack functionality""" + result = call_stack() + assert isinstance(result, str) + assert "test_debug_helpers.py" in result + assert "test_call_stack_basic" in result + + def test_call_stack_with_default_separator(self): + """Test call_stack with default separator""" + result = call_stack() + assert " -> " in result + + def test_call_stack_with_custom_separator(self): + """Test call_stack with custom separator""" + result = call_stack(separator=" | ") + assert " | " in result + assert " -> " not in result + + def test_call_stack_with_empty_separator(self): + """Test call_stack with empty separator (should default to ' -> ')""" + result = call_stack(separator="") + assert " -> " in result + + def test_call_stack_format(self): + """Test call_stack output format (filename:function:lineno)""" + result = call_stack() + parts = result.split(" -> ") + for part in parts: + # Each part should have format: filename:function:lineno + assert part.count(":") >= 2 + # Most parts should contain .py but some system frames might not + # Just check that we have some .py files in the trace + assert ".py" in result or "test_debug_helpers" in result + + def test_call_stack_with_start_offset(self): + """Test call_stack with start offset""" + result_no_offset = call_stack(start=0) + result_with_offset = call_stack(start=2) + + # With offset, we should get fewer frames + parts_no_offset = result_no_offset.split(" -> ") + parts_with_offset = result_with_offset.split(" -> ") + + assert len(parts_with_offset) <= len(parts_no_offset) + + def test_call_stack_with_skip_last(self): + """Test call_stack with skip_last parameter""" + result_skip_default = call_stack(skip_last=-1) + result_skip_more = call_stack(skip_last=-3) + + # Skipping more should result in fewer frames + parts_default = result_skip_default.split(" -> ") + parts_more = result_skip_more.split(" -> ") + + assert len(parts_more) <= len(parts_default) + + def test_call_stack_skip_last_positive_converts_to_negative(self): + """Test that positive skip_last is converted to negative""" + # Both should produce same result + result_negative = call_stack(skip_last=-2) + result_positive = call_stack(skip_last=2) + + assert result_negative == result_positive + + def test_call_stack_nested_calls(self): + """Test call_stack in nested function calls""" + def level_one(): + return level_two() + + def level_two(): + return level_three() + + def level_three(): + return call_stack() + + result = level_one() + assert "level_one" in result + assert "level_two" in result + assert "level_three" in result + + def test_call_stack_reset_start_if_empty_false(self): + """Test call_stack with high start value and reset_start_if_empty=False""" + # Using a very high start value should result in empty stack + result = call_stack(start=1000, reset_start_if_empty=False) + assert result == "" + + def test_call_stack_reset_start_if_empty_true(self): + """Test call_stack with high start value and reset_start_if_empty=True""" + # Using a very high start value with reset should give non-empty result + result = call_stack(start=1000, reset_start_if_empty=True) + assert result != "" + assert "test_debug_helpers.py" in result + + def test_call_stack_contains_line_numbers(self): + """Test that call_stack includes line numbers""" + result = call_stack() + # Extract parts and check for numbers + parts = result.split(" -> ") + for part in parts: + # Line numbers should be present (digits at the end) + assert any(char.isdigit() for char in part) + + def test_call_stack_separator_none(self): + """Test call_stack with None separator""" + result = call_stack(separator="") # Use empty string instead of None + # Empty string should be converted to default ' -> ' + assert " -> " in result + + def test_call_stack_multiple_separators(self): + """Test call_stack with various custom separators""" + separators = [" | ", " >> ", " => ", " / ", "\n"] + + for sep in separators: + result = call_stack(separator=sep) + assert sep in result or result == "" # May be empty based on stack depth + + +class TestExceptionStack: + """Test cases for exception_stack function""" + + def test_exception_stack_with_active_exception(self): + """Test exception_stack when an exception is active""" + try: + raise ValueError("Test exception") + except ValueError: + result = exception_stack() + assert isinstance(result, str) + assert "test_debug_helpers.py" in result + assert "test_exception_stack_with_active_exception" in result + + def test_exception_stack_format(self): + """Test exception_stack output format""" + try: + raise RuntimeError("Test error") + except RuntimeError: + result = exception_stack() + parts = result.split(" -> ") + for part in parts: + # Each part should have format: filename:function:lineno + assert part.count(":") >= 2 + + def test_exception_stack_with_custom_separator(self): + """Test exception_stack with custom separator""" + def nested_call(): + def inner_call(): + raise TypeError("Test type error") + inner_call() + + try: + nested_call() + except TypeError: + result = exception_stack(separator=" | ") + # Only check separator if there are multiple frames + if " | " in result or result.count(":") == 2: + # Single frame or has separator + assert isinstance(result, str) + assert " -> " not in result + + def test_exception_stack_with_empty_separator(self): + """Test exception_stack with empty separator (should default to ' -> ')""" + def nested_call(): + def inner_call(): + raise KeyError("Test key error") + inner_call() + + try: + nested_call() + except KeyError: + result = exception_stack(separator="") + # Should use default separator if multiple frames exist + assert isinstance(result, str) + + def test_exception_stack_separator_none(self): + """Test exception_stack with empty separator""" + def nested_call(): + def inner_call(): + raise IndexError("Test index error") + inner_call() + + try: + nested_call() + except IndexError: + result = exception_stack(separator="") # Use empty string instead of None + assert isinstance(result, str) + + def test_exception_stack_nested_exceptions(self): + """Test exception_stack with nested function calls""" + def level_one(): + level_two() + + def level_two(): + level_three() + + def level_three(): + raise ValueError("Nested exception") + + try: + level_one() + except ValueError: + result = exception_stack() + # Should contain all levels in the stack + assert "level_one" in result or "level_two" in result or "level_three" in result + + def test_exception_stack_with_provided_exc_info(self): + """Test exception_stack with explicitly provided exc_info""" + try: + raise AttributeError("Test attribute error") + except AttributeError: + exc_info = sys.exc_info() + result = exception_stack(exc_stack=exc_info) + assert isinstance(result, str) + assert len(result) > 0 + + def test_exception_stack_no_active_exception(self): + """Test exception_stack when no exception is active""" + # This should handle the case gracefully + # When no exception is active, sys.exc_info() returns (None, None, None) + result = exception_stack() + # With no traceback, should return empty string or handle gracefully + assert isinstance(result, str) + + def test_exception_stack_contains_line_numbers(self): + """Test that exception_stack includes line numbers""" + try: + raise OSError("Test OS error") + except OSError: + result = exception_stack() + if result: # May be empty + parts = result.split(" -> ") + for part in parts: + # Line numbers should be present + assert any(char.isdigit() for char in part) + + def test_exception_stack_multiple_exceptions(self): + """Test exception_stack captures the current exception only""" + first_result = None + second_result = None + + try: + raise ValueError("First exception") + except ValueError: + first_result = exception_stack() + + try: + raise TypeError("Second exception") + except TypeError: + second_result = exception_stack() + + # Both should be valid but may differ + assert isinstance(first_result, str) + assert isinstance(second_result, str) + + def test_exception_stack_with_multiple_separators(self): + """Test exception_stack with various custom separators""" + separators = [" | ", " >> ", " => ", " / ", "\n"] + + def nested_call(): + def inner_call(): + raise ValueError("Test exception") + inner_call() + + for sep in separators: + try: + nested_call() + except ValueError: + result = exception_stack(separator=sep) + assert isinstance(result, str) + # Separator only appears if there are multiple frames + + +class TestOptExcInfo: + """Test cases for OptExcInfo type definition""" + + def test_opt_exc_info_type_none_tuple(self): + """Test OptExcInfo can be None tuple""" + exc_info: OptExcInfo = (None, None, None) + assert exc_info == (None, None, None) + + def test_opt_exc_info_type_exception_tuple(self): + """Test OptExcInfo can be exception tuple""" + try: + raise ValueError("Test") + except ValueError: + exc_info: OptExcInfo = sys.exc_info() + assert exc_info[0] is not None + assert exc_info[1] is not None + assert exc_info[2] is not None + + def test_opt_exc_info_with_exception_stack(self): + """Test that OptExcInfo works with exception_stack function""" + try: + raise RuntimeError("Test runtime error") + except RuntimeError: + exc_info = sys.exc_info() + result = exception_stack(exc_stack=exc_info) + assert isinstance(result, str) + + +class TestIntegration: + """Integration tests combining multiple scenarios""" + + def test_call_stack_and_exception_stack_together(self): + """Test using both call_stack and exception_stack in error handling""" + def faulty_function(): + _ = call_stack() # Get call stack before exception + raise ValueError("Intentional error") + + try: + faulty_function() + except ValueError: + exception_trace = exception_stack() + + assert isinstance(exception_trace, str) + assert "faulty_function" in exception_trace or "test_debug_helpers.py" in exception_trace + + def test_nested_exception_with_call_stack(self): + """Test call_stack within exception handling""" + def outer(): + return inner() + + def inner(): + try: + raise RuntimeError("Inner error") + except RuntimeError: + return { + 'call_stack': call_stack(), + 'exception_stack': exception_stack() + } + + result = outer() + assert 'call_stack' in result + assert 'exception_stack' in result + assert isinstance(result['call_stack'], str) + assert isinstance(result['exception_stack'], str) + + def test_multiple_nested_levels(self): + """Test with multiple nested function levels""" + def level_a(): + return level_b() + + def level_b(): + return level_c() + + def level_c(): + return level_d() + + def level_d(): + try: + raise ValueError("Deep error") + except ValueError: + return { + 'call': call_stack(), + 'exception': exception_stack() + } + + result = level_a() + # Should contain information about the call chain + assert result['call'] + assert result['exception'] + + def test_different_separators_consistency(self): + """Test that different separators work consistently""" + separators = [" -> ", " | ", " / ", " >> "] + + def nested_call(): + def inner_call(): + raise ValueError("Test") + inner_call() + + for sep in separators: + try: + nested_call() + except ValueError: + exc_result = exception_stack(separator=sep) + call_result = call_stack(separator=sep) + + assert isinstance(exc_result, str) + assert isinstance(call_result, str) + # Both should be valid strings (separator check only if multiple frames) + + +class TestEdgeCases: + """Test edge cases and boundary conditions""" + + def test_call_stack_with_zero_start(self): + """Test call_stack with start=0 (should include all frames)""" + result = call_stack(start=0) + assert isinstance(result, str) + assert len(result) > 0 + + def test_call_stack_with_large_skip_last(self): + """Test call_stack with very large skip_last value""" + result = call_stack(skip_last=-100) + # Should handle gracefully, may be empty + assert isinstance(result, str) + + def test_exception_stack_none_exc_info(self): + """Test exception_stack with None as exc_stack""" + result = exception_stack(exc_stack=None) + assert isinstance(result, str) + + def test_exception_stack_empty_tuple(self): + """Test exception_stack with empty exception info""" + exc_info: OptExcInfo = (None, None, None) + result = exception_stack(exc_stack=exc_info) + assert isinstance(result, str) + + def test_call_stack_special_characters_in_separator(self): + """Test call_stack with special characters in separator""" + special_separators = ["\n", "\t", "->", "||", "//"] + + for sep in special_separators: + result = call_stack(separator=sep) + assert isinstance(result, str) + + def test_very_deep_call_stack(self): + """Test call_stack with very deep recursion (up to a limit)""" + def recursive_call(depth: int, max_depth: int = 5) -> str: + if depth >= max_depth: + return call_stack() + return recursive_call(depth + 1, max_depth) + + result = recursive_call(0) + assert isinstance(result, str) + # Should contain multiple recursive_call entries + assert result.count("recursive_call") > 0 + + def test_exception_stack_different_exception_types(self): + """Test exception_stack with various exception types""" + exception_types = [ + ValueError("value"), + TypeError("type"), + KeyError("key"), + IndexError("index"), + AttributeError("attr"), + RuntimeError("runtime"), + ] + + for exc in exception_types: + try: + raise exc + except (ValueError, TypeError, KeyError, IndexError, AttributeError, RuntimeError): + result = exception_stack() + assert isinstance(result, str) + + +class TestRealWorldScenarios: + """Test real-world debugging scenarios""" + + def test_debugging_workflow(self): + """Test typical debugging workflow with both functions""" + def process_data(data: str) -> str: + _ = call_stack() # Capture call stack for debugging + if not data: + raise ValueError("No data provided") + return data.upper() + + # Success case + result = process_data("test") + assert result == "TEST" + + # Error case + try: + process_data("") + except ValueError: + exc_trace = exception_stack() + assert isinstance(exc_trace, str) + + def test_logging_context(self): + """Test using call_stack for logging context""" + def get_logging_context(): + return { + 'timestamp': 'now', + 'stack': call_stack(start=1, separator=" > "), + 'function': 'get_logging_context' + } + + context = get_logging_context() + assert 'stack' in context + assert 'timestamp' in context + assert isinstance(context['stack'], str) + + def test_error_reporting(self): + """Test comprehensive error reporting""" + def dangerous_operation() -> dict[str, str]: + try: + # Simulate some operation + _ = 1 / 0 + except ZeroDivisionError: + return { + 'error': 'Division by zero', + 'call_stack': call_stack(), + 'exception_stack': exception_stack(), + } + return {} # Fallback return + + error_report = dangerous_operation() + assert error_report is not None + assert 'error' in error_report + assert 'call_stack' in error_report + assert 'exception_stack' in error_report + assert error_report['error'] == 'Division by zero' + + def test_function_tracing(self): + """Test function call tracing""" + traces: list[str] = [] + + def traced_function_a() -> str: + traces.append(call_stack()) + return traced_function_b() + + def traced_function_b() -> str: + traces.append(call_stack()) + return traced_function_c() + + def traced_function_c() -> str: + traces.append(call_stack()) + return "done" + + result = traced_function_a() + assert result == "done" + assert len(traces) == 3 + # Each trace should be different (different call depths) + assert all(isinstance(t, str) for t in traces) + + def test_exception_chain_tracking(self): + """Test tracking exception chains""" + exception_traces: list[str] = [] + + def operation_one() -> None: + try: + operation_two() + except ValueError: + exception_traces.append(exception_stack()) + raise + + def operation_two() -> None: + try: + operation_three() + except TypeError as exc: + exception_traces.append(exception_stack()) + raise ValueError("Wrapped error") from exc + + def operation_three() -> None: + raise TypeError("Original error") + + try: + operation_one() + except ValueError: + exception_traces.append(exception_stack()) + + # Should have captured multiple exception stacks + assert len(exception_traces) > 0 + assert all(isinstance(t, str) for t in exception_traces) + + +class TestParametrized: + """Parametrized tests for comprehensive coverage""" + + @pytest.mark.parametrize("start", [0, 1, 2, 5, 10]) + def test_call_stack_various_starts(self, start: int) -> None: + """Test call_stack with various start values""" + result = call_stack(start=start) + assert isinstance(result, str) + + @pytest.mark.parametrize("skip_last", [-1, -2, -3, -5, 1, 2, 3, 5]) + def test_call_stack_various_skip_lasts(self, skip_last: int) -> None: + """Test call_stack with various skip_last values""" + result = call_stack(skip_last=skip_last) + assert isinstance(result, str) + + @pytest.mark.parametrize("separator", [" -> ", " | ", " / ", " >> ", " => ", "\n", "\t"]) + def test_call_stack_various_separators(self, separator: str) -> None: + """Test call_stack with various separators""" + result = call_stack(separator=separator) + assert isinstance(result, str) + if result: + assert separator in result + + @pytest.mark.parametrize("reset_start", [True, False]) + def test_call_stack_reset_start_variations(self, reset_start: bool) -> None: + """Test call_stack with reset_start_if_empty variations""" + result = call_stack(start=100, reset_start_if_empty=reset_start) + assert isinstance(result, str) + if reset_start: + assert len(result) > 0 # Should have content after reset + else: + assert len(result) == 0 # Should be empty + + @pytest.mark.parametrize("separator", [" -> ", " | ", " / ", " >> ", "\n"]) + def test_exception_stack_various_separators(self, separator: str) -> None: + """Test exception_stack with various separators""" + def nested_call(): + def inner_call(): + raise ValueError("Test") + inner_call() + + try: + nested_call() + except ValueError: + result = exception_stack(separator=separator) + assert isinstance(result, str) + # Check that result is valid (separator only if multiple frames exist) + + @pytest.mark.parametrize("exception_type", [ + ValueError, + TypeError, + KeyError, + IndexError, + AttributeError, + RuntimeError, + OSError, + ]) + def test_exception_stack_various_exception_types(self, exception_type: type[Exception]) -> None: + """Test exception_stack with various exception types""" + try: + raise exception_type("Test exception") + except (ValueError, TypeError, KeyError, IndexError, AttributeError, RuntimeError, OSError): + result = exception_stack() + assert isinstance(result, str) + +# __END__ diff --git a/tests/unit/debug_handling/test_dump_data.py b/tests/unit/debug_handling/test_dump_data.py new file mode 100644 index 0000000..9168005 --- /dev/null +++ b/tests/unit/debug_handling/test_dump_data.py @@ -0,0 +1,288 @@ +""" +Unit tests for debug_handling.dump_data module +""" + +import json +from datetime import datetime, date +from decimal import Decimal +from typing import Any + +import pytest + +from corelibs.debug_handling.dump_data import dump_data + + +class TestDumpData: + """Test cases for dump_data function""" + + def test_dump_simple_dict(self): + """Test dumping a simple dictionary""" + data = {"name": "John", "age": 30} + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed == data + + def test_dump_simple_list(self): + """Test dumping a simple list""" + data = [1, 2, 3, 4, 5] + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed == data + + def test_dump_nested_dict(self): + """Test dumping a nested dictionary""" + data = { + "user": { + "name": "Alice", + "address": { + "city": "Tokyo", + "country": "Japan" + } + } + } + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed == data + + def test_dump_mixed_types(self): + """Test dumping data with mixed types""" + data = { + "string": "test", + "number": 42, + "float": 3.14, + "boolean": True, + "null": None, + "list": [1, 2, 3] + } + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed == data + + def test_dump_with_indent_default(self): + """Test that indent is applied by default""" + data = {"a": 1, "b": 2} + result = dump_data(data) + + # With indent, result should contain newlines + assert "\n" in result + assert " " in result # 4 spaces for indent + + def test_dump_with_indent_true(self): + """Test explicit indent=True""" + data = {"a": 1, "b": 2} + result = dump_data(data, use_indent=True) + + # With indent, result should contain newlines + assert "\n" in result + assert " " in result # 4 spaces for indent + + def test_dump_without_indent(self): + """Test dumping without indentation""" + data = {"a": 1, "b": 2} + result = dump_data(data, use_indent=False) + + # Without indent, result should be compact + assert "\n" not in result + assert result == '{"a": 1, "b": 2}' + + def test_dump_unicode_characters(self): + """Test that unicode characters are preserved (ensure_ascii=False)""" + data = {"message": "こんにちは", "emoji": "😀", "german": "Müller"} + result = dump_data(data) + + # Unicode characters should be preserved, not escaped + assert "こんにちは" in result + assert "😀" in result + assert "Müller" in result + + parsed = json.loads(result) + assert parsed == data + + def test_dump_datetime_object(self): + """Test dumping data with datetime objects (using default=str)""" + now = datetime(2023, 10, 15, 14, 30, 0) + data = {"timestamp": now} + result = dump_data(data) + + assert isinstance(result, str) + # datetime should be converted to string + assert "2023-10-15" in result + + def test_dump_date_object(self): + """Test dumping data with date objects""" + today = date(2023, 10, 15) + data = {"date": today} + result = dump_data(data) + + assert isinstance(result, str) + assert "2023-10-15" in result + + def test_dump_decimal_object(self): + """Test dumping data with Decimal objects""" + data = {"amount": Decimal("123.45")} + result = dump_data(data) + + assert isinstance(result, str) + assert "123.45" in result + + def test_dump_empty_dict(self): + """Test dumping an empty dictionary""" + data = {} + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed == {} + + def test_dump_empty_list(self): + """Test dumping an empty list""" + data = [] + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed == [] + + def test_dump_string_directly(self): + """Test dumping a string directly""" + data = "Hello, World!" + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed == data + + def test_dump_number_directly(self): + """Test dumping a number directly""" + data = 42 + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed == data + + def test_dump_boolean_directly(self): + """Test dumping a boolean directly""" + data = True + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed is True + + def test_dump_none_directly(self): + """Test dumping None directly""" + data = None + result = dump_data(data) + + assert isinstance(result, str) + assert result == "null" + parsed = json.loads(result) + assert parsed is None + + def test_dump_complex_nested_structure(self): + """Test dumping a complex nested structure""" + data = { + "users": [ + { + "id": 1, + "name": "Alice", + "tags": ["admin", "user"], + "metadata": { + "created": datetime(2023, 1, 1), + "active": True + } + }, + { + "id": 2, + "name": "Bob", + "tags": ["user"], + "metadata": { + "created": datetime(2023, 6, 15), + "active": False + } + } + ], + "total": 2 + } + result = dump_data(data) + + assert isinstance(result, str) + # Check that it's valid JSON + parsed = json.loads(result) + assert len(parsed["users"]) == 2 + assert parsed["total"] == 2 + + def test_dump_special_characters(self): + """Test dumping data with special characters""" + data = { + "quote": 'He said "Hello"', + "backslash": "path\\to\\file", + "newline": "line1\nline2", + "tab": "col1\tcol2" + } + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed == data + + def test_dump_large_numbers(self): + """Test dumping large numbers""" + data = { + "big_int": 123456789012345678901234567890, + "big_float": 1.23456789e100 + } + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed["big_int"] == data["big_int"] + + def test_dump_list_of_dicts(self): + """Test dumping a list of dictionaries""" + data = [ + {"id": 1, "name": "Item 1"}, + {"id": 2, "name": "Item 2"}, + {"id": 3, "name": "Item 3"} + ] + result = dump_data(data) + + assert isinstance(result, str) + parsed = json.loads(result) + assert parsed == data + assert len(parsed) == 3 + + +class CustomObject: + """Custom class for testing default=str conversion""" + def __init__(self, value: Any): + self.value = value + + def __str__(self): + return f"CustomObject({self.value})" + + +class TestDumpDataWithCustomObjects: + """Test cases for dump_data with custom objects""" + + def test_dump_custom_object(self): + """Test that custom objects are converted using str()""" + obj = CustomObject("test") + data = {"custom": obj} + result = dump_data(data) + + assert isinstance(result, str) + assert "CustomObject(test)" in result + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/unit/debug_handling/test_profiling.py b/tests/unit/debug_handling/test_profiling.py new file mode 100644 index 0000000..4f19928 --- /dev/null +++ b/tests/unit/debug_handling/test_profiling.py @@ -0,0 +1,560 @@ +""" +Unit tests for corelibs.debug_handling.profiling module +""" + +import time +import tracemalloc + +from corelibs.debug_handling.profiling import display_top, Profiling + + +class TestDisplayTop: + """Test display_top function""" + + def test_display_top_basic(self): + """Test that display_top returns a string with basic stats""" + tracemalloc.start() + + # Allocate some memory + data = [0] * 10000 + + snapshot = tracemalloc.take_snapshot() + tracemalloc.stop() + + result = display_top(snapshot) + + assert isinstance(result, str) + assert "Top 10 lines" in result + assert "KiB" in result + assert "Total allocated size:" in result + + # Clean up + del data + + def test_display_top_with_custom_limit(self): + """Test display_top with custom limit parameter""" + tracemalloc.start() + + # Allocate some memory + data = [0] * 10000 + + snapshot = tracemalloc.take_snapshot() + tracemalloc.stop() + + result = display_top(snapshot, limit=5) + + assert isinstance(result, str) + assert "Top 5 lines" in result + + # Clean up + del data + + def test_display_top_with_different_key_type(self): + """Test display_top with different key_type parameter""" + tracemalloc.start() + + # Allocate some memory + data = [0] * 10000 + + snapshot = tracemalloc.take_snapshot() + tracemalloc.stop() + + result = display_top(snapshot, key_type='filename') + + assert isinstance(result, str) + assert "Top 10 lines" in result + + # Clean up + del data + + def test_display_top_filters_traces(self): + """Test that display_top filters out bootstrap and unknown traces""" + tracemalloc.start() + + # Allocate some memory + data = [0] * 10000 + + snapshot = tracemalloc.take_snapshot() + tracemalloc.stop() + + result = display_top(snapshot) + + # Should not contain filtered traces + assert "" not in result + assert "" not in result + + # Clean up + del data + + def test_display_top_with_limit_larger_than_stats(self): + """Test display_top when limit is larger than available stats""" + tracemalloc.start() + + # Allocate some memory + data = [0] * 100 + + snapshot = tracemalloc.take_snapshot() + tracemalloc.stop() + + result = display_top(snapshot, limit=1000) + + assert isinstance(result, str) + assert "Top 1000 lines" in result + assert "Total allocated size:" in result + + # Clean up + del data + + def test_display_top_empty_snapshot(self): + """Test display_top with a snapshot that has minimal traces""" + tracemalloc.start() + snapshot = tracemalloc.take_snapshot() + tracemalloc.stop() + + result = display_top(snapshot, limit=1) + + assert isinstance(result, str) + assert "Top 1 lines" in result + + +class TestProfilingInitialization: + """Test Profiling class initialization""" + + def test_profiling_initialization(self): + """Test that Profiling initializes correctly""" + profiler = Profiling() + + # Should be able to create instance + assert isinstance(profiler, Profiling) + + def test_profiling_initial_state(self): + """Test that Profiling starts in a clean state""" + profiler = Profiling() + + # Should not raise an error when calling end_profiling + # even though start_profiling wasn't called + profiler.end_profiling() + + result = profiler.print_profiling() + assert isinstance(result, str) + + +class TestProfilingStartEnd: + """Test start_profiling and end_profiling functionality""" + + def test_start_profiling(self): + """Test that start_profiling can be called""" + profiler = Profiling() + + # Should not raise an error + profiler.start_profiling("test_operation") + + def test_end_profiling(self): + """Test that end_profiling can be called""" + profiler = Profiling() + profiler.start_profiling("test_operation") + + # Should not raise an error + profiler.end_profiling() + + def test_start_profiling_with_different_idents(self): + """Test start_profiling with different identifier strings""" + profiler = Profiling() + + identifiers = ["short", "longer_identifier", "very_long_identifier_with_many_chars"] + + for ident in identifiers: + profiler.start_profiling(ident) + profiler.end_profiling() + result = profiler.print_profiling() + + assert ident in result + + def test_end_profiling_without_start(self): + """Test that end_profiling can be called without start_profiling""" + profiler = Profiling() + + # Should not raise an error but internal state should indicate warning + profiler.end_profiling() + + result = profiler.print_profiling() + assert isinstance(result, str) + + def test_profiling_measures_time(self): + """Test that profiling measures elapsed time""" + profiler = Profiling() + profiler.start_profiling("time_test") + + sleep_duration = 0.05 # 50ms + time.sleep(sleep_duration) + + profiler.end_profiling() + result = profiler.print_profiling() + + assert isinstance(result, str) + assert "time:" in result + # Should have some time measurement + assert "ms" in result or "s" in result + + def test_profiling_measures_memory(self): + """Test that profiling measures memory usage""" + profiler = Profiling() + profiler.start_profiling("memory_test") + + # Allocate some memory + data = [0] * 100000 + + profiler.end_profiling() + result = profiler.print_profiling() + + assert isinstance(result, str) + assert "RSS:" in result + assert "VMS:" in result + assert "time:" in result + + # Clean up + del data + + +class TestProfilingPrintProfiling: + """Test print_profiling functionality""" + + def test_print_profiling_returns_string(self): + """Test that print_profiling returns a string""" + profiler = Profiling() + profiler.start_profiling("test") + profiler.end_profiling() + + result = profiler.print_profiling() + + assert isinstance(result, str) + + def test_print_profiling_contains_identifier(self): + """Test that print_profiling includes the identifier""" + profiler = Profiling() + identifier = "my_test_operation" + + profiler.start_profiling(identifier) + profiler.end_profiling() + + result = profiler.print_profiling() + + assert identifier in result + + def test_print_profiling_format(self): + """Test that print_profiling has expected format""" + profiler = Profiling() + profiler.start_profiling("test") + profiler.end_profiling() + + result = profiler.print_profiling() + + # Check for expected components + assert "Profiling:" in result + assert "RSS:" in result + assert "VMS:" in result + assert "time:" in result + + def test_print_profiling_multiple_calls(self): + """Test that print_profiling can be called multiple times""" + profiler = Profiling() + profiler.start_profiling("test") + profiler.end_profiling() + + result1 = profiler.print_profiling() + result2 = profiler.print_profiling() + + # Should return the same result + assert result1 == result2 + + def test_print_profiling_time_formats(self): + """Test different time format outputs""" + profiler = Profiling() + + # Very short duration (milliseconds) + profiler.start_profiling("ms_test") + time.sleep(0.001) + profiler.end_profiling() + result = profiler.print_profiling() + assert "ms" in result + + # Slightly longer duration (seconds) + profiler.start_profiling("s_test") + time.sleep(0.1) + profiler.end_profiling() + result = profiler.print_profiling() + # Could be ms or s depending on timing + assert ("ms" in result or "s" in result) + + def test_print_profiling_memory_formats(self): + """Test different memory format outputs""" + profiler = Profiling() + profiler.start_profiling("memory_format_test") + + # Allocate some memory + data = [0] * 50000 + + profiler.end_profiling() + result = profiler.print_profiling() + + # Should have some memory unit (B, kB, MB, GB) + assert any(unit in result for unit in ["B", "kB", "MB", "GB"]) + + # Clean up + del data + + +class TestProfilingIntegration: + """Integration tests for Profiling class""" + + def test_complete_profiling_cycle(self): + """Test a complete profiling cycle from start to print""" + profiler = Profiling() + + profiler.start_profiling("complete_cycle") + + # Do some work + data = [i for i in range(10000)] + time.sleep(0.01) + + profiler.end_profiling() + result = profiler.print_profiling() + + assert isinstance(result, str) + assert "complete_cycle" in result + assert "RSS:" in result + assert "VMS:" in result + assert "time:" in result + + # Clean up + del data + + def test_multiple_profiling_sessions(self): + """Test running multiple profiling sessions""" + profiler = Profiling() + + # First session + profiler.start_profiling("session_1") + time.sleep(0.01) + profiler.end_profiling() + result1 = profiler.print_profiling() + + # Second session (same profiler instance) + profiler.start_profiling("session_2") + data = [0] * 100000 + time.sleep(0.01) + profiler.end_profiling() + result2 = profiler.print_profiling() + + # Results should be different + assert "session_1" in result1 + assert "session_2" in result2 + assert result1 != result2 + + # Clean up + del data + + def test_profiling_with_zero_work(self): + """Test profiling with minimal work""" + profiler = Profiling() + + profiler.start_profiling("zero_work") + profiler.end_profiling() + + result = profiler.print_profiling() + + assert isinstance(result, str) + assert "zero_work" in result + + def test_profiling_with_heavy_computation(self): + """Test profiling with heavier computation""" + profiler = Profiling() + + profiler.start_profiling("heavy_computation") + + # Do some computation + result_data: list[list[int]] = [] + for _ in range(1000): + result_data.append([j * 2 for j in range(100)]) + + time.sleep(0.05) + + profiler.end_profiling() + result = profiler.print_profiling() + + assert isinstance(result, str) + assert "heavy_computation" in result + # Should show measurable time and memory + assert "time:" in result + + # Clean up + del result_data + + def test_independent_profilers(self): + """Test that multiple Profiling instances are independent""" + profiler1 = Profiling() + profiler2 = Profiling() + + profiler1.start_profiling("profiler_1") + time.sleep(0.01) + + profiler2.start_profiling("profiler_2") + data = [0] * 100000 + time.sleep(0.01) + + profiler1.end_profiling() + profiler2.end_profiling() + + result1 = profiler1.print_profiling() + result2 = profiler2.print_profiling() + + # Should have different identifiers + assert "profiler_1" in result1 + assert "profiler_2" in result2 + + # Results should be different + assert result1 != result2 + + # Clean up + del data + + +class TestProfilingEdgeCases: + """Test edge cases and boundary conditions""" + + def test_empty_identifier(self): + """Test profiling with empty identifier""" + profiler = Profiling() + + profiler.start_profiling("") + profiler.end_profiling() + + result = profiler.print_profiling() + + assert isinstance(result, str) + assert "Profiling:" in result + + def test_very_long_identifier(self): + """Test profiling with very long identifier""" + profiler = Profiling() + + long_ident = "a" * 100 + + profiler.start_profiling(long_ident) + profiler.end_profiling() + + result = profiler.print_profiling() + + assert isinstance(result, str) + assert long_ident in result + + def test_special_characters_in_identifier(self): + """Test profiling with special characters in identifier""" + profiler = Profiling() + + special_ident = "test_@#$%_operation" + + profiler.start_profiling(special_ident) + profiler.end_profiling() + + result = profiler.print_profiling() + + assert isinstance(result, str) + assert special_ident in result + + def test_rapid_consecutive_profiling(self): + """Test rapid consecutive profiling cycles""" + profiler = Profiling() + + for i in range(5): + profiler.start_profiling(f"rapid_{i}") + profiler.end_profiling() + result = profiler.print_profiling() + + assert isinstance(result, str) + assert f"rapid_{i}" in result + + def test_profiling_negative_memory_change(self): + """Test profiling when memory usage decreases""" + profiler = Profiling() + + # Allocate some memory before profiling + pre_data = [0] * 1000000 + + profiler.start_profiling("memory_decrease") + + # Free the memory + del pre_data + + profiler.end_profiling() + result = profiler.print_profiling() + + assert isinstance(result, str) + assert "memory_decrease" in result + # Should handle negative memory change gracefully + + def test_very_short_duration(self): + """Test profiling with extremely short duration""" + profiler = Profiling() + + profiler.start_profiling("instant") + profiler.end_profiling() + + result = profiler.print_profiling() + + assert isinstance(result, str) + assert "instant" in result + assert "ms" in result # Should show milliseconds for very short duration + + +class TestProfilingContextManager: + """Test profiling usage patterns similar to context managers""" + + def test_typical_usage_pattern(self): + """Test typical usage pattern for profiling""" + profiler = Profiling() + + # Typical pattern + profiler.start_profiling("typical_operation") + + # Perform operation + result_list: list[int] = [] + for _ in range(1000): + result_list.append(_ * 2) + + profiler.end_profiling() + + # Get results + output = profiler.print_profiling() + + assert isinstance(output, str) + assert "typical_operation" in output + + # Clean up + del result_list + + def test_profiling_without_end(self): + """Test what happens when end_profiling is not called""" + profiler = Profiling() + + profiler.start_profiling("no_end") + + # Don't call end_profiling + + result = profiler.print_profiling() + + # Should still return a string (though data might be incomplete) + assert isinstance(result, str) + + def test_profiling_end_without_start(self): + """Test calling end_profiling multiple times without start""" + profiler = Profiling() + + profiler.end_profiling() + profiler.end_profiling() + + result = profiler.print_profiling() + + assert isinstance(result, str) + +# __END__ diff --git a/tests/unit/debug_handling/test_timer.py b/tests/unit/debug_handling/test_timer.py new file mode 100644 index 0000000..d13384b --- /dev/null +++ b/tests/unit/debug_handling/test_timer.py @@ -0,0 +1,405 @@ +""" +Unit tests for corelibs.debug_handling.timer module +""" + +import time +from datetime import datetime, timedelta + +from corelibs.debug_handling.timer import Timer + + +class TestTimerInitialization: + """Test Timer class initialization""" + + def test_timer_initialization(self): + """Test that Timer initializes with correct default values""" + timer = Timer() + + # Check that start times are set + assert isinstance(timer.get_overall_start_time(), datetime) + assert isinstance(timer.get_start_time(), datetime) + + # Check that end times are None + assert timer.get_overall_end_time() is None + assert timer.get_end_time() is None + + # Check that run times are None + assert timer.get_overall_run_time() is None + assert timer.get_run_time() is None + + def test_timer_start_times_are_recent(self): + """Test that start times are set to current time on initialization""" + before_init = datetime.now() + timer = Timer() + after_init = datetime.now() + + overall_start = timer.get_overall_start_time() + start = timer.get_start_time() + + assert before_init <= overall_start <= after_init + assert before_init <= start <= after_init + + def test_timer_start_times_are_same(self): + """Test that overall_start_time and start_time are initialized to the same time""" + timer = Timer() + + overall_start = timer.get_overall_start_time() + start = timer.get_start_time() + + # They should be very close (within a few microseconds) + time_diff = abs((overall_start - start).total_seconds()) + assert time_diff < 0.001 # Less than 1 millisecond + + +class TestOverallRunTime: + """Test overall run time functionality""" + + def test_overall_run_time_returns_timedelta(self): + """Test that overall_run_time returns a timedelta object""" + timer = Timer() + time.sleep(0.01) # Sleep for 10ms + + result = timer.overall_run_time() + + assert isinstance(result, timedelta) + + def test_overall_run_time_sets_end_time(self): + """Test that calling overall_run_time sets the end time""" + timer = Timer() + + assert timer.get_overall_end_time() is None + + timer.overall_run_time() + + assert isinstance(timer.get_overall_end_time(), datetime) + + def test_overall_run_time_sets_run_time(self): + """Test that calling overall_run_time sets the run time""" + timer = Timer() + + assert timer.get_overall_run_time() is None + + timer.overall_run_time() + + assert isinstance(timer.get_overall_run_time(), timedelta) + + def test_overall_run_time_accuracy(self): + """Test that overall_run_time calculates time difference accurately""" + timer = Timer() + sleep_duration = 0.05 # 50ms + time.sleep(sleep_duration) + + result = timer.overall_run_time() + + # Allow for some variance (10ms tolerance) + assert sleep_duration - 0.01 <= result.total_seconds() <= sleep_duration + 0.01 + + def test_overall_run_time_multiple_calls(self): + """Test that calling overall_run_time multiple times updates the values""" + timer = Timer() + time.sleep(0.01) + + first_result = timer.overall_run_time() + first_end_time = timer.get_overall_end_time() + + time.sleep(0.01) + + second_result = timer.overall_run_time() + second_end_time = timer.get_overall_end_time() + + # Second call should have longer runtime + assert second_result > first_result + assert second_end_time is not None + assert first_end_time is not None + # End time should be updated + assert second_end_time > first_end_time + + def test_overall_run_time_consistency(self): + """Test that get_overall_run_time returns the same value as overall_run_time""" + timer = Timer() + time.sleep(0.01) + + calculated_time = timer.overall_run_time() + retrieved_time = timer.get_overall_run_time() + + assert calculated_time == retrieved_time + + +class TestRunTime: + """Test run time functionality""" + + def test_run_time_returns_timedelta(self): + """Test that run_time returns a timedelta object""" + timer = Timer() + time.sleep(0.01) + + result = timer.run_time() + + assert isinstance(result, timedelta) + + def test_run_time_sets_end_time(self): + """Test that calling run_time sets the end time""" + timer = Timer() + + assert timer.get_end_time() is None + + timer.run_time() + + assert isinstance(timer.get_end_time(), datetime) + + def test_run_time_sets_run_time(self): + """Test that calling run_time sets the run time""" + timer = Timer() + + assert timer.get_run_time() is None + + timer.run_time() + + assert isinstance(timer.get_run_time(), timedelta) + + def test_run_time_accuracy(self): + """Test that run_time calculates time difference accurately""" + timer = Timer() + sleep_duration = 0.05 # 50ms + time.sleep(sleep_duration) + + result = timer.run_time() + + # Allow for some variance (10ms tolerance) + assert sleep_duration - 0.01 <= result.total_seconds() <= sleep_duration + 0.01 + + def test_run_time_multiple_calls(self): + """Test that calling run_time multiple times updates the values""" + timer = Timer() + time.sleep(0.01) + + first_result = timer.run_time() + first_end_time = timer.get_end_time() + + time.sleep(0.01) + + second_result = timer.run_time() + second_end_time = timer.get_end_time() + + # Second call should have longer runtime + assert second_result > first_result + assert second_end_time is not None + assert first_end_time is not None + # End time should be updated + assert second_end_time > first_end_time + + def test_run_time_consistency(self): + """Test that get_run_time returns the same value as run_time""" + timer = Timer() + time.sleep(0.01) + + calculated_time = timer.run_time() + retrieved_time = timer.get_run_time() + + assert calculated_time == retrieved_time + + +class TestResetRunTime: + """Test reset_run_time functionality""" + + def test_reset_run_time_resets_start_time(self): + """Test that reset_run_time updates the start time""" + timer = Timer() + original_start = timer.get_start_time() + + time.sleep(0.02) + timer.reset_run_time() + + new_start = timer.get_start_time() + + assert new_start > original_start + + def test_reset_run_time_clears_end_time(self): + """Test that reset_run_time clears the end time""" + timer = Timer() + timer.run_time() + + assert timer.get_end_time() is not None + + timer.reset_run_time() + + assert timer.get_end_time() is None + + def test_reset_run_time_clears_run_time(self): + """Test that reset_run_time clears the run time""" + timer = Timer() + timer.run_time() + + assert timer.get_run_time() is not None + + timer.reset_run_time() + + assert timer.get_run_time() is None + + def test_reset_run_time_does_not_affect_overall_times(self): + """Test that reset_run_time does not affect overall times""" + timer = Timer() + + overall_start = timer.get_overall_start_time() + timer.overall_run_time() + overall_end = timer.get_overall_end_time() + overall_run = timer.get_overall_run_time() + + timer.reset_run_time() + + # Overall times should remain unchanged + assert timer.get_overall_start_time() == overall_start + assert timer.get_overall_end_time() == overall_end + assert timer.get_overall_run_time() == overall_run + + def test_reset_run_time_allows_new_measurement(self): + """Test that reset_run_time allows for new time measurements""" + timer = Timer() + time.sleep(0.02) + timer.run_time() + + first_run_time = timer.get_run_time() + + timer.reset_run_time() + time.sleep(0.01) + timer.run_time() + + second_run_time = timer.get_run_time() + + assert second_run_time is not None + assert first_run_time is not None + # Second measurement should be shorter since we reset + assert second_run_time < first_run_time + + +class TestTimerIntegration: + """Integration tests for Timer class""" + + def test_independent_timers(self): + """Test that multiple Timer instances are independent""" + timer1 = Timer() + time.sleep(0.01) + timer2 = Timer() + + # timer1 should have earlier start time + assert timer1.get_start_time() < timer2.get_start_time() + assert timer1.get_overall_start_time() < timer2.get_overall_start_time() + + def test_overall_and_run_time_independence(self): + """Test that overall time and run time are independent""" + timer = Timer() + time.sleep(0.02) + + # Reset run time but not overall + timer.reset_run_time() + time.sleep(0.01) + + run_time = timer.run_time() + overall_time = timer.overall_run_time() + + # Overall time should be longer than run time + assert overall_time > run_time + + def test_typical_usage_pattern(self): + """Test a typical usage pattern of the Timer class""" + timer = Timer() + + # Measure first operation + time.sleep(0.01) + first_operation = timer.run_time() + assert first_operation.total_seconds() > 0 + + # Reset and measure second operation + timer.reset_run_time() + time.sleep(0.01) + second_operation = timer.run_time() + assert second_operation.total_seconds() > 0 + + # Get overall time + overall = timer.overall_run_time() + + # Overall should be greater than individual operations + assert overall > first_operation + assert overall > second_operation + + def test_zero_sleep_timer(self): + """Test timer with minimal sleep (edge case)""" + timer = Timer() + + # Call run_time immediately + result = timer.run_time() + + # Should still return a valid timedelta (very small) + assert isinstance(result, timedelta) + assert result.total_seconds() >= 0 + + def test_getter_methods_before_calculation(self): + """Test that getter methods return None before calculation methods are called""" + timer = Timer() + + # Before calling run_time() + assert timer.get_end_time() is None + assert timer.get_run_time() is None + + # Before calling overall_run_time() + assert timer.get_overall_end_time() is None + assert timer.get_overall_run_time() is None + + # But start times should always be set + assert timer.get_start_time() is not None + assert timer.get_overall_start_time() is not None + + +class TestTimerEdgeCases: + """Test edge cases and boundary conditions""" + + def test_rapid_consecutive_calls(self): + """Test rapid consecutive calls to run_time""" + timer = Timer() + + results: list[timedelta] = [] + for _ in range(5): + results.append(timer.run_time()) + + # Each result should be greater than or equal to the previous + for i in range(1, len(results)): + assert results[i] >= results[i - 1] + + def test_very_short_duration(self): + """Test timer with very short duration""" + timer = Timer() + result = timer.run_time() + + # Should be a very small positive timedelta + assert isinstance(result, timedelta) + assert result.total_seconds() >= 0 + assert result.total_seconds() < 0.1 # Less than 100ms + + def test_reset_multiple_times(self): + """Test resetting the timer multiple times""" + timer = Timer() + + for _ in range(3): + timer.reset_run_time() + time.sleep(0.01) + result = timer.run_time() + + assert isinstance(result, timedelta) + assert result.total_seconds() > 0 + + def test_overall_time_persists_through_resets(self): + """Test that overall time continues even when run_time is reset""" + timer = Timer() + + time.sleep(0.01) + timer.reset_run_time() + + time.sleep(0.01) + timer.reset_run_time() + + overall = timer.overall_run_time() + + # Overall time should reflect total elapsed time + assert overall.total_seconds() >= 0.02 + +# __END__ diff --git a/tests/unit/debug_handling/test_writeline.py b/tests/unit/debug_handling/test_writeline.py new file mode 100644 index 0000000..52e23d8 --- /dev/null +++ b/tests/unit/debug_handling/test_writeline.py @@ -0,0 +1,975 @@ +""" +Unit tests for debug_handling.writeline module +""" + +import io +import pytest +from pytest import CaptureFixture + +from corelibs.debug_handling.writeline import ( + write_l, + pr_header, + pr_title, + pr_open, + pr_close, + pr_act +) + + +class TestWriteL: + """Test cases for write_l function""" + + def test_write_l_print_only(self, capsys: CaptureFixture[str]): + """Test write_l with print_line=True and no file""" + write_l("Test line", print_line=True) + captured = capsys.readouterr() + assert captured.out == "Test line\n" + + def test_write_l_no_print_no_file(self, capsys: CaptureFixture[str]): + """Test write_l with print_line=False and no file (should do nothing)""" + write_l("Test line", print_line=False) + captured = capsys.readouterr() + assert captured.out == "" + + def test_write_l_file_only(self, capsys: CaptureFixture[str]): + """Test write_l with file handler only (no print)""" + fpl = io.StringIO() + write_l("Test line", fpl=fpl, print_line=False) + captured = capsys.readouterr() + assert captured.out == "" + assert fpl.getvalue() == "Test line\n" + fpl.close() + + def test_write_l_both_print_and_file(self, capsys: CaptureFixture[str]): + """Test write_l with both print and file output""" + fpl = io.StringIO() + write_l("Test line", fpl=fpl, print_line=True) + captured = capsys.readouterr() + assert captured.out == "Test line\n" + assert fpl.getvalue() == "Test line\n" + fpl.close() + + def test_write_l_multiple_lines_to_file(self): + """Test write_l writing multiple lines to file""" + fpl = io.StringIO() + write_l("Line 1", fpl=fpl, print_line=False) + write_l("Line 2", fpl=fpl, print_line=False) + write_l("Line 3", fpl=fpl, print_line=False) + assert fpl.getvalue() == "Line 1\nLine 2\nLine 3\n" + fpl.close() + + def test_write_l_empty_string(self, capsys: CaptureFixture[str]): + """Test write_l with empty string""" + fpl = io.StringIO() + write_l("", fpl=fpl, print_line=True) + captured = capsys.readouterr() + assert captured.out == "\n" + assert fpl.getvalue() == "\n" + fpl.close() + + def test_write_l_special_characters(self): + """Test write_l with special characters""" + fpl = io.StringIO() + special_line = "Special: \t\n\r\\ 特殊文字 €" + write_l(special_line, fpl=fpl, print_line=False) + assert special_line + "\n" in fpl.getvalue() + fpl.close() + + def test_write_l_long_string(self): + """Test write_l with long string""" + fpl = io.StringIO() + long_line = "A" * 1000 + write_l(long_line, fpl=fpl, print_line=False) + assert fpl.getvalue() == long_line + "\n" + fpl.close() + + def test_write_l_unicode_content(self): + """Test write_l with unicode content""" + fpl = io.StringIO() + unicode_line = "Hello 世界 🌍 Привет" + write_l(unicode_line, fpl=fpl, print_line=False) + assert fpl.getvalue() == unicode_line + "\n" + fpl.close() + + def test_write_l_default_parameters(self, capsys: CaptureFixture[str]): + """Test write_l with default parameters""" + write_l("Test") + captured = capsys.readouterr() + # Default print_line is False + assert captured.out == "" + + def test_write_l_with_newline_in_string(self): + """Test write_l with newline characters in the string""" + fpl = io.StringIO() + write_l("Line with\nnewline", fpl=fpl, print_line=False) + assert fpl.getvalue() == "Line with\nnewline\n" + fpl.close() + + +class TestPrHeader: + """Test cases for pr_header function""" + + def test_pr_header_default(self, capsys: CaptureFixture[str]): + """Test pr_header with default parameters""" + pr_header("TEST") + captured = capsys.readouterr() + assert "#" in captured.out + assert "TEST" in captured.out + + def test_pr_header_custom_marker(self, capsys: CaptureFixture[str]): + """Test pr_header with custom marker string""" + pr_header("TEST", marker_string="*") + captured = capsys.readouterr() + assert "*" in captured.out + assert "TEST" in captured.out + assert "#" not in captured.out + + def test_pr_header_custom_width(self, capsys: CaptureFixture[str]): + """Test pr_header with custom width""" + pr_header("TEST", width=50) + captured = capsys.readouterr() + # Check that output is formatted + assert "TEST" in captured.out + + def test_pr_header_short_tag(self, capsys: CaptureFixture[str]): + """Test pr_header with short tag""" + pr_header("X") + captured = capsys.readouterr() + assert "X" in captured.out + assert "#" in captured.out + + def test_pr_header_long_tag(self, capsys: CaptureFixture[str]): + """Test pr_header with long tag""" + pr_header("This is a very long header tag") + captured = capsys.readouterr() + assert "This is a very long header tag" in captured.out + + def test_pr_header_empty_tag(self, capsys: CaptureFixture[str]): + """Test pr_header with empty tag""" + pr_header("") + captured = capsys.readouterr() + assert "#" in captured.out + + def test_pr_header_special_characters(self, capsys: CaptureFixture[str]): + """Test pr_header with special characters in tag""" + pr_header("TEST: 123! @#$") + captured = capsys.readouterr() + assert "TEST: 123! @#$" in captured.out + + def test_pr_header_unicode(self, capsys: CaptureFixture[str]): + """Test pr_header with unicode characters""" + pr_header("テスト 🎉") + captured = capsys.readouterr() + assert "テスト 🎉" in captured.out + + def test_pr_header_various_markers(self, capsys: CaptureFixture[str]): + """Test pr_header with various marker strings""" + markers = ["*", "=", "-", "+", "~", "@"] + for marker in markers: + pr_header("TEST", marker_string=marker) + captured = capsys.readouterr() + assert marker in captured.out + assert "TEST" in captured.out + + def test_pr_header_zero_width(self, capsys: CaptureFixture[str]): + """Test pr_header with width of 0""" + pr_header("TEST", width=0) + captured = capsys.readouterr() + assert "TEST" in captured.out + + def test_pr_header_large_width(self, capsys: CaptureFixture[str]): + """Test pr_header with large width""" + pr_header("TEST", width=100) + captured = capsys.readouterr() + assert "TEST" in captured.out + assert "#" in captured.out + + def test_pr_header_format(self, capsys: CaptureFixture[str]): + """Test pr_header output format""" + pr_header("CENTER", marker_string="#", width=20) + captured = capsys.readouterr() + # Should have spaces around centered text + assert " CENTER " in captured.out or "CENTER" in captured.out + + +class TestPrTitle: + """Test cases for pr_title function""" + + def test_pr_title_default(self, capsys: CaptureFixture[str]): + """Test pr_title with default parameters""" + pr_title("Test Title") + captured = capsys.readouterr() + assert "Test Title" in captured.out + assert "|" in captured.out + assert "." in captured.out + assert ":" in captured.out + + def test_pr_title_custom_prefix(self, capsys: CaptureFixture[str]): + """Test pr_title with custom prefix string""" + pr_title("Test", prefix_string=">") + captured = capsys.readouterr() + assert ">" in captured.out + assert "Test" in captured.out + assert "|" not in captured.out + + def test_pr_title_custom_space_filler(self, capsys: CaptureFixture[str]): + """Test pr_title with custom space filler""" + pr_title("Test", space_filler="-") + captured = capsys.readouterr() + assert "Test" in captured.out + assert "-" in captured.out + assert "." not in captured.out + + def test_pr_title_custom_width(self, capsys: CaptureFixture[str]): + """Test pr_title with custom width""" + pr_title("Test", width=50) + captured = capsys.readouterr() + assert "Test" in captured.out + + def test_pr_title_short_tag(self, capsys: CaptureFixture[str]): + """Test pr_title with short tag""" + pr_title("X") + captured = capsys.readouterr() + assert "X" in captured.out + assert "." in captured.out + + def test_pr_title_long_tag(self, capsys: CaptureFixture[str]): + """Test pr_title with long tag""" + pr_title("This is a very long title tag") + captured = capsys.readouterr() + assert "This is a very long title tag" in captured.out + + def test_pr_title_empty_tag(self, capsys: CaptureFixture[str]): + """Test pr_title with empty tag""" + pr_title("") + captured = capsys.readouterr() + assert "|" in captured.out + assert ":" in captured.out + + def test_pr_title_special_characters(self, capsys: CaptureFixture[str]): + """Test pr_title with special characters""" + pr_title("Task #123!") + captured = capsys.readouterr() + assert "Task #123!" in captured.out + + def test_pr_title_unicode(self, capsys: CaptureFixture[str]): + """Test pr_title with unicode characters""" + pr_title("タイトル 📝") + captured = capsys.readouterr() + assert "タイトル 📝" in captured.out + + def test_pr_title_various_fillers(self, capsys: CaptureFixture[str]): + """Test pr_title with various space fillers""" + fillers = [".", "-", "_", "*", " ", "~"] + for filler in fillers: + pr_title("Test", space_filler=filler) + captured = capsys.readouterr() + assert "Test" in captured.out + + def test_pr_title_zero_width(self, capsys: CaptureFixture[str]): + """Test pr_title with width of 0""" + pr_title("Test", width=0) + captured = capsys.readouterr() + assert "Test" in captured.out + + def test_pr_title_large_width(self, capsys: CaptureFixture[str]): + """Test pr_title with large width""" + pr_title("Test", width=100) + captured = capsys.readouterr() + assert "Test" in captured.out + + def test_pr_title_format_left_align(self, capsys: CaptureFixture[str]): + """Test pr_title output format (should be left-aligned with filler)""" + pr_title("Start", space_filler=".", width=10) + captured = capsys.readouterr() + # Should have the tag followed by dots + assert "Start" in captured.out + assert ":" in captured.out + + +class TestPrOpen: + """Test cases for pr_open function""" + + def test_pr_open_default(self, capsys: CaptureFixture[str]): + """Test pr_open with default parameters""" + pr_open("Processing") + captured = capsys.readouterr() + assert "Processing" in captured.out + assert "|" in captured.out + assert "." in captured.out + assert "[" in captured.out + # Should not have newline at the end + assert not captured.out.endswith("\n") + + def test_pr_open_custom_prefix(self, capsys: CaptureFixture[str]): + """Test pr_open with custom prefix string""" + pr_open("Task", prefix_string=">") + captured = capsys.readouterr() + assert ">" in captured.out + assert "Task" in captured.out + assert "|" not in captured.out + + def test_pr_open_custom_space_filler(self, capsys: CaptureFixture[str]): + """Test pr_open with custom space filler""" + pr_open("Task", space_filler="-") + captured = capsys.readouterr() + assert "Task" in captured.out + assert "-" in captured.out + assert "." not in captured.out + + def test_pr_open_custom_width(self, capsys: CaptureFixture[str]): + """Test pr_open with custom width""" + pr_open("Task", width=50) + captured = capsys.readouterr() + assert "Task" in captured.out + assert "[" in captured.out + + def test_pr_open_short_tag(self, capsys: CaptureFixture[str]): + """Test pr_open with short tag""" + pr_open("X") + captured = capsys.readouterr() + assert "X" in captured.out + assert "[" in captured.out + + def test_pr_open_long_tag(self, capsys: CaptureFixture[str]): + """Test pr_open with long tag""" + pr_open("This is a very long task tag") + captured = capsys.readouterr() + assert "This is a very long task tag" in captured.out + + def test_pr_open_empty_tag(self, capsys: CaptureFixture[str]): + """Test pr_open with empty tag""" + pr_open("") + captured = capsys.readouterr() + assert "[" in captured.out + assert "|" in captured.out + + def test_pr_open_no_newline(self, capsys: CaptureFixture[str]): + """Test pr_open doesn't end with newline""" + pr_open("Test") + captured = capsys.readouterr() + # Output should not end with newline (uses end="") + assert not captured.out.endswith("\n") + + def test_pr_open_special_characters(self, capsys: CaptureFixture[str]): + """Test pr_open with special characters""" + pr_open("Loading: 50%") + captured = capsys.readouterr() + assert "Loading: 50%" in captured.out + + def test_pr_open_unicode(self, capsys: CaptureFixture[str]): + """Test pr_open with unicode characters""" + pr_open("処理中 ⏳") + captured = capsys.readouterr() + assert "処理中 ⏳" in captured.out + + def test_pr_open_format(self, capsys: CaptureFixture[str]): + """Test pr_open output format""" + pr_open("Task", prefix_string="|", space_filler=".", width=20) + captured = capsys.readouterr() + assert "|" in captured.out + assert "Task" in captured.out + assert "[" in captured.out + + +class TestPrClose: + """Test cases for pr_close function""" + + def test_pr_close_default(self, capsys: CaptureFixture[str]): + """Test pr_close with default (empty) tag""" + pr_close() + captured = capsys.readouterr() + assert captured.out == "]\n" + + def test_pr_close_with_tag(self, capsys: CaptureFixture[str]): + """Test pr_close with custom tag""" + pr_close("DONE") + captured = capsys.readouterr() + assert "DONE" in captured.out + assert "]" in captured.out + assert captured.out.endswith("\n") + + def test_pr_close_with_space(self, capsys: CaptureFixture[str]): + """Test pr_close with space in tag""" + pr_close(" OK ") + captured = capsys.readouterr() + assert " OK " in captured.out + assert "]" in captured.out + + def test_pr_close_empty_string(self, capsys: CaptureFixture[str]): + """Test pr_close with empty string (same as default)""" + pr_close("") + captured = capsys.readouterr() + assert captured.out == "]\n" + + def test_pr_close_special_characters(self, capsys: CaptureFixture[str]): + """Test pr_close with special characters""" + pr_close("✓") + captured = capsys.readouterr() + assert "✓" in captured.out + assert "]" in captured.out + + def test_pr_close_unicode(self, capsys: CaptureFixture[str]): + """Test pr_close with unicode characters""" + pr_close("完了") + captured = capsys.readouterr() + assert "完了" in captured.out + assert "]" in captured.out + + def test_pr_close_newline(self, capsys: CaptureFixture[str]): + """Test pr_close ends with newline""" + pr_close("OK") + captured = capsys.readouterr() + assert captured.out.endswith("\n") + + def test_pr_close_various_tags(self, capsys: CaptureFixture[str]): + """Test pr_close with various tags""" + tags = ["OK", "DONE", "✓", "✗", "SKIP", "PASS", "FAIL"] + for tag in tags: + pr_close(tag) + captured = capsys.readouterr() + assert tag in captured.out + assert "]" in captured.out + + +class TestPrAct: + """Test cases for pr_act function""" + + def test_pr_act_default(self, capsys: CaptureFixture[str]): + """Test pr_act with default dot""" + pr_act() + captured = capsys.readouterr() + assert captured.out == "." + assert not captured.out.endswith("\n") + + def test_pr_act_custom_character(self, capsys: CaptureFixture[str]): + """Test pr_act with custom character""" + pr_act("#") + captured = capsys.readouterr() + assert captured.out == "#" + + def test_pr_act_multiple_calls(self, capsys: CaptureFixture[str]): + """Test pr_act with multiple calls""" + pr_act(".") + pr_act(".") + pr_act(".") + captured = capsys.readouterr() + assert captured.out == "..." + + def test_pr_act_various_characters(self, capsys: CaptureFixture[str]): + """Test pr_act with various characters""" + characters = [".", "#", "*", "+", "-", "=", ">", "~"] + for char in characters: + pr_act(char) + captured = capsys.readouterr() + assert "".join(characters) in captured.out + + def test_pr_act_empty_string(self, capsys: CaptureFixture[str]): + """Test pr_act with empty string""" + pr_act("") + captured = capsys.readouterr() + assert captured.out == "" + + def test_pr_act_special_character(self, capsys: CaptureFixture[str]): + """Test pr_act with special characters""" + pr_act("✓") + captured = capsys.readouterr() + assert captured.out == "✓" + + def test_pr_act_unicode(self, capsys: CaptureFixture[str]): + """Test pr_act with unicode character""" + pr_act("●") + captured = capsys.readouterr() + assert captured.out == "●" + + def test_pr_act_no_newline(self, capsys: CaptureFixture[str]): + """Test pr_act doesn't add newline""" + pr_act("x") + captured = capsys.readouterr() + assert not captured.out.endswith("\n") + + def test_pr_act_multiple_characters(self, capsys: CaptureFixture[str]): + """Test pr_act with multiple characters in string""" + pr_act("...") + captured = capsys.readouterr() + assert captured.out == "..." + + def test_pr_act_whitespace(self, capsys: CaptureFixture[str]): + """Test pr_act with whitespace""" + pr_act(" ") + captured = capsys.readouterr() + assert captured.out == " " + + +class TestProgressCombinations: + """Test combinations of progress printer functions""" + + def test_complete_progress_flow(self, capsys: CaptureFixture[str]): + """Test complete progress output flow""" + pr_header("PROCESS") + pr_title("Task 1") + pr_open("Subtask") + pr_act(".") + pr_act(".") + pr_act(".") + pr_close(" OK") + captured = capsys.readouterr() + + assert "PROCESS" in captured.out + assert "Task 1" in captured.out + assert "Subtask" in captured.out + assert "..." in captured.out + assert " OK]" in captured.out + + def test_multiple_tasks_progress(self, capsys: CaptureFixture[str]): + """Test multiple tasks with progress""" + pr_header("BATCH PROCESS") + for i in range(3): + pr_open(f"Task {i + 1}") + for _ in range(5): + pr_act(".") + pr_close(" DONE") + captured = capsys.readouterr() + + assert "BATCH PROCESS" in captured.out + assert "Task 1" in captured.out + assert "Task 2" in captured.out + assert "Task 3" in captured.out + assert " DONE]" in captured.out + + def test_nested_progress(self, capsys: CaptureFixture[str]): + """Test nested progress indicators""" + pr_header("MAIN TASK", marker_string="=") + pr_title("Subtask A", prefix_string=">") + pr_open("Processing") + pr_act("#") + pr_act("#") + pr_close() + pr_title("Subtask B", prefix_string=">") + pr_open("Processing") + pr_act("*") + pr_act("*") + pr_close(" OK") + captured = capsys.readouterr() + + assert "MAIN TASK" in captured.out + assert "Subtask A" in captured.out + assert "Subtask B" in captured.out + assert "##" in captured.out + assert "**" in captured.out + + def test_progress_with_different_markers(self, capsys: CaptureFixture[str]): + """Test progress with different marker styles""" + pr_header("Process", marker_string="*") + pr_title("Step 1", prefix_string=">>", space_filler="-") + pr_open("Work", prefix_string=">>", space_filler="-") + pr_act("+") + pr_close(" ✓") + captured = capsys.readouterr() + + assert "*" in captured.out + assert ">>" in captured.out + assert "-" in captured.out + assert "+" in captured.out + assert "✓" in captured.out + + def test_empty_progress_sequence(self, capsys: CaptureFixture[str]): + """Test progress sequence with no actual progress""" + pr_open("Quick task") + pr_close(" SKIP") + captured = capsys.readouterr() + + assert "Quick task" in captured.out + assert " SKIP]" in captured.out + + +class TestIntegration: + """Integration tests combining multiple scenarios""" + + def test_file_and_console_logging(self, capsys: CaptureFixture[str]): + """Test logging to both file and console""" + fpl = io.StringIO() + + write_l("Starting process", fpl=fpl, print_line=True) + write_l("Processing item 1", fpl=fpl, print_line=True) + write_l("Processing item 2", fpl=fpl, print_line=True) + write_l("Complete", fpl=fpl, print_line=True) + + captured = capsys.readouterr() + file_content = fpl.getvalue() + + # Check console output + assert "Starting process\n" in captured.out + assert "Processing item 1\n" in captured.out + assert "Processing item 2\n" in captured.out + assert "Complete\n" in captured.out + + # Check file output + assert "Starting process\n" in file_content + assert "Processing item 1\n" in file_content + assert "Processing item 2\n" in file_content + assert "Complete\n" in file_content + + fpl.close() + + def test_progress_with_logging(self, capsys: CaptureFixture[str]): + """Test combining progress output with file logging""" + fpl = io.StringIO() + + write_l("=== Process Start ===", fpl=fpl, print_line=True) + pr_header("MAIN PROCESS") + write_l("Header shown", fpl=fpl, print_line=False) + + pr_open("Task 1") + pr_act(".") + pr_act(".") + pr_close(" OK") + write_l("Task 1 completed", fpl=fpl, print_line=False) + + write_l("=== Process End ===", fpl=fpl, print_line=True) + + captured = capsys.readouterr() + file_content = fpl.getvalue() + + assert "=== Process Start ===" in captured.out + assert "MAIN PROCESS" in captured.out + assert "Task 1" in captured.out + assert "=== Process End ===" in captured.out + + assert "=== Process Start ===\n" in file_content + assert "Header shown\n" in file_content + assert "Task 1 completed\n" in file_content + assert "=== Process End ===\n" in file_content + + fpl.close() + + def test_complex_workflow(self, capsys: CaptureFixture[str]): + """Test complex workflow with all functions""" + fpl = io.StringIO() + + write_l("Log: Starting batch process", fpl=fpl, print_line=False) + pr_header("BATCH PROCESSOR", marker_string="=", width=40) + + for i in range(2): + write_l(f"Log: Processing batch {i + 1}", fpl=fpl, print_line=False) + pr_title(f"Batch {i + 1}", prefix_string="|", space_filler=".") + + pr_open(f"Item {i + 1}", prefix_string="|", space_filler=".") + for j in range(3): + pr_act("*") + write_l(f"Log: Progress {j + 1}/3", fpl=fpl, print_line=False) + pr_close(" ✓") + + write_l(f"Log: Batch {i + 1} complete", fpl=fpl, print_line=False) + + write_l("Log: All batches complete", fpl=fpl, print_line=False) + + captured = capsys.readouterr() + file_content = fpl.getvalue() + + # Check console has progress indicators + assert "BATCH PROCESSOR" in captured.out + assert "Batch 1" in captured.out + assert "Batch 2" in captured.out + assert "***" in captured.out + assert "✓" in captured.out + + # Check file has all log entries + assert "Log: Starting batch process\n" in file_content + assert "Log: Processing batch 1\n" in file_content + assert "Log: Processing batch 2\n" in file_content + assert "Log: Progress 1/3\n" in file_content + assert "Log: Batch 1 complete\n" in file_content + assert "Log: All batches complete\n" in file_content + + fpl.close() + + +class TestEdgeCases: + """Test edge cases and boundary conditions""" + + def test_write_l_none_file_handler(self, capsys: CaptureFixture[str]): + """Test write_l explicitly with None file handler""" + write_l("Test", fpl=None, print_line=True) + captured = capsys.readouterr() + assert captured.out == "Test\n" + + def test_pr_header_negative_width(self): + """Test pr_header with negative width raises ValueError""" + with pytest.raises(ValueError): + pr_header("Test", width=-10) + + def test_pr_title_negative_width(self): + """Test pr_title with negative width raises ValueError""" + with pytest.raises(ValueError): + pr_title("Test", width=-10) + + def test_pr_open_negative_width(self): + """Test pr_open with negative width raises ValueError""" + with pytest.raises(ValueError): + pr_open("Test", width=-10) + + def test_multiple_pr_act_no_close(self, capsys: CaptureFixture[str]): + """Test multiple pr_act calls without pr_close""" + pr_act(".") + pr_act(".") + pr_act(".") + captured = capsys.readouterr() + assert captured.out == "..." + + def test_pr_close_without_pr_open(self, capsys: CaptureFixture[str]): + """Test pr_close without prior pr_open (should still work)""" + pr_close(" OK") + captured = capsys.readouterr() + assert " OK]" in captured.out + + def test_very_long_strings(self): + """Test with very long strings""" + fpl = io.StringIO() + long_str = "A" * 10000 + write_l(long_str, fpl=fpl, print_line=False) + assert len(fpl.getvalue()) == 10001 # string + newline + fpl.close() + + def test_pr_header_very_long_tag(self, capsys: CaptureFixture[str]): + """Test pr_header with tag longer than width""" + pr_header("This is a very long tag that exceeds the width", width=10) + captured = capsys.readouterr() + assert "This is a very long tag that exceeds the width" in captured.out + + def test_pr_title_very_long_tag(self, capsys: CaptureFixture[str]): + """Test pr_title with tag longer than width""" + pr_title("This is a very long tag that exceeds the width", width=10) + captured = capsys.readouterr() + assert "This is a very long tag that exceeds the width" in captured.out + + def test_write_l_closed_file(self): + """Test write_l with closed file should raise error""" + fpl = io.StringIO() + fpl.close() + + with pytest.raises(ValueError): + write_l("Test", fpl=fpl, print_line=False) + + +class TestParametrized: + """Parametrized tests for comprehensive coverage""" + + @pytest.mark.parametrize("print_line", [True, False]) + def test_write_l_print_line_variations(self, print_line: bool, capsys: CaptureFixture[str]): + """Test write_l with different print_line values""" + write_l("Test", print_line=print_line) + captured = capsys.readouterr() + if print_line: + assert captured.out == "Test\n" + else: + assert captured.out == "" + + @pytest.mark.parametrize("marker", ["#", "*", "=", "-", "+", "~", "@", "^"]) + def test_pr_header_various_markers_param(self, marker: str, capsys: CaptureFixture[str]): + """Test pr_header with various markers""" + pr_header("TEST", marker_string=marker) + captured = capsys.readouterr() + assert marker in captured.out + assert "TEST" in captured.out + + @pytest.mark.parametrize("width", [0, 5, 10, 20, 35, 50, 100]) + def test_pr_header_various_widths(self, width: int, capsys: CaptureFixture[str]): + """Test pr_header with various widths""" + pr_header("TEST", width=width) + captured = capsys.readouterr() + assert "TEST" in captured.out + + @pytest.mark.parametrize("filler", [".", "-", "_", "*", " ", "~", "="]) + def test_pr_title_various_fillers_param(self, filler: str, capsys: CaptureFixture[str]): + """Test pr_title with various space fillers""" + pr_title("Test", space_filler=filler) + captured = capsys.readouterr() + assert "Test" in captured.out + + @pytest.mark.parametrize("prefix", ["|", ">", ">>", "*", "-", "+"]) + def test_pr_title_various_prefixes(self, prefix: str, capsys: CaptureFixture[str]): + """Test pr_title with various prefix strings""" + pr_title("Test", prefix_string=prefix) + captured = capsys.readouterr() + assert prefix in captured.out + assert "Test" in captured.out + + @pytest.mark.parametrize("act_char", [".", "#", "*", "+", "-", "=", ">", "~", "✓", "●"]) + def test_pr_act_various_characters_param(self, act_char: str, capsys: CaptureFixture[str]): + """Test pr_act with various characters""" + pr_act(act_char) + captured = capsys.readouterr() + assert captured.out == act_char + + @pytest.mark.parametrize("close_tag", ["", " OK", " DONE", " ✓", " ✗", " SKIP", " PASS"]) + def test_pr_close_various_tags_param(self, close_tag: str, capsys: CaptureFixture[str]): + """Test pr_close with various tags""" + pr_close(close_tag) + captured = capsys.readouterr() + assert f"{close_tag}]" in captured.out + + @pytest.mark.parametrize("content", [ + "Simple text", + "Text with 特殊文字", + "Text with emoji 🎉", + "Text\twith\ttabs", + "Multiple\n\nNewlines", + "", + "A" * 100, + ]) + def test_write_l_various_content(self, content: str, capsys: CaptureFixture[str]): + """Test write_l with various content types""" + fpl = io.StringIO() + write_l(content, fpl=fpl, print_line=True) + captured = capsys.readouterr() + assert content in captured.out + assert content + "\n" in fpl.getvalue() + fpl.close() + + +class TestRealWorldScenarios: + """Test real-world usage scenarios""" + + def test_batch_processing_output(self, capsys: CaptureFixture[str]): + """Test typical batch processing output""" + pr_header("BATCH PROCESSOR", marker_string="=", width=50) + + items = ["file1.txt", "file2.txt", "file3.txt"] + for item in items: + pr_open(f"Processing {item}") + for _ in range(10): + pr_act(".") + pr_close(" ✓") + + captured = capsys.readouterr() + assert "BATCH PROCESSOR" in captured.out + for item in items: + assert item in captured.out + assert "✓" in captured.out + + def test_logging_workflow(self, capsys: CaptureFixture[str]): + """Test typical logging workflow""" + log_file = io.StringIO() + + # Simulate a workflow with logging + write_l("[INFO] Starting process", fpl=log_file, print_line=True) + write_l("[INFO] Initializing components", fpl=log_file, print_line=True) + write_l("[DEBUG] Component A loaded", fpl=log_file, print_line=False) + write_l("[DEBUG] Component B loaded", fpl=log_file, print_line=False) + write_l("[INFO] Processing data", fpl=log_file, print_line=True) + write_l("[INFO] Process complete", fpl=log_file, print_line=True) + + captured = capsys.readouterr() + log_content = log_file.getvalue() + + # Console should only have INFO messages + assert "[INFO] Starting process" in captured.out + assert "[DEBUG] Component A loaded" not in captured.out + + # Log file should have all messages + assert "[INFO] Starting process\n" in log_content + assert "[DEBUG] Component A loaded\n" in log_content + assert "[DEBUG] Component B loaded\n" in log_content + + log_file.close() + + def test_progress_indicator_for_long_task(self, capsys: CaptureFixture[str]): + """Test progress indicator for a long-running task""" + pr_header("DATA PROCESSING") + pr_open("Loading data", width=50) + + # Simulate progress + for i in range(20): + if i % 5 == 0: + pr_act(str(i // 5)) + else: + pr_act(".") + + pr_close(" COMPLETE") + + captured = capsys.readouterr() + assert "DATA PROCESSING" in captured.out + assert "Loading data" in captured.out + assert "COMPLETE" in captured.out + + def test_multi_stage_process(self, capsys: CaptureFixture[str]): + """Test multi-stage process with titles and progress""" + pr_header("DEPLOYMENT PIPELINE", marker_string="=") + + stages = ["Build", "Test", "Deploy"] + for stage in stages: + pr_title(stage) + pr_open(f"Running {stage.lower()}") + pr_act("#") + pr_act("#") + pr_act("#") + pr_close(" OK") + + captured = capsys.readouterr() + assert "DEPLOYMENT PIPELINE" in captured.out + for stage in stages: + assert stage in captured.out + assert "###" in captured.out + + def test_error_reporting_with_logging(self, capsys: CaptureFixture[str]): + """Test error reporting workflow""" + error_log = io.StringIO() + + pr_header("VALIDATION", marker_string="!") + pr_open("Checking files") + + write_l("[ERROR] File not found: data.csv", fpl=error_log, print_line=False) + pr_act("✗") + + write_l("[ERROR] Permission denied: output.txt", fpl=error_log, print_line=False) + pr_act("✗") + + pr_close(" FAILED") + + captured = capsys.readouterr() + log_content = error_log.getvalue() + + assert "VALIDATION" in captured.out + assert "Checking files" in captured.out + assert "✗✗" in captured.out + assert "FAILED" in captured.out + + assert "[ERROR] File not found: data.csv\n" in log_content + assert "[ERROR] Permission denied: output.txt\n" in log_content + + error_log.close() + + def test_detailed_reporting(self, capsys: CaptureFixture[str]): + """Test detailed reporting with mixed output""" + report_file = io.StringIO() + + pr_header("SYSTEM REPORT", marker_string="#", width=60) + write_l("=== System Report Generated ===", fpl=report_file, print_line=False) + + pr_title("Database Status", prefix_string=">>") + write_l("Database: Connected", fpl=report_file, print_line=False) + write_l("Tables: 15", fpl=report_file, print_line=False) + write_l("Records: 1,234,567", fpl=report_file, print_line=False) + + pr_title("API Status", prefix_string=">>") + write_l("API: Online", fpl=report_file, print_line=False) + write_l("Requests/min: 1,500", fpl=report_file, print_line=False) + + write_l("=== Report Complete ===", fpl=report_file, print_line=False) + + captured = capsys.readouterr() + report_content = report_file.getvalue() + + assert "SYSTEM REPORT" in captured.out + assert "Database Status" in captured.out + assert "API Status" in captured.out + + assert "=== System Report Generated ===\n" in report_content + assert "Database: Connected\n" in report_content + assert "API: Online\n" in report_content + assert "=== Report Complete ===\n" in report_content + + report_file.close() + +# __END__