file handling tests, move progress to script handling
Progress is not only file, but process progress in a script
This commit is contained in:
@@ -20,7 +20,7 @@ This is a pip package that can be installed into any project and covers the foll
|
||||
## Current list
|
||||
|
||||
- config_handling: simple INI config file data loader with check/convert/etc
|
||||
- csv_handling: csv dict writer helper
|
||||
- csv_interface: csv dict writer/reader helper
|
||||
- debug_handling: various debug helpers like data dumper, timer, utilization, etc
|
||||
- db_handling: SQLite interface class
|
||||
- encyption_handling: symmetric encryption
|
||||
@@ -33,6 +33,10 @@ This is a pip package that can be installed into any project and covers the foll
|
||||
- string_handling: byte format, datetime format, datetime compare, hashing, string formats for numbers, double byte string format, etc
|
||||
- var_handling: var type checkers, enum base class
|
||||
|
||||
## Unfinished
|
||||
|
||||
- csv_interface: The CSV DictWriter interface is just in a very basic way implemented
|
||||
|
||||
## UV setup
|
||||
|
||||
uv must be [installed](https://docs.astral.sh/uv/getting-started/installation/)
|
||||
|
||||
@@ -7,7 +7,12 @@ import shutil
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def remove_all_in_directory(directory: Path, ignore_files: list[str] | None = None, verbose: bool = False) -> bool:
|
||||
def remove_all_in_directory(
|
||||
directory: Path,
|
||||
ignore_files: list[str] | None = None,
|
||||
verbose: bool = False,
|
||||
dry_run: bool = False
|
||||
) -> bool:
|
||||
"""
|
||||
remove all files and folders in a directory
|
||||
can exclude files or folders
|
||||
@@ -24,7 +29,10 @@ def remove_all_in_directory(directory: Path, ignore_files: list[str] | None = No
|
||||
if ignore_files is None:
|
||||
ignore_files = []
|
||||
if verbose:
|
||||
print(f"Remove old files in: {directory.name} [", end="", flush=True)
|
||||
print(
|
||||
f"{'[DRY RUN] ' if dry_run else ''}Remove old files in: {directory.name} [",
|
||||
end="", flush=True
|
||||
)
|
||||
# remove all files and folders in given directory by recursive globbing
|
||||
for file in directory.rglob("*"):
|
||||
# skip if in ignore files
|
||||
@@ -32,10 +40,12 @@ def remove_all_in_directory(directory: Path, ignore_files: list[str] | None = No
|
||||
continue
|
||||
# remove one file, or a whole directory
|
||||
if file.is_file():
|
||||
if not dry_run:
|
||||
os.remove(file)
|
||||
if verbose:
|
||||
print(".", end="", flush=True)
|
||||
elif file.is_dir():
|
||||
if not dry_run:
|
||||
shutil.rmtree(file)
|
||||
if verbose:
|
||||
print("/", end="", flush=True)
|
||||
|
||||
@@ -32,7 +32,7 @@ show_position(file pos optional)
|
||||
import time
|
||||
from typing import Literal
|
||||
from math import floor
|
||||
from corelibs.datetime_handling.datetime_helpers import convert_timestamp
|
||||
from corelibs.datetime_handling.timestamp_convert import convert_timestamp
|
||||
from corelibs.string_handling.byte_helpers import format_bytes
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from random import randint
|
||||
import sys
|
||||
import io
|
||||
from pathlib import Path
|
||||
from corelibs.file_handling.progress import Progress
|
||||
from corelibs.script_handling.progress import Progress
|
||||
from corelibs.datetime_handling.datetime_helpers import create_time
|
||||
from corelibs.datetime_handling.timestamp_convert import convert_timestamp
|
||||
|
||||
|
||||
389
tests/unit/file_handling/test_file_crc.py
Normal file
389
tests/unit/file_handling/test_file_crc.py
Normal file
@@ -0,0 +1,389 @@
|
||||
"""
|
||||
PyTest: file_handling/file_crc
|
||||
"""
|
||||
|
||||
import zlib
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
|
||||
from corelibs.file_handling.file_crc import (
|
||||
file_crc,
|
||||
file_name_crc,
|
||||
)
|
||||
|
||||
|
||||
class TestFileCrc:
|
||||
"""Test suite for file_crc function"""
|
||||
|
||||
def test_file_crc_small_file(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a small file"""
|
||||
test_file = tmp_path / "test_small.txt"
|
||||
content = b"Hello, World!"
|
||||
test_file.write_bytes(content)
|
||||
|
||||
# Calculate expected CRC
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 8 # CRC32 is 8 hex digits
|
||||
|
||||
def test_file_crc_large_file(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a file larger than buffer size (65536 bytes)"""
|
||||
test_file = tmp_path / "test_large.bin"
|
||||
|
||||
# Create a file larger than the buffer (65536 bytes)
|
||||
content = b"A" * 100000
|
||||
test_file.write_bytes(content)
|
||||
|
||||
# Calculate expected CRC
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
|
||||
def test_file_crc_empty_file(self, tmp_path: Path):
|
||||
"""Test CRC calculation for an empty file"""
|
||||
test_file = tmp_path / "test_empty.txt"
|
||||
test_file.write_bytes(b"")
|
||||
|
||||
# CRC of empty data
|
||||
expected_crc = f"{zlib.crc32(b"") & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
assert result == "00000000"
|
||||
|
||||
def test_file_crc_binary_file(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a binary file"""
|
||||
test_file = tmp_path / "test_binary.bin"
|
||||
content = bytes(range(256)) # All possible byte values
|
||||
test_file.write_bytes(content)
|
||||
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
|
||||
def test_file_crc_exact_buffer_size(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a file exactly the buffer size"""
|
||||
test_file = tmp_path / "test_exact_buffer.bin"
|
||||
content = b"X" * 65536
|
||||
test_file.write_bytes(content)
|
||||
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
|
||||
def test_file_crc_multiple_buffers(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a file requiring multiple buffer reads"""
|
||||
test_file = tmp_path / "test_multi_buffer.bin"
|
||||
content = b"TestData" * 20000 # ~160KB
|
||||
test_file.write_bytes(content)
|
||||
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
|
||||
def test_file_crc_unicode_content(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a file with unicode content"""
|
||||
test_file = tmp_path / "test_unicode.txt"
|
||||
content = "Hello 世界! 🌍".encode('utf-8')
|
||||
test_file.write_bytes(content)
|
||||
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
|
||||
def test_file_crc_deterministic(self, tmp_path: Path):
|
||||
"""Test that CRC calculation is deterministic"""
|
||||
test_file = tmp_path / "test_deterministic.txt"
|
||||
content = b"Deterministic test content"
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result1 = file_crc(test_file)
|
||||
result2 = file_crc(test_file)
|
||||
|
||||
assert result1 == result2
|
||||
|
||||
def test_file_crc_different_files(self, tmp_path: Path):
|
||||
"""Test that different files produce different CRCs"""
|
||||
file1 = tmp_path / "file1.txt"
|
||||
file2 = tmp_path / "file2.txt"
|
||||
|
||||
file1.write_bytes(b"Content 1")
|
||||
file2.write_bytes(b"Content 2")
|
||||
|
||||
crc1 = file_crc(file1)
|
||||
crc2 = file_crc(file2)
|
||||
|
||||
assert crc1 != crc2
|
||||
|
||||
def test_file_crc_same_content_different_names(self, tmp_path: Path):
|
||||
"""Test that files with same content produce same CRC regardless of name"""
|
||||
file1 = tmp_path / "name1.txt"
|
||||
file2 = tmp_path / "name2.txt"
|
||||
|
||||
content = b"Same content"
|
||||
file1.write_bytes(content)
|
||||
file2.write_bytes(content)
|
||||
|
||||
crc1 = file_crc(file1)
|
||||
crc2 = file_crc(file2)
|
||||
|
||||
assert crc1 == crc2
|
||||
|
||||
def test_file_crc_nonexistent_file(self, tmp_path: Path):
|
||||
"""Test that file_crc raises error for non-existent file"""
|
||||
test_file = tmp_path / "nonexistent.txt"
|
||||
|
||||
with pytest.raises(FileNotFoundError):
|
||||
file_crc(test_file)
|
||||
|
||||
def test_file_crc_with_path_object(self, tmp_path: Path):
|
||||
"""Test file_crc works with Path object"""
|
||||
test_file = tmp_path / "test_path.txt"
|
||||
test_file.write_bytes(b"Test with Path")
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 8
|
||||
|
||||
|
||||
class TestFileNameCrc:
|
||||
"""Test suite for file_name_crc function"""
|
||||
|
||||
def test_file_name_crc_simple_filename(self, tmp_path: Path):
|
||||
"""Test extracting simple filename without parent folder"""
|
||||
test_file = tmp_path / "testfile.csv"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "testfile.csv"
|
||||
|
||||
def test_file_name_crc_with_parent_folder(self, tmp_path: Path):
|
||||
"""Test extracting filename with parent folder"""
|
||||
parent = tmp_path / "parent_folder"
|
||||
parent.mkdir()
|
||||
test_file = parent / "testfile.csv"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert result == "parent_folder/testfile.csv"
|
||||
|
||||
def test_file_name_crc_nested_path_without_parent(self):
|
||||
"""Test filename extraction from deeply nested path without parent"""
|
||||
test_path = Path("/foo/bar/baz/file.csv")
|
||||
|
||||
result = file_name_crc(test_path, add_parent_folder=False)
|
||||
assert result == "file.csv"
|
||||
|
||||
def test_file_name_crc_nested_path_with_parent(self):
|
||||
"""Test filename extraction from deeply nested path with parent"""
|
||||
test_path = Path("/foo/bar/baz/file.csv")
|
||||
|
||||
result = file_name_crc(test_path, add_parent_folder=True)
|
||||
assert result == "baz/file.csv"
|
||||
|
||||
def test_file_name_crc_default_parameter(self, tmp_path: Path):
|
||||
"""Test that add_parent_folder defaults to False"""
|
||||
test_file = tmp_path / "subdir" / "testfile.txt"
|
||||
test_file.parent.mkdir(parents=True)
|
||||
|
||||
result = file_name_crc(test_file)
|
||||
assert result == "testfile.txt"
|
||||
|
||||
def test_file_name_crc_different_extensions(self, tmp_path: Path):
|
||||
"""Test with different file extensions"""
|
||||
extensions = [".txt", ".csv", ".json", ".xml", ".py"]
|
||||
|
||||
for ext in extensions:
|
||||
test_file = tmp_path / f"testfile{ext}"
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == f"testfile{ext}"
|
||||
|
||||
def test_file_name_crc_no_extension(self, tmp_path: Path):
|
||||
"""Test with filename without extension"""
|
||||
test_file = tmp_path / "testfile"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "testfile"
|
||||
|
||||
def test_file_name_crc_multiple_dots(self, tmp_path: Path):
|
||||
"""Test with filename containing multiple dots"""
|
||||
test_file = tmp_path / "test.file.name.tar.gz"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "test.file.name.tar.gz"
|
||||
|
||||
def test_file_name_crc_with_spaces(self, tmp_path: Path):
|
||||
"""Test with filename containing spaces"""
|
||||
test_file = tmp_path / "test file name.txt"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "test file name.txt"
|
||||
|
||||
def test_file_name_crc_with_special_chars(self, tmp_path: Path):
|
||||
"""Test with filename containing special characters"""
|
||||
test_file = tmp_path / "test_file-name (1).txt"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "test_file-name (1).txt"
|
||||
|
||||
def test_file_name_crc_unicode_filename(self, tmp_path: Path):
|
||||
"""Test with unicode characters in filename"""
|
||||
test_file = tmp_path / "テストファイル.txt"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "テストファイル.txt"
|
||||
|
||||
def test_file_name_crc_unicode_parent(self, tmp_path: Path):
|
||||
"""Test with unicode characters in parent folder name"""
|
||||
parent = tmp_path / "親フォルダ"
|
||||
parent.mkdir()
|
||||
test_file = parent / "file.txt"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert result == "親フォルダ/file.txt"
|
||||
|
||||
def test_file_name_crc_path_separator(self, tmp_path: Path):
|
||||
"""Test that result uses forward slash separator"""
|
||||
parent = tmp_path / "parent"
|
||||
parent.mkdir()
|
||||
test_file = parent / "file.txt"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert "/" in result
|
||||
assert result == "parent/file.txt"
|
||||
|
||||
def test_file_name_crc_return_type(self, tmp_path: Path):
|
||||
"""Test that return type is always string"""
|
||||
test_file = tmp_path / "test.txt"
|
||||
|
||||
result1 = file_name_crc(test_file, add_parent_folder=False)
|
||||
result2 = file_name_crc(test_file, add_parent_folder=True)
|
||||
|
||||
assert isinstance(result1, str)
|
||||
assert isinstance(result2, str)
|
||||
|
||||
def test_file_name_crc_root_level_file(self):
|
||||
"""Test with file at root level"""
|
||||
test_path = Path("/file.txt")
|
||||
|
||||
result_without_parent = file_name_crc(test_path, add_parent_folder=False)
|
||||
assert result_without_parent == "file.txt"
|
||||
|
||||
result_with_parent = file_name_crc(test_path, add_parent_folder=True)
|
||||
# Parent of root-level file would be empty string or root
|
||||
assert "file.txt" in result_with_parent
|
||||
|
||||
def test_file_name_crc_relative_path(self):
|
||||
"""Test with relative path"""
|
||||
test_path = Path("folder/subfolder/file.txt")
|
||||
|
||||
result = file_name_crc(test_path, add_parent_folder=True)
|
||||
assert result == "subfolder/file.txt"
|
||||
|
||||
def test_file_name_crc_current_dir(self):
|
||||
"""Test with file in current directory"""
|
||||
test_path = Path("file.txt")
|
||||
|
||||
result = file_name_crc(test_path, add_parent_folder=False)
|
||||
assert result == "file.txt"
|
||||
|
||||
def test_file_name_crc_nonexistent_file(self, tmp_path: Path):
|
||||
"""Test that file_name_crc works even if file doesn't exist"""
|
||||
test_file = tmp_path / "parent" / "nonexistent.txt"
|
||||
|
||||
# Should work without file existing
|
||||
result1 = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result1 == "nonexistent.txt"
|
||||
|
||||
result2 = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert result2 == "parent/nonexistent.txt"
|
||||
|
||||
def test_file_name_crc_explicit_true(self, tmp_path: Path):
|
||||
"""Test explicitly setting add_parent_folder to True"""
|
||||
parent = tmp_path / "mydir"
|
||||
parent.mkdir()
|
||||
test_file = parent / "myfile.dat"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert result == "mydir/myfile.dat"
|
||||
|
||||
def test_file_name_crc_explicit_false(self, tmp_path: Path):
|
||||
"""Test explicitly setting add_parent_folder to False"""
|
||||
parent = tmp_path / "mydir"
|
||||
parent.mkdir()
|
||||
test_file = parent / "myfile.dat"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "myfile.dat"
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests combining both functions"""
|
||||
|
||||
def test_crc_and_naming_together(self, tmp_path: Path):
|
||||
"""Test using both functions on the same file"""
|
||||
parent = tmp_path / "data"
|
||||
parent.mkdir()
|
||||
test_file = parent / "testfile.csv"
|
||||
test_file.write_bytes(b"Sample data for integration test")
|
||||
|
||||
# Get CRC
|
||||
crc = file_crc(test_file)
|
||||
assert len(crc) == 8
|
||||
|
||||
# Get filename
|
||||
name_simple = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert name_simple == "testfile.csv"
|
||||
|
||||
name_with_parent = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert name_with_parent == "data/testfile.csv"
|
||||
|
||||
def test_multiple_files_crc_comparison(self, tmp_path: Path):
|
||||
"""Test CRC comparison across multiple files"""
|
||||
files: dict[str, str] = {}
|
||||
for i in range(3):
|
||||
file_path = tmp_path / f"file{i}.txt"
|
||||
file_path.write_bytes(f"Content {i}".encode())
|
||||
files[f"file{i}.txt"] = file_crc(file_path)
|
||||
|
||||
# All CRCs should be different
|
||||
assert len(set(files.values())) == 3
|
||||
|
||||
def test_workflow_file_identification(self, tmp_path: Path):
|
||||
"""Test a workflow of identifying files by name and verifying by CRC"""
|
||||
# Create directory structure
|
||||
dir1 = tmp_path / "dir1"
|
||||
dir2 = tmp_path / "dir2"
|
||||
dir1.mkdir()
|
||||
dir2.mkdir()
|
||||
|
||||
# Create same-named files with different content
|
||||
file1 = dir1 / "data.csv"
|
||||
file2 = dir2 / "data.csv"
|
||||
|
||||
file1.write_bytes(b"Data set 1")
|
||||
file2.write_bytes(b"Data set 2")
|
||||
|
||||
# Get names (should be the same)
|
||||
name1 = file_name_crc(file1, add_parent_folder=False)
|
||||
name2 = file_name_crc(file2, add_parent_folder=False)
|
||||
assert name1 == name2 == "data.csv"
|
||||
|
||||
# Get names with parent (should be different)
|
||||
full_name1 = file_name_crc(file1, add_parent_folder=True)
|
||||
full_name2 = file_name_crc(file2, add_parent_folder=True)
|
||||
assert full_name1 == "dir1/data.csv"
|
||||
assert full_name2 == "dir2/data.csv"
|
||||
|
||||
# Get CRCs (should be different)
|
||||
crc1 = file_crc(file1)
|
||||
crc2 = file_crc(file2)
|
||||
assert crc1 != crc2
|
||||
|
||||
# __END__
|
||||
522
tests/unit/file_handling/test_file_handling.py
Normal file
522
tests/unit/file_handling/test_file_handling.py
Normal file
@@ -0,0 +1,522 @@
|
||||
"""
|
||||
PyTest: file_handling/file_handling
|
||||
"""
|
||||
|
||||
# pylint: disable=use-implicit-booleaness-not-comparison
|
||||
|
||||
from pathlib import Path
|
||||
from pytest import CaptureFixture
|
||||
|
||||
from corelibs.file_handling.file_handling import (
|
||||
remove_all_in_directory,
|
||||
)
|
||||
|
||||
|
||||
class TestRemoveAllInDirectory:
|
||||
"""Test suite for remove_all_in_directory function"""
|
||||
|
||||
def test_remove_all_files_in_empty_directory(self, tmp_path: Path):
|
||||
"""Test removing all files from an empty directory"""
|
||||
test_dir = tmp_path / "empty_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert test_dir.exists() # Directory itself should still exist
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_all_files_in_directory(self, tmp_path: Path):
|
||||
"""Test removing all files from a directory with files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create test files
|
||||
(test_dir / "file1.txt").write_text("content 1")
|
||||
(test_dir / "file2.txt").write_text("content 2")
|
||||
(test_dir / "file3.csv").write_text("csv,data")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert test_dir.exists()
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_all_subdirectories(self, tmp_path: Path):
|
||||
"""Test removing subdirectories within a directory"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create subdirectories
|
||||
subdir1 = test_dir / "subdir1"
|
||||
subdir2 = test_dir / "subdir2"
|
||||
subdir1.mkdir()
|
||||
subdir2.mkdir()
|
||||
|
||||
# Add files to subdirectories
|
||||
(subdir1 / "file.txt").write_text("content")
|
||||
(subdir2 / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert test_dir.exists()
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_nested_structure(self, tmp_path: Path):
|
||||
"""Test removing deeply nested directory structure"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create nested structure
|
||||
nested = test_dir / "level1" / "level2" / "level3"
|
||||
nested.mkdir(parents=True)
|
||||
(nested / "deep_file.txt").write_text("deep content")
|
||||
(test_dir / "level1" / "mid_file.txt").write_text("mid content")
|
||||
(test_dir / "top_file.txt").write_text("top content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert test_dir.exists()
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_with_ignore_files_single(self, tmp_path: Path):
|
||||
"""Test removing files while ignoring specific files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files
|
||||
(test_dir / "keep.txt").write_text("keep me")
|
||||
(test_dir / "remove1.txt").write_text("remove me")
|
||||
(test_dir / "remove2.txt").write_text("remove me too")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=["keep.txt"])
|
||||
assert result is True
|
||||
assert test_dir.exists()
|
||||
remaining = list(test_dir.iterdir())
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].name == "keep.txt"
|
||||
|
||||
def test_remove_with_ignore_files_multiple(self, tmp_path: Path):
|
||||
"""Test removing files while ignoring multiple specific files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files
|
||||
(test_dir / "keep1.txt").write_text("keep me")
|
||||
(test_dir / "keep2.log").write_text("keep me too")
|
||||
(test_dir / "remove.txt").write_text("remove me")
|
||||
|
||||
result = remove_all_in_directory(
|
||||
test_dir,
|
||||
ignore_files=["keep1.txt", "keep2.log"]
|
||||
)
|
||||
assert result is True
|
||||
assert test_dir.exists()
|
||||
remaining = {f.name for f in test_dir.iterdir()}
|
||||
assert remaining == {"keep1.txt", "keep2.log"}
|
||||
|
||||
def test_remove_with_ignore_directory(self, tmp_path: Path):
|
||||
"""Test removing with ignored directory"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create directories
|
||||
keep_dir = test_dir / "keep_dir"
|
||||
remove_dir = test_dir / "remove_dir"
|
||||
keep_dir.mkdir()
|
||||
remove_dir.mkdir()
|
||||
|
||||
(keep_dir / "file.txt").write_text("keep")
|
||||
(remove_dir / "file.txt").write_text("remove")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=["keep_dir"])
|
||||
assert result is True
|
||||
assert keep_dir.exists()
|
||||
assert not remove_dir.exists()
|
||||
|
||||
def test_remove_with_ignore_nested_files(self, tmp_path: Path):
|
||||
"""Test that ignore_files matches by name at any level"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files with same name at different levels
|
||||
(test_dir / "keep.txt").write_text("top level keep")
|
||||
(test_dir / "remove.txt").write_text("remove")
|
||||
subdir = test_dir / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "file.txt").write_text("nested")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=["keep.txt"])
|
||||
assert result is True
|
||||
# keep.txt should be preserved at top level
|
||||
assert (test_dir / "keep.txt").exists()
|
||||
# Other files should be removed
|
||||
assert not (test_dir / "remove.txt").exists()
|
||||
# Subdirectory not in ignore list should be removed
|
||||
assert not subdir.exists()
|
||||
|
||||
def test_remove_nonexistent_directory(self, tmp_path: Path):
|
||||
"""Test removing from a non-existent directory returns False"""
|
||||
test_dir = tmp_path / "nonexistent"
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is False
|
||||
|
||||
def test_remove_from_file_not_directory(self, tmp_path: Path):
|
||||
"""Test that function returns False when given a file instead of directory"""
|
||||
test_file = tmp_path / "file.txt"
|
||||
test_file.write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_file)
|
||||
assert result is False
|
||||
assert test_file.exists() # File should not be affected
|
||||
|
||||
def test_remove_with_verbose_mode(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test verbose mode produces output"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files and directories
|
||||
(test_dir / "file1.txt").write_text("content")
|
||||
(test_dir / "file2.txt").write_text("content")
|
||||
subdir = test_dir / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "nested.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir, verbose=True)
|
||||
assert result is True
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "Remove old files in: test_dir [" in captured.out
|
||||
assert "]" in captured.out
|
||||
assert "." in captured.out # Files are marked with .
|
||||
assert "/" in captured.out # Directories are marked with /
|
||||
|
||||
def test_remove_with_dry_run_mode(self, tmp_path: Path):
|
||||
"""Test dry run mode doesn't actually remove files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create test files
|
||||
file1 = test_dir / "file1.txt"
|
||||
file2 = test_dir / "file2.txt"
|
||||
file1.write_text("content 1")
|
||||
file2.write_text("content 2")
|
||||
|
||||
result = remove_all_in_directory(test_dir, dry_run=True)
|
||||
assert result is True
|
||||
# Files should still exist
|
||||
assert file1.exists()
|
||||
assert file2.exists()
|
||||
assert len(list(test_dir.iterdir())) == 2
|
||||
|
||||
def test_remove_with_dry_run_and_verbose(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test dry run with verbose mode shows [DRY RUN] prefix"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir, dry_run=True, verbose=True)
|
||||
assert result is True
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "[DRY RUN]" in captured.out
|
||||
|
||||
def test_remove_mixed_content(self, tmp_path: Path):
|
||||
"""Test removing mixed files and directories"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create mixed content
|
||||
(test_dir / "file1.txt").write_text("content")
|
||||
(test_dir / "file2.csv").write_text("csv")
|
||||
subdir1 = test_dir / "subdir1"
|
||||
subdir2 = test_dir / "subdir2"
|
||||
subdir1.mkdir()
|
||||
subdir2.mkdir()
|
||||
(subdir1 / "nested_file.txt").write_text("nested")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_with_none_ignore_files(self, tmp_path: Path):
|
||||
"""Test that None as ignore_files works correctly"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=None)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_with_empty_ignore_list(self, tmp_path: Path):
|
||||
"""Test that empty ignore_files list works correctly"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=[])
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_special_characters_in_filenames(self, tmp_path: Path):
|
||||
"""Test removing files with special characters in names"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files with special characters
|
||||
(test_dir / "file with spaces.txt").write_text("content")
|
||||
(test_dir / "file-with-dashes.txt").write_text("content")
|
||||
(test_dir / "file_with_underscores.txt").write_text("content")
|
||||
(test_dir / "file.multiple.dots.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_unicode_filenames(self, tmp_path: Path):
|
||||
"""Test removing files with unicode characters in names"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files with unicode names
|
||||
(test_dir / "ファイル.txt").write_text("content")
|
||||
(test_dir / "文件.txt").write_text("content")
|
||||
(test_dir / "αρχείο.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_hidden_files(self, tmp_path: Path):
|
||||
"""Test removing hidden files (dotfiles)"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create hidden files
|
||||
(test_dir / ".hidden").write_text("content")
|
||||
(test_dir / ".gitignore").write_text("content")
|
||||
(test_dir / "normal.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_preserves_ignored_hidden_files(self, tmp_path: Path):
|
||||
"""Test that ignored hidden files are preserved"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
(test_dir / ".gitkeep").write_text("keep")
|
||||
(test_dir / "file.txt").write_text("remove")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=[".gitkeep"])
|
||||
assert result is True
|
||||
remaining = list(test_dir.iterdir())
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].name == ".gitkeep"
|
||||
|
||||
def test_remove_large_number_of_files(self, tmp_path: Path):
|
||||
"""Test removing a large number of files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create 100 files
|
||||
for i in range(100):
|
||||
(test_dir / f"file_{i:03d}.txt").write_text(f"content {i}")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_deeply_nested_with_ignore(self, tmp_path: Path):
|
||||
"""Test removing structure while preserving ignored items
|
||||
|
||||
Note: rglob processes files depth-first, so files inside an ignored
|
||||
directory will be processed (and potentially removed) before the directory
|
||||
itself is checked. Only items at the same level or that share the same name
|
||||
as ignored items will be preserved.
|
||||
"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create structure
|
||||
level1 = test_dir / "level1"
|
||||
level1.mkdir()
|
||||
keep_file = test_dir / "keep.txt"
|
||||
(level1 / "file.txt").write_text("remove")
|
||||
keep_file.write_text("keep this file")
|
||||
(test_dir / "top.txt").write_text("remove")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=["keep.txt"])
|
||||
assert result is True
|
||||
# Check that keep.txt is preserved
|
||||
assert keep_file.exists()
|
||||
assert keep_file.read_text() == "keep this file"
|
||||
# Other items should be removed
|
||||
assert not (test_dir / "top.txt").exists()
|
||||
assert not level1.exists()
|
||||
|
||||
def test_remove_binary_files(self, tmp_path: Path):
|
||||
"""Test removing binary files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create binary files
|
||||
(test_dir / "binary1.bin").write_bytes(bytes(range(256)))
|
||||
(test_dir / "binary2.dat").write_bytes(b"\x00\x01\x02\xff")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_symlinks(self, tmp_path: Path):
|
||||
"""Test removing symbolic links"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create a file and a symlink to it
|
||||
original = tmp_path / "original.txt"
|
||||
original.write_text("original content")
|
||||
symlink = test_dir / "link.txt"
|
||||
symlink.symlink_to(original)
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
# Original file should still exist
|
||||
assert original.exists()
|
||||
|
||||
def test_remove_with_permissions_variations(self, tmp_path: Path):
|
||||
"""Test removing files with different permissions"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files
|
||||
file1 = test_dir / "readonly.txt"
|
||||
file2 = test_dir / "normal.txt"
|
||||
file1.write_text("readonly")
|
||||
file2.write_text("normal")
|
||||
|
||||
# Make file1 read-only
|
||||
file1.chmod(0o444)
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_default_parameters(self, tmp_path: Path):
|
||||
"""Test function with only required parameter"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_return_value_true_when_successful(self, tmp_path: Path):
|
||||
"""Test that function returns True on successful removal"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_remove_return_value_false_when_not_directory(self, tmp_path: Path):
|
||||
"""Test that function returns False when path is not a directory"""
|
||||
test_file = tmp_path / "file.txt"
|
||||
test_file.write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_file)
|
||||
assert result is False
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_remove_directory_becomes_empty(self, tmp_path: Path):
|
||||
"""Test that directory is empty after removal"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create various items
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
subdir = test_dir / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "nested.txt").write_text("nested")
|
||||
|
||||
# Verify directory is not empty before
|
||||
assert len(list(test_dir.iterdir())) > 0
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
|
||||
# Verify directory is empty after
|
||||
assert len(list(test_dir.iterdir())) == 0
|
||||
assert test_dir.exists()
|
||||
assert test_dir.is_dir()
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests for file_handling module"""
|
||||
|
||||
def test_multiple_remove_operations(self, tmp_path: Path):
|
||||
"""Test multiple consecutive remove operations"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# First batch of files
|
||||
(test_dir / "batch1_file1.txt").write_text("content")
|
||||
(test_dir / "batch1_file2.txt").write_text("content")
|
||||
|
||||
result1 = remove_all_in_directory(test_dir)
|
||||
assert result1 is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
# Second batch of files
|
||||
(test_dir / "batch2_file1.txt").write_text("content")
|
||||
(test_dir / "batch2_file2.txt").write_text("content")
|
||||
|
||||
result2 = remove_all_in_directory(test_dir)
|
||||
assert result2 is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_then_recreate(self, tmp_path: Path):
|
||||
"""Test removing files then recreating them"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create and remove
|
||||
original_file = test_dir / "file.txt"
|
||||
original_file.write_text("original")
|
||||
remove_all_in_directory(test_dir)
|
||||
assert not original_file.exists()
|
||||
|
||||
# Recreate
|
||||
new_file = test_dir / "file.txt"
|
||||
new_file.write_text("new content")
|
||||
assert new_file.exists()
|
||||
assert new_file.read_text() == "new content"
|
||||
|
||||
def test_cleanup_workflow(self, tmp_path: Path):
|
||||
"""Test a typical cleanup workflow"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Simulate work directory
|
||||
(test_dir / "temp1.tmp").write_text("temp")
|
||||
(test_dir / "temp2.tmp").write_text("temp")
|
||||
(test_dir / "result.txt").write_text("important")
|
||||
|
||||
# Clean up temp files, keep result
|
||||
result = remove_all_in_directory(
|
||||
test_dir,
|
||||
ignore_files=["result.txt"]
|
||||
)
|
||||
assert result is True
|
||||
|
||||
remaining = list(test_dir.iterdir())
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].name == "result.txt"
|
||||
assert remaining[0].read_text() == "important"
|
||||
|
||||
# __END__
|
||||
Reference in New Issue
Block a user