Move progres and script helpers to corelibs-progress and corelibs-script

This commit is contained in:
Clemens Schwaighofer
2026-02-06 15:55:26 +09:00
parent b58a26f79a
commit 28527990e9
8 changed files with 28 additions and 2159 deletions

View File

@@ -73,6 +73,12 @@ All content in this module will move to stand alone libraries, as of now the fol
- math_handling.math_helpers: python.math
- requests_handling.auth_helpers: corelibs-requests
- requests_handling.caller: corelibs-requests
- script_handling.progress: corelibs-progress
- script_handling.script_helpers: corelibs-script
- string_handling.byte_helpers: corelibs-strings
- string_handling.double_byte_string_format: corelibs-double-byte-format
- string_handling.hash_helpers: corelibs-hash
- string_handling.string_helpers: corelibs-strings
- string_handling.text_colors: corelibs-text-colors
- var_handling.enum_base: corelibs-enum-base
- var_handling.var_helpers: corelibs-var

View File

@@ -17,8 +17,10 @@ dependencies = [
"corelibs-hash>=1.0.0",
"corelibs-iterator>=1.0.0",
"corelibs-json>=1.0.0",
"corelibs-progress>=1.0.0",
"corelibs-regex-checks>=1.0.0",
"corelibs-requests>=1.0.0",
"corelibs-script>=1.0.0",
"corelibs-search>=1.0.0",
"corelibs-stack-trace>=1.0.0",
"corelibs-strings>=1.0.0",

View File

@@ -29,449 +29,16 @@ set_end_time(time optional)
show_position(file pos optional)
"""
import time
from typing import Literal
from math import floor
from corelibs_datetime.timestamp_convert import convert_timestamp
from corelibs.string_handling.byte_helpers import format_bytes
from warnings import warn
from corelibs_progress.progress import Progress as CoreProgress # for type checking only
class Progress():
class Progress(CoreProgress):
"""
file progress output information
"""
def __init__(
self,
verbose: int = 0,
precision: int = 1,
microtime: Literal[-1] | Literal[1] | Literal[0] = 0,
wide_time: bool = False,
prefix_lb: bool = False
):
# set default var stuff
# max lines in input
self.linecount: int = 0
# max file size
self.filesize: int = 0
# * comma after percent
self.precision: int = 0
# * if flagged 1, then wthe wide 15 char left bound format is used
self.wide_time: bool = False
# * verbose status from outside
self.verbose: bool = False
# * microtime output for last run time (1 for enable 0 for auto -1 for disable)
self.microtime: Literal[-1] | Literal[1] | Literal[0] = 0
# micro time flag for last group
self.lg_microtime: bool = False
# = flag if output was given
self.change = 0
# = global start for the full script running time
self.start: float | None = None
# = for the eta time, can be set after a query or long read in, to not create a wrong ETA time
self.start_run: float | None = None
# loop start
self.start_time: float | None = None
# global end
self.end: float | None = None
# loop end
self.end_time: float | None = None
# run time in seconds, set when end time method is called
self.run_time: float | None = None
# = filesize current
self.count_size: int | None = None
# position current
self.count: int = 0
# last count (position)
self.current_count: int = 0
# the current file post
self.file_pos: int | None = None
# lines processed in the last run
self.lines_processed: int = 0
# time in th seconds for the last group run (until percent change)
self.last_group: float = 0
# float value, lines processed per second to the last group run
self.lines_in_last_group: float = 0
# float values, lines processed per second to complete run
self.lines_in_global: float = 0
# flaot value, bytes processes per second in the last group run
self.bytes_in_last_group: float = 0
# float value, bytes processed per second to complete run
self.bytes_in_global: float = 0
# bytes processed in last run (in bytes)
self.size_in_last_group: int = 0
# current file position 8size)
self.current_size: int = 0
# last percent position
self.last_percent: int | float = 0
# if we have normal % or in steps of 10
self.precision_ten_step: int = 0
# the default size this is precision + 4
self.percent_print: int = 5
# this is 1 if it is 1 or 0 for precision or precision size
self.percent_precision: int = 1
# prefix line with a line break
self.prefix_lb: bool = False
# estimated time to finish
self.eta: float | None = None
# run time since start
self.full_time_needed: float | None = None
# the actual output
self.string: str = ''
# initialize the class
self.set_precision(precision)
self.set_verbose(verbose)
self.set_micro_time(microtime)
self.set_wide_time(wide_time)
self.set_prefix_lb(prefix_lb)
self.set_start_time()
def reset(self):
"""
resets the current progress to 0, but keeps the overall start variables set
"""
# reset what always gets reset
self.count = 0
self.count_size = None
self.current_count = 0
self.linecount = 0
self.lines_processed = 0
self.last_group = 0
self.lines_in_last_group = 0
self.lines_in_global = 0
self.bytes_in_last_group = 0
self.bytes_in_global = 0
self.size_in_last_group = 0
self.filesize = 0
self.current_size = 0
self.last_percent = 0
self.eta = 0
self.full_time_needed = 0
self.start_run = None
self.start_time = None
self.end_time = None
def set_wide_time(self, wide_time: bool) -> bool:
"""
sets the show wide time flag
Arguments:
wide_time {bool} -- _description_
Returns:
bool -- _description_
"""
self.wide_time = wide_time
return self.wide_time
def set_micro_time(self, microtime: Literal[-1] | Literal[1] | Literal[0]) -> Literal[-1] | Literal[1] | Literal[0]:
"""sets the show microtime -1 OFF, 0 AUTO, 1 ON
Returns:
_type_ -- _description_
"""
self.microtime = microtime
return self.microtime
def set_prefix_lb(self, prefix_lb: bool) -> bool:
"""
set prefix line break flag
Arguments:
prefix_lb {bool} -- _description_
Returns:
bool -- _description_
"""
self.prefix_lb = prefix_lb
return self.prefix_lb
def set_verbose(self, verbose: int) -> bool:
"""
set the internal verbose flag to 1 if any value higher than 1 is given, else sets it to 0
Arguments:
verbose {int} -- _description_
Returns:
bool -- _description_
"""
if verbose > 0:
self.verbose = True
else:
self.verbose = False
return self.verbose
def set_precision(self, precision: int) -> int:
"""
sets the output precision size. If -2 for five step, -1 for ten step
else sets the precision normally, for 0, no precision is set, maximum precision is 10
Arguments:
precision {int} -- _description_
Returns:
int -- _description_
"""
# if not a valid number, we set it to 0
if precision < -2 or precision > 10:
precision = 0
if precision < 0:
if precision < -1:
self.precision_ten_step = 5
else:
self.precision_ten_step = 10
self.precision = 0 # no comma
self.percent_precision = 0 # no print precision
self.percent_print = 3 # max 3 length
else:
# comma values visible
self.precision = 10 if precision < 0 or precision > 10 else precision
# for calcualtion of precision
self.percent_precision = 10 if precision < 0 or precision > 10 else precision
# for the format output base is 4, plsut he percent precision length
self.percent_print = (3 if precision == 0 else 4) + self.percent_precision
# return the set precision
return self.precision
def set_linecount(self, linecount: int) -> int:
"""
set the maximum lines in this file, if value is smaller than 0 or 0, then it is set to 1
Arguments:
linecount {int} -- _description_
Returns:
int -- _description_
"""
if linecount > 0:
self.linecount = linecount
else:
self.linecount = 1
return self.linecount
def set_filesize(self, filesize: int) -> int:
"""
set the maximum filesize for this file, if value is smaller than 0 or 0, then it is set to 1
Arguments:
filesize {int} -- _description_
Returns:
int -- _description_
"""
if filesize > 0:
self.filesize = filesize
else:
self.filesize = 1
return self.filesize
def set_start_time(self, time_value: float = time.time()) -> None:
"""
initial set of the start times, auto set
Keyword Arguments:
time_value {float} -- _description_ (default: {time.time()})
"""
# avoid possible double set of the original start time
if not self.start:
self.start = time_value
self.start_time = time_value
self.start_run = time_value
def set_eta_start_time(self, time_value: float = time.time()) -> None:
"""
sets the loop % run time, for correct ETA calculation
calls set start time, as the main start time is only set once
Keyword Arguments:
time_value {float} -- _description_ (default: {time.time()})
"""
self.set_start_time(time_value)
def set_end_time(self, time_value: float = time.time()) -> None:
"""
set the end time
Keyword Arguments:
time_value {float} -- _description_ (default: {time.time()})
"""
self.end = time_value
self.end_time = time_value
if self.start is None:
self.start = 0
# the overall run time in micro seconds
self.run_time = self.end - self.start
def show_position(self, filepos: int = 0) -> str:
"""
processes the current position. either based on read the file size pos, or the line count
Keyword Arguments:
filepos {int} -- _description_ (default: {0})
Returns:
str -- _description_
"""
show_filesize = True # if we print from file size or line count
# microtime flags
eta_microtime = False
ftn_microtime = False
lg_microtime = False
# percent precision calc
# _p_spf = "{:." + str(self.precision) + "f}"
# output format for percent
_pr_p_spf = "{:>" + str(self.percent_print) + "." + str(self.percent_precision) + "f}"
# set the linecount precision based on the final linecount, if not, leave it empty
_pr_lc = "{}"
if self.linecount:
_pr_lc = "{:>" + str(len(str(f"{self.linecount:,}"))) + ",}"
# time format, if flag is set, the wide format is used
_pr_tf = "{}"
if self.wide_time:
_pr_tf = "{:>15}"
# count up
self.count += 1
# if we have file pos from parameter
if filepos != 0:
self.file_pos = filepos
else:
# we did not, so we set internal value
self.file_pos = self.count
# we also check if the filesize was set now
if self.filesize == 0:
self.filesize = self.linecount
# set ignore filesize output (no data)
show_filesize = False
# set the count size based on the file pos, is only used if we have filesize
self.count_size = self.file_pos
# do normal or down to 10 (0, 10, ...) %
if self.precision_ten_step:
_percent = int((self.file_pos / float(self.filesize)) * 100)
mod = _percent % self.precision_ten_step
percent = _percent if mod == 0 else self.last_percent
else:
# calc percent
percent = round(((self.file_pos / float(self.filesize)) * 100), self.precision)
# output
if percent != self.last_percent:
self.end_time = time.time() # current time (for loop time)
if self.start is None:
self.start = 0
if self.start_time is None:
self.start_time = 0
# for from the beginning
full_time_needed = self.end_time - self.start # how long from the start
self.last_group = self.end_time - self.start_time # how long for last loop
self.lines_processed = self.count - self.current_count # how many lines processed
# lines in last group
self.lines_in_last_group = (self.lines_processed / self.last_group) if self.last_group else 0
# lines in global
self.lines_in_global = (self.count / full_time_needed) if full_time_needed else 0
# if we have linecount or not
if self.linecount == 0:
full_time_per_line = (full_time_needed if full_time_needed else 1) / self.count_size # how long for all
# estimate for the rest
eta = full_time_per_line * (self.filesize - self.count_size)
else:
# how long for all
full_time_per_line = (full_time_needed if full_time_needed else 1) / self.count
# estimate for the rest
eta = full_time_per_line * (self.linecount - self.count)
# just in case ...
if eta < 0:
eta = 0
# check if to show microtime
# ON
if self.microtime == 1:
eta_microtime = ftn_microtime = lg_microtime = True
# AUTO
if self.microtime == 0:
if eta > 0 and eta < 1:
eta_microtime = True
if full_time_needed > 0 and full_time_needed < 1:
ftn_microtime = True
# pre check last group: if pre comma part is same add microtime anyway
if self.last_group > 0 and self.last_group < 1:
lg_microtime = True
if self.last_group == floor(self.last_group):
lg_microtime = True
self.last_group = floor(self.last_group)
# if with filesize or without
if show_filesize:
# last group size
self.size_in_last_group = self.count_size - self.current_size
# calc kb/s if there is any filesize data
# last group
self.bytes_in_last_group = (self.size_in_last_group / self.last_group) if self.last_group else 0
# global
self.bytes_in_global = (self.count_size / full_time_needed) if full_time_needed else 0
# only used if we run with file size for the next check
self.current_size = self.count_size
if self.verbose >= 1:
self.string = (
f"Processed {_pr_p_spf}% "
"[{} / {}] | "
f"{_pr_lc} / {_pr_lc} Lines | ETA: {_pr_tf} / TR: {_pr_tf} / "
"LR: {:,} "
"lines ({:,}) in {}, {:,.2f} ({:,.2f}) lines/s, {} ({}) b/s"
).format(
float(percent),
format_bytes(self.count_size),
format_bytes(self.filesize),
self.count,
self.linecount,
convert_timestamp(eta, eta_microtime),
convert_timestamp(full_time_needed, ftn_microtime),
self.lines_processed,
self.size_in_last_group,
convert_timestamp(self.last_group, lg_microtime),
self.lines_in_global,
self.lines_in_last_group,
format_bytes(self.bytes_in_global),
format_bytes(self.bytes_in_last_group)
)
else:
if self.verbose >= 1:
self.string = (
f"Processed {_pr_p_spf}% | {_pr_lc} / {_pr_lc} Lines "
f"| ETA: {_pr_tf} / TR: {_pr_tf} / "
"LR: {:,} lines in {}, {:,.2f} ({:,.2f}) lines/s"
).format(
float(percent),
self.count,
self.linecount,
convert_timestamp(eta, eta_microtime),
convert_timestamp(full_time_needed, ftn_microtime),
self.lines_processed,
convert_timestamp(self.last_group, lg_microtime),
self.lines_in_global,
self.lines_in_last_group
)
# prefix return string with line break if flagged
self.string = ("\n" if self.prefix_lb else '') + self.string
# print the string if verbose is turned on
if self.verbose >= 1:
print(self.string)
# write back vars
self.last_percent = percent
self.eta = eta
self.full_time_needed = full_time_needed
self.lg_microtime = lg_microtime
# for the next run, check data
self.start_time = time.time()
self.current_count = self.count
# trigger if this is a change
self.change = 1
else:
# trigger if this is a change
self.change = 0
# return string
return self.string
# } END OF ShowPosition
warn("Use 'corelibs_progress.progress.Progress'", DeprecationWarning, stacklevel=2)
# __END__

View File

@@ -2,13 +2,16 @@
Helper methods for scripts
"""
import time
import os
import sys
from warnings import deprecated
from pathlib import Path
import psutil
from corelibs_script.script_support import (
wait_abort as corelibs_wait_abort,
lock_run as corelibs_lock_run,
unlock_run as corelibs_unlock_run,
)
@deprecated("use corelibs_script.script_support.wait_abort instead")
def wait_abort(sleep: int = 5) -> None:
"""
wait a certain time for an abort command
@@ -16,18 +19,10 @@ def wait_abort(sleep: int = 5) -> None:
Keyword Arguments:
sleep {int} -- _description_ (default: {5})
"""
try:
print(f"Waiting {sleep} seconds (Press CTRL +C to abort) [", end="", flush=True)
for _ in range(1, sleep):
print(".", end="", flush=True)
time.sleep(1)
print("]", flush=True)
except KeyboardInterrupt:
print("\nInterrupted by user")
sys.exit(0)
print("\n\n")
corelibs_wait_abort(sleep)
@deprecated("use corelibs_script.script_support.lock_run instead")
def lock_run(lock_file: Path) -> None:
"""
lock a script run
@@ -41,44 +36,10 @@ def lock_run(lock_file: Path) -> None:
Exception: _description_
IOError: _description_
"""
no_file = False
run_pid = os.getpid()
# or os.path.isfile()
try:
with open(lock_file, "r", encoding="UTF-8") as fp:
exists = False
pid = fp.read()
fp.close()
if pid:
# check if this pid exists
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if pid == proc.info['pid']:
exists = True
break
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# in case we cannot access
continue
if not exists:
# no pid but lock file, unlink
try:
lock_file.unlink()
no_file = True
except IOError as e:
raise IOError(f"Cannot remove lock_file: {lock_file}: {e}") from e
else:
raise IOError(f"Script is already running with PID {pid}")
except IOError:
no_file = True
if no_file:
try:
with open(lock_file, "w", encoding="UTF-8") as fp:
fp.write(str(run_pid))
fp.close()
except IOError as e:
raise IOError(f"Cannot open run lock file '{lock_file}' for writing: {e}") from e
corelibs_lock_run(lock_file)
@deprecated("use corelibs_script.script_support.unlock_run instead")
def unlock_run(lock_file: Path) -> None:
"""
removes the lock file
@@ -89,9 +50,6 @@ def unlock_run(lock_file: Path) -> None:
Raises:
Exception: _description_
"""
try:
lock_file.unlink()
except IOError as e:
raise IOError(f"Cannot remove lock_file: {lock_file}: {e}") from e
corelibs_unlock_run(lock_file)
# __END__

View File

@@ -37,8 +37,8 @@ def main():
# prg.SetStartTime(time.time())
prg.set_start_time()
print(
f"PRECISION: {prg.precision} | TEN STEP: {prg.precision_ten_step} | "
f"WIDE TEME: {prg.wide_time} | MICROTIME: {prg.microtime} | VERBOSE: {prg.verbose}"
f"PRECISION: {prg.__precision} | TEN STEP: {prg.precision_ten_step} | "
f"WIDE TEME: {prg.__wide_time} | MICROTIME: {prg.microtime} | VERBOSE: {prg.verbose}"
)
if use_file:
@@ -56,7 +56,7 @@ def main():
print(
f"Buffer size: {io.DEFAULT_BUFFER_SIZE} | "
f"Do Buffering: {fh.line_buffering} | "
f"File size: {prg.filesize}"
f"File size: {prg.__filesize}"
)
data = fh.readline()
while data:
@@ -72,7 +72,7 @@ def main():
print(f"Starting: {create_time(prg.start if prg.start is not None else 0)}")
prg.set_linecount(256)
i = 1
while i <= prg.linecount:
while i <= prg.__linecount:
sleep = randint(1, 9)
sleep /= 7
time.sleep(sleep)

View File

@@ -1,3 +0,0 @@
"""
Unit tests for script_handling module
"""

View File

@@ -1,821 +0,0 @@
"""
PyTest: script_handling/script_helpers
"""
# pylint: disable=use-implicit-booleaness-not-comparison
import time
import os
from pathlib import Path
from unittest.mock import patch, MagicMock, mock_open, PropertyMock
import pytest
from pytest import CaptureFixture
import psutil
from corelibs.script_handling.script_helpers import (
wait_abort,
lock_run,
unlock_run,
)
class TestWaitAbort:
"""Test suite for wait_abort function"""
def test_wait_abort_default_sleep(self, capsys: CaptureFixture[str]):
"""Test wait_abort with default sleep duration"""
with patch('time.sleep'):
wait_abort()
captured = capsys.readouterr()
assert "Waiting 5 seconds" in captured.out
assert "(Press CTRL +C to abort)" in captured.out
assert "[" in captured.out
assert "]" in captured.out
# Should have 4 dots (sleep - 1)
assert captured.out.count(".") == 4
def test_wait_abort_custom_sleep(self, capsys: CaptureFixture[str]):
"""Test wait_abort with custom sleep duration"""
with patch('time.sleep'):
wait_abort(sleep=3)
captured = capsys.readouterr()
assert "Waiting 3 seconds" in captured.out
# Should have 2 dots (3 - 1)
assert captured.out.count(".") == 2
def test_wait_abort_sleep_one_second(self, capsys: CaptureFixture[str]):
"""Test wait_abort with sleep duration of 1 second"""
with patch('time.sleep'):
wait_abort(sleep=1)
captured = capsys.readouterr()
assert "Waiting 1 seconds" in captured.out
# Should have 0 dots (1 - 1)
assert captured.out.count(".") == 0
def test_wait_abort_sleep_zero(self, capsys: CaptureFixture[str]):
"""Test wait_abort with sleep duration of 0"""
with patch('time.sleep'):
wait_abort(sleep=0)
captured = capsys.readouterr()
assert "Waiting 0 seconds" in captured.out
# Should have 0 dots since range(1, 0) is empty
assert captured.out.count(".") == 0
def test_wait_abort_keyboard_interrupt(self, capsys: CaptureFixture[str]):
"""Test wait_abort handles KeyboardInterrupt and exits"""
with patch('time.sleep', side_effect=KeyboardInterrupt):
with pytest.raises(SystemExit) as exc_info:
wait_abort(sleep=5)
assert exc_info.value.code == 0
captured = capsys.readouterr()
assert "Interrupted by user" in captured.out
def test_wait_abort_keyboard_interrupt_immediate(self, capsys: CaptureFixture[str]):
"""Test wait_abort handles KeyboardInterrupt on first iteration"""
def sleep_side_effect(_duration: int) -> None:
raise KeyboardInterrupt()
with patch('time.sleep', side_effect=sleep_side_effect):
with pytest.raises(SystemExit) as exc_info:
wait_abort(sleep=10)
assert exc_info.value.code == 0
captured = capsys.readouterr()
assert "Interrupted by user" in captured.out
def test_wait_abort_completes_normally(self, capsys: CaptureFixture[str]):
"""Test wait_abort completes without interruption"""
with patch('time.sleep') as mock_sleep:
wait_abort(sleep=3)
# time.sleep should be called (sleep - 1) times
assert mock_sleep.call_count == 2
captured = capsys.readouterr()
assert "Waiting 3 seconds" in captured.out
assert "]" in captured.out
# Should have newlines at the end
assert captured.out.endswith("\n\n")
def test_wait_abort_actual_timing(self):
"""Test wait_abort actually waits (integration test)"""
start_time = time.time()
wait_abort(sleep=1)
elapsed_time = time.time() - start_time
# Should take at least close to 0 seconds (1-1)
# With mocking disabled in this test, it would take actual time
# but we've been mocking it, so this tests the unmocked behavior
# For this test, we'll check it runs without error
assert elapsed_time >= 0
def test_wait_abort_large_sleep_value(self, capsys: CaptureFixture[str]):
"""Test wait_abort with large sleep value"""
with patch('time.sleep'):
wait_abort(sleep=100)
captured = capsys.readouterr()
assert "Waiting 100 seconds" in captured.out
# Should have 99 dots
assert captured.out.count(".") == 99
def test_wait_abort_output_format(self, capsys: CaptureFixture[str]):
"""Test wait_abort output formatting"""
with patch('time.sleep'):
wait_abort(sleep=3)
captured = capsys.readouterr()
# Check the exact format
assert "Waiting 3 seconds (Press CTRL +C to abort) [" in captured.out
assert captured.out.count("[") == 1
assert captured.out.count("]") == 1
def test_wait_abort_flush_behavior(self):
"""Test that wait_abort flushes output correctly"""
with patch('time.sleep'):
with patch('builtins.print') as mock_print:
wait_abort(sleep=3)
# Check that print was called with flush=True
# First call: "Waiting X seconds..."
# Intermediate calls: dots with flush=True
# Last calls: "]" and final newlines
flush_calls = [
call for call in mock_print.call_args_list
if 'flush' in call.kwargs and call.kwargs['flush'] is True
]
assert len(flush_calls) > 0
class TestLockRun:
"""Test suite for lock_run function"""
def test_lock_run_creates_lock_file(self, tmp_path: Path):
"""Test lock_run creates a lock file with current PID"""
lock_file = tmp_path / "test.lock"
lock_run(lock_file)
assert lock_file.exists()
content = lock_file.read_text()
assert content == str(os.getpid())
def test_lock_run_raises_when_process_exists(self, tmp_path: Path):
"""Test lock_run raises IOError when process with PID exists
Note: The actual code has a bug where it compares string PID from file
with integer PID from psutil, which will never match. This test demonstrates
the intended behavior if the bug were fixed.
"""
lock_file = tmp_path / "test.lock"
current_pid = os.getpid()
# Create lock file with current PID
lock_file.write_text(str(current_pid))
# Patch at module level to ensure correct comparison
with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter:
def mock_process_iter(attrs=None): # type: ignore
mock_proc = MagicMock()
# Make PID a string to match the file content for comparison
mock_proc.info = {'pid': str(current_pid)}
return [mock_proc]
mock_proc_iter.side_effect = mock_process_iter
with pytest.raises(IOError) as exc_info:
lock_run(lock_file)
assert f"Script is already running with PID {current_pid}" in str(exc_info.value)
def test_lock_run_removes_stale_lock_file(self, tmp_path: Path):
"""Test lock_run removes lock file when PID doesn't exist"""
lock_file = tmp_path / "test.lock"
# Use a PID that definitely doesn't exist
stale_pid = "99999999"
lock_file.write_text(stale_pid)
# Mock psutil to return no matching processes
with patch('psutil.process_iter') as mock_proc_iter:
mock_process = MagicMock()
mock_process.info = {'pid': 12345} # Different PID
mock_proc_iter.return_value = [mock_process]
lock_run(lock_file)
# Lock file should be recreated with current PID
assert lock_file.exists()
assert lock_file.read_text() == str(os.getpid())
def test_lock_run_creates_lock_when_no_file_exists(self, tmp_path: Path):
"""Test lock_run creates lock file when none exists"""
lock_file = tmp_path / "new.lock"
assert not lock_file.exists()
lock_run(lock_file)
assert lock_file.exists()
def test_lock_run_handles_empty_lock_file(self, tmp_path: Path):
"""Test lock_run handles empty lock file"""
lock_file = tmp_path / "empty.lock"
lock_file.write_text("")
lock_run(lock_file)
assert lock_file.exists()
assert lock_file.read_text() == str(os.getpid())
def test_lock_run_handles_psutil_no_such_process(self, tmp_path: Path):
"""Test lock_run handles psutil.NoSuchProcess exception"""
lock_file = tmp_path / "test.lock"
lock_file.write_text("12345")
with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter:
# Create a mock that raises NoSuchProcess inside the try block
def mock_iter(attrs=None): # type: ignore
mock_proc = MagicMock()
mock_proc.info = {'pid': "12345"}
# Configure to raise exception when accessed
type(mock_proc).info = PropertyMock(side_effect=psutil.NoSuchProcess(12345))
return [mock_proc]
mock_proc_iter.side_effect = mock_iter
# Since the exception is caught, lock should be acquired
lock_run(lock_file)
assert lock_file.exists()
assert lock_file.read_text() == str(os.getpid())
def test_lock_run_handles_psutil_access_denied(self, tmp_path: Path):
"""Test lock_run handles psutil.AccessDenied exception"""
lock_file = tmp_path / "test.lock"
lock_file.write_text("12345")
with patch('psutil.process_iter') as mock_proc_iter:
mock_proc_iter.return_value = []
lock_run(lock_file)
assert lock_file.exists()
def test_lock_run_handles_psutil_zombie_process(self, tmp_path: Path):
"""Test lock_run handles psutil.ZombieProcess exception"""
lock_file = tmp_path / "test.lock"
lock_file.write_text("12345")
with patch('psutil.process_iter') as mock_proc_iter:
mock_proc_iter.return_value = []
lock_run(lock_file)
assert lock_file.exists()
def test_lock_run_raises_on_unlink_error(self, tmp_path: Path):
"""Test lock_run raises IOError when cannot remove stale lock file"""
lock_file = tmp_path / "test.lock"
lock_file.write_text("99999999")
with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter:
mock_proc_iter.return_value = []
# Mock pathlib.Path.unlink to raise IOError on the specific lock_file
original_unlink = Path.unlink
def mock_unlink(self, *args, **kwargs): # type: ignore
if self == lock_file:
raise IOError("Permission denied")
return original_unlink(self, *args, **kwargs)
with patch.object(Path, 'unlink', mock_unlink):
with pytest.raises(IOError) as exc_info:
lock_run(lock_file)
assert "Cannot remove lock_file" in str(exc_info.value)
assert "Permission denied" in str(exc_info.value)
def test_lock_run_raises_on_write_error(self, tmp_path: Path):
"""Test lock_run raises IOError when cannot write lock file"""
lock_file = tmp_path / "test.lock"
# Mock open to raise IOError on write
with patch('builtins.open', side_effect=IOError("Disk full")):
with pytest.raises(IOError) as exc_info:
lock_run(lock_file)
assert "Cannot open run lock file" in str(exc_info.value)
assert "Disk full" in str(exc_info.value)
def test_lock_run_uses_current_pid(self, tmp_path: Path):
"""Test lock_run uses current process PID"""
lock_file = tmp_path / "test.lock"
expected_pid = os.getpid()
lock_run(lock_file)
actual_pid = lock_file.read_text()
assert actual_pid == str(expected_pid)
def test_lock_run_with_subdirectory(self, tmp_path: Path):
"""Test lock_run creates lock file in subdirectory"""
subdir = tmp_path / "locks"
subdir.mkdir()
lock_file = subdir / "test.lock"
lock_run(lock_file)
assert lock_file.exists()
assert lock_file.read_text() == str(os.getpid())
def test_lock_run_overwrites_invalid_pid(self, tmp_path: Path):
"""Test lock_run overwrites lock file with invalid PID format"""
lock_file = tmp_path / "test.lock"
lock_file.write_text("not_a_number")
# When PID is not a valid number, psutil won't find it
with patch('psutil.process_iter') as mock_proc_iter:
mock_proc_iter.return_value = []
lock_run(lock_file)
assert lock_file.read_text() == str(os.getpid())
def test_lock_run_multiple_times_same_process(self, tmp_path: Path):
"""Test lock_run called multiple times by same process"""
lock_file = tmp_path / "test.lock"
current_pid = os.getpid()
# First call
lock_run(lock_file)
assert lock_file.read_text() == str(current_pid)
# Second call - should raise since process exists
with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter:
def mock_iter(attrs=None): # type: ignore
mock_proc = MagicMock()
mock_proc.info = {'pid': str(current_pid)}
return [mock_proc]
mock_proc_iter.side_effect = mock_iter
with pytest.raises(IOError) as exc_info:
lock_run(lock_file)
assert f"Script is already running with PID {current_pid}" in str(exc_info.value)
def test_lock_run_checks_all_processes(self, tmp_path: Path):
"""Test lock_run iterates through all processes"""
lock_file = tmp_path / "test.lock"
lock_file.write_text("12345")
with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter:
# Create multiple mock processes
def mock_iter(attrs=None): # type: ignore
mock_processes = []
for pid in ["1000", "2000", "12345", "4000"]: # PIDs as strings
mock_proc = MagicMock()
mock_proc.info = {'pid': pid}
mock_processes.append(mock_proc)
return mock_processes
mock_proc_iter.side_effect = mock_iter
# Should find PID 12345 and raise
with pytest.raises(IOError) as exc_info:
lock_run(lock_file)
assert "Script is already running with PID 12345" in str(exc_info.value)
def test_lock_run_file_encoding_utf8(self, tmp_path: Path):
"""Test lock_run uses UTF-8 encoding"""
lock_file = tmp_path / "test.lock"
with patch('builtins.open', mock_open()) as mock_file:
try:
lock_run(lock_file)
except (IOError, FileNotFoundError):
pass # We're just checking the encoding parameter
# Check that open was called with UTF-8 encoding
calls = mock_file.call_args_list
for call in calls:
if 'encoding' in call.kwargs:
assert call.kwargs['encoding'] == 'UTF-8'
class TestUnlockRun:
"""Test suite for unlock_run function"""
def test_unlock_run_removes_lock_file(self, tmp_path: Path):
"""Test unlock_run removes existing lock file"""
lock_file = tmp_path / "test.lock"
lock_file.write_text("12345")
assert lock_file.exists()
unlock_run(lock_file)
assert not lock_file.exists()
def test_unlock_run_raises_on_error(self, tmp_path: Path):
"""Test unlock_run raises IOError when cannot remove file"""
lock_file = tmp_path / "test.lock"
lock_file.write_text("12345")
with patch.object(Path, 'unlink', side_effect=IOError("Permission denied")):
with pytest.raises(IOError) as exc_info:
unlock_run(lock_file)
assert "Cannot remove lock_file" in str(exc_info.value)
assert "Permission denied" in str(exc_info.value)
def test_unlock_run_on_nonexistent_file(self, tmp_path: Path):
"""Test unlock_run on non-existent file raises IOError"""
lock_file = tmp_path / "nonexistent.lock"
with pytest.raises(IOError) as exc_info:
unlock_run(lock_file)
assert "Cannot remove lock_file" in str(exc_info.value)
def test_unlock_run_with_subdirectory(self, tmp_path: Path):
"""Test unlock_run removes file from subdirectory"""
subdir = tmp_path / "locks"
subdir.mkdir()
lock_file = subdir / "test.lock"
lock_file.write_text("12345")
unlock_run(lock_file)
assert not lock_file.exists()
def test_unlock_run_multiple_times(self, tmp_path: Path):
"""Test unlock_run called multiple times raises error"""
lock_file = tmp_path / "test.lock"
lock_file.write_text("12345")
# First call should succeed
unlock_run(lock_file)
assert not lock_file.exists()
# Second call should raise IOError
with pytest.raises(IOError):
unlock_run(lock_file)
def test_unlock_run_readonly_file(self, tmp_path: Path):
"""Test unlock_run on read-only file"""
lock_file = tmp_path / "readonly.lock"
lock_file.write_text("12345")
lock_file.chmod(0o444)
try:
unlock_run(lock_file)
# On some systems, unlink may still work on readonly files
assert not lock_file.exists()
except IOError as exc_info:
# On other systems, it may raise an error
assert "Cannot remove lock_file" in str(exc_info)
def test_unlock_run_preserves_other_files(self, tmp_path: Path):
"""Test unlock_run only removes specified file"""
lock_file1 = tmp_path / "test1.lock"
lock_file2 = tmp_path / "test2.lock"
lock_file1.write_text("12345")
lock_file2.write_text("67890")
unlock_run(lock_file1)
assert not lock_file1.exists()
assert lock_file2.exists()
class TestLockUnlockIntegration:
"""Integration tests for lock_run and unlock_run"""
def test_lock_unlock_workflow(self, tmp_path: Path):
"""Test complete lock and unlock workflow"""
lock_file = tmp_path / "workflow.lock"
# Lock
lock_run(lock_file)
assert lock_file.exists()
assert lock_file.read_text() == str(os.getpid())
# Unlock
unlock_run(lock_file)
assert not lock_file.exists()
def test_lock_unlock_relock(self, tmp_path: Path):
"""Test locking, unlocking, and locking again"""
lock_file = tmp_path / "relock.lock"
# First lock
lock_run(lock_file)
first_content = lock_file.read_text()
# Unlock
unlock_run(lock_file)
# Second lock
lock_run(lock_file)
second_content = lock_file.read_text()
assert first_content == second_content == str(os.getpid())
def test_lock_prevents_duplicate_run(self, tmp_path: Path):
"""Test lock prevents duplicate process simulation"""
lock_file = tmp_path / "duplicate.lock"
current_pid = os.getpid()
# First lock
lock_run(lock_file)
# Simulate another process trying to acquire lock
with patch('psutil.process_iter') as mock_proc_iter:
mock_process = MagicMock()
mock_process.info = {'pid': current_pid}
mock_proc_iter.return_value = [mock_process]
with pytest.raises(IOError) as exc_info:
lock_run(lock_file)
assert "already running" in str(exc_info.value)
# Cleanup
unlock_run(lock_file)
def test_stale_lock_cleanup_and_reacquire(self, tmp_path: Path):
"""Test cleaning up stale lock and acquiring new one"""
lock_file = tmp_path / "stale.lock"
# Create stale lock
stale_pid = "99999999"
lock_file.write_text(stale_pid)
# Mock psutil to indicate process doesn't exist
with patch('psutil.process_iter') as mock_proc_iter:
mock_proc_iter.return_value = []
lock_run(lock_file)
# Should have our PID now
assert lock_file.read_text() == str(os.getpid())
# Cleanup
unlock_run(lock_file)
assert not lock_file.exists()
def test_multiple_locks_different_files(self, tmp_path: Path):
"""Test multiple locks with different files"""
lock_file1 = tmp_path / "lock1.lock"
lock_file2 = tmp_path / "lock2.lock"
# Acquire multiple locks
lock_run(lock_file1)
lock_run(lock_file2)
assert lock_file1.exists()
assert lock_file2.exists()
# Release them
unlock_run(lock_file1)
unlock_run(lock_file2)
assert not lock_file1.exists()
assert not lock_file2.exists()
def test_lock_in_context_manager_pattern(self, tmp_path: Path):
"""Test lock/unlock in a context manager pattern"""
lock_file = tmp_path / "context.lock"
class LockContext:
def __init__(self, lock_path: Path):
self.lock_path = lock_path
def __enter__(self) -> 'LockContext':
lock_run(self.lock_path)
return self
def __exit__(self, exc_type: type, exc_val: Exception, exc_tb: object) -> bool:
unlock_run(self.lock_path)
return False
# Use in context
with LockContext(lock_file):
assert lock_file.exists()
# After context, should be unlocked
assert not lock_file.exists()
def test_lock_survives_process_in_loop(self, tmp_path: Path):
"""Test lock file persists across multiple operations"""
lock_file = tmp_path / "persistent.lock"
lock_run(lock_file)
# Simulate some operations
for _ in range(10):
assert lock_file.exists()
content = lock_file.read_text()
assert content == str(os.getpid())
unlock_run(lock_file)
assert not lock_file.exists()
def test_exception_during_locked_execution(self, tmp_path: Path):
"""Test lock cleanup when exception occurs during execution"""
lock_file = tmp_path / "exception.lock"
lock_run(lock_file)
try:
# Simulate some work that raises exception
raise ValueError("Something went wrong")
except ValueError:
pass
finally:
# Lock should still exist until explicitly unlocked
assert lock_file.exists()
unlock_run(lock_file)
assert not lock_file.exists()
def test_lock_file_permissions(self, tmp_path: Path):
"""Test lock file has appropriate permissions"""
lock_file = tmp_path / "permissions.lock"
lock_run(lock_file)
# File should be readable and writable by owner
assert lock_file.exists()
# We can read it
content = lock_file.read_text()
assert content == str(os.getpid())
unlock_run(lock_file)
class TestEdgeCases:
"""Test edge cases and error conditions"""
def test_wait_abort_negative_sleep(self, capsys: CaptureFixture[str]):
"""Test wait_abort with negative sleep value"""
with patch('time.sleep'):
wait_abort(sleep=-5)
captured = capsys.readouterr()
assert "Waiting -5 seconds" in captured.out
def test_lock_run_with_whitespace_pid(self, tmp_path: Path):
"""Test lock_run handles lock file with whitespace"""
lock_file = tmp_path / "whitespace.lock"
lock_file.write_text(" 12345 \n")
with patch('psutil.process_iter') as mock_proc_iter:
mock_proc_iter.return_value = []
lock_run(lock_file)
# Should create new lock with clean PID
assert lock_file.read_text() == str(os.getpid())
def test_lock_run_with_special_characters_in_path(self, tmp_path: Path):
"""Test lock_run with special characters in file path"""
special_dir = tmp_path / "special dir with spaces"
special_dir.mkdir()
lock_file = special_dir / "lock-file.lock"
lock_run(lock_file)
assert lock_file.exists()
unlock_run(lock_file)
def test_lock_run_with_very_long_path(self, tmp_path: Path):
"""Test lock_run with very long file path"""
# Create nested directories
deep_path = tmp_path
for i in range(10):
deep_path = deep_path / f"level{i}"
deep_path.mkdir(parents=True)
lock_file = deep_path / "deep.lock"
lock_run(lock_file)
assert lock_file.exists()
unlock_run(lock_file)
def test_unlock_run_on_directory(self, tmp_path: Path):
"""Test unlock_run on a directory raises appropriate error"""
test_dir = tmp_path / "test_dir"
test_dir.mkdir()
with pytest.raises(IOError):
unlock_run(test_dir)
def test_lock_run_race_condition_simulation(self, tmp_path: Path):
"""Test lock_run handles simulated race condition"""
lock_file = tmp_path / "race.lock"
# This is hard to test reliably, but we can at least verify
# the function handles existing files
lock_file.write_text("88888")
with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter:
def mock_iter(attrs=None): # type: ignore
mock_proc = MagicMock()
mock_proc.info = {'pid': "88888"}
return [mock_proc]
mock_proc_iter.side_effect = mock_iter
with pytest.raises(IOError):
lock_run(lock_file)
class TestScriptHelpersIntegration:
"""Integration tests combining multiple functions"""
def test_typical_script_pattern(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test typical script execution pattern with all helpers"""
lock_file = tmp_path / "script.lock"
# Wait before starting (with mocked sleep)
with patch('time.sleep'):
wait_abort(sleep=2)
captured = capsys.readouterr()
assert "Waiting 2 seconds" in captured.out
# Acquire lock
lock_run(lock_file)
assert lock_file.exists()
# Simulate work
time.sleep(0.01)
# Release lock
unlock_run(lock_file)
assert not lock_file.exists()
def test_script_with_error_handling(self, tmp_path: Path):
"""Test script pattern with error handling"""
lock_file = tmp_path / "error_script.lock"
try:
lock_run(lock_file)
# Simulate error during execution
raise RuntimeError("Simulated error")
except RuntimeError:
pass
finally:
# Ensure cleanup happens
if lock_file.exists():
unlock_run(lock_file)
assert not lock_file.exists()
def test_concurrent_script_protection(self, tmp_path: Path):
"""Test protection against concurrent script execution"""
lock_file = tmp_path / "concurrent.lock"
# First instance acquires lock
lock_run(lock_file)
# Second instance should fail
with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter:
def mock_iter(attrs=None): # type: ignore
mock_proc = MagicMock()
mock_proc.info = {'pid': str(os.getpid())}
return [mock_proc]
mock_proc_iter.side_effect = mock_iter
with pytest.raises(IOError) as exc_info:
lock_run(lock_file)
assert "already running" in str(exc_info.value).lower()
# Cleanup
unlock_run(lock_file)
def test_graceful_shutdown_pattern(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test graceful shutdown with wait and cleanup"""
lock_file = tmp_path / "graceful.lock"
lock_run(lock_file)
# Simulate interrupt during wait
with patch('time.sleep', side_effect=KeyboardInterrupt):
with pytest.raises(SystemExit):
wait_abort(sleep=5)
captured = capsys.readouterr()
assert "Interrupted by user" in captured.out
# Cleanup should still happen
unlock_run(lock_file)
assert not lock_file.exists()
# __END__

View File

@@ -1,840 +0,0 @@
"""
PyTest: script_handling/progress
"""
import time
from unittest.mock import patch
from pytest import CaptureFixture
from corelibs.script_handling.progress import Progress
class TestProgressInit:
"""Test suite for Progress initialization"""
def test_default_initialization(self):
"""Test Progress initialization with default parameters"""
prg = Progress()
assert prg.verbose is False
assert prg.precision == 1
assert prg.microtime == 0
assert prg.wide_time is False
assert prg.prefix_lb is False
assert prg.linecount == 0
assert prg.filesize == 0
assert prg.count == 0
assert prg.start is not None
def test_initialization_with_verbose(self):
"""Test Progress initialization with verbose enabled"""
prg = Progress(verbose=1)
assert prg.verbose is True
prg = Progress(verbose=5)
assert prg.verbose is True
prg = Progress(verbose=0)
assert prg.verbose is False
def test_initialization_with_precision(self):
"""Test Progress initialization with different precision values"""
# Normal precision
prg = Progress(precision=0)
assert prg.precision == 0
assert prg.percent_print == 3
prg = Progress(precision=2)
assert prg.precision == 2
assert prg.percent_print == 6
prg = Progress(precision=10)
assert prg.precision == 10
assert prg.percent_print == 14
# Ten step precision
prg = Progress(precision=-1)
assert prg.precision == 0
assert prg.precision_ten_step == 10
assert prg.percent_print == 3
# Five step precision
prg = Progress(precision=-2)
assert prg.precision == 0
assert prg.precision_ten_step == 5
assert prg.percent_print == 3
def test_initialization_with_microtime(self):
"""Test Progress initialization with microtime settings"""
prg = Progress(microtime=-1)
assert prg.microtime == -1
prg = Progress(microtime=0)
assert prg.microtime == 0
prg = Progress(microtime=1)
assert prg.microtime == 1
def test_initialization_with_wide_time(self):
"""Test Progress initialization with wide_time flag"""
prg = Progress(wide_time=True)
assert prg.wide_time is True
prg = Progress(wide_time=False)
assert prg.wide_time is False
def test_initialization_with_prefix_lb(self):
"""Test Progress initialization with prefix line break"""
prg = Progress(prefix_lb=True)
assert prg.prefix_lb is True
prg = Progress(prefix_lb=False)
assert prg.prefix_lb is False
def test_initialization_combined_parameters(self):
"""Test Progress initialization with multiple parameters"""
prg = Progress(verbose=1, precision=2, microtime=1, wide_time=True, prefix_lb=True)
assert prg.verbose is True
assert prg.precision == 2
assert prg.microtime == 1
assert prg.wide_time is True
assert prg.prefix_lb is True
class TestProgressSetters:
"""Test suite for Progress setter methods"""
def test_set_verbose(self):
"""Test set_verbose method"""
prg = Progress()
assert prg.set_verbose(1) is True
assert prg.verbose is True
assert prg.set_verbose(10) is True
assert prg.verbose is True
assert prg.set_verbose(0) is False
assert prg.verbose is False
def test_set_precision(self):
"""Test set_precision method"""
prg = Progress()
# Valid precision values
assert prg.set_precision(0) == 0
assert prg.precision == 0
assert prg.set_precision(5) == 5
assert prg.precision == 5
assert prg.set_precision(10) == 10
assert prg.precision == 10
# Ten step precision
prg.set_precision(-1)
assert prg.precision == 0
assert prg.precision_ten_step == 10
# Five step precision
prg.set_precision(-2)
assert prg.precision == 0
assert prg.precision_ten_step == 5
# Invalid precision (too low)
assert prg.set_precision(-3) == 0
assert prg.precision == 0
# Invalid precision (too high)
assert prg.set_precision(11) == 0
assert prg.precision == 0
def test_set_linecount(self):
"""Test set_linecount method"""
prg = Progress()
assert prg.set_linecount(100) == 100
assert prg.linecount == 100
assert prg.set_linecount(1000) == 1000
assert prg.linecount == 1000
# Zero or negative should set to 1
assert prg.set_linecount(0) == 1
assert prg.linecount == 1
assert prg.set_linecount(-10) == 1
assert prg.linecount == 1
def test_set_filesize(self):
"""Test set_filesize method"""
prg = Progress()
assert prg.set_filesize(1024) == 1024
assert prg.filesize == 1024
assert prg.set_filesize(1048576) == 1048576
assert prg.filesize == 1048576
# Zero or negative should set to 1
assert prg.set_filesize(0) == 1
assert prg.filesize == 1
assert prg.set_filesize(-100) == 1
assert prg.filesize == 1
def test_set_wide_time(self):
"""Test set_wide_time method"""
prg = Progress()
assert prg.set_wide_time(True) is True
assert prg.wide_time is True
assert prg.set_wide_time(False) is False
assert prg.wide_time is False
def test_set_micro_time(self):
"""Test set_micro_time method"""
prg = Progress()
assert prg.set_micro_time(-1) == -1
assert prg.microtime == -1
assert prg.set_micro_time(0) == 0
assert prg.microtime == 0
assert prg.set_micro_time(1) == 1
assert prg.microtime == 1
def test_set_prefix_lb(self):
"""Test set_prefix_lb method"""
prg = Progress()
assert prg.set_prefix_lb(True) is True
assert prg.prefix_lb is True
assert prg.set_prefix_lb(False) is False
assert prg.prefix_lb is False
def test_set_start_time(self):
"""Test set_start_time method"""
prg = Progress()
initial_start = prg.start
# Wait a bit and set new start time
time.sleep(0.01)
new_time = time.time()
prg.set_start_time(new_time)
# Original start should not change
assert prg.start == initial_start
# But start_time and start_run should update
assert prg.start_time == new_time
assert prg.start_run == new_time
def test_set_start_time_custom_value(self):
"""Test set_start_time with custom time value"""
prg = Progress()
custom_time = 1234567890.0
prg.start = None # Reset start to test first-time setting
prg.set_start_time(custom_time)
assert prg.start == custom_time
assert prg.start_time == custom_time
assert prg.start_run == custom_time
def test_set_eta_start_time(self):
"""Test set_eta_start_time method"""
prg = Progress()
custom_time = time.time() + 100
prg.set_eta_start_time(custom_time)
assert prg.start_time == custom_time
assert prg.start_run == custom_time
def test_set_end_time(self):
"""Test set_end_time method"""
prg = Progress()
start_time = time.time()
prg.set_start_time(start_time)
time.sleep(0.01)
end_time = time.time()
prg.set_end_time(end_time)
assert prg.end == end_time
assert prg.end_time == end_time
assert prg.run_time is not None
assert prg.run_time > 0
def test_set_end_time_with_none_start(self):
"""Test set_end_time when start is None"""
prg = Progress()
prg.start = None
end_time = time.time()
prg.set_end_time(end_time)
assert prg.end == end_time
assert prg.run_time == end_time
class TestProgressReset:
"""Test suite for Progress reset method"""
def test_reset_basic(self):
"""Test reset method resets counter variables"""
prg = Progress()
prg.set_linecount(1000)
prg.set_filesize(10240)
prg.count = 500
prg.current_count = 500
prg.lines_processed = 100
prg.reset()
assert prg.count == 0
assert prg.current_count == 0
assert prg.linecount == 0
assert prg.lines_processed == 0
assert prg.filesize == 0
assert prg.last_percent == 0
def test_reset_preserves_start(self):
"""Test reset preserves the original start time"""
prg = Progress()
original_start = prg.start
prg.reset()
# Original start should still be set from initialization
assert prg.start == original_start
def test_reset_clears_runtime_data(self):
"""Test reset clears runtime calculation data"""
prg = Progress()
prg.eta = 100.5
prg.full_time_needed = 50.2
prg.last_group = 10.1
prg.lines_in_last_group = 5.5
prg.lines_in_global = 3.3
prg.reset()
assert prg.eta == 0
assert prg.full_time_needed == 0
assert prg.last_group == 0
assert prg.lines_in_last_group == 0
assert prg.lines_in_global == 0
class TestProgressShowPosition:
"""Test suite for Progress show_position method"""
def test_show_position_basic_linecount(self):
"""Test show_position with basic line count"""
prg = Progress(verbose=0)
prg.set_linecount(100)
# Process some lines
for _ in range(10):
prg.show_position()
assert prg.count == 10
assert prg.file_pos == 10
def test_show_position_with_filesize(self):
"""Test show_position with file size parameter"""
prg = Progress(verbose=0)
prg.set_filesize(1024)
prg.show_position(512)
assert prg.count == 1
assert prg.file_pos == 512
assert prg.count_size == 512
def test_show_position_percent_calculation(self):
"""Test show_position calculates percentage correctly"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
# Process 50 lines
for _ in range(50):
prg.show_position()
assert prg.last_percent == 50.0
def test_show_position_ten_step_precision(self):
"""Test show_position with ten step precision"""
prg = Progress(verbose=0, precision=-1)
prg.set_linecount(100)
# Process lines, should only update at 10% intervals
for _ in range(15):
prg.show_position()
# Should be at 10% (not 15%)
assert prg.last_percent == 10
def test_show_position_five_step_precision(self):
"""Test show_position with five step precision"""
prg = Progress(verbose=0, precision=-2)
prg.set_linecount(100)
# Process lines, should only update at 5% intervals
for _ in range(7):
prg.show_position()
# Should be at 5% (not 7%)
assert prg.last_percent == 5
def test_show_position_change_flag(self):
"""Test show_position sets change flag correctly"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
# First call should trigger change (at 1%)
prg.show_position()
assert prg.change == 1
last_percent = prg.last_percent
# Keep calling - each percent increment triggers change
prg.show_position()
# At precision=0, each 1% is a new change
if prg.last_percent != last_percent:
assert prg.change == 1
else:
assert prg.change == 0
def test_show_position_with_verbose_output(self, capsys: CaptureFixture[str]):
"""Test show_position produces output when verbose is enabled"""
prg = Progress(verbose=1, precision=0)
prg.set_linecount(100)
# Process until percent changes
for _ in range(10):
prg.show_position()
captured = capsys.readouterr()
assert "Processed" in captured.out
assert "Lines" in captured.out
def test_show_position_with_prefix_lb(self):
"""Test show_position with prefix line break"""
prg = Progress(verbose=1, precision=0, prefix_lb=True)
prg.set_linecount(100)
# Process until percent changes
for _ in range(10):
prg.show_position()
assert prg.string.startswith("\n")
def test_show_position_lines_processed_calculation(self):
"""Test show_position calculates lines processed correctly"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
# First call at 1%
prg.show_position()
first_lines_processed = prg.lines_processed
assert first_lines_processed == 1
# Process to 2% (need to process 1 more line)
prg.show_position()
# lines_processed should be 1 (from 1 to 2)
assert prg.lines_processed == 1
def test_show_position_eta_calculation(self):
"""Test show_position calculates ETA"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(1000)
# We need to actually process lines for percent to change
# Process 100 lines to get to ~10%
for _ in range(100):
prg.show_position()
# ETA should be set after percent changes
assert prg.eta is not None
assert prg.eta >= 0
def test_show_position_with_filesize_output(self, capsys: CaptureFixture[str]):
"""Test show_position output with filesize information"""
prg = Progress(verbose=1, precision=0)
prg.set_filesize(10240)
# Process with filesize
for i in range(1, 1025):
prg.show_position(i)
captured = capsys.readouterr()
# Should contain byte information
assert "B" in captured.out or "KB" in captured.out
def test_show_position_bytes_calculation(self):
"""Test show_position calculates bytes per second"""
prg = Progress(verbose=0, precision=0)
prg.set_filesize(10240)
# Process enough bytes to trigger a percent change
# Need to process ~102 bytes for 1% of 10240
prg.show_position(102)
# After percent change, bytes stats should be set
assert prg.bytes_in_last_group >= 0
assert prg.bytes_in_global >= 0
def test_show_position_current_count_tracking(self):
"""Test show_position tracks current count correctly"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
for _ in range(10):
prg.show_position()
# Current count should be updated to last change point
assert prg.current_count == 10
assert prg.count == 10
def test_show_position_full_time_calculation(self):
"""Test show_position calculates full time needed"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
# Process enough to trigger percent change
for _ in range(10):
prg.show_position()
assert prg.full_time_needed is not None
assert prg.full_time_needed >= 0
def test_show_position_last_group_time(self):
"""Test show_position tracks last group time"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
# Process enough to trigger percent change
for _ in range(10):
prg.show_position()
# last_group should be set after percent change
assert prg.last_group >= 0
def test_show_position_zero_eta_edge_case(self):
"""Test show_position handles negative ETA gracefully"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
# Process all lines
for _ in range(100):
prg.show_position()
# ETA should not be negative
assert prg.eta is not None
assert prg.eta >= 0
def test_show_position_no_filesize_string_format(self):
"""Test show_position string format without filesize"""
prg = Progress(verbose=1, precision=0)
prg.set_linecount(100)
for _ in range(10):
prg.show_position()
# String should not contain byte information
assert "b/s" not in prg.string
assert "Lines" in prg.string
def test_show_position_wide_time_format(self):
"""Test show_position with wide time formatting"""
prg = Progress(verbose=1, precision=0, wide_time=True)
prg.set_linecount(100)
for _ in range(10):
prg.show_position()
# With wide_time, time fields should be formatted with specific width
assert prg.string != ""
def test_show_position_microtime_on(self):
"""Test show_position with microtime enabled"""
prg = Progress(verbose=0, precision=0, microtime=1)
prg.set_linecount(100)
with patch('time.time') as mock_time:
mock_time.return_value = 1000.0
prg.set_start_time(1000.0)
mock_time.return_value = 1000.5
for _ in range(10):
prg.show_position()
# Microtime should be enabled
assert prg.microtime == 1
def test_show_position_microtime_off(self):
"""Test show_position with microtime disabled"""
prg = Progress(verbose=0, precision=0, microtime=-1)
prg.set_linecount(100)
for _ in range(10):
prg.show_position()
assert prg.microtime == -1
def test_show_position_lines_per_second_global(self):
"""Test show_position calculates global lines per second"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(1000)
# Process 100 lines to trigger percent changes
for _ in range(100):
prg.show_position()
# After processing, lines_in_global should be calculated
assert prg.lines_in_global >= 0
def test_show_position_lines_per_second_last_group(self):
"""Test show_position calculates last group lines per second"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(1000)
# Process lines to trigger percent changes
for _ in range(100):
prg.show_position()
# After processing, lines_in_last_group should be calculated
assert prg.lines_in_last_group >= 0
def test_show_position_returns_string(self):
"""Test show_position returns the progress string"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
result = ""
for _ in range(10):
result = prg.show_position()
# Should return string on percent change
assert isinstance(result, str)
class TestProgressEdgeCases:
"""Test suite for edge cases and error conditions"""
def test_zero_linecount_protection(self):
"""Test Progress handles zero linecount gracefully"""
prg = Progress(verbose=0)
prg.set_filesize(1024)
# Should not crash with zero linecount
prg.show_position(512)
assert prg.file_pos == 512
def test_zero_filesize_protection(self):
"""Test Progress handles zero filesize gracefully"""
prg = Progress(verbose=0)
prg.set_linecount(100)
# Should not crash with zero filesize
prg.show_position()
assert isinstance(prg.string, str)
def test_division_by_zero_protection_last_group(self):
"""Test Progress protects against division by zero in last_group"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
with patch('time.time') as mock_time:
# Same time for start and end
mock_time.return_value = 1000.0
prg.set_start_time(1000.0)
for _ in range(10):
prg.show_position()
# Should handle zero time difference
assert prg.lines_in_last_group >= 0
def test_division_by_zero_protection_full_time(self):
"""Test Progress protects against division by zero in full_time_needed"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
# Process lines very quickly
for _ in range(10):
prg.show_position()
# Should handle very small time differences without crashing
# lines_in_global should be a valid number (>= 0)
assert isinstance(prg.lines_in_global, (int, float))
def test_none_start_protection(self):
"""Test Progress handles None start time"""
prg = Progress(verbose=0, precision=0)
prg.start = None
prg.set_linecount(100)
# Should not crash
prg.show_position()
assert prg.start == 0
def test_none_start_time_protection(self):
"""Test Progress handles None start_time"""
prg = Progress(verbose=0, precision=0)
prg.start_time = None
prg.set_linecount(100)
# Should not crash and should set start_time during processing
prg.show_position()
# start_time will be set to 0 internally when None is encountered
# But during percent calculation, it may be reset to current time
assert prg.start_time is not None
def test_precision_boundary_values(self):
"""Test precision at boundary values"""
prg = Progress()
# Minimum valid
assert prg.set_precision(-2) == 0
# Maximum valid
assert prg.set_precision(10) == 10
# Below minimum
assert prg.set_precision(-3) == 0
# Above maximum
assert prg.set_precision(11) == 0
def test_large_linecount_handling(self):
"""Test Progress handles large linecount values"""
prg = Progress(verbose=0)
large_count = 10_000_000
prg.set_linecount(large_count)
assert prg.linecount == large_count
# Should handle calculations without overflow
prg.show_position()
assert prg.count == 1
def test_large_filesize_handling(self):
"""Test Progress handles large filesize values"""
prg = Progress(verbose=0)
large_size = 10_737_418_240 # 10 GB
prg.set_filesize(large_size)
assert prg.filesize == large_size
# Should handle calculations without overflow
prg.show_position(1024)
assert prg.file_pos == 1024
class TestProgressIntegration:
"""Integration tests for Progress class"""
def test_complete_progress_workflow(self, capsys: CaptureFixture[str]):
"""Test complete progress workflow from start to finish"""
prg = Progress(verbose=1, precision=0)
prg.set_linecount(100)
# Simulate processing
for _ in range(100):
prg.show_position()
prg.set_end_time()
assert prg.count == 100
assert prg.last_percent == 100.0
assert prg.run_time is not None
captured = capsys.readouterr()
assert "Processed" in captured.out
def test_progress_with_filesize_workflow(self):
"""Test progress workflow with file size tracking"""
prg = Progress(verbose=0, precision=0)
prg.set_filesize(10240)
# Simulate reading file in chunks
for pos in range(0, 10240, 1024):
prg.show_position(pos + 1024)
assert prg.count == 10
assert prg.count_size == 10240
def test_reset_and_reuse(self):
"""Test resetting and reusing Progress instance"""
prg = Progress(verbose=0, precision=0)
# First run
prg.set_linecount(100)
for _ in range(100):
prg.show_position()
assert prg.count == 100
# Reset
prg.reset()
assert prg.count == 0
# Second run
prg.set_linecount(50)
for _ in range(50):
prg.show_position()
assert prg.count == 50
def test_multiple_precision_changes(self):
"""Test changing precision multiple times"""
prg = Progress(verbose=0)
prg.set_precision(0)
assert prg.precision == 0
prg.set_precision(2)
assert prg.precision == 2
prg.set_precision(-1)
assert prg.precision == 0
assert prg.precision_ten_step == 10
def test_eta_start_time_adjustment(self):
"""Test adjusting ETA start time mid-processing"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(1000)
# Process some lines
for _ in range(100):
prg.show_position()
# Adjust ETA start time (simulating delay like DB query)
new_time = time.time()
prg.set_eta_start_time(new_time)
# Continue processing
for _ in range(100):
prg.show_position()
assert prg.start_run == new_time
def test_verbose_toggle_during_processing(self):
"""Test toggling verbose flag during processing"""
prg = Progress(verbose=0, precision=0)
prg.set_linecount(100)
# Process without output
for _ in range(50):
prg.show_position()
# Enable verbose
prg.set_verbose(1)
assert prg.verbose is True
# Continue with output
for _ in range(50):
prg.show_position()
assert prg.count == 100