Move file handling to corelibs_file module
This commit is contained in:
@@ -10,6 +10,7 @@ dependencies = [
|
||||
"corelibs-debug>=1.0.0",
|
||||
"corelibs-dump-data>=1.0.0",
|
||||
"corelibs-enum-base>=1.0.0",
|
||||
"corelibs-file>=1.0.0",
|
||||
"corelibs-regex-checks>=1.0.0",
|
||||
"corelibs-stack-trace>=1.0.0",
|
||||
"corelibs-text-colors>=1.0.0",
|
||||
|
||||
@@ -2,19 +2,16 @@
|
||||
File check if BOM encoded, needed for CSV load
|
||||
"""
|
||||
|
||||
from warnings import deprecated
|
||||
from pathlib import Path
|
||||
from typing import TypedDict
|
||||
|
||||
|
||||
class BomEncodingInfo(TypedDict):
|
||||
"""BOM encoding info"""
|
||||
has_bom: bool
|
||||
bom_type: str | None
|
||||
encoding: str | None
|
||||
bom_length: int
|
||||
bom_pattern: bytes | None
|
||||
from corelibs_file.file_bom_encoding import (
|
||||
is_bom_encoded as is_bom_encoding_ng,
|
||||
get_bom_encoding_info,
|
||||
BomEncodingInfo
|
||||
)
|
||||
|
||||
|
||||
@deprecated("Use corelibs_file.file_bom_encoding.is_bom_encoded instead")
|
||||
def is_bom_encoded(file_path: Path) -> bool:
|
||||
"""
|
||||
Detect if a file is BOM encoded
|
||||
@@ -25,9 +22,10 @@ def is_bom_encoded(file_path: Path) -> bool:
|
||||
Returns:
|
||||
bool: True if file has BOM, False otherwise
|
||||
"""
|
||||
return is_bom_encoded_info(file_path)['has_bom']
|
||||
return is_bom_encoding_ng(file_path)
|
||||
|
||||
|
||||
@deprecated("Use corelibs_file.file_bom_encoding.is_bom_encoded_info instead")
|
||||
def is_bom_encoded_info(file_path: Path) -> BomEncodingInfo:
|
||||
"""
|
||||
Enhanced BOM detection with additional file analysis
|
||||
@@ -38,38 +36,7 @@ def is_bom_encoded_info(file_path: Path) -> BomEncodingInfo:
|
||||
Returns:
|
||||
dict: Comprehensive BOM and encoding information
|
||||
"""
|
||||
try:
|
||||
# Read first 1024 bytes for analysis
|
||||
with open(file_path, 'rb') as f:
|
||||
header = f.read(4)
|
||||
|
||||
bom_patterns = {
|
||||
b'\xef\xbb\xbf': ('UTF-8', 'utf-8', 3),
|
||||
b'\xff\xfe\x00\x00': ('UTF-32 LE', 'utf-32-le', 4),
|
||||
b'\x00\x00\xfe\xff': ('UTF-32 BE', 'utf-32-be', 4),
|
||||
b'\xff\xfe': ('UTF-16 LE', 'utf-16-le', 2),
|
||||
b'\xfe\xff': ('UTF-16 BE', 'utf-16-be', 2),
|
||||
}
|
||||
|
||||
for bom_pattern, (encoding_name, encoding, length) in bom_patterns.items():
|
||||
if header.startswith(bom_pattern):
|
||||
return {
|
||||
'has_bom': True,
|
||||
'bom_type': encoding_name,
|
||||
'encoding': encoding,
|
||||
'bom_length': length,
|
||||
'bom_pattern': bom_pattern
|
||||
}
|
||||
|
||||
return {
|
||||
'has_bom': False,
|
||||
'bom_type': None,
|
||||
'encoding': None,
|
||||
'bom_length': 0,
|
||||
'bom_pattern': None
|
||||
}
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error checking BOM encoding: {e}") from e
|
||||
return get_bom_encoding_info(file_path)
|
||||
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -2,10 +2,13 @@
|
||||
crc handlers for file CRC
|
||||
"""
|
||||
|
||||
import zlib
|
||||
from warnings import deprecated
|
||||
from pathlib import Path
|
||||
from corelibs_file.file_crc import file_crc as file_crc_ng
|
||||
from corelibs_file.file_handling import get_file_name
|
||||
|
||||
|
||||
@deprecated("Use corelibs_file.file_crc.file_crc instead")
|
||||
def file_crc(file_path: Path) -> str:
|
||||
"""
|
||||
With for loop and buffer, create file crc32
|
||||
@@ -16,13 +19,10 @@ def file_crc(file_path: Path) -> str:
|
||||
Returns:
|
||||
str: file crc32
|
||||
"""
|
||||
crc = 0
|
||||
with open(file_path, 'rb', 65536) as ins:
|
||||
for _ in range(int((file_path.stat().st_size / 65536)) + 1):
|
||||
crc = zlib.crc32(ins.read(65536), crc)
|
||||
return f"{crc & 0xFFFFFFFF:08X}"
|
||||
return file_crc_ng(file_path)
|
||||
|
||||
|
||||
@deprecated("Use corelibs_file.file_handling.get_file_name instead")
|
||||
def file_name_crc(file_path: Path, add_parent_folder: bool = False) -> str:
|
||||
"""
|
||||
either returns file name only from path
|
||||
@@ -38,9 +38,6 @@ def file_name_crc(file_path: Path, add_parent_folder: bool = False) -> str:
|
||||
Returns:
|
||||
str: file name as string
|
||||
"""
|
||||
if add_parent_folder:
|
||||
return str(Path(file_path.parent.name).joinpath(file_path.name))
|
||||
else:
|
||||
return file_path.name
|
||||
return get_file_name(file_path, add_parent_folder=add_parent_folder)
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -2,11 +2,12 @@
|
||||
File handling utilities
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from warnings import deprecated
|
||||
from pathlib import Path
|
||||
from corelibs_file.file_handling import remove_all_in_directory as remove_all_in_directory_ng
|
||||
|
||||
|
||||
@deprecated("Use corelibs_file.file_handling.remove_all_in_directory instead")
|
||||
def remove_all_in_directory(
|
||||
directory: Path,
|
||||
ignore_files: list[str] | None = None,
|
||||
@@ -14,43 +15,24 @@ def remove_all_in_directory(
|
||||
dry_run: bool = False
|
||||
) -> bool:
|
||||
"""
|
||||
remove all files and folders in a directory
|
||||
can exclude files or folders
|
||||
deprecated
|
||||
|
||||
Args:
|
||||
directory (Path): _description_
|
||||
ignore_files (list[str], optional): _description_. Defaults to None.
|
||||
Arguments:
|
||||
directory {Path} -- _description_
|
||||
|
||||
Keyword Arguments:
|
||||
ignore_files {list[str] | None} -- _description_ (default: {None})
|
||||
verbose {bool} -- _description_ (default: {False})
|
||||
dry_run {bool} -- _description_ (default: {False})
|
||||
|
||||
Returns:
|
||||
bool: _description_
|
||||
bool -- _description_
|
||||
"""
|
||||
if not directory.is_dir():
|
||||
return False
|
||||
if ignore_files is None:
|
||||
ignore_files = []
|
||||
if verbose:
|
||||
print(
|
||||
f"{'[DRY RUN] ' if dry_run else ''}Remove old files in: {directory.name} [",
|
||||
end="", flush=True
|
||||
)
|
||||
# remove all files and folders in given directory by recursive globbing
|
||||
for file in directory.rglob("*"):
|
||||
# skip if in ignore files
|
||||
if file.name in ignore_files:
|
||||
continue
|
||||
# remove one file, or a whole directory
|
||||
if file.is_file():
|
||||
if not dry_run:
|
||||
os.remove(file)
|
||||
if verbose:
|
||||
print(".", end="", flush=True)
|
||||
elif file.is_dir():
|
||||
if not dry_run:
|
||||
shutil.rmtree(file)
|
||||
if verbose:
|
||||
print("/", end="", flush=True)
|
||||
if verbose:
|
||||
print("]", flush=True)
|
||||
return True
|
||||
return remove_all_in_directory_ng(
|
||||
directory,
|
||||
ignore_files=ignore_files,
|
||||
verbose=verbose,
|
||||
dry_run=dry_run
|
||||
)
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -5,8 +5,8 @@ BOM check for files
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from corelibs.file_handling.file_bom_encoding import is_bom_encoded, is_bom_encoded_info
|
||||
from corelibs_dump_data.dump_data import dump_data
|
||||
from corelibs.file_handling.file_bom_encoding import is_bom_encoded, is_bom_encoded_info
|
||||
|
||||
|
||||
def main() -> None:
|
||||
|
||||
@@ -1,538 +0,0 @@
|
||||
"""
|
||||
PyTest: file_handling/file_bom_encoding
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
|
||||
from corelibs.file_handling.file_bom_encoding import (
|
||||
is_bom_encoded,
|
||||
is_bom_encoded_info,
|
||||
BomEncodingInfo,
|
||||
)
|
||||
|
||||
|
||||
class TestIsBomEncoded:
|
||||
"""Test suite for is_bom_encoded function"""
|
||||
|
||||
def test_utf8_bom_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-8 BOM encoded file"""
|
||||
test_file = tmp_path / "utf8_bom.txt"
|
||||
# UTF-8 BOM: EF BB BF
|
||||
content = b'\xef\xbb\xbfHello, World!'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_utf16_le_bom_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-16 LE BOM encoded file"""
|
||||
test_file = tmp_path / "utf16_le_bom.txt"
|
||||
# UTF-16 LE BOM: FF FE
|
||||
content = b'\xff\xfeH\x00e\x00l\x00l\x00o\x00'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
def test_utf16_be_bom_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-16 BE BOM encoded file"""
|
||||
test_file = tmp_path / "utf16_be_bom.txt"
|
||||
# UTF-16 BE BOM: FE FF
|
||||
content = b'\xfe\xff\x00H\x00e\x00l\x00l\x00o'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
def test_utf32_le_bom_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-32 LE BOM encoded file"""
|
||||
test_file = tmp_path / "utf32_le_bom.txt"
|
||||
# UTF-32 LE BOM: FF FE 00 00
|
||||
content = b'\xff\xfe\x00\x00H\x00\x00\x00e\x00\x00\x00'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
def test_utf32_be_bom_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-32 BE BOM encoded file"""
|
||||
test_file = tmp_path / "utf32_be_bom.txt"
|
||||
# UTF-32 BE BOM: 00 00 FE FF
|
||||
content = b'\x00\x00\xfe\xff\x00\x00\x00H\x00\x00\x00e'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
def test_no_bom_ascii_file(self, tmp_path: Path):
|
||||
"""Test detection of ASCII file without BOM"""
|
||||
test_file = tmp_path / "ascii.txt"
|
||||
content = b'Hello, World!'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_no_bom_utf8_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-8 file without BOM"""
|
||||
test_file = tmp_path / "utf8_no_bom.txt"
|
||||
content = 'Hello, 世界!'.encode('utf-8')
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_empty_file(self, tmp_path: Path):
|
||||
"""Test detection on empty file"""
|
||||
test_file = tmp_path / "empty.txt"
|
||||
test_file.write_bytes(b'')
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_binary_file_no_bom(self, tmp_path: Path):
|
||||
"""Test detection on binary file without BOM"""
|
||||
test_file = tmp_path / "binary.bin"
|
||||
content = bytes(range(256))
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_partial_bom_pattern(self, tmp_path: Path):
|
||||
"""Test file with partial BOM pattern that shouldn't match"""
|
||||
test_file = tmp_path / "partial_bom.txt"
|
||||
# Only first two bytes of UTF-8 BOM
|
||||
content = b'\xef\xbbHello'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_false_positive_bom_pattern(self, tmp_path: Path):
|
||||
"""Test file that contains BOM-like bytes but not at the start"""
|
||||
test_file = tmp_path / "false_positive.txt"
|
||||
content = b'Hello\xef\xbb\xbfWorld'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_nonexistent_file(self, tmp_path: Path):
|
||||
"""Test that function raises error for non-existent file"""
|
||||
test_file = tmp_path / "nonexistent.txt"
|
||||
|
||||
with pytest.raises(ValueError, match="Error checking BOM encoding"):
|
||||
is_bom_encoded(test_file)
|
||||
|
||||
def test_very_small_file(self, tmp_path: Path):
|
||||
"""Test file smaller than largest BOM pattern (4 bytes)"""
|
||||
test_file = tmp_path / "small.txt"
|
||||
content = b'Hi'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_exactly_bom_size_utf8(self, tmp_path: Path):
|
||||
"""Test file that is exactly the size of UTF-8 BOM"""
|
||||
test_file = tmp_path / "exact_bom.txt"
|
||||
content = b'\xef\xbb\xbf'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
def test_exactly_bom_size_utf32(self, tmp_path: Path):
|
||||
"""Test file that is exactly the size of UTF-32 BOM"""
|
||||
test_file = tmp_path / "exact_bom_utf32.txt"
|
||||
content = b'\xff\xfe\x00\x00'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
|
||||
class TestIsBomEncodedInfo:
|
||||
"""Test suite for is_bom_encoded_info function"""
|
||||
|
||||
def test_utf8_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for UTF-8 BOM encoded file"""
|
||||
test_file = tmp_path / "utf8_bom.txt"
|
||||
content = b'\xef\xbb\xbfHello, UTF-8!'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert isinstance(result, dict)
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-8'
|
||||
assert result['encoding'] == 'utf-8'
|
||||
assert result['bom_length'] == 3
|
||||
assert result['bom_pattern'] == b'\xef\xbb\xbf'
|
||||
|
||||
def test_utf16_le_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for UTF-16 LE BOM encoded file"""
|
||||
test_file = tmp_path / "utf16_le_bom.txt"
|
||||
content = b'\xff\xfeH\x00e\x00l\x00l\x00o\x00'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-16 LE'
|
||||
assert result['encoding'] == 'utf-16-le'
|
||||
assert result['bom_length'] == 2
|
||||
assert result['bom_pattern'] == b'\xff\xfe'
|
||||
|
||||
def test_utf16_be_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for UTF-16 BE BOM encoded file"""
|
||||
test_file = tmp_path / "utf16_be_bom.txt"
|
||||
content = b'\xfe\xff\x00H\x00e\x00l\x00l\x00o'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-16 BE'
|
||||
assert result['encoding'] == 'utf-16-be'
|
||||
assert result['bom_length'] == 2
|
||||
assert result['bom_pattern'] == b'\xfe\xff'
|
||||
|
||||
def test_utf32_le_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for UTF-32 LE BOM encoded file"""
|
||||
test_file = tmp_path / "utf32_le_bom.txt"
|
||||
content = b'\xff\xfe\x00\x00H\x00\x00\x00e\x00\x00\x00'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-32 LE'
|
||||
assert result['encoding'] == 'utf-32-le'
|
||||
assert result['bom_length'] == 4
|
||||
assert result['bom_pattern'] == b'\xff\xfe\x00\x00'
|
||||
|
||||
def test_utf32_be_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for UTF-32 BE BOM encoded file"""
|
||||
test_file = tmp_path / "utf32_be_bom.txt"
|
||||
content = b'\x00\x00\xfe\xff\x00\x00\x00H\x00\x00\x00e'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-32 BE'
|
||||
assert result['encoding'] == 'utf-32-be'
|
||||
assert result['bom_length'] == 4
|
||||
assert result['bom_pattern'] == b'\x00\x00\xfe\xff'
|
||||
|
||||
def test_no_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for file without BOM"""
|
||||
test_file = tmp_path / "no_bom.txt"
|
||||
content = b'Hello, World!'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is False
|
||||
assert result['bom_type'] is None
|
||||
assert result['encoding'] is None
|
||||
assert result['bom_length'] == 0
|
||||
assert result['bom_pattern'] is None
|
||||
|
||||
def test_empty_file_info(self, tmp_path: Path):
|
||||
"""Test detailed info for empty file"""
|
||||
test_file = tmp_path / "empty.txt"
|
||||
test_file.write_bytes(b'')
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is False
|
||||
assert result['bom_type'] is None
|
||||
assert result['encoding'] is None
|
||||
assert result['bom_length'] == 0
|
||||
assert result['bom_pattern'] is None
|
||||
|
||||
def test_bom_precedence_utf32_vs_utf16(self, tmp_path: Path):
|
||||
"""Test that UTF-32 LE BOM takes precedence over UTF-16 LE when both match"""
|
||||
test_file = tmp_path / "precedence.txt"
|
||||
# UTF-32 LE BOM starts with UTF-16 LE BOM pattern
|
||||
content = b'\xff\xfe\x00\x00Additional content'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
# Should detect UTF-32 LE, not UTF-16 LE
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-32 LE'
|
||||
assert result['encoding'] == 'utf-32-le'
|
||||
assert result['bom_length'] == 4
|
||||
assert result['bom_pattern'] == b'\xff\xfe\x00\x00'
|
||||
|
||||
def test_return_type_validation(self, tmp_path: Path):
|
||||
"""Test that return type matches BomEncodingInfo TypedDict"""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_bytes(b'Test content')
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
# Check all required keys are present
|
||||
required_keys = {'has_bom', 'bom_type', 'encoding', 'bom_length', 'bom_pattern'}
|
||||
assert set(result.keys()) == required_keys
|
||||
|
||||
# Check types
|
||||
assert isinstance(result['has_bom'], bool)
|
||||
assert result['bom_type'] is None or isinstance(result['bom_type'], str)
|
||||
assert result['encoding'] is None or isinstance(result['encoding'], str)
|
||||
assert isinstance(result['bom_length'], int)
|
||||
assert result['bom_pattern'] is None or isinstance(result['bom_pattern'], bytes)
|
||||
|
||||
def test_nonexistent_file_error(self, tmp_path: Path):
|
||||
"""Test that function raises ValueError for non-existent file"""
|
||||
test_file = tmp_path / "nonexistent.txt"
|
||||
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
is_bom_encoded_info(test_file)
|
||||
|
||||
assert "Error checking BOM encoding" in str(exc_info.value)
|
||||
|
||||
def test_directory_instead_of_file(self, tmp_path: Path):
|
||||
"""Test that function raises error when given a directory"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
with pytest.raises(ValueError, match="Error checking BOM encoding"):
|
||||
is_bom_encoded_info(test_dir)
|
||||
|
||||
def test_large_file_with_bom(self, tmp_path: Path):
|
||||
"""Test BOM detection on large file (only first 4 bytes matter)"""
|
||||
test_file = tmp_path / "large_bom.txt"
|
||||
# UTF-8 BOM followed by large content
|
||||
content = b'\xef\xbb\xbf' + b'A' * 100000
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-8'
|
||||
assert result['encoding'] == 'utf-8'
|
||||
|
||||
def test_bom_detection_priority_order(self, tmp_path: Path):
|
||||
"""Test that BOM patterns are checked in the correct priority order"""
|
||||
# The function should check longer patterns first to avoid false matches
|
||||
test_cases = [
|
||||
(b'\xff\xfe\x00\x00', 'UTF-32 LE'), # 4 bytes
|
||||
(b'\x00\x00\xfe\xff', 'UTF-32 BE'), # 4 bytes
|
||||
(b'\xff\xfe', 'UTF-16 LE'), # 2 bytes
|
||||
(b'\xfe\xff', 'UTF-16 BE'), # 2 bytes
|
||||
(b'\xef\xbb\xbf', 'UTF-8'), # 3 bytes
|
||||
]
|
||||
|
||||
for i, (bom_bytes, expected_type) in enumerate(test_cases):
|
||||
test_file = tmp_path / f"priority_test_{i}.txt"
|
||||
content = bom_bytes + b'Content'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
assert result['bom_type'] == expected_type
|
||||
assert result['bom_pattern'] == bom_bytes
|
||||
|
||||
def test_csv_file_with_utf8_bom(self, tmp_path: Path):
|
||||
"""Test CSV file with UTF-8 BOM (common use case mentioned in docstring)"""
|
||||
test_file = tmp_path / "data.csv"
|
||||
content = b'\xef\xbb\xbf"Name","Age","City"\n"John",30,"New York"\n"Jane",25,"Tokyo"'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-8'
|
||||
assert result['encoding'] == 'utf-8'
|
||||
assert result['bom_length'] == 3
|
||||
|
||||
def test_csv_file_without_bom(self, tmp_path: Path):
|
||||
"""Test CSV file without BOM"""
|
||||
test_file = tmp_path / "data_no_bom.csv"
|
||||
content = b'"Name","Age","City"\n"John",30,"New York"\n"Jane",25,"Tokyo"'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is False
|
||||
assert result['bom_type'] is None
|
||||
assert result['encoding'] is None
|
||||
assert result['bom_length'] == 0
|
||||
|
||||
|
||||
class TestBomEncodingInfo:
|
||||
"""Test suite for BomEncodingInfo TypedDict"""
|
||||
|
||||
def test_typed_dict_structure(self):
|
||||
"""Test that BomEncodingInfo has correct structure"""
|
||||
# This is a type check - in actual usage, mypy would validate this
|
||||
sample_info: BomEncodingInfo = {
|
||||
'has_bom': True,
|
||||
'bom_type': 'UTF-8',
|
||||
'encoding': 'utf-8',
|
||||
'bom_length': 3,
|
||||
'bom_pattern': b'\xef\xbb\xbf'
|
||||
}
|
||||
|
||||
assert sample_info['has_bom'] is True
|
||||
assert sample_info['bom_type'] == 'UTF-8'
|
||||
assert sample_info['encoding'] == 'utf-8'
|
||||
assert sample_info['bom_length'] == 3
|
||||
assert sample_info['bom_pattern'] == b'\xef\xbb\xbf'
|
||||
|
||||
def test_typed_dict_none_values(self):
|
||||
"""Test TypedDict with None values"""
|
||||
sample_info: BomEncodingInfo = {
|
||||
'has_bom': False,
|
||||
'bom_type': None,
|
||||
'encoding': None,
|
||||
'bom_length': 0,
|
||||
'bom_pattern': None
|
||||
}
|
||||
|
||||
assert sample_info['has_bom'] is False
|
||||
assert sample_info['bom_type'] is None
|
||||
assert sample_info['encoding'] is None
|
||||
assert sample_info['bom_length'] == 0
|
||||
assert sample_info['bom_pattern'] is None
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests for BOM encoding detection"""
|
||||
|
||||
def test_is_bom_encoded_uses_info_function(self, tmp_path: Path):
|
||||
"""Test that is_bom_encoded uses is_bom_encoded_info internally"""
|
||||
test_file = tmp_path / "integration.txt"
|
||||
content = b'\xef\xbb\xbfIntegration test'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
# Both functions should return consistent results
|
||||
simple_result = is_bom_encoded(test_file)
|
||||
detailed_result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert simple_result == detailed_result['has_bom']
|
||||
assert simple_result is True
|
||||
|
||||
def test_multiple_file_bom_detection_workflow(self, tmp_path: Path):
|
||||
"""Test a workflow of detecting BOM across multiple files"""
|
||||
files = {
|
||||
'utf8_bom.csv': b'\xef\xbb\xbf"data","value"\n"test",123',
|
||||
'utf16_le.txt': b'\xff\xfeH\x00e\x00l\x00l\x00o\x00',
|
||||
'no_bom.txt': b'Plain ASCII text',
|
||||
'empty.txt': b'',
|
||||
}
|
||||
|
||||
results = {}
|
||||
detailed_results = {}
|
||||
|
||||
for filename, content in files.items():
|
||||
file_path = tmp_path / filename
|
||||
file_path.write_bytes(content)
|
||||
|
||||
results[filename] = is_bom_encoded(file_path)
|
||||
detailed_results[filename] = is_bom_encoded_info(file_path)
|
||||
|
||||
# Verify results
|
||||
assert results['utf8_bom.csv'] is True
|
||||
assert results['utf16_le.txt'] is True
|
||||
assert results['no_bom.txt'] is False
|
||||
assert results['empty.txt'] is False
|
||||
|
||||
# Verify detailed results match simple results
|
||||
for filename in files:
|
||||
assert results[filename] == detailed_results[filename]['has_bom']
|
||||
|
||||
# Verify specific encoding details
|
||||
assert detailed_results['utf8_bom.csv']['encoding'] == 'utf-8'
|
||||
assert detailed_results['utf16_le.txt']['encoding'] == 'utf-16-le'
|
||||
assert detailed_results['no_bom.txt']['encoding'] is None
|
||||
|
||||
def test_csv_loading_workflow(self, tmp_path: Path):
|
||||
"""Test BOM detection workflow for CSV loading (main use case)"""
|
||||
# Create CSV files with and without BOM
|
||||
csv_with_bom = tmp_path / "data_with_bom.csv"
|
||||
csv_without_bom = tmp_path / "data_without_bom.csv"
|
||||
|
||||
# CSV with UTF-8 BOM
|
||||
bom_content = b'\xef\xbb\xbf"Name","Age"\n"Alice",30\n"Bob",25'
|
||||
csv_with_bom.write_bytes(bom_content)
|
||||
|
||||
# CSV without BOM
|
||||
no_bom_content = b'"Name","Age"\n"Charlie",35\n"Diana",28'
|
||||
csv_without_bom.write_bytes(no_bom_content)
|
||||
|
||||
# Simulate CSV loading workflow
|
||||
files_to_process = [csv_with_bom, csv_without_bom]
|
||||
processing_info: list[dict[str, str | bool | int]] = []
|
||||
|
||||
for csv_file in files_to_process:
|
||||
bom_info = is_bom_encoded_info(csv_file)
|
||||
|
||||
file_info: dict[str, str | bool | int] = {
|
||||
'file': csv_file.name,
|
||||
'has_bom': bom_info['has_bom'],
|
||||
'encoding': bom_info['encoding'] or 'default',
|
||||
'skip_bytes': bom_info['bom_length']
|
||||
}
|
||||
processing_info.append(file_info)
|
||||
|
||||
# Verify workflow results
|
||||
assert len(processing_info) == 2
|
||||
|
||||
bom_file_info = next(info for info in processing_info if info['file'] == 'data_with_bom.csv')
|
||||
no_bom_file_info = next(info for info in processing_info if info['file'] == 'data_without_bom.csv')
|
||||
|
||||
assert bom_file_info['has_bom'] is True
|
||||
assert bom_file_info['encoding'] == 'utf-8'
|
||||
assert bom_file_info['skip_bytes'] == 3
|
||||
|
||||
assert no_bom_file_info['has_bom'] is False
|
||||
assert no_bom_file_info['encoding'] == 'default'
|
||||
assert no_bom_file_info['skip_bytes'] == 0
|
||||
|
||||
def test_error_handling_consistency(self, tmp_path: Path):
|
||||
"""Test that both functions handle errors consistently"""
|
||||
nonexistent_file = tmp_path / "does_not_exist.txt"
|
||||
|
||||
# Both functions should raise ValueError for non-existent files
|
||||
with pytest.raises(ValueError):
|
||||
is_bom_encoded(nonexistent_file)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
is_bom_encoded_info(nonexistent_file)
|
||||
|
||||
def test_all_supported_bom_types(self, tmp_path: Path):
|
||||
"""Test detection of all supported BOM types"""
|
||||
bom_test_cases = [
|
||||
('utf8', b'\xef\xbb\xbf', 'UTF-8', 'utf-8', 3),
|
||||
('utf16_le', b'\xff\xfe', 'UTF-16 LE', 'utf-16-le', 2),
|
||||
('utf16_be', b'\xfe\xff', 'UTF-16 BE', 'utf-16-be', 2),
|
||||
('utf32_le', b'\xff\xfe\x00\x00', 'UTF-32 LE', 'utf-32-le', 4),
|
||||
('utf32_be', b'\x00\x00\xfe\xff', 'UTF-32 BE', 'utf-32-be', 4),
|
||||
]
|
||||
|
||||
for name, bom_bytes, expected_type, expected_encoding, expected_length in bom_test_cases:
|
||||
test_file = tmp_path / f"{name}_test.txt"
|
||||
content = bom_bytes + b'Test content'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
# Test simple function
|
||||
assert is_bom_encoded(test_file) is True
|
||||
|
||||
# Test detailed function
|
||||
info = is_bom_encoded_info(test_file)
|
||||
assert info['has_bom'] is True
|
||||
assert info['bom_type'] == expected_type
|
||||
assert info['encoding'] == expected_encoding
|
||||
assert info['bom_length'] == expected_length
|
||||
assert info['bom_pattern'] == bom_bytes
|
||||
|
||||
|
||||
# __END__
|
||||
@@ -1,389 +0,0 @@
|
||||
"""
|
||||
PyTest: file_handling/file_crc
|
||||
"""
|
||||
|
||||
import zlib
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
|
||||
from corelibs.file_handling.file_crc import (
|
||||
file_crc,
|
||||
file_name_crc,
|
||||
)
|
||||
|
||||
|
||||
class TestFileCrc:
|
||||
"""Test suite for file_crc function"""
|
||||
|
||||
def test_file_crc_small_file(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a small file"""
|
||||
test_file = tmp_path / "test_small.txt"
|
||||
content = b"Hello, World!"
|
||||
test_file.write_bytes(content)
|
||||
|
||||
# Calculate expected CRC
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 8 # CRC32 is 8 hex digits
|
||||
|
||||
def test_file_crc_large_file(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a file larger than buffer size (65536 bytes)"""
|
||||
test_file = tmp_path / "test_large.bin"
|
||||
|
||||
# Create a file larger than the buffer (65536 bytes)
|
||||
content = b"A" * 100000
|
||||
test_file.write_bytes(content)
|
||||
|
||||
# Calculate expected CRC
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
|
||||
def test_file_crc_empty_file(self, tmp_path: Path):
|
||||
"""Test CRC calculation for an empty file"""
|
||||
test_file = tmp_path / "test_empty.txt"
|
||||
test_file.write_bytes(b"")
|
||||
|
||||
# CRC of empty data
|
||||
expected_crc = f"{zlib.crc32(b"") & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
assert result == "00000000"
|
||||
|
||||
def test_file_crc_binary_file(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a binary file"""
|
||||
test_file = tmp_path / "test_binary.bin"
|
||||
content = bytes(range(256)) # All possible byte values
|
||||
test_file.write_bytes(content)
|
||||
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
|
||||
def test_file_crc_exact_buffer_size(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a file exactly the buffer size"""
|
||||
test_file = tmp_path / "test_exact_buffer.bin"
|
||||
content = b"X" * 65536
|
||||
test_file.write_bytes(content)
|
||||
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
|
||||
def test_file_crc_multiple_buffers(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a file requiring multiple buffer reads"""
|
||||
test_file = tmp_path / "test_multi_buffer.bin"
|
||||
content = b"TestData" * 20000 # ~160KB
|
||||
test_file.write_bytes(content)
|
||||
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
|
||||
def test_file_crc_unicode_content(self, tmp_path: Path):
|
||||
"""Test CRC calculation for a file with unicode content"""
|
||||
test_file = tmp_path / "test_unicode.txt"
|
||||
content = "Hello 世界! 🌍".encode('utf-8')
|
||||
test_file.write_bytes(content)
|
||||
|
||||
expected_crc = f"{zlib.crc32(content) & 0xFFFFFFFF:08X}"
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert result == expected_crc
|
||||
|
||||
def test_file_crc_deterministic(self, tmp_path: Path):
|
||||
"""Test that CRC calculation is deterministic"""
|
||||
test_file = tmp_path / "test_deterministic.txt"
|
||||
content = b"Deterministic test content"
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result1 = file_crc(test_file)
|
||||
result2 = file_crc(test_file)
|
||||
|
||||
assert result1 == result2
|
||||
|
||||
def test_file_crc_different_files(self, tmp_path: Path):
|
||||
"""Test that different files produce different CRCs"""
|
||||
file1 = tmp_path / "file1.txt"
|
||||
file2 = tmp_path / "file2.txt"
|
||||
|
||||
file1.write_bytes(b"Content 1")
|
||||
file2.write_bytes(b"Content 2")
|
||||
|
||||
crc1 = file_crc(file1)
|
||||
crc2 = file_crc(file2)
|
||||
|
||||
assert crc1 != crc2
|
||||
|
||||
def test_file_crc_same_content_different_names(self, tmp_path: Path):
|
||||
"""Test that files with same content produce same CRC regardless of name"""
|
||||
file1 = tmp_path / "name1.txt"
|
||||
file2 = tmp_path / "name2.txt"
|
||||
|
||||
content = b"Same content"
|
||||
file1.write_bytes(content)
|
||||
file2.write_bytes(content)
|
||||
|
||||
crc1 = file_crc(file1)
|
||||
crc2 = file_crc(file2)
|
||||
|
||||
assert crc1 == crc2
|
||||
|
||||
def test_file_crc_nonexistent_file(self, tmp_path: Path):
|
||||
"""Test that file_crc raises error for non-existent file"""
|
||||
test_file = tmp_path / "nonexistent.txt"
|
||||
|
||||
with pytest.raises(FileNotFoundError):
|
||||
file_crc(test_file)
|
||||
|
||||
def test_file_crc_with_path_object(self, tmp_path: Path):
|
||||
"""Test file_crc works with Path object"""
|
||||
test_file = tmp_path / "test_path.txt"
|
||||
test_file.write_bytes(b"Test with Path")
|
||||
|
||||
result = file_crc(test_file)
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 8
|
||||
|
||||
|
||||
class TestFileNameCrc:
|
||||
"""Test suite for file_name_crc function"""
|
||||
|
||||
def test_file_name_crc_simple_filename(self, tmp_path: Path):
|
||||
"""Test extracting simple filename without parent folder"""
|
||||
test_file = tmp_path / "testfile.csv"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "testfile.csv"
|
||||
|
||||
def test_file_name_crc_with_parent_folder(self, tmp_path: Path):
|
||||
"""Test extracting filename with parent folder"""
|
||||
parent = tmp_path / "parent_folder"
|
||||
parent.mkdir()
|
||||
test_file = parent / "testfile.csv"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert result == "parent_folder/testfile.csv"
|
||||
|
||||
def test_file_name_crc_nested_path_without_parent(self):
|
||||
"""Test filename extraction from deeply nested path without parent"""
|
||||
test_path = Path("/foo/bar/baz/file.csv")
|
||||
|
||||
result = file_name_crc(test_path, add_parent_folder=False)
|
||||
assert result == "file.csv"
|
||||
|
||||
def test_file_name_crc_nested_path_with_parent(self):
|
||||
"""Test filename extraction from deeply nested path with parent"""
|
||||
test_path = Path("/foo/bar/baz/file.csv")
|
||||
|
||||
result = file_name_crc(test_path, add_parent_folder=True)
|
||||
assert result == "baz/file.csv"
|
||||
|
||||
def test_file_name_crc_default_parameter(self, tmp_path: Path):
|
||||
"""Test that add_parent_folder defaults to False"""
|
||||
test_file = tmp_path / "subdir" / "testfile.txt"
|
||||
test_file.parent.mkdir(parents=True)
|
||||
|
||||
result = file_name_crc(test_file)
|
||||
assert result == "testfile.txt"
|
||||
|
||||
def test_file_name_crc_different_extensions(self, tmp_path: Path):
|
||||
"""Test with different file extensions"""
|
||||
extensions = [".txt", ".csv", ".json", ".xml", ".py"]
|
||||
|
||||
for ext in extensions:
|
||||
test_file = tmp_path / f"testfile{ext}"
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == f"testfile{ext}"
|
||||
|
||||
def test_file_name_crc_no_extension(self, tmp_path: Path):
|
||||
"""Test with filename without extension"""
|
||||
test_file = tmp_path / "testfile"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "testfile"
|
||||
|
||||
def test_file_name_crc_multiple_dots(self, tmp_path: Path):
|
||||
"""Test with filename containing multiple dots"""
|
||||
test_file = tmp_path / "test.file.name.tar.gz"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "test.file.name.tar.gz"
|
||||
|
||||
def test_file_name_crc_with_spaces(self, tmp_path: Path):
|
||||
"""Test with filename containing spaces"""
|
||||
test_file = tmp_path / "test file name.txt"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "test file name.txt"
|
||||
|
||||
def test_file_name_crc_with_special_chars(self, tmp_path: Path):
|
||||
"""Test with filename containing special characters"""
|
||||
test_file = tmp_path / "test_file-name (1).txt"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "test_file-name (1).txt"
|
||||
|
||||
def test_file_name_crc_unicode_filename(self, tmp_path: Path):
|
||||
"""Test with unicode characters in filename"""
|
||||
test_file = tmp_path / "テストファイル.txt"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "テストファイル.txt"
|
||||
|
||||
def test_file_name_crc_unicode_parent(self, tmp_path: Path):
|
||||
"""Test with unicode characters in parent folder name"""
|
||||
parent = tmp_path / "親フォルダ"
|
||||
parent.mkdir()
|
||||
test_file = parent / "file.txt"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert result == "親フォルダ/file.txt"
|
||||
|
||||
def test_file_name_crc_path_separator(self, tmp_path: Path):
|
||||
"""Test that result uses forward slash separator"""
|
||||
parent = tmp_path / "parent"
|
||||
parent.mkdir()
|
||||
test_file = parent / "file.txt"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert "/" in result
|
||||
assert result == "parent/file.txt"
|
||||
|
||||
def test_file_name_crc_return_type(self, tmp_path: Path):
|
||||
"""Test that return type is always string"""
|
||||
test_file = tmp_path / "test.txt"
|
||||
|
||||
result1 = file_name_crc(test_file, add_parent_folder=False)
|
||||
result2 = file_name_crc(test_file, add_parent_folder=True)
|
||||
|
||||
assert isinstance(result1, str)
|
||||
assert isinstance(result2, str)
|
||||
|
||||
def test_file_name_crc_root_level_file(self):
|
||||
"""Test with file at root level"""
|
||||
test_path = Path("/file.txt")
|
||||
|
||||
result_without_parent = file_name_crc(test_path, add_parent_folder=False)
|
||||
assert result_without_parent == "file.txt"
|
||||
|
||||
result_with_parent = file_name_crc(test_path, add_parent_folder=True)
|
||||
# Parent of root-level file would be empty string or root
|
||||
assert "file.txt" in result_with_parent
|
||||
|
||||
def test_file_name_crc_relative_path(self):
|
||||
"""Test with relative path"""
|
||||
test_path = Path("folder/subfolder/file.txt")
|
||||
|
||||
result = file_name_crc(test_path, add_parent_folder=True)
|
||||
assert result == "subfolder/file.txt"
|
||||
|
||||
def test_file_name_crc_current_dir(self):
|
||||
"""Test with file in current directory"""
|
||||
test_path = Path("file.txt")
|
||||
|
||||
result = file_name_crc(test_path, add_parent_folder=False)
|
||||
assert result == "file.txt"
|
||||
|
||||
def test_file_name_crc_nonexistent_file(self, tmp_path: Path):
|
||||
"""Test that file_name_crc works even if file doesn't exist"""
|
||||
test_file = tmp_path / "parent" / "nonexistent.txt"
|
||||
|
||||
# Should work without file existing
|
||||
result1 = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result1 == "nonexistent.txt"
|
||||
|
||||
result2 = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert result2 == "parent/nonexistent.txt"
|
||||
|
||||
def test_file_name_crc_explicit_true(self, tmp_path: Path):
|
||||
"""Test explicitly setting add_parent_folder to True"""
|
||||
parent = tmp_path / "mydir"
|
||||
parent.mkdir()
|
||||
test_file = parent / "myfile.dat"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert result == "mydir/myfile.dat"
|
||||
|
||||
def test_file_name_crc_explicit_false(self, tmp_path: Path):
|
||||
"""Test explicitly setting add_parent_folder to False"""
|
||||
parent = tmp_path / "mydir"
|
||||
parent.mkdir()
|
||||
test_file = parent / "myfile.dat"
|
||||
|
||||
result = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert result == "myfile.dat"
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests combining both functions"""
|
||||
|
||||
def test_crc_and_naming_together(self, tmp_path: Path):
|
||||
"""Test using both functions on the same file"""
|
||||
parent = tmp_path / "data"
|
||||
parent.mkdir()
|
||||
test_file = parent / "testfile.csv"
|
||||
test_file.write_bytes(b"Sample data for integration test")
|
||||
|
||||
# Get CRC
|
||||
crc = file_crc(test_file)
|
||||
assert len(crc) == 8
|
||||
|
||||
# Get filename
|
||||
name_simple = file_name_crc(test_file, add_parent_folder=False)
|
||||
assert name_simple == "testfile.csv"
|
||||
|
||||
name_with_parent = file_name_crc(test_file, add_parent_folder=True)
|
||||
assert name_with_parent == "data/testfile.csv"
|
||||
|
||||
def test_multiple_files_crc_comparison(self, tmp_path: Path):
|
||||
"""Test CRC comparison across multiple files"""
|
||||
files: dict[str, str] = {}
|
||||
for i in range(3):
|
||||
file_path = tmp_path / f"file{i}.txt"
|
||||
file_path.write_bytes(f"Content {i}".encode())
|
||||
files[f"file{i}.txt"] = file_crc(file_path)
|
||||
|
||||
# All CRCs should be different
|
||||
assert len(set(files.values())) == 3
|
||||
|
||||
def test_workflow_file_identification(self, tmp_path: Path):
|
||||
"""Test a workflow of identifying files by name and verifying by CRC"""
|
||||
# Create directory structure
|
||||
dir1 = tmp_path / "dir1"
|
||||
dir2 = tmp_path / "dir2"
|
||||
dir1.mkdir()
|
||||
dir2.mkdir()
|
||||
|
||||
# Create same-named files with different content
|
||||
file1 = dir1 / "data.csv"
|
||||
file2 = dir2 / "data.csv"
|
||||
|
||||
file1.write_bytes(b"Data set 1")
|
||||
file2.write_bytes(b"Data set 2")
|
||||
|
||||
# Get names (should be the same)
|
||||
name1 = file_name_crc(file1, add_parent_folder=False)
|
||||
name2 = file_name_crc(file2, add_parent_folder=False)
|
||||
assert name1 == name2 == "data.csv"
|
||||
|
||||
# Get names with parent (should be different)
|
||||
full_name1 = file_name_crc(file1, add_parent_folder=True)
|
||||
full_name2 = file_name_crc(file2, add_parent_folder=True)
|
||||
assert full_name1 == "dir1/data.csv"
|
||||
assert full_name2 == "dir2/data.csv"
|
||||
|
||||
# Get CRCs (should be different)
|
||||
crc1 = file_crc(file1)
|
||||
crc2 = file_crc(file2)
|
||||
assert crc1 != crc2
|
||||
|
||||
# __END__
|
||||
@@ -1,522 +0,0 @@
|
||||
"""
|
||||
PyTest: file_handling/file_handling
|
||||
"""
|
||||
|
||||
# pylint: disable=use-implicit-booleaness-not-comparison
|
||||
|
||||
from pathlib import Path
|
||||
from pytest import CaptureFixture
|
||||
|
||||
from corelibs.file_handling.file_handling import (
|
||||
remove_all_in_directory,
|
||||
)
|
||||
|
||||
|
||||
class TestRemoveAllInDirectory:
|
||||
"""Test suite for remove_all_in_directory function"""
|
||||
|
||||
def test_remove_all_files_in_empty_directory(self, tmp_path: Path):
|
||||
"""Test removing all files from an empty directory"""
|
||||
test_dir = tmp_path / "empty_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert test_dir.exists() # Directory itself should still exist
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_all_files_in_directory(self, tmp_path: Path):
|
||||
"""Test removing all files from a directory with files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create test files
|
||||
(test_dir / "file1.txt").write_text("content 1")
|
||||
(test_dir / "file2.txt").write_text("content 2")
|
||||
(test_dir / "file3.csv").write_text("csv,data")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert test_dir.exists()
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_all_subdirectories(self, tmp_path: Path):
|
||||
"""Test removing subdirectories within a directory"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create subdirectories
|
||||
subdir1 = test_dir / "subdir1"
|
||||
subdir2 = test_dir / "subdir2"
|
||||
subdir1.mkdir()
|
||||
subdir2.mkdir()
|
||||
|
||||
# Add files to subdirectories
|
||||
(subdir1 / "file.txt").write_text("content")
|
||||
(subdir2 / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert test_dir.exists()
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_nested_structure(self, tmp_path: Path):
|
||||
"""Test removing deeply nested directory structure"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create nested structure
|
||||
nested = test_dir / "level1" / "level2" / "level3"
|
||||
nested.mkdir(parents=True)
|
||||
(nested / "deep_file.txt").write_text("deep content")
|
||||
(test_dir / "level1" / "mid_file.txt").write_text("mid content")
|
||||
(test_dir / "top_file.txt").write_text("top content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert test_dir.exists()
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_with_ignore_files_single(self, tmp_path: Path):
|
||||
"""Test removing files while ignoring specific files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files
|
||||
(test_dir / "keep.txt").write_text("keep me")
|
||||
(test_dir / "remove1.txt").write_text("remove me")
|
||||
(test_dir / "remove2.txt").write_text("remove me too")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=["keep.txt"])
|
||||
assert result is True
|
||||
assert test_dir.exists()
|
||||
remaining = list(test_dir.iterdir())
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].name == "keep.txt"
|
||||
|
||||
def test_remove_with_ignore_files_multiple(self, tmp_path: Path):
|
||||
"""Test removing files while ignoring multiple specific files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files
|
||||
(test_dir / "keep1.txt").write_text("keep me")
|
||||
(test_dir / "keep2.log").write_text("keep me too")
|
||||
(test_dir / "remove.txt").write_text("remove me")
|
||||
|
||||
result = remove_all_in_directory(
|
||||
test_dir,
|
||||
ignore_files=["keep1.txt", "keep2.log"]
|
||||
)
|
||||
assert result is True
|
||||
assert test_dir.exists()
|
||||
remaining = {f.name for f in test_dir.iterdir()}
|
||||
assert remaining == {"keep1.txt", "keep2.log"}
|
||||
|
||||
def test_remove_with_ignore_directory(self, tmp_path: Path):
|
||||
"""Test removing with ignored directory"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create directories
|
||||
keep_dir = test_dir / "keep_dir"
|
||||
remove_dir = test_dir / "remove_dir"
|
||||
keep_dir.mkdir()
|
||||
remove_dir.mkdir()
|
||||
|
||||
(keep_dir / "file.txt").write_text("keep")
|
||||
(remove_dir / "file.txt").write_text("remove")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=["keep_dir"])
|
||||
assert result is True
|
||||
assert keep_dir.exists()
|
||||
assert not remove_dir.exists()
|
||||
|
||||
def test_remove_with_ignore_nested_files(self, tmp_path: Path):
|
||||
"""Test that ignore_files matches by name at any level"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files with same name at different levels
|
||||
(test_dir / "keep.txt").write_text("top level keep")
|
||||
(test_dir / "remove.txt").write_text("remove")
|
||||
subdir = test_dir / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "file.txt").write_text("nested")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=["keep.txt"])
|
||||
assert result is True
|
||||
# keep.txt should be preserved at top level
|
||||
assert (test_dir / "keep.txt").exists()
|
||||
# Other files should be removed
|
||||
assert not (test_dir / "remove.txt").exists()
|
||||
# Subdirectory not in ignore list should be removed
|
||||
assert not subdir.exists()
|
||||
|
||||
def test_remove_nonexistent_directory(self, tmp_path: Path):
|
||||
"""Test removing from a non-existent directory returns False"""
|
||||
test_dir = tmp_path / "nonexistent"
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is False
|
||||
|
||||
def test_remove_from_file_not_directory(self, tmp_path: Path):
|
||||
"""Test that function returns False when given a file instead of directory"""
|
||||
test_file = tmp_path / "file.txt"
|
||||
test_file.write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_file)
|
||||
assert result is False
|
||||
assert test_file.exists() # File should not be affected
|
||||
|
||||
def test_remove_with_verbose_mode(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test verbose mode produces output"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files and directories
|
||||
(test_dir / "file1.txt").write_text("content")
|
||||
(test_dir / "file2.txt").write_text("content")
|
||||
subdir = test_dir / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "nested.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir, verbose=True)
|
||||
assert result is True
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "Remove old files in: test_dir [" in captured.out
|
||||
assert "]" in captured.out
|
||||
assert "." in captured.out # Files are marked with .
|
||||
assert "/" in captured.out # Directories are marked with /
|
||||
|
||||
def test_remove_with_dry_run_mode(self, tmp_path: Path):
|
||||
"""Test dry run mode doesn't actually remove files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create test files
|
||||
file1 = test_dir / "file1.txt"
|
||||
file2 = test_dir / "file2.txt"
|
||||
file1.write_text("content 1")
|
||||
file2.write_text("content 2")
|
||||
|
||||
result = remove_all_in_directory(test_dir, dry_run=True)
|
||||
assert result is True
|
||||
# Files should still exist
|
||||
assert file1.exists()
|
||||
assert file2.exists()
|
||||
assert len(list(test_dir.iterdir())) == 2
|
||||
|
||||
def test_remove_with_dry_run_and_verbose(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test dry run with verbose mode shows [DRY RUN] prefix"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir, dry_run=True, verbose=True)
|
||||
assert result is True
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert "[DRY RUN]" in captured.out
|
||||
|
||||
def test_remove_mixed_content(self, tmp_path: Path):
|
||||
"""Test removing mixed files and directories"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create mixed content
|
||||
(test_dir / "file1.txt").write_text("content")
|
||||
(test_dir / "file2.csv").write_text("csv")
|
||||
subdir1 = test_dir / "subdir1"
|
||||
subdir2 = test_dir / "subdir2"
|
||||
subdir1.mkdir()
|
||||
subdir2.mkdir()
|
||||
(subdir1 / "nested_file.txt").write_text("nested")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_with_none_ignore_files(self, tmp_path: Path):
|
||||
"""Test that None as ignore_files works correctly"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=None)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_with_empty_ignore_list(self, tmp_path: Path):
|
||||
"""Test that empty ignore_files list works correctly"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=[])
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_special_characters_in_filenames(self, tmp_path: Path):
|
||||
"""Test removing files with special characters in names"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files with special characters
|
||||
(test_dir / "file with spaces.txt").write_text("content")
|
||||
(test_dir / "file-with-dashes.txt").write_text("content")
|
||||
(test_dir / "file_with_underscores.txt").write_text("content")
|
||||
(test_dir / "file.multiple.dots.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_unicode_filenames(self, tmp_path: Path):
|
||||
"""Test removing files with unicode characters in names"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files with unicode names
|
||||
(test_dir / "ファイル.txt").write_text("content")
|
||||
(test_dir / "文件.txt").write_text("content")
|
||||
(test_dir / "αρχείο.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_hidden_files(self, tmp_path: Path):
|
||||
"""Test removing hidden files (dotfiles)"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create hidden files
|
||||
(test_dir / ".hidden").write_text("content")
|
||||
(test_dir / ".gitignore").write_text("content")
|
||||
(test_dir / "normal.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_preserves_ignored_hidden_files(self, tmp_path: Path):
|
||||
"""Test that ignored hidden files are preserved"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
(test_dir / ".gitkeep").write_text("keep")
|
||||
(test_dir / "file.txt").write_text("remove")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=[".gitkeep"])
|
||||
assert result is True
|
||||
remaining = list(test_dir.iterdir())
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].name == ".gitkeep"
|
||||
|
||||
def test_remove_large_number_of_files(self, tmp_path: Path):
|
||||
"""Test removing a large number of files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create 100 files
|
||||
for i in range(100):
|
||||
(test_dir / f"file_{i:03d}.txt").write_text(f"content {i}")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_deeply_nested_with_ignore(self, tmp_path: Path):
|
||||
"""Test removing structure while preserving ignored items
|
||||
|
||||
Note: rglob processes files depth-first, so files inside an ignored
|
||||
directory will be processed (and potentially removed) before the directory
|
||||
itself is checked. Only items at the same level or that share the same name
|
||||
as ignored items will be preserved.
|
||||
"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create structure
|
||||
level1 = test_dir / "level1"
|
||||
level1.mkdir()
|
||||
keep_file = test_dir / "keep.txt"
|
||||
(level1 / "file.txt").write_text("remove")
|
||||
keep_file.write_text("keep this file")
|
||||
(test_dir / "top.txt").write_text("remove")
|
||||
|
||||
result = remove_all_in_directory(test_dir, ignore_files=["keep.txt"])
|
||||
assert result is True
|
||||
# Check that keep.txt is preserved
|
||||
assert keep_file.exists()
|
||||
assert keep_file.read_text() == "keep this file"
|
||||
# Other items should be removed
|
||||
assert not (test_dir / "top.txt").exists()
|
||||
assert not level1.exists()
|
||||
|
||||
def test_remove_binary_files(self, tmp_path: Path):
|
||||
"""Test removing binary files"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create binary files
|
||||
(test_dir / "binary1.bin").write_bytes(bytes(range(256)))
|
||||
(test_dir / "binary2.dat").write_bytes(b"\x00\x01\x02\xff")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_symlinks(self, tmp_path: Path):
|
||||
"""Test removing symbolic links"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create a file and a symlink to it
|
||||
original = tmp_path / "original.txt"
|
||||
original.write_text("original content")
|
||||
symlink = test_dir / "link.txt"
|
||||
symlink.symlink_to(original)
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
# Original file should still exist
|
||||
assert original.exists()
|
||||
|
||||
def test_remove_with_permissions_variations(self, tmp_path: Path):
|
||||
"""Test removing files with different permissions"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create files
|
||||
file1 = test_dir / "readonly.txt"
|
||||
file2 = test_dir / "normal.txt"
|
||||
file1.write_text("readonly")
|
||||
file2.write_text("normal")
|
||||
|
||||
# Make file1 read-only
|
||||
file1.chmod(0o444)
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_default_parameters(self, tmp_path: Path):
|
||||
"""Test function with only required parameter"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_return_value_true_when_successful(self, tmp_path: Path):
|
||||
"""Test that function returns True on successful removal"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_remove_return_value_false_when_not_directory(self, tmp_path: Path):
|
||||
"""Test that function returns False when path is not a directory"""
|
||||
test_file = tmp_path / "file.txt"
|
||||
test_file.write_text("content")
|
||||
|
||||
result = remove_all_in_directory(test_file)
|
||||
assert result is False
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_remove_directory_becomes_empty(self, tmp_path: Path):
|
||||
"""Test that directory is empty after removal"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create various items
|
||||
(test_dir / "file.txt").write_text("content")
|
||||
subdir = test_dir / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "nested.txt").write_text("nested")
|
||||
|
||||
# Verify directory is not empty before
|
||||
assert len(list(test_dir.iterdir())) > 0
|
||||
|
||||
result = remove_all_in_directory(test_dir)
|
||||
assert result is True
|
||||
|
||||
# Verify directory is empty after
|
||||
assert len(list(test_dir.iterdir())) == 0
|
||||
assert test_dir.exists()
|
||||
assert test_dir.is_dir()
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests for file_handling module"""
|
||||
|
||||
def test_multiple_remove_operations(self, tmp_path: Path):
|
||||
"""Test multiple consecutive remove operations"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# First batch of files
|
||||
(test_dir / "batch1_file1.txt").write_text("content")
|
||||
(test_dir / "batch1_file2.txt").write_text("content")
|
||||
|
||||
result1 = remove_all_in_directory(test_dir)
|
||||
assert result1 is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
# Second batch of files
|
||||
(test_dir / "batch2_file1.txt").write_text("content")
|
||||
(test_dir / "batch2_file2.txt").write_text("content")
|
||||
|
||||
result2 = remove_all_in_directory(test_dir)
|
||||
assert result2 is True
|
||||
assert list(test_dir.iterdir()) == []
|
||||
|
||||
def test_remove_then_recreate(self, tmp_path: Path):
|
||||
"""Test removing files then recreating them"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Create and remove
|
||||
original_file = test_dir / "file.txt"
|
||||
original_file.write_text("original")
|
||||
remove_all_in_directory(test_dir)
|
||||
assert not original_file.exists()
|
||||
|
||||
# Recreate
|
||||
new_file = test_dir / "file.txt"
|
||||
new_file.write_text("new content")
|
||||
assert new_file.exists()
|
||||
assert new_file.read_text() == "new content"
|
||||
|
||||
def test_cleanup_workflow(self, tmp_path: Path):
|
||||
"""Test a typical cleanup workflow"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
# Simulate work directory
|
||||
(test_dir / "temp1.tmp").write_text("temp")
|
||||
(test_dir / "temp2.tmp").write_text("temp")
|
||||
(test_dir / "result.txt").write_text("important")
|
||||
|
||||
# Clean up temp files, keep result
|
||||
result = remove_all_in_directory(
|
||||
test_dir,
|
||||
ignore_files=["result.txt"]
|
||||
)
|
||||
assert result is True
|
||||
|
||||
remaining = list(test_dir.iterdir())
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].name == "result.txt"
|
||||
assert remaining[0].read_text() == "important"
|
||||
|
||||
# __END__
|
||||
Reference in New Issue
Block a user