127 lines
4.0 KiB
Python
127 lines
4.0 KiB
Python
"""
|
|
Profile memory usage in Python
|
|
"""
|
|
|
|
# https://docs.python.org/3/library/tracemalloc.html
|
|
|
|
import os
|
|
import time
|
|
import tracemalloc
|
|
import linecache
|
|
from typing import Tuple
|
|
from tracemalloc import Snapshot
|
|
import psutil
|
|
|
|
|
|
def display_top(snapshot: Snapshot, key_type: str = 'lineno', limit: int = 10) -> str:
|
|
"""
|
|
Print tracmalloc stats
|
|
https://docs.python.org/3/library/tracemalloc.html#pretty-top
|
|
|
|
Args:
|
|
snapshot (Snapshot): _description_
|
|
key_type (str, optional): _description_. Defaults to 'lineno'.
|
|
limit (int, optional): _description_. Defaults to 10.
|
|
"""
|
|
snapshot = snapshot.filter_traces((
|
|
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
|
|
tracemalloc.Filter(False, "<unknown>"),
|
|
))
|
|
top_stats = snapshot.statistics(key_type)
|
|
|
|
profiler_msg = f"Top {limit} lines"
|
|
for index, stat in enumerate(top_stats[:limit], 1):
|
|
frame = stat.traceback[0]
|
|
# replace "/path/to/module/file.py" with "module/file.py"
|
|
filename = os.sep.join(frame.filename.split(os.sep)[-2:])
|
|
profiler_msg += f"#{index}: {filename}:{frame.lineno}: {(stat.size / 1024):.1f} KiB"
|
|
line = linecache.getline(frame.filename, frame.lineno).strip()
|
|
if line:
|
|
profiler_msg += f" {line}"
|
|
|
|
other = top_stats[limit:]
|
|
if other:
|
|
size = sum(stat.size for stat in other)
|
|
profiler_msg += f"{len(other)} other: {(size / 1024):.1f} KiB"
|
|
total = sum(stat.size for stat in top_stats)
|
|
profiler_msg += f"Total allocated size: {(total / 1024):.1f} KiB"
|
|
return profiler_msg
|
|
|
|
|
|
class Profiling:
|
|
"""
|
|
Profile memory usage and elapsed time for some block
|
|
Based on: https://stackoverflow.com/a/53301648
|
|
"""
|
|
|
|
def __init__(self):
|
|
# profiling id
|
|
self.__ident: str = ''
|
|
# memory
|
|
self.__rss_before: int = 0
|
|
self.__vms_before: int = 0
|
|
# self.shared_before: int = 0
|
|
self.__rss_used: int = 0
|
|
self.__vms_used: int = 0
|
|
# self.shared_used: int = 0
|
|
# time
|
|
self.__call_start: float = 0
|
|
self.__elapsed = 0
|
|
|
|
def __get_process_memory(self) -> Tuple[int, int]:
|
|
process = psutil.Process(os.getpid())
|
|
mi = process.memory_info()
|
|
# macos does not have mi.shared
|
|
return mi.rss, mi.vms
|
|
|
|
def __elapsed_since(self) -> str:
|
|
elapsed = time.time() - self.__call_start
|
|
if elapsed < 1:
|
|
return str(round(elapsed * 1000, 2)) + "ms"
|
|
if elapsed < 60:
|
|
return str(round(elapsed, 2)) + "s"
|
|
if elapsed < 3600:
|
|
return str(round(elapsed / 60, 2)) + "min"
|
|
return str(round(elapsed / 3600, 2)) + "hrs"
|
|
|
|
def __format_bytes(self, bytes_data: int) -> str:
|
|
if abs(bytes_data) < 1000:
|
|
return str(bytes_data) + "B"
|
|
if abs(bytes_data) < 1e6:
|
|
return str(round(bytes_data / 1e3, 2)) + "kB"
|
|
if abs(bytes_data) < 1e9:
|
|
return str(round(bytes_data / 1e6, 2)) + "MB"
|
|
return str(round(bytes_data / 1e9, 2)) + "GB"
|
|
|
|
def start_profiling(self, ident: str) -> None:
|
|
"""
|
|
start the profiling
|
|
"""
|
|
self.__ident = ident
|
|
self.__rss_before, self.__vms_before = self.__get_process_memory()
|
|
self.__call_start = time.time()
|
|
|
|
def end_profiling(self) -> None:
|
|
"""
|
|
end the profiling
|
|
"""
|
|
if self.__rss_before == 0 and self.__vms_before == 0:
|
|
print("start_profile() was not called, output will be negative")
|
|
self.__elapsed = self.__elapsed_since()
|
|
__rss_after, __vms_after = self.__get_process_memory()
|
|
self.__rss_used = __rss_after - self.__rss_before
|
|
self.__vms_used = __vms_after - self.__vms_before
|
|
|
|
def print_profiling(self) -> str:
|
|
"""
|
|
print the profiling time
|
|
"""
|
|
return (
|
|
f"Profiling: {self.__ident:>20} "
|
|
f"RSS: {self.__format_bytes(self.__rss_used):>8} | "
|
|
f"VMS: {self.__format_bytes(self.__vms_used):>8} | "
|
|
f"time: {self.__elapsed:>8}"
|
|
)
|
|
|
|
# __END__
|