From 28527990e9071ee5a067aeaa474b95926cb925ac Mon Sep 17 00:00:00 2001 From: Clemens Schwaighofer Date: Fri, 6 Feb 2026 15:55:26 +0900 Subject: [PATCH] Move progres and script helpers to corelibs-progress and corelibs-script --- README.md | 6 + pyproject.toml | 2 + src/corelibs/script_handling/progress.py | 441 +-------- .../script_handling/script_helpers.py | 66 +- test-run/progress/progress_test.py | 8 +- tests/unit/script_handling/__init__.py | 3 - .../script_handling/_test_script_helpers.py | 821 ----------------- tests/unit/script_handling/test_progress.py | 840 ------------------ 8 files changed, 28 insertions(+), 2159 deletions(-) delete mode 100644 tests/unit/script_handling/__init__.py delete mode 100644 tests/unit/script_handling/_test_script_helpers.py delete mode 100644 tests/unit/script_handling/test_progress.py diff --git a/README.md b/README.md index 20e2d76..63c897e 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,12 @@ All content in this module will move to stand alone libraries, as of now the fol - math_handling.math_helpers: python.math - requests_handling.auth_helpers: corelibs-requests - requests_handling.caller: corelibs-requests +- script_handling.progress: corelibs-progress +- script_handling.script_helpers: corelibs-script +- string_handling.byte_helpers: corelibs-strings +- string_handling.double_byte_string_format: corelibs-double-byte-format +- string_handling.hash_helpers: corelibs-hash +- string_handling.string_helpers: corelibs-strings - string_handling.text_colors: corelibs-text-colors - var_handling.enum_base: corelibs-enum-base - var_handling.var_helpers: corelibs-var diff --git a/pyproject.toml b/pyproject.toml index 77b3156..bba586d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,8 +17,10 @@ dependencies = [ "corelibs-hash>=1.0.0", "corelibs-iterator>=1.0.0", "corelibs-json>=1.0.0", + "corelibs-progress>=1.0.0", "corelibs-regex-checks>=1.0.0", "corelibs-requests>=1.0.0", + "corelibs-script>=1.0.0", "corelibs-search>=1.0.0", "corelibs-stack-trace>=1.0.0", "corelibs-strings>=1.0.0", diff --git a/src/corelibs/script_handling/progress.py b/src/corelibs/script_handling/progress.py index be48c47..1154e37 100644 --- a/src/corelibs/script_handling/progress.py +++ b/src/corelibs/script_handling/progress.py @@ -29,449 +29,16 @@ set_end_time(time optional) show_position(file pos optional) """ -import time -from typing import Literal -from math import floor -from corelibs_datetime.timestamp_convert import convert_timestamp -from corelibs.string_handling.byte_helpers import format_bytes +from warnings import warn +from corelibs_progress.progress import Progress as CoreProgress # for type checking only -class Progress(): +class Progress(CoreProgress): """ file progress output information """ - def __init__( - self, - verbose: int = 0, - precision: int = 1, - microtime: Literal[-1] | Literal[1] | Literal[0] = 0, - wide_time: bool = False, - prefix_lb: bool = False - ): - # set default var stuff - # max lines in input - self.linecount: int = 0 - # max file size - self.filesize: int = 0 - # * comma after percent - self.precision: int = 0 - # * if flagged 1, then wthe wide 15 char left bound format is used - self.wide_time: bool = False - # * verbose status from outside - self.verbose: bool = False - # * microtime output for last run time (1 for enable 0 for auto -1 for disable) - self.microtime: Literal[-1] | Literal[1] | Literal[0] = 0 - # micro time flag for last group - self.lg_microtime: bool = False - # = flag if output was given - self.change = 0 - # = global start for the full script running time - self.start: float | None = None - # = for the eta time, can be set after a query or long read in, to not create a wrong ETA time - self.start_run: float | None = None - # loop start - self.start_time: float | None = None - # global end - self.end: float | None = None - # loop end - self.end_time: float | None = None - # run time in seconds, set when end time method is called - self.run_time: float | None = None - # = filesize current - self.count_size: int | None = None - # position current - self.count: int = 0 - # last count (position) - self.current_count: int = 0 - # the current file post - self.file_pos: int | None = None - # lines processed in the last run - self.lines_processed: int = 0 - # time in th seconds for the last group run (until percent change) - self.last_group: float = 0 - # float value, lines processed per second to the last group run - self.lines_in_last_group: float = 0 - # float values, lines processed per second to complete run - self.lines_in_global: float = 0 - # flaot value, bytes processes per second in the last group run - self.bytes_in_last_group: float = 0 - # float value, bytes processed per second to complete run - self.bytes_in_global: float = 0 - # bytes processed in last run (in bytes) - self.size_in_last_group: int = 0 - # current file position 8size) - self.current_size: int = 0 - # last percent position - self.last_percent: int | float = 0 - # if we have normal % or in steps of 10 - self.precision_ten_step: int = 0 - # the default size this is precision + 4 - self.percent_print: int = 5 - # this is 1 if it is 1 or 0 for precision or precision size - self.percent_precision: int = 1 - # prefix line with a line break - self.prefix_lb: bool = False - # estimated time to finish - self.eta: float | None = None - # run time since start - self.full_time_needed: float | None = None - # the actual output - self.string: str = '' - # initialize the class - self.set_precision(precision) - self.set_verbose(verbose) - self.set_micro_time(microtime) - self.set_wide_time(wide_time) - self.set_prefix_lb(prefix_lb) - self.set_start_time() - def reset(self): - """ - resets the current progress to 0, but keeps the overall start variables set - """ - # reset what always gets reset - self.count = 0 - self.count_size = None - self.current_count = 0 - self.linecount = 0 - self.lines_processed = 0 - self.last_group = 0 - self.lines_in_last_group = 0 - self.lines_in_global = 0 - self.bytes_in_last_group = 0 - self.bytes_in_global = 0 - self.size_in_last_group = 0 - self.filesize = 0 - self.current_size = 0 - self.last_percent = 0 - self.eta = 0 - self.full_time_needed = 0 - self.start_run = None - self.start_time = None - self.end_time = None - - def set_wide_time(self, wide_time: bool) -> bool: - """ - sets the show wide time flag - - Arguments: - wide_time {bool} -- _description_ - - Returns: - bool -- _description_ - """ - self.wide_time = wide_time - return self.wide_time - - def set_micro_time(self, microtime: Literal[-1] | Literal[1] | Literal[0]) -> Literal[-1] | Literal[1] | Literal[0]: - """sets the show microtime -1 OFF, 0 AUTO, 1 ON - - Returns: - _type_ -- _description_ - """ - self.microtime = microtime - return self.microtime - - def set_prefix_lb(self, prefix_lb: bool) -> bool: - """ - set prefix line break flag - - Arguments: - prefix_lb {bool} -- _description_ - - Returns: - bool -- _description_ - """ - self.prefix_lb = prefix_lb - return self.prefix_lb - - def set_verbose(self, verbose: int) -> bool: - """ - set the internal verbose flag to 1 if any value higher than 1 is given, else sets it to 0 - - Arguments: - verbose {int} -- _description_ - - Returns: - bool -- _description_ - """ - if verbose > 0: - self.verbose = True - else: - self.verbose = False - return self.verbose - - def set_precision(self, precision: int) -> int: - """ - sets the output precision size. If -2 for five step, -1 for ten step - else sets the precision normally, for 0, no precision is set, maximum precision is 10 - - Arguments: - precision {int} -- _description_ - - Returns: - int -- _description_ - """ - # if not a valid number, we set it to 0 - if precision < -2 or precision > 10: - precision = 0 - if precision < 0: - if precision < -1: - self.precision_ten_step = 5 - else: - self.precision_ten_step = 10 - self.precision = 0 # no comma - self.percent_precision = 0 # no print precision - self.percent_print = 3 # max 3 length - else: - # comma values visible - self.precision = 10 if precision < 0 or precision > 10 else precision - # for calcualtion of precision - self.percent_precision = 10 if precision < 0 or precision > 10 else precision - # for the format output base is 4, plsut he percent precision length - self.percent_print = (3 if precision == 0 else 4) + self.percent_precision - # return the set precision - return self.precision - - def set_linecount(self, linecount: int) -> int: - """ - set the maximum lines in this file, if value is smaller than 0 or 0, then it is set to 1 - - Arguments: - linecount {int} -- _description_ - - Returns: - int -- _description_ - """ - if linecount > 0: - self.linecount = linecount - else: - self.linecount = 1 - return self.linecount - - def set_filesize(self, filesize: int) -> int: - """ - set the maximum filesize for this file, if value is smaller than 0 or 0, then it is set to 1 - - Arguments: - filesize {int} -- _description_ - - Returns: - int -- _description_ - """ - if filesize > 0: - self.filesize = filesize - else: - self.filesize = 1 - return self.filesize - - def set_start_time(self, time_value: float = time.time()) -> None: - """ - initial set of the start times, auto set - - Keyword Arguments: - time_value {float} -- _description_ (default: {time.time()}) - """ - # avoid possible double set of the original start time - if not self.start: - self.start = time_value - self.start_time = time_value - self.start_run = time_value - - def set_eta_start_time(self, time_value: float = time.time()) -> None: - """ - sets the loop % run time, for correct ETA calculation - calls set start time, as the main start time is only set once - - Keyword Arguments: - time_value {float} -- _description_ (default: {time.time()}) - """ - self.set_start_time(time_value) - - def set_end_time(self, time_value: float = time.time()) -> None: - """ - set the end time - - Keyword Arguments: - time_value {float} -- _description_ (default: {time.time()}) - """ - self.end = time_value - self.end_time = time_value - if self.start is None: - self.start = 0 - # the overall run time in micro seconds - self.run_time = self.end - self.start - - def show_position(self, filepos: int = 0) -> str: - """ - processes the current position. either based on read the file size pos, or the line count - - Keyword Arguments: - filepos {int} -- _description_ (default: {0}) - - Returns: - str -- _description_ - """ - show_filesize = True # if we print from file size or line count - # microtime flags - eta_microtime = False - ftn_microtime = False - lg_microtime = False - # percent precision calc - # _p_spf = "{:." + str(self.precision) + "f}" - # output format for percent - _pr_p_spf = "{:>" + str(self.percent_print) + "." + str(self.percent_precision) + "f}" - # set the linecount precision based on the final linecount, if not, leave it empty - _pr_lc = "{}" - if self.linecount: - _pr_lc = "{:>" + str(len(str(f"{self.linecount:,}"))) + ",}" - # time format, if flag is set, the wide format is used - _pr_tf = "{}" - if self.wide_time: - _pr_tf = "{:>15}" - - # count up - self.count += 1 - # if we have file pos from parameter - if filepos != 0: - self.file_pos = filepos - else: - # we did not, so we set internal value - self.file_pos = self.count - # we also check if the filesize was set now - if self.filesize == 0: - self.filesize = self.linecount - # set ignore filesize output (no data) - show_filesize = False - # set the count size based on the file pos, is only used if we have filesize - self.count_size = self.file_pos - # do normal or down to 10 (0, 10, ...) % - if self.precision_ten_step: - _percent = int((self.file_pos / float(self.filesize)) * 100) - mod = _percent % self.precision_ten_step - percent = _percent if mod == 0 else self.last_percent - else: - # calc percent - percent = round(((self.file_pos / float(self.filesize)) * 100), self.precision) - - # output - if percent != self.last_percent: - self.end_time = time.time() # current time (for loop time) - if self.start is None: - self.start = 0 - if self.start_time is None: - self.start_time = 0 - # for from the beginning - full_time_needed = self.end_time - self.start # how long from the start - self.last_group = self.end_time - self.start_time # how long for last loop - self.lines_processed = self.count - self.current_count # how many lines processed - # lines in last group - self.lines_in_last_group = (self.lines_processed / self.last_group) if self.last_group else 0 - # lines in global - self.lines_in_global = (self.count / full_time_needed) if full_time_needed else 0 - # if we have linecount or not - if self.linecount == 0: - full_time_per_line = (full_time_needed if full_time_needed else 1) / self.count_size # how long for all - # estimate for the rest - eta = full_time_per_line * (self.filesize - self.count_size) - else: - # how long for all - full_time_per_line = (full_time_needed if full_time_needed else 1) / self.count - # estimate for the rest - eta = full_time_per_line * (self.linecount - self.count) - # just in case ... - if eta < 0: - eta = 0 - # check if to show microtime - # ON - if self.microtime == 1: - eta_microtime = ftn_microtime = lg_microtime = True - # AUTO - if self.microtime == 0: - if eta > 0 and eta < 1: - eta_microtime = True - if full_time_needed > 0 and full_time_needed < 1: - ftn_microtime = True - # pre check last group: if pre comma part is same add microtime anyway - if self.last_group > 0 and self.last_group < 1: - lg_microtime = True - if self.last_group == floor(self.last_group): - lg_microtime = True - self.last_group = floor(self.last_group) - # if with filesize or without - if show_filesize: - # last group size - self.size_in_last_group = self.count_size - self.current_size - # calc kb/s if there is any filesize data - # last group - self.bytes_in_last_group = (self.size_in_last_group / self.last_group) if self.last_group else 0 - # global - self.bytes_in_global = (self.count_size / full_time_needed) if full_time_needed else 0 - # only used if we run with file size for the next check - self.current_size = self.count_size - - if self.verbose >= 1: - self.string = ( - f"Processed {_pr_p_spf}% " - "[{} / {}] | " - f"{_pr_lc} / {_pr_lc} Lines | ETA: {_pr_tf} / TR: {_pr_tf} / " - "LR: {:,} " - "lines ({:,}) in {}, {:,.2f} ({:,.2f}) lines/s, {} ({}) b/s" - ).format( - float(percent), - format_bytes(self.count_size), - format_bytes(self.filesize), - self.count, - self.linecount, - convert_timestamp(eta, eta_microtime), - convert_timestamp(full_time_needed, ftn_microtime), - self.lines_processed, - self.size_in_last_group, - convert_timestamp(self.last_group, lg_microtime), - self.lines_in_global, - self.lines_in_last_group, - format_bytes(self.bytes_in_global), - format_bytes(self.bytes_in_last_group) - ) - else: - if self.verbose >= 1: - self.string = ( - f"Processed {_pr_p_spf}% | {_pr_lc} / {_pr_lc} Lines " - f"| ETA: {_pr_tf} / TR: {_pr_tf} / " - "LR: {:,} lines in {}, {:,.2f} ({:,.2f}) lines/s" - ).format( - float(percent), - self.count, - self.linecount, - convert_timestamp(eta, eta_microtime), - convert_timestamp(full_time_needed, ftn_microtime), - self.lines_processed, - convert_timestamp(self.last_group, lg_microtime), - self.lines_in_global, - self.lines_in_last_group - ) - # prefix return string with line break if flagged - self.string = ("\n" if self.prefix_lb else '') + self.string - # print the string if verbose is turned on - if self.verbose >= 1: - print(self.string) - - # write back vars - self.last_percent = percent - self.eta = eta - self.full_time_needed = full_time_needed - self.lg_microtime = lg_microtime - # for the next run, check data - self.start_time = time.time() - self.current_count = self.count - # trigger if this is a change - self.change = 1 - else: - # trigger if this is a change - self.change = 0 - # return string - return self.string - # } END OF ShowPosition +warn("Use 'corelibs_progress.progress.Progress'", DeprecationWarning, stacklevel=2) # __END__ diff --git a/src/corelibs/script_handling/script_helpers.py b/src/corelibs/script_handling/script_helpers.py index 4c37022..733bd06 100644 --- a/src/corelibs/script_handling/script_helpers.py +++ b/src/corelibs/script_handling/script_helpers.py @@ -2,13 +2,16 @@ Helper methods for scripts """ -import time -import os -import sys +from warnings import deprecated from pathlib import Path -import psutil +from corelibs_script.script_support import ( + wait_abort as corelibs_wait_abort, + lock_run as corelibs_lock_run, + unlock_run as corelibs_unlock_run, +) +@deprecated("use corelibs_script.script_support.wait_abort instead") def wait_abort(sleep: int = 5) -> None: """ wait a certain time for an abort command @@ -16,18 +19,10 @@ def wait_abort(sleep: int = 5) -> None: Keyword Arguments: sleep {int} -- _description_ (default: {5}) """ - try: - print(f"Waiting {sleep} seconds (Press CTRL +C to abort) [", end="", flush=True) - for _ in range(1, sleep): - print(".", end="", flush=True) - time.sleep(1) - print("]", flush=True) - except KeyboardInterrupt: - print("\nInterrupted by user") - sys.exit(0) - print("\n\n") + corelibs_wait_abort(sleep) +@deprecated("use corelibs_script.script_support.lock_run instead") def lock_run(lock_file: Path) -> None: """ lock a script run @@ -41,44 +36,10 @@ def lock_run(lock_file: Path) -> None: Exception: _description_ IOError: _description_ """ - no_file = False - run_pid = os.getpid() - # or os.path.isfile() - try: - with open(lock_file, "r", encoding="UTF-8") as fp: - exists = False - pid = fp.read() - fp.close() - if pid: - # check if this pid exists - for proc in psutil.process_iter(['pid', 'name', 'cmdline']): - try: - if pid == proc.info['pid']: - exists = True - break - except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): - # in case we cannot access - continue - if not exists: - # no pid but lock file, unlink - try: - lock_file.unlink() - no_file = True - except IOError as e: - raise IOError(f"Cannot remove lock_file: {lock_file}: {e}") from e - else: - raise IOError(f"Script is already running with PID {pid}") - except IOError: - no_file = True - if no_file: - try: - with open(lock_file, "w", encoding="UTF-8") as fp: - fp.write(str(run_pid)) - fp.close() - except IOError as e: - raise IOError(f"Cannot open run lock file '{lock_file}' for writing: {e}") from e + corelibs_lock_run(lock_file) +@deprecated("use corelibs_script.script_support.unlock_run instead") def unlock_run(lock_file: Path) -> None: """ removes the lock file @@ -89,9 +50,6 @@ def unlock_run(lock_file: Path) -> None: Raises: Exception: _description_ """ - try: - lock_file.unlink() - except IOError as e: - raise IOError(f"Cannot remove lock_file: {lock_file}: {e}") from e + corelibs_unlock_run(lock_file) # __END__ diff --git a/test-run/progress/progress_test.py b/test-run/progress/progress_test.py index 2e3794b..3d68007 100755 --- a/test-run/progress/progress_test.py +++ b/test-run/progress/progress_test.py @@ -37,8 +37,8 @@ def main(): # prg.SetStartTime(time.time()) prg.set_start_time() print( - f"PRECISION: {prg.precision} | TEN STEP: {prg.precision_ten_step} | " - f"WIDE TEME: {prg.wide_time} | MICROTIME: {prg.microtime} | VERBOSE: {prg.verbose}" + f"PRECISION: {prg.__precision} | TEN STEP: {prg.precision_ten_step} | " + f"WIDE TEME: {prg.__wide_time} | MICROTIME: {prg.microtime} | VERBOSE: {prg.verbose}" ) if use_file: @@ -56,7 +56,7 @@ def main(): print( f"Buffer size: {io.DEFAULT_BUFFER_SIZE} | " f"Do Buffering: {fh.line_buffering} | " - f"File size: {prg.filesize}" + f"File size: {prg.__filesize}" ) data = fh.readline() while data: @@ -72,7 +72,7 @@ def main(): print(f"Starting: {create_time(prg.start if prg.start is not None else 0)}") prg.set_linecount(256) i = 1 - while i <= prg.linecount: + while i <= prg.__linecount: sleep = randint(1, 9) sleep /= 7 time.sleep(sleep) diff --git a/tests/unit/script_handling/__init__.py b/tests/unit/script_handling/__init__.py deleted file mode 100644 index 90ff722..0000000 --- a/tests/unit/script_handling/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -""" -Unit tests for script_handling module -""" diff --git a/tests/unit/script_handling/_test_script_helpers.py b/tests/unit/script_handling/_test_script_helpers.py deleted file mode 100644 index 6f713bb..0000000 --- a/tests/unit/script_handling/_test_script_helpers.py +++ /dev/null @@ -1,821 +0,0 @@ -""" -PyTest: script_handling/script_helpers -""" - -# pylint: disable=use-implicit-booleaness-not-comparison - -import time -import os -from pathlib import Path -from unittest.mock import patch, MagicMock, mock_open, PropertyMock -import pytest -from pytest import CaptureFixture -import psutil - -from corelibs.script_handling.script_helpers import ( - wait_abort, - lock_run, - unlock_run, -) - - -class TestWaitAbort: - """Test suite for wait_abort function""" - - def test_wait_abort_default_sleep(self, capsys: CaptureFixture[str]): - """Test wait_abort with default sleep duration""" - with patch('time.sleep'): - wait_abort() - - captured = capsys.readouterr() - assert "Waiting 5 seconds" in captured.out - assert "(Press CTRL +C to abort)" in captured.out - assert "[" in captured.out - assert "]" in captured.out - # Should have 4 dots (sleep - 1) - assert captured.out.count(".") == 4 - - def test_wait_abort_custom_sleep(self, capsys: CaptureFixture[str]): - """Test wait_abort with custom sleep duration""" - with patch('time.sleep'): - wait_abort(sleep=3) - - captured = capsys.readouterr() - assert "Waiting 3 seconds" in captured.out - # Should have 2 dots (3 - 1) - assert captured.out.count(".") == 2 - - def test_wait_abort_sleep_one_second(self, capsys: CaptureFixture[str]): - """Test wait_abort with sleep duration of 1 second""" - with patch('time.sleep'): - wait_abort(sleep=1) - - captured = capsys.readouterr() - assert "Waiting 1 seconds" in captured.out - # Should have 0 dots (1 - 1) - assert captured.out.count(".") == 0 - - def test_wait_abort_sleep_zero(self, capsys: CaptureFixture[str]): - """Test wait_abort with sleep duration of 0""" - with patch('time.sleep'): - wait_abort(sleep=0) - - captured = capsys.readouterr() - assert "Waiting 0 seconds" in captured.out - # Should have 0 dots since range(1, 0) is empty - assert captured.out.count(".") == 0 - - def test_wait_abort_keyboard_interrupt(self, capsys: CaptureFixture[str]): - """Test wait_abort handles KeyboardInterrupt and exits""" - with patch('time.sleep', side_effect=KeyboardInterrupt): - with pytest.raises(SystemExit) as exc_info: - wait_abort(sleep=5) - - assert exc_info.value.code == 0 - captured = capsys.readouterr() - assert "Interrupted by user" in captured.out - - def test_wait_abort_keyboard_interrupt_immediate(self, capsys: CaptureFixture[str]): - """Test wait_abort handles KeyboardInterrupt on first iteration""" - def sleep_side_effect(_duration: int) -> None: - raise KeyboardInterrupt() - - with patch('time.sleep', side_effect=sleep_side_effect): - with pytest.raises(SystemExit) as exc_info: - wait_abort(sleep=10) - - assert exc_info.value.code == 0 - captured = capsys.readouterr() - assert "Interrupted by user" in captured.out - - def test_wait_abort_completes_normally(self, capsys: CaptureFixture[str]): - """Test wait_abort completes without interruption""" - with patch('time.sleep') as mock_sleep: - wait_abort(sleep=3) - - # time.sleep should be called (sleep - 1) times - assert mock_sleep.call_count == 2 - - captured = capsys.readouterr() - assert "Waiting 3 seconds" in captured.out - assert "]" in captured.out - # Should have newlines at the end - assert captured.out.endswith("\n\n") - - def test_wait_abort_actual_timing(self): - """Test wait_abort actually waits (integration test)""" - start_time = time.time() - wait_abort(sleep=1) - elapsed_time = time.time() - start_time - - # Should take at least close to 0 seconds (1-1) - # With mocking disabled in this test, it would take actual time - # but we've been mocking it, so this tests the unmocked behavior - # For this test, we'll check it runs without error - assert elapsed_time >= 0 - - def test_wait_abort_large_sleep_value(self, capsys: CaptureFixture[str]): - """Test wait_abort with large sleep value""" - with patch('time.sleep'): - wait_abort(sleep=100) - - captured = capsys.readouterr() - assert "Waiting 100 seconds" in captured.out - # Should have 99 dots - assert captured.out.count(".") == 99 - - def test_wait_abort_output_format(self, capsys: CaptureFixture[str]): - """Test wait_abort output formatting""" - with patch('time.sleep'): - wait_abort(sleep=3) - - captured = capsys.readouterr() - # Check the exact format - assert "Waiting 3 seconds (Press CTRL +C to abort) [" in captured.out - assert captured.out.count("[") == 1 - assert captured.out.count("]") == 1 - - def test_wait_abort_flush_behavior(self): - """Test that wait_abort flushes output correctly""" - with patch('time.sleep'): - with patch('builtins.print') as mock_print: - wait_abort(sleep=3) - - # Check that print was called with flush=True - # First call: "Waiting X seconds..." - # Intermediate calls: dots with flush=True - # Last calls: "]" and final newlines - flush_calls = [ - call for call in mock_print.call_args_list - if 'flush' in call.kwargs and call.kwargs['flush'] is True - ] - assert len(flush_calls) > 0 - - -class TestLockRun: - """Test suite for lock_run function""" - - def test_lock_run_creates_lock_file(self, tmp_path: Path): - """Test lock_run creates a lock file with current PID""" - lock_file = tmp_path / "test.lock" - - lock_run(lock_file) - - assert lock_file.exists() - content = lock_file.read_text() - assert content == str(os.getpid()) - - def test_lock_run_raises_when_process_exists(self, tmp_path: Path): - """Test lock_run raises IOError when process with PID exists - - Note: The actual code has a bug where it compares string PID from file - with integer PID from psutil, which will never match. This test demonstrates - the intended behavior if the bug were fixed. - """ - lock_file = tmp_path / "test.lock" - current_pid = os.getpid() - - # Create lock file with current PID - lock_file.write_text(str(current_pid)) - - # Patch at module level to ensure correct comparison - with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter: - def mock_process_iter(attrs=None): # type: ignore - mock_proc = MagicMock() - # Make PID a string to match the file content for comparison - mock_proc.info = {'pid': str(current_pid)} - return [mock_proc] - - mock_proc_iter.side_effect = mock_process_iter - - with pytest.raises(IOError) as exc_info: - lock_run(lock_file) - - assert f"Script is already running with PID {current_pid}" in str(exc_info.value) - - def test_lock_run_removes_stale_lock_file(self, tmp_path: Path): - """Test lock_run removes lock file when PID doesn't exist""" - lock_file = tmp_path / "test.lock" - # Use a PID that definitely doesn't exist - stale_pid = "99999999" - lock_file.write_text(stale_pid) - - # Mock psutil to return no matching processes - with patch('psutil.process_iter') as mock_proc_iter: - mock_process = MagicMock() - mock_process.info = {'pid': 12345} # Different PID - mock_proc_iter.return_value = [mock_process] - - lock_run(lock_file) - - # Lock file should be recreated with current PID - assert lock_file.exists() - assert lock_file.read_text() == str(os.getpid()) - - def test_lock_run_creates_lock_when_no_file_exists(self, tmp_path: Path): - """Test lock_run creates lock file when none exists""" - lock_file = tmp_path / "new.lock" - - assert not lock_file.exists() - lock_run(lock_file) - assert lock_file.exists() - - def test_lock_run_handles_empty_lock_file(self, tmp_path: Path): - """Test lock_run handles empty lock file""" - lock_file = tmp_path / "empty.lock" - lock_file.write_text("") - - lock_run(lock_file) - - assert lock_file.exists() - assert lock_file.read_text() == str(os.getpid()) - - def test_lock_run_handles_psutil_no_such_process(self, tmp_path: Path): - """Test lock_run handles psutil.NoSuchProcess exception""" - lock_file = tmp_path / "test.lock" - lock_file.write_text("12345") - - with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter: - # Create a mock that raises NoSuchProcess inside the try block - def mock_iter(attrs=None): # type: ignore - mock_proc = MagicMock() - mock_proc.info = {'pid': "12345"} - # Configure to raise exception when accessed - type(mock_proc).info = PropertyMock(side_effect=psutil.NoSuchProcess(12345)) - return [mock_proc] - - mock_proc_iter.side_effect = mock_iter - - # Since the exception is caught, lock should be acquired - lock_run(lock_file) - - assert lock_file.exists() - assert lock_file.read_text() == str(os.getpid()) - - def test_lock_run_handles_psutil_access_denied(self, tmp_path: Path): - """Test lock_run handles psutil.AccessDenied exception""" - lock_file = tmp_path / "test.lock" - lock_file.write_text("12345") - - with patch('psutil.process_iter') as mock_proc_iter: - mock_proc_iter.return_value = [] - - lock_run(lock_file) - - assert lock_file.exists() - - def test_lock_run_handles_psutil_zombie_process(self, tmp_path: Path): - """Test lock_run handles psutil.ZombieProcess exception""" - lock_file = tmp_path / "test.lock" - lock_file.write_text("12345") - - with patch('psutil.process_iter') as mock_proc_iter: - mock_proc_iter.return_value = [] - - lock_run(lock_file) - - assert lock_file.exists() - - def test_lock_run_raises_on_unlink_error(self, tmp_path: Path): - """Test lock_run raises IOError when cannot remove stale lock file""" - lock_file = tmp_path / "test.lock" - lock_file.write_text("99999999") - - with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter: - mock_proc_iter.return_value = [] - - # Mock pathlib.Path.unlink to raise IOError on the specific lock_file - original_unlink = Path.unlink - - def mock_unlink(self, *args, **kwargs): # type: ignore - if self == lock_file: - raise IOError("Permission denied") - return original_unlink(self, *args, **kwargs) - - with patch.object(Path, 'unlink', mock_unlink): - with pytest.raises(IOError) as exc_info: - lock_run(lock_file) - - assert "Cannot remove lock_file" in str(exc_info.value) - assert "Permission denied" in str(exc_info.value) - - def test_lock_run_raises_on_write_error(self, tmp_path: Path): - """Test lock_run raises IOError when cannot write lock file""" - lock_file = tmp_path / "test.lock" - - # Mock open to raise IOError on write - with patch('builtins.open', side_effect=IOError("Disk full")): - with pytest.raises(IOError) as exc_info: - lock_run(lock_file) - - assert "Cannot open run lock file" in str(exc_info.value) - assert "Disk full" in str(exc_info.value) - - def test_lock_run_uses_current_pid(self, tmp_path: Path): - """Test lock_run uses current process PID""" - lock_file = tmp_path / "test.lock" - expected_pid = os.getpid() - - lock_run(lock_file) - - actual_pid = lock_file.read_text() - assert actual_pid == str(expected_pid) - - def test_lock_run_with_subdirectory(self, tmp_path: Path): - """Test lock_run creates lock file in subdirectory""" - subdir = tmp_path / "locks" - subdir.mkdir() - lock_file = subdir / "test.lock" - - lock_run(lock_file) - - assert lock_file.exists() - assert lock_file.read_text() == str(os.getpid()) - - def test_lock_run_overwrites_invalid_pid(self, tmp_path: Path): - """Test lock_run overwrites lock file with invalid PID format""" - lock_file = tmp_path / "test.lock" - lock_file.write_text("not_a_number") - - # When PID is not a valid number, psutil won't find it - with patch('psutil.process_iter') as mock_proc_iter: - mock_proc_iter.return_value = [] - - lock_run(lock_file) - - assert lock_file.read_text() == str(os.getpid()) - - def test_lock_run_multiple_times_same_process(self, tmp_path: Path): - """Test lock_run called multiple times by same process""" - lock_file = tmp_path / "test.lock" - current_pid = os.getpid() - - # First call - lock_run(lock_file) - assert lock_file.read_text() == str(current_pid) - - # Second call - should raise since process exists - with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter: - def mock_iter(attrs=None): # type: ignore - mock_proc = MagicMock() - mock_proc.info = {'pid': str(current_pid)} - return [mock_proc] - - mock_proc_iter.side_effect = mock_iter - - with pytest.raises(IOError) as exc_info: - lock_run(lock_file) - - assert f"Script is already running with PID {current_pid}" in str(exc_info.value) - - def test_lock_run_checks_all_processes(self, tmp_path: Path): - """Test lock_run iterates through all processes""" - lock_file = tmp_path / "test.lock" - lock_file.write_text("12345") - - with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter: - # Create multiple mock processes - def mock_iter(attrs=None): # type: ignore - mock_processes = [] - for pid in ["1000", "2000", "12345", "4000"]: # PIDs as strings - mock_proc = MagicMock() - mock_proc.info = {'pid': pid} - mock_processes.append(mock_proc) - return mock_processes - - mock_proc_iter.side_effect = mock_iter - - # Should find PID 12345 and raise - with pytest.raises(IOError) as exc_info: - lock_run(lock_file) - - assert "Script is already running with PID 12345" in str(exc_info.value) - - def test_lock_run_file_encoding_utf8(self, tmp_path: Path): - """Test lock_run uses UTF-8 encoding""" - lock_file = tmp_path / "test.lock" - - with patch('builtins.open', mock_open()) as mock_file: - try: - lock_run(lock_file) - except (IOError, FileNotFoundError): - pass # We're just checking the encoding parameter - - # Check that open was called with UTF-8 encoding - calls = mock_file.call_args_list - for call in calls: - if 'encoding' in call.kwargs: - assert call.kwargs['encoding'] == 'UTF-8' - - -class TestUnlockRun: - """Test suite for unlock_run function""" - - def test_unlock_run_removes_lock_file(self, tmp_path: Path): - """Test unlock_run removes existing lock file""" - lock_file = tmp_path / "test.lock" - lock_file.write_text("12345") - - assert lock_file.exists() - unlock_run(lock_file) - assert not lock_file.exists() - - def test_unlock_run_raises_on_error(self, tmp_path: Path): - """Test unlock_run raises IOError when cannot remove file""" - lock_file = tmp_path / "test.lock" - lock_file.write_text("12345") - - with patch.object(Path, 'unlink', side_effect=IOError("Permission denied")): - with pytest.raises(IOError) as exc_info: - unlock_run(lock_file) - - assert "Cannot remove lock_file" in str(exc_info.value) - assert "Permission denied" in str(exc_info.value) - - def test_unlock_run_on_nonexistent_file(self, tmp_path: Path): - """Test unlock_run on non-existent file raises IOError""" - lock_file = tmp_path / "nonexistent.lock" - - with pytest.raises(IOError) as exc_info: - unlock_run(lock_file) - - assert "Cannot remove lock_file" in str(exc_info.value) - - def test_unlock_run_with_subdirectory(self, tmp_path: Path): - """Test unlock_run removes file from subdirectory""" - subdir = tmp_path / "locks" - subdir.mkdir() - lock_file = subdir / "test.lock" - lock_file.write_text("12345") - - unlock_run(lock_file) - assert not lock_file.exists() - - def test_unlock_run_multiple_times(self, tmp_path: Path): - """Test unlock_run called multiple times raises error""" - lock_file = tmp_path / "test.lock" - lock_file.write_text("12345") - - # First call should succeed - unlock_run(lock_file) - assert not lock_file.exists() - - # Second call should raise IOError - with pytest.raises(IOError): - unlock_run(lock_file) - - def test_unlock_run_readonly_file(self, tmp_path: Path): - """Test unlock_run on read-only file""" - lock_file = tmp_path / "readonly.lock" - lock_file.write_text("12345") - lock_file.chmod(0o444) - - try: - unlock_run(lock_file) - # On some systems, unlink may still work on readonly files - assert not lock_file.exists() - except IOError as exc_info: - # On other systems, it may raise an error - assert "Cannot remove lock_file" in str(exc_info) - - def test_unlock_run_preserves_other_files(self, tmp_path: Path): - """Test unlock_run only removes specified file""" - lock_file1 = tmp_path / "test1.lock" - lock_file2 = tmp_path / "test2.lock" - lock_file1.write_text("12345") - lock_file2.write_text("67890") - - unlock_run(lock_file1) - - assert not lock_file1.exists() - assert lock_file2.exists() - - -class TestLockUnlockIntegration: - """Integration tests for lock_run and unlock_run""" - - def test_lock_unlock_workflow(self, tmp_path: Path): - """Test complete lock and unlock workflow""" - lock_file = tmp_path / "workflow.lock" - - # Lock - lock_run(lock_file) - assert lock_file.exists() - assert lock_file.read_text() == str(os.getpid()) - - # Unlock - unlock_run(lock_file) - assert not lock_file.exists() - - def test_lock_unlock_relock(self, tmp_path: Path): - """Test locking, unlocking, and locking again""" - lock_file = tmp_path / "relock.lock" - - # First lock - lock_run(lock_file) - first_content = lock_file.read_text() - - # Unlock - unlock_run(lock_file) - - # Second lock - lock_run(lock_file) - second_content = lock_file.read_text() - - assert first_content == second_content == str(os.getpid()) - - def test_lock_prevents_duplicate_run(self, tmp_path: Path): - """Test lock prevents duplicate process simulation""" - lock_file = tmp_path / "duplicate.lock" - current_pid = os.getpid() - - # First lock - lock_run(lock_file) - - # Simulate another process trying to acquire lock - with patch('psutil.process_iter') as mock_proc_iter: - mock_process = MagicMock() - mock_process.info = {'pid': current_pid} - mock_proc_iter.return_value = [mock_process] - - with pytest.raises(IOError) as exc_info: - lock_run(lock_file) - - assert "already running" in str(exc_info.value) - - # Cleanup - unlock_run(lock_file) - - def test_stale_lock_cleanup_and_reacquire(self, tmp_path: Path): - """Test cleaning up stale lock and acquiring new one""" - lock_file = tmp_path / "stale.lock" - - # Create stale lock - stale_pid = "99999999" - lock_file.write_text(stale_pid) - - # Mock psutil to indicate process doesn't exist - with patch('psutil.process_iter') as mock_proc_iter: - mock_proc_iter.return_value = [] - - lock_run(lock_file) - - # Should have our PID now - assert lock_file.read_text() == str(os.getpid()) - - # Cleanup - unlock_run(lock_file) - assert not lock_file.exists() - - def test_multiple_locks_different_files(self, tmp_path: Path): - """Test multiple locks with different files""" - lock_file1 = tmp_path / "lock1.lock" - lock_file2 = tmp_path / "lock2.lock" - - # Acquire multiple locks - lock_run(lock_file1) - lock_run(lock_file2) - - assert lock_file1.exists() - assert lock_file2.exists() - - # Release them - unlock_run(lock_file1) - unlock_run(lock_file2) - - assert not lock_file1.exists() - assert not lock_file2.exists() - - def test_lock_in_context_manager_pattern(self, tmp_path: Path): - """Test lock/unlock in a context manager pattern""" - lock_file = tmp_path / "context.lock" - - class LockContext: - def __init__(self, lock_path: Path): - self.lock_path = lock_path - - def __enter__(self) -> 'LockContext': - lock_run(self.lock_path) - return self - - def __exit__(self, exc_type: type, exc_val: Exception, exc_tb: object) -> bool: - unlock_run(self.lock_path) - return False - - # Use in context - with LockContext(lock_file): - assert lock_file.exists() - - # After context, should be unlocked - assert not lock_file.exists() - - def test_lock_survives_process_in_loop(self, tmp_path: Path): - """Test lock file persists across multiple operations""" - lock_file = tmp_path / "persistent.lock" - - lock_run(lock_file) - - # Simulate some operations - for _ in range(10): - assert lock_file.exists() - content = lock_file.read_text() - assert content == str(os.getpid()) - - unlock_run(lock_file) - assert not lock_file.exists() - - def test_exception_during_locked_execution(self, tmp_path: Path): - """Test lock cleanup when exception occurs during execution""" - lock_file = tmp_path / "exception.lock" - - lock_run(lock_file) - - try: - # Simulate some work that raises exception - raise ValueError("Something went wrong") - except ValueError: - pass - finally: - # Lock should still exist until explicitly unlocked - assert lock_file.exists() - unlock_run(lock_file) - - assert not lock_file.exists() - - def test_lock_file_permissions(self, tmp_path: Path): - """Test lock file has appropriate permissions""" - lock_file = tmp_path / "permissions.lock" - - lock_run(lock_file) - - # File should be readable and writable by owner - assert lock_file.exists() - # We can read it - content = lock_file.read_text() - assert content == str(os.getpid()) - - unlock_run(lock_file) - - -class TestEdgeCases: - """Test edge cases and error conditions""" - - def test_wait_abort_negative_sleep(self, capsys: CaptureFixture[str]): - """Test wait_abort with negative sleep value""" - with patch('time.sleep'): - wait_abort(sleep=-5) - - captured = capsys.readouterr() - assert "Waiting -5 seconds" in captured.out - - def test_lock_run_with_whitespace_pid(self, tmp_path: Path): - """Test lock_run handles lock file with whitespace""" - lock_file = tmp_path / "whitespace.lock" - lock_file.write_text(" 12345 \n") - - with patch('psutil.process_iter') as mock_proc_iter: - mock_proc_iter.return_value = [] - - lock_run(lock_file) - - # Should create new lock with clean PID - assert lock_file.read_text() == str(os.getpid()) - - def test_lock_run_with_special_characters_in_path(self, tmp_path: Path): - """Test lock_run with special characters in file path""" - special_dir = tmp_path / "special dir with spaces" - special_dir.mkdir() - lock_file = special_dir / "lock-file.lock" - - lock_run(lock_file) - assert lock_file.exists() - unlock_run(lock_file) - - def test_lock_run_with_very_long_path(self, tmp_path: Path): - """Test lock_run with very long file path""" - # Create nested directories - deep_path = tmp_path - for i in range(10): - deep_path = deep_path / f"level{i}" - deep_path.mkdir(parents=True) - - lock_file = deep_path / "deep.lock" - - lock_run(lock_file) - assert lock_file.exists() - unlock_run(lock_file) - - def test_unlock_run_on_directory(self, tmp_path: Path): - """Test unlock_run on a directory raises appropriate error""" - test_dir = tmp_path / "test_dir" - test_dir.mkdir() - - with pytest.raises(IOError): - unlock_run(test_dir) - - def test_lock_run_race_condition_simulation(self, tmp_path: Path): - """Test lock_run handles simulated race condition""" - lock_file = tmp_path / "race.lock" - - # This is hard to test reliably, but we can at least verify - # the function handles existing files - lock_file.write_text("88888") - - with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter: - def mock_iter(attrs=None): # type: ignore - mock_proc = MagicMock() - mock_proc.info = {'pid': "88888"} - return [mock_proc] - - mock_proc_iter.side_effect = mock_iter - - with pytest.raises(IOError): - lock_run(lock_file) - - -class TestScriptHelpersIntegration: - """Integration tests combining multiple functions""" - - def test_typical_script_pattern(self, tmp_path: Path, capsys: CaptureFixture[str]): - """Test typical script execution pattern with all helpers""" - lock_file = tmp_path / "script.lock" - - # Wait before starting (with mocked sleep) - with patch('time.sleep'): - wait_abort(sleep=2) - - captured = capsys.readouterr() - assert "Waiting 2 seconds" in captured.out - - # Acquire lock - lock_run(lock_file) - assert lock_file.exists() - - # Simulate work - time.sleep(0.01) - - # Release lock - unlock_run(lock_file) - assert not lock_file.exists() - - def test_script_with_error_handling(self, tmp_path: Path): - """Test script pattern with error handling""" - lock_file = tmp_path / "error_script.lock" - - try: - lock_run(lock_file) - # Simulate error during execution - raise RuntimeError("Simulated error") - except RuntimeError: - pass - finally: - # Ensure cleanup happens - if lock_file.exists(): - unlock_run(lock_file) - - assert not lock_file.exists() - - def test_concurrent_script_protection(self, tmp_path: Path): - """Test protection against concurrent script execution""" - lock_file = tmp_path / "concurrent.lock" - - # First instance acquires lock - lock_run(lock_file) - - # Second instance should fail - with patch('corelibs.script_handling.script_helpers.psutil.process_iter') as mock_proc_iter: - def mock_iter(attrs=None): # type: ignore - mock_proc = MagicMock() - mock_proc.info = {'pid': str(os.getpid())} - return [mock_proc] - - mock_proc_iter.side_effect = mock_iter - - with pytest.raises(IOError) as exc_info: - lock_run(lock_file) - - assert "already running" in str(exc_info.value).lower() - - # Cleanup - unlock_run(lock_file) - - def test_graceful_shutdown_pattern(self, tmp_path: Path, capsys: CaptureFixture[str]): - """Test graceful shutdown with wait and cleanup""" - lock_file = tmp_path / "graceful.lock" - - lock_run(lock_file) - - # Simulate interrupt during wait - with patch('time.sleep', side_effect=KeyboardInterrupt): - with pytest.raises(SystemExit): - wait_abort(sleep=5) - - captured = capsys.readouterr() - assert "Interrupted by user" in captured.out - - # Cleanup should still happen - unlock_run(lock_file) - assert not lock_file.exists() - - -# __END__ diff --git a/tests/unit/script_handling/test_progress.py b/tests/unit/script_handling/test_progress.py deleted file mode 100644 index e6ce81c..0000000 --- a/tests/unit/script_handling/test_progress.py +++ /dev/null @@ -1,840 +0,0 @@ -""" -PyTest: script_handling/progress -""" - -import time -from unittest.mock import patch -from pytest import CaptureFixture - -from corelibs.script_handling.progress import Progress - - -class TestProgressInit: - """Test suite for Progress initialization""" - - def test_default_initialization(self): - """Test Progress initialization with default parameters""" - prg = Progress() - assert prg.verbose is False - assert prg.precision == 1 - assert prg.microtime == 0 - assert prg.wide_time is False - assert prg.prefix_lb is False - assert prg.linecount == 0 - assert prg.filesize == 0 - assert prg.count == 0 - assert prg.start is not None - - def test_initialization_with_verbose(self): - """Test Progress initialization with verbose enabled""" - prg = Progress(verbose=1) - assert prg.verbose is True - - prg = Progress(verbose=5) - assert prg.verbose is True - - prg = Progress(verbose=0) - assert prg.verbose is False - - def test_initialization_with_precision(self): - """Test Progress initialization with different precision values""" - # Normal precision - prg = Progress(precision=0) - assert prg.precision == 0 - assert prg.percent_print == 3 - - prg = Progress(precision=2) - assert prg.precision == 2 - assert prg.percent_print == 6 - - prg = Progress(precision=10) - assert prg.precision == 10 - assert prg.percent_print == 14 - - # Ten step precision - prg = Progress(precision=-1) - assert prg.precision == 0 - assert prg.precision_ten_step == 10 - assert prg.percent_print == 3 - - # Five step precision - prg = Progress(precision=-2) - assert prg.precision == 0 - assert prg.precision_ten_step == 5 - assert prg.percent_print == 3 - - def test_initialization_with_microtime(self): - """Test Progress initialization with microtime settings""" - prg = Progress(microtime=-1) - assert prg.microtime == -1 - - prg = Progress(microtime=0) - assert prg.microtime == 0 - - prg = Progress(microtime=1) - assert prg.microtime == 1 - - def test_initialization_with_wide_time(self): - """Test Progress initialization with wide_time flag""" - prg = Progress(wide_time=True) - assert prg.wide_time is True - - prg = Progress(wide_time=False) - assert prg.wide_time is False - - def test_initialization_with_prefix_lb(self): - """Test Progress initialization with prefix line break""" - prg = Progress(prefix_lb=True) - assert prg.prefix_lb is True - - prg = Progress(prefix_lb=False) - assert prg.prefix_lb is False - - def test_initialization_combined_parameters(self): - """Test Progress initialization with multiple parameters""" - prg = Progress(verbose=1, precision=2, microtime=1, wide_time=True, prefix_lb=True) - assert prg.verbose is True - assert prg.precision == 2 - assert prg.microtime == 1 - assert prg.wide_time is True - assert prg.prefix_lb is True - - -class TestProgressSetters: - """Test suite for Progress setter methods""" - - def test_set_verbose(self): - """Test set_verbose method""" - prg = Progress() - - assert prg.set_verbose(1) is True - assert prg.verbose is True - - assert prg.set_verbose(10) is True - assert prg.verbose is True - - assert prg.set_verbose(0) is False - assert prg.verbose is False - - def test_set_precision(self): - """Test set_precision method""" - prg = Progress() - - # Valid precision values - assert prg.set_precision(0) == 0 - assert prg.precision == 0 - - assert prg.set_precision(5) == 5 - assert prg.precision == 5 - - assert prg.set_precision(10) == 10 - assert prg.precision == 10 - - # Ten step precision - prg.set_precision(-1) - assert prg.precision == 0 - assert prg.precision_ten_step == 10 - - # Five step precision - prg.set_precision(-2) - assert prg.precision == 0 - assert prg.precision_ten_step == 5 - - # Invalid precision (too low) - assert prg.set_precision(-3) == 0 - assert prg.precision == 0 - - # Invalid precision (too high) - assert prg.set_precision(11) == 0 - assert prg.precision == 0 - - def test_set_linecount(self): - """Test set_linecount method""" - prg = Progress() - - assert prg.set_linecount(100) == 100 - assert prg.linecount == 100 - - assert prg.set_linecount(1000) == 1000 - assert prg.linecount == 1000 - - # Zero or negative should set to 1 - assert prg.set_linecount(0) == 1 - assert prg.linecount == 1 - - assert prg.set_linecount(-10) == 1 - assert prg.linecount == 1 - - def test_set_filesize(self): - """Test set_filesize method""" - prg = Progress() - - assert prg.set_filesize(1024) == 1024 - assert prg.filesize == 1024 - - assert prg.set_filesize(1048576) == 1048576 - assert prg.filesize == 1048576 - - # Zero or negative should set to 1 - assert prg.set_filesize(0) == 1 - assert prg.filesize == 1 - - assert prg.set_filesize(-100) == 1 - assert prg.filesize == 1 - - def test_set_wide_time(self): - """Test set_wide_time method""" - prg = Progress() - - assert prg.set_wide_time(True) is True - assert prg.wide_time is True - - assert prg.set_wide_time(False) is False - assert prg.wide_time is False - - def test_set_micro_time(self): - """Test set_micro_time method""" - prg = Progress() - - assert prg.set_micro_time(-1) == -1 - assert prg.microtime == -1 - - assert prg.set_micro_time(0) == 0 - assert prg.microtime == 0 - - assert prg.set_micro_time(1) == 1 - assert prg.microtime == 1 - - def test_set_prefix_lb(self): - """Test set_prefix_lb method""" - prg = Progress() - - assert prg.set_prefix_lb(True) is True - assert prg.prefix_lb is True - - assert prg.set_prefix_lb(False) is False - assert prg.prefix_lb is False - - def test_set_start_time(self): - """Test set_start_time method""" - prg = Progress() - initial_start = prg.start - - # Wait a bit and set new start time - time.sleep(0.01) - new_time = time.time() - prg.set_start_time(new_time) - - # Original start should not change - assert prg.start == initial_start - # But start_time and start_run should update - assert prg.start_time == new_time - assert prg.start_run == new_time - - def test_set_start_time_custom_value(self): - """Test set_start_time with custom time value""" - prg = Progress() - custom_time = 1234567890.0 - prg.start = None # Reset start to test first-time setting - prg.set_start_time(custom_time) - - assert prg.start == custom_time - assert prg.start_time == custom_time - assert prg.start_run == custom_time - - def test_set_eta_start_time(self): - """Test set_eta_start_time method""" - prg = Progress() - custom_time = time.time() + 100 - prg.set_eta_start_time(custom_time) - - assert prg.start_time == custom_time - assert prg.start_run == custom_time - - def test_set_end_time(self): - """Test set_end_time method""" - prg = Progress() - start_time = time.time() - prg.set_start_time(start_time) - - time.sleep(0.01) - end_time = time.time() - prg.set_end_time(end_time) - - assert prg.end == end_time - assert prg.end_time == end_time - assert prg.run_time is not None - assert prg.run_time > 0 - - def test_set_end_time_with_none_start(self): - """Test set_end_time when start is None""" - prg = Progress() - prg.start = None - end_time = time.time() - prg.set_end_time(end_time) - - assert prg.end == end_time - assert prg.run_time == end_time - - -class TestProgressReset: - """Test suite for Progress reset method""" - - def test_reset_basic(self): - """Test reset method resets counter variables""" - prg = Progress() - prg.set_linecount(1000) - prg.set_filesize(10240) - prg.count = 500 - prg.current_count = 500 - prg.lines_processed = 100 - - prg.reset() - - assert prg.count == 0 - assert prg.current_count == 0 - assert prg.linecount == 0 - assert prg.lines_processed == 0 - assert prg.filesize == 0 - assert prg.last_percent == 0 - - def test_reset_preserves_start(self): - """Test reset preserves the original start time""" - prg = Progress() - original_start = prg.start - - prg.reset() - - # Original start should still be set from initialization - assert prg.start == original_start - - def test_reset_clears_runtime_data(self): - """Test reset clears runtime calculation data""" - prg = Progress() - prg.eta = 100.5 - prg.full_time_needed = 50.2 - prg.last_group = 10.1 - prg.lines_in_last_group = 5.5 - prg.lines_in_global = 3.3 - - prg.reset() - - assert prg.eta == 0 - assert prg.full_time_needed == 0 - assert prg.last_group == 0 - assert prg.lines_in_last_group == 0 - assert prg.lines_in_global == 0 - - -class TestProgressShowPosition: - """Test suite for Progress show_position method""" - - def test_show_position_basic_linecount(self): - """Test show_position with basic line count""" - prg = Progress(verbose=0) - prg.set_linecount(100) - - # Process some lines - for _ in range(10): - prg.show_position() - - assert prg.count == 10 - assert prg.file_pos == 10 - - def test_show_position_with_filesize(self): - """Test show_position with file size parameter""" - prg = Progress(verbose=0) - prg.set_filesize(1024) - - prg.show_position(512) - - assert prg.count == 1 - assert prg.file_pos == 512 - assert prg.count_size == 512 - - def test_show_position_percent_calculation(self): - """Test show_position calculates percentage correctly""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - # Process 50 lines - for _ in range(50): - prg.show_position() - - assert prg.last_percent == 50.0 - - def test_show_position_ten_step_precision(self): - """Test show_position with ten step precision""" - prg = Progress(verbose=0, precision=-1) - prg.set_linecount(100) - - # Process lines, should only update at 10% intervals - for _ in range(15): - prg.show_position() - - # Should be at 10% (not 15%) - assert prg.last_percent == 10 - - def test_show_position_five_step_precision(self): - """Test show_position with five step precision""" - prg = Progress(verbose=0, precision=-2) - prg.set_linecount(100) - - # Process lines, should only update at 5% intervals - for _ in range(7): - prg.show_position() - - # Should be at 5% (not 7%) - assert prg.last_percent == 5 - - def test_show_position_change_flag(self): - """Test show_position sets change flag correctly""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - # First call should trigger change (at 1%) - prg.show_position() - assert prg.change == 1 - last_percent = prg.last_percent - - # Keep calling - each percent increment triggers change - prg.show_position() - # At precision=0, each 1% is a new change - if prg.last_percent != last_percent: - assert prg.change == 1 - else: - assert prg.change == 0 - - def test_show_position_with_verbose_output(self, capsys: CaptureFixture[str]): - """Test show_position produces output when verbose is enabled""" - prg = Progress(verbose=1, precision=0) - prg.set_linecount(100) - - # Process until percent changes - for _ in range(10): - prg.show_position() - - captured = capsys.readouterr() - assert "Processed" in captured.out - assert "Lines" in captured.out - - def test_show_position_with_prefix_lb(self): - """Test show_position with prefix line break""" - prg = Progress(verbose=1, precision=0, prefix_lb=True) - prg.set_linecount(100) - - # Process until percent changes - for _ in range(10): - prg.show_position() - - assert prg.string.startswith("\n") - - def test_show_position_lines_processed_calculation(self): - """Test show_position calculates lines processed correctly""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - # First call at 1% - prg.show_position() - first_lines_processed = prg.lines_processed - assert first_lines_processed == 1 - - # Process to 2% (need to process 1 more line) - prg.show_position() - # lines_processed should be 1 (from 1 to 2) - assert prg.lines_processed == 1 - - def test_show_position_eta_calculation(self): - """Test show_position calculates ETA""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(1000) - - # We need to actually process lines for percent to change - # Process 100 lines to get to ~10% - for _ in range(100): - prg.show_position() - - # ETA should be set after percent changes - assert prg.eta is not None - assert prg.eta >= 0 - - def test_show_position_with_filesize_output(self, capsys: CaptureFixture[str]): - """Test show_position output with filesize information""" - prg = Progress(verbose=1, precision=0) - prg.set_filesize(10240) - - # Process with filesize - for i in range(1, 1025): - prg.show_position(i) - - captured = capsys.readouterr() - # Should contain byte information - assert "B" in captured.out or "KB" in captured.out - - def test_show_position_bytes_calculation(self): - """Test show_position calculates bytes per second""" - prg = Progress(verbose=0, precision=0) - prg.set_filesize(10240) - - # Process enough bytes to trigger a percent change - # Need to process ~102 bytes for 1% of 10240 - prg.show_position(102) - - # After percent change, bytes stats should be set - assert prg.bytes_in_last_group >= 0 - assert prg.bytes_in_global >= 0 - - def test_show_position_current_count_tracking(self): - """Test show_position tracks current count correctly""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - for _ in range(10): - prg.show_position() - - # Current count should be updated to last change point - assert prg.current_count == 10 - assert prg.count == 10 - - def test_show_position_full_time_calculation(self): - """Test show_position calculates full time needed""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - # Process enough to trigger percent change - for _ in range(10): - prg.show_position() - - assert prg.full_time_needed is not None - assert prg.full_time_needed >= 0 - - def test_show_position_last_group_time(self): - """Test show_position tracks last group time""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - # Process enough to trigger percent change - for _ in range(10): - prg.show_position() - - # last_group should be set after percent change - assert prg.last_group >= 0 - - def test_show_position_zero_eta_edge_case(self): - """Test show_position handles negative ETA gracefully""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - # Process all lines - for _ in range(100): - prg.show_position() - - # ETA should not be negative - assert prg.eta is not None - assert prg.eta >= 0 - - def test_show_position_no_filesize_string_format(self): - """Test show_position string format without filesize""" - prg = Progress(verbose=1, precision=0) - prg.set_linecount(100) - - for _ in range(10): - prg.show_position() - - # String should not contain byte information - assert "b/s" not in prg.string - assert "Lines" in prg.string - - def test_show_position_wide_time_format(self): - """Test show_position with wide time formatting""" - prg = Progress(verbose=1, precision=0, wide_time=True) - prg.set_linecount(100) - - for _ in range(10): - prg.show_position() - - # With wide_time, time fields should be formatted with specific width - assert prg.string != "" - - def test_show_position_microtime_on(self): - """Test show_position with microtime enabled""" - prg = Progress(verbose=0, precision=0, microtime=1) - prg.set_linecount(100) - - with patch('time.time') as mock_time: - mock_time.return_value = 1000.0 - prg.set_start_time(1000.0) - - mock_time.return_value = 1000.5 - for _ in range(10): - prg.show_position() - - # Microtime should be enabled - assert prg.microtime == 1 - - def test_show_position_microtime_off(self): - """Test show_position with microtime disabled""" - prg = Progress(verbose=0, precision=0, microtime=-1) - prg.set_linecount(100) - - for _ in range(10): - prg.show_position() - - assert prg.microtime == -1 - - def test_show_position_lines_per_second_global(self): - """Test show_position calculates global lines per second""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(1000) - - # Process 100 lines to trigger percent changes - for _ in range(100): - prg.show_position() - - # After processing, lines_in_global should be calculated - assert prg.lines_in_global >= 0 - - def test_show_position_lines_per_second_last_group(self): - """Test show_position calculates last group lines per second""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(1000) - - # Process lines to trigger percent changes - for _ in range(100): - prg.show_position() - - # After processing, lines_in_last_group should be calculated - assert prg.lines_in_last_group >= 0 - - def test_show_position_returns_string(self): - """Test show_position returns the progress string""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - result = "" - for _ in range(10): - result = prg.show_position() - - # Should return string on percent change - assert isinstance(result, str) - - -class TestProgressEdgeCases: - """Test suite for edge cases and error conditions""" - - def test_zero_linecount_protection(self): - """Test Progress handles zero linecount gracefully""" - prg = Progress(verbose=0) - prg.set_filesize(1024) - - # Should not crash with zero linecount - prg.show_position(512) - assert prg.file_pos == 512 - - def test_zero_filesize_protection(self): - """Test Progress handles zero filesize gracefully""" - prg = Progress(verbose=0) - prg.set_linecount(100) - - # Should not crash with zero filesize - prg.show_position() - assert isinstance(prg.string, str) - - def test_division_by_zero_protection_last_group(self): - """Test Progress protects against division by zero in last_group""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - with patch('time.time') as mock_time: - # Same time for start and end - mock_time.return_value = 1000.0 - prg.set_start_time(1000.0) - - for _ in range(10): - prg.show_position() - - # Should handle zero time difference - assert prg.lines_in_last_group >= 0 - - def test_division_by_zero_protection_full_time(self): - """Test Progress protects against division by zero in full_time_needed""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - # Process lines very quickly - for _ in range(10): - prg.show_position() - - # Should handle very small time differences without crashing - # lines_in_global should be a valid number (>= 0) - assert isinstance(prg.lines_in_global, (int, float)) - - def test_none_start_protection(self): - """Test Progress handles None start time""" - prg = Progress(verbose=0, precision=0) - prg.start = None - prg.set_linecount(100) - - # Should not crash - prg.show_position() - - assert prg.start == 0 - - def test_none_start_time_protection(self): - """Test Progress handles None start_time""" - prg = Progress(verbose=0, precision=0) - prg.start_time = None - prg.set_linecount(100) - - # Should not crash and should set start_time during processing - prg.show_position() - - # start_time will be set to 0 internally when None is encountered - # But during percent calculation, it may be reset to current time - assert prg.start_time is not None - - def test_precision_boundary_values(self): - """Test precision at boundary values""" - prg = Progress() - - # Minimum valid - assert prg.set_precision(-2) == 0 - - # Maximum valid - assert prg.set_precision(10) == 10 - - # Below minimum - assert prg.set_precision(-3) == 0 - - # Above maximum - assert prg.set_precision(11) == 0 - - def test_large_linecount_handling(self): - """Test Progress handles large linecount values""" - prg = Progress(verbose=0) - large_count = 10_000_000 - prg.set_linecount(large_count) - - assert prg.linecount == large_count - - # Should handle calculations without overflow - prg.show_position() - assert prg.count == 1 - - def test_large_filesize_handling(self): - """Test Progress handles large filesize values""" - prg = Progress(verbose=0) - large_size = 10_737_418_240 # 10 GB - prg.set_filesize(large_size) - - assert prg.filesize == large_size - - # Should handle calculations without overflow - prg.show_position(1024) - assert prg.file_pos == 1024 - - -class TestProgressIntegration: - """Integration tests for Progress class""" - - def test_complete_progress_workflow(self, capsys: CaptureFixture[str]): - """Test complete progress workflow from start to finish""" - prg = Progress(verbose=1, precision=0) - prg.set_linecount(100) - - # Simulate processing - for _ in range(100): - prg.show_position() - - prg.set_end_time() - - assert prg.count == 100 - assert prg.last_percent == 100.0 - assert prg.run_time is not None - - captured = capsys.readouterr() - assert "Processed" in captured.out - - def test_progress_with_filesize_workflow(self): - """Test progress workflow with file size tracking""" - prg = Progress(verbose=0, precision=0) - prg.set_filesize(10240) - - # Simulate reading file in chunks - for pos in range(0, 10240, 1024): - prg.show_position(pos + 1024) - - assert prg.count == 10 - assert prg.count_size == 10240 - - def test_reset_and_reuse(self): - """Test resetting and reusing Progress instance""" - prg = Progress(verbose=0, precision=0) - - # First run - prg.set_linecount(100) - for _ in range(100): - prg.show_position() - assert prg.count == 100 - - # Reset - prg.reset() - assert prg.count == 0 - - # Second run - prg.set_linecount(50) - for _ in range(50): - prg.show_position() - assert prg.count == 50 - - def test_multiple_precision_changes(self): - """Test changing precision multiple times""" - prg = Progress(verbose=0) - - prg.set_precision(0) - assert prg.precision == 0 - - prg.set_precision(2) - assert prg.precision == 2 - - prg.set_precision(-1) - assert prg.precision == 0 - assert prg.precision_ten_step == 10 - - def test_eta_start_time_adjustment(self): - """Test adjusting ETA start time mid-processing""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(1000) - - # Process some lines - for _ in range(100): - prg.show_position() - - # Adjust ETA start time (simulating delay like DB query) - new_time = time.time() - prg.set_eta_start_time(new_time) - - # Continue processing - for _ in range(100): - prg.show_position() - - assert prg.start_run == new_time - - def test_verbose_toggle_during_processing(self): - """Test toggling verbose flag during processing""" - prg = Progress(verbose=0, precision=0) - prg.set_linecount(100) - - # Process without output - for _ in range(50): - prg.show_position() - - # Enable verbose - prg.set_verbose(1) - assert prg.verbose is True - - # Continue with output - for _ in range(50): - prg.show_position() - - assert prg.count == 100