561 lines
15 KiB
Python
561 lines
15 KiB
Python
"""
|
|
Unit tests for corelibs.debug_handling.profiling module
|
|
"""
|
|
|
|
import time
|
|
import tracemalloc
|
|
|
|
from corelibs.debug_handling.profiling import display_top, Profiling
|
|
|
|
|
|
class TestDisplayTop:
|
|
"""Test display_top function"""
|
|
|
|
def test_display_top_basic(self):
|
|
"""Test that display_top returns a string with basic stats"""
|
|
tracemalloc.start()
|
|
|
|
# Allocate some memory
|
|
data = [0] * 10000
|
|
|
|
snapshot = tracemalloc.take_snapshot()
|
|
tracemalloc.stop()
|
|
|
|
result = display_top(snapshot)
|
|
|
|
assert isinstance(result, str)
|
|
assert "Top 10 lines" in result
|
|
assert "KiB" in result
|
|
assert "Total allocated size:" in result
|
|
|
|
# Clean up
|
|
del data
|
|
|
|
def test_display_top_with_custom_limit(self):
|
|
"""Test display_top with custom limit parameter"""
|
|
tracemalloc.start()
|
|
|
|
# Allocate some memory
|
|
data = [0] * 10000
|
|
|
|
snapshot = tracemalloc.take_snapshot()
|
|
tracemalloc.stop()
|
|
|
|
result = display_top(snapshot, limit=5)
|
|
|
|
assert isinstance(result, str)
|
|
assert "Top 5 lines" in result
|
|
|
|
# Clean up
|
|
del data
|
|
|
|
def test_display_top_with_different_key_type(self):
|
|
"""Test display_top with different key_type parameter"""
|
|
tracemalloc.start()
|
|
|
|
# Allocate some memory
|
|
data = [0] * 10000
|
|
|
|
snapshot = tracemalloc.take_snapshot()
|
|
tracemalloc.stop()
|
|
|
|
result = display_top(snapshot, key_type='filename')
|
|
|
|
assert isinstance(result, str)
|
|
assert "Top 10 lines" in result
|
|
|
|
# Clean up
|
|
del data
|
|
|
|
def test_display_top_filters_traces(self):
|
|
"""Test that display_top filters out bootstrap and unknown traces"""
|
|
tracemalloc.start()
|
|
|
|
# Allocate some memory
|
|
data = [0] * 10000
|
|
|
|
snapshot = tracemalloc.take_snapshot()
|
|
tracemalloc.stop()
|
|
|
|
result = display_top(snapshot)
|
|
|
|
# Should not contain filtered traces
|
|
assert "<frozen importlib._bootstrap>" not in result
|
|
assert "<unknown>" not in result
|
|
|
|
# Clean up
|
|
del data
|
|
|
|
def test_display_top_with_limit_larger_than_stats(self):
|
|
"""Test display_top when limit is larger than available stats"""
|
|
tracemalloc.start()
|
|
|
|
# Allocate some memory
|
|
data = [0] * 100
|
|
|
|
snapshot = tracemalloc.take_snapshot()
|
|
tracemalloc.stop()
|
|
|
|
result = display_top(snapshot, limit=1000)
|
|
|
|
assert isinstance(result, str)
|
|
assert "Top 1000 lines" in result
|
|
assert "Total allocated size:" in result
|
|
|
|
# Clean up
|
|
del data
|
|
|
|
def test_display_top_empty_snapshot(self):
|
|
"""Test display_top with a snapshot that has minimal traces"""
|
|
tracemalloc.start()
|
|
snapshot = tracemalloc.take_snapshot()
|
|
tracemalloc.stop()
|
|
|
|
result = display_top(snapshot, limit=1)
|
|
|
|
assert isinstance(result, str)
|
|
assert "Top 1 lines" in result
|
|
|
|
|
|
class TestProfilingInitialization:
|
|
"""Test Profiling class initialization"""
|
|
|
|
def test_profiling_initialization(self):
|
|
"""Test that Profiling initializes correctly"""
|
|
profiler = Profiling()
|
|
|
|
# Should be able to create instance
|
|
assert isinstance(profiler, Profiling)
|
|
|
|
def test_profiling_initial_state(self):
|
|
"""Test that Profiling starts in a clean state"""
|
|
profiler = Profiling()
|
|
|
|
# Should not raise an error when calling end_profiling
|
|
# even though start_profiling wasn't called
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
assert isinstance(result, str)
|
|
|
|
|
|
class TestProfilingStartEnd:
|
|
"""Test start_profiling and end_profiling functionality"""
|
|
|
|
def test_start_profiling(self):
|
|
"""Test that start_profiling can be called"""
|
|
profiler = Profiling()
|
|
|
|
# Should not raise an error
|
|
profiler.start_profiling("test_operation")
|
|
|
|
def test_end_profiling(self):
|
|
"""Test that end_profiling can be called"""
|
|
profiler = Profiling()
|
|
profiler.start_profiling("test_operation")
|
|
|
|
# Should not raise an error
|
|
profiler.end_profiling()
|
|
|
|
def test_start_profiling_with_different_idents(self):
|
|
"""Test start_profiling with different identifier strings"""
|
|
profiler = Profiling()
|
|
|
|
identifiers = ["short", "longer_identifier", "very_long_identifier_with_many_chars"]
|
|
|
|
for ident in identifiers:
|
|
profiler.start_profiling(ident)
|
|
profiler.end_profiling()
|
|
result = profiler.print_profiling()
|
|
|
|
assert ident in result
|
|
|
|
def test_end_profiling_without_start(self):
|
|
"""Test that end_profiling can be called without start_profiling"""
|
|
profiler = Profiling()
|
|
|
|
# Should not raise an error but internal state should indicate warning
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
assert isinstance(result, str)
|
|
|
|
def test_profiling_measures_time(self):
|
|
"""Test that profiling measures elapsed time"""
|
|
profiler = Profiling()
|
|
profiler.start_profiling("time_test")
|
|
|
|
sleep_duration = 0.05 # 50ms
|
|
time.sleep(sleep_duration)
|
|
|
|
profiler.end_profiling()
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert "time:" in result
|
|
# Should have some time measurement
|
|
assert "ms" in result or "s" in result
|
|
|
|
def test_profiling_measures_memory(self):
|
|
"""Test that profiling measures memory usage"""
|
|
profiler = Profiling()
|
|
profiler.start_profiling("memory_test")
|
|
|
|
# Allocate some memory
|
|
data = [0] * 100000
|
|
|
|
profiler.end_profiling()
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert "RSS:" in result
|
|
assert "VMS:" in result
|
|
assert "time:" in result
|
|
|
|
# Clean up
|
|
del data
|
|
|
|
|
|
class TestProfilingPrintProfiling:
|
|
"""Test print_profiling functionality"""
|
|
|
|
def test_print_profiling_returns_string(self):
|
|
"""Test that print_profiling returns a string"""
|
|
profiler = Profiling()
|
|
profiler.start_profiling("test")
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
|
|
def test_print_profiling_contains_identifier(self):
|
|
"""Test that print_profiling includes the identifier"""
|
|
profiler = Profiling()
|
|
identifier = "my_test_operation"
|
|
|
|
profiler.start_profiling(identifier)
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
|
|
assert identifier in result
|
|
|
|
def test_print_profiling_format(self):
|
|
"""Test that print_profiling has expected format"""
|
|
profiler = Profiling()
|
|
profiler.start_profiling("test")
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
|
|
# Check for expected components
|
|
assert "Profiling:" in result
|
|
assert "RSS:" in result
|
|
assert "VMS:" in result
|
|
assert "time:" in result
|
|
|
|
def test_print_profiling_multiple_calls(self):
|
|
"""Test that print_profiling can be called multiple times"""
|
|
profiler = Profiling()
|
|
profiler.start_profiling("test")
|
|
profiler.end_profiling()
|
|
|
|
result1 = profiler.print_profiling()
|
|
result2 = profiler.print_profiling()
|
|
|
|
# Should return the same result
|
|
assert result1 == result2
|
|
|
|
def test_print_profiling_time_formats(self):
|
|
"""Test different time format outputs"""
|
|
profiler = Profiling()
|
|
|
|
# Very short duration (milliseconds)
|
|
profiler.start_profiling("ms_test")
|
|
time.sleep(0.001)
|
|
profiler.end_profiling()
|
|
result = profiler.print_profiling()
|
|
assert "ms" in result
|
|
|
|
# Slightly longer duration (seconds)
|
|
profiler.start_profiling("s_test")
|
|
time.sleep(0.1)
|
|
profiler.end_profiling()
|
|
result = profiler.print_profiling()
|
|
# Could be ms or s depending on timing
|
|
assert ("ms" in result or "s" in result)
|
|
|
|
def test_print_profiling_memory_formats(self):
|
|
"""Test different memory format outputs"""
|
|
profiler = Profiling()
|
|
profiler.start_profiling("memory_format_test")
|
|
|
|
# Allocate some memory
|
|
data = [0] * 50000
|
|
|
|
profiler.end_profiling()
|
|
result = profiler.print_profiling()
|
|
|
|
# Should have some memory unit (B, kB, MB, GB)
|
|
assert any(unit in result for unit in ["B", "kB", "MB", "GB"])
|
|
|
|
# Clean up
|
|
del data
|
|
|
|
|
|
class TestProfilingIntegration:
|
|
"""Integration tests for Profiling class"""
|
|
|
|
def test_complete_profiling_cycle(self):
|
|
"""Test a complete profiling cycle from start to print"""
|
|
profiler = Profiling()
|
|
|
|
profiler.start_profiling("complete_cycle")
|
|
|
|
# Do some work
|
|
data = [i for i in range(10000)]
|
|
time.sleep(0.01)
|
|
|
|
profiler.end_profiling()
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert "complete_cycle" in result
|
|
assert "RSS:" in result
|
|
assert "VMS:" in result
|
|
assert "time:" in result
|
|
|
|
# Clean up
|
|
del data
|
|
|
|
def test_multiple_profiling_sessions(self):
|
|
"""Test running multiple profiling sessions"""
|
|
profiler = Profiling()
|
|
|
|
# First session
|
|
profiler.start_profiling("session_1")
|
|
time.sleep(0.01)
|
|
profiler.end_profiling()
|
|
result1 = profiler.print_profiling()
|
|
|
|
# Second session (same profiler instance)
|
|
profiler.start_profiling("session_2")
|
|
data = [0] * 100000
|
|
time.sleep(0.01)
|
|
profiler.end_profiling()
|
|
result2 = profiler.print_profiling()
|
|
|
|
# Results should be different
|
|
assert "session_1" in result1
|
|
assert "session_2" in result2
|
|
assert result1 != result2
|
|
|
|
# Clean up
|
|
del data
|
|
|
|
def test_profiling_with_zero_work(self):
|
|
"""Test profiling with minimal work"""
|
|
profiler = Profiling()
|
|
|
|
profiler.start_profiling("zero_work")
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert "zero_work" in result
|
|
|
|
def test_profiling_with_heavy_computation(self):
|
|
"""Test profiling with heavier computation"""
|
|
profiler = Profiling()
|
|
|
|
profiler.start_profiling("heavy_computation")
|
|
|
|
# Do some computation
|
|
result_data: list[list[int]] = []
|
|
for _ in range(1000):
|
|
result_data.append([j * 2 for j in range(100)])
|
|
|
|
time.sleep(0.05)
|
|
|
|
profiler.end_profiling()
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert "heavy_computation" in result
|
|
# Should show measurable time and memory
|
|
assert "time:" in result
|
|
|
|
# Clean up
|
|
del result_data
|
|
|
|
def test_independent_profilers(self):
|
|
"""Test that multiple Profiling instances are independent"""
|
|
profiler1 = Profiling()
|
|
profiler2 = Profiling()
|
|
|
|
profiler1.start_profiling("profiler_1")
|
|
time.sleep(0.01)
|
|
|
|
profiler2.start_profiling("profiler_2")
|
|
data = [0] * 100000
|
|
time.sleep(0.01)
|
|
|
|
profiler1.end_profiling()
|
|
profiler2.end_profiling()
|
|
|
|
result1 = profiler1.print_profiling()
|
|
result2 = profiler2.print_profiling()
|
|
|
|
# Should have different identifiers
|
|
assert "profiler_1" in result1
|
|
assert "profiler_2" in result2
|
|
|
|
# Results should be different
|
|
assert result1 != result2
|
|
|
|
# Clean up
|
|
del data
|
|
|
|
|
|
class TestProfilingEdgeCases:
|
|
"""Test edge cases and boundary conditions"""
|
|
|
|
def test_empty_identifier(self):
|
|
"""Test profiling with empty identifier"""
|
|
profiler = Profiling()
|
|
|
|
profiler.start_profiling("")
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert "Profiling:" in result
|
|
|
|
def test_very_long_identifier(self):
|
|
"""Test profiling with very long identifier"""
|
|
profiler = Profiling()
|
|
|
|
long_ident = "a" * 100
|
|
|
|
profiler.start_profiling(long_ident)
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert long_ident in result
|
|
|
|
def test_special_characters_in_identifier(self):
|
|
"""Test profiling with special characters in identifier"""
|
|
profiler = Profiling()
|
|
|
|
special_ident = "test_@#$%_operation"
|
|
|
|
profiler.start_profiling(special_ident)
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert special_ident in result
|
|
|
|
def test_rapid_consecutive_profiling(self):
|
|
"""Test rapid consecutive profiling cycles"""
|
|
profiler = Profiling()
|
|
|
|
for i in range(5):
|
|
profiler.start_profiling(f"rapid_{i}")
|
|
profiler.end_profiling()
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert f"rapid_{i}" in result
|
|
|
|
def test_profiling_negative_memory_change(self):
|
|
"""Test profiling when memory usage decreases"""
|
|
profiler = Profiling()
|
|
|
|
# Allocate some memory before profiling
|
|
pre_data = [0] * 1000000
|
|
|
|
profiler.start_profiling("memory_decrease")
|
|
|
|
# Free the memory
|
|
del pre_data
|
|
|
|
profiler.end_profiling()
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert "memory_decrease" in result
|
|
# Should handle negative memory change gracefully
|
|
|
|
def test_very_short_duration(self):
|
|
"""Test profiling with extremely short duration"""
|
|
profiler = Profiling()
|
|
|
|
profiler.start_profiling("instant")
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
assert "instant" in result
|
|
assert "ms" in result # Should show milliseconds for very short duration
|
|
|
|
|
|
class TestProfilingContextManager:
|
|
"""Test profiling usage patterns similar to context managers"""
|
|
|
|
def test_typical_usage_pattern(self):
|
|
"""Test typical usage pattern for profiling"""
|
|
profiler = Profiling()
|
|
|
|
# Typical pattern
|
|
profiler.start_profiling("typical_operation")
|
|
|
|
# Perform operation
|
|
result_list: list[int] = []
|
|
for _ in range(1000):
|
|
result_list.append(_ * 2)
|
|
|
|
profiler.end_profiling()
|
|
|
|
# Get results
|
|
output = profiler.print_profiling()
|
|
|
|
assert isinstance(output, str)
|
|
assert "typical_operation" in output
|
|
|
|
# Clean up
|
|
del result_list
|
|
|
|
def test_profiling_without_end(self):
|
|
"""Test what happens when end_profiling is not called"""
|
|
profiler = Profiling()
|
|
|
|
profiler.start_profiling("no_end")
|
|
|
|
# Don't call end_profiling
|
|
|
|
result = profiler.print_profiling()
|
|
|
|
# Should still return a string (though data might be incomplete)
|
|
assert isinstance(result, str)
|
|
|
|
def test_profiling_end_without_start(self):
|
|
"""Test calling end_profiling multiple times without start"""
|
|
profiler = Profiling()
|
|
|
|
profiler.end_profiling()
|
|
profiler.end_profiling()
|
|
|
|
result = profiler.print_profiling()
|
|
|
|
assert isinstance(result, str)
|
|
|
|
# __END__
|