Compare commits

..

8 Commits

Author SHA1 Message Date
Clemens Schwaighofer
4e78d83092 Add checks for BOM encoding in files 2025-11-06 18:21:32 +09:00
Clemens Schwaighofer
0e6331fa6a v0.33.0: datetime parsing update 2025-11-06 13:26:07 +09:00
Clemens Schwaighofer
c98c5df63c Update datetime parse helper
Allow non T in isotime format, add non T normal datetime parsing
2025-11-06 13:24:27 +09:00
Clemens Schwaighofer
0981c74da9 v0.32.0: add email sending 2025-10-27 11:22:11 +09:00
Clemens Schwaighofer
31518799f6 README update 2025-10-27 11:20:46 +09:00
Clemens Schwaighofer
e8b4b9b48e Add send email class 2025-10-27 11:19:38 +09:00
Clemens Schwaighofer
cd06272b38 v0.31.1: fix dict_helper file name to dict_helpers 2025-10-27 10:42:45 +09:00
Clemens Schwaighofer
c5ab4352e3 Fix name dict_helper to dict_helpers
So we have the same name for everyhing
2025-10-27 10:40:12 +09:00
17 changed files with 2170 additions and 12 deletions

View File

@@ -10,6 +10,7 @@ This is a pip package that can be installed into any project and covers the foll
- logging update with exception logs
- requests wrapper for easier auth pass on access
- dict fingerprinting
- sending email
- jmespath search
- json helpers for conten replace and output
- dump outputs for data for debugging
@@ -26,6 +27,7 @@ This is a pip package that can be installed into any project and covers the foll
- debug_handling: various debug helpers like data dumper, timer, utilization, etc
- db_handling: SQLite interface class
- encyption_handling: symmetric encryption
- email_handling: simple email sending
- file_handling: crc handling for file content and file names, progress bar
- json_handling: jmespath support and json date support, replace content in dict with json paths
- iterator_handling: list and dictionary handling support (search, fingerprinting, etc)
@@ -50,7 +52,7 @@ Have the following setup in `project.toml`
```toml
[[tool.uv.index]]
name = "egra-gitea"
name = "opj-pypi"
url = "https://git.egplusww.jp/api/packages/PyPI/pypi/simple/"
publish-url = "https://git.egplusww.jp/api/packages/PyPI/pypi"
explicit = true
@@ -58,15 +60,15 @@ explicit = true
```sh
uv build
uv publish --index egra-gitea --token <gitea token>
uv publish --index opj-pypi --token <gitea token>
```
## Test package
## Use package
We must set the full index URL here because we run with "--no-project"
```sh
uv run --with corelibs --index egra-gitea=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/ --no-project -- python -c "import corelibs"
uv run --with corelibs --index opj-pypi=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/ --no-project -- python -c "import corelibs"
```
### Python tests
@@ -99,7 +101,7 @@ uv run test-run/<script>
This will also add the index entry
```sh
uv add corelibs --index egra-gitea=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/
uv add corelibs --index opj-pypi=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/
```
## Python venv setup

View File

@@ -1,7 +1,7 @@
# MARK: Project info
[project]
name = "corelibs"
version = "0.31.0"
version = "0.33.0"
description = "Collection of utils for Python scripts"
readme = "README.md"
requires-python = ">=3.13"
@@ -17,7 +17,7 @@ dependencies = [
# MARK: build target
[[tool.uv.index]]
name = "egra-gitea"
name = "opj-pypi"
url = "https://git.egplusww.jp/api/packages/PyPI/pypi/simple/"
publish-url = "https://git.egplusww.jp/api/packages/PyPI/pypi"
explicit = true
@@ -63,12 +63,13 @@ ignore = [
[tool.pylint.MASTER]
# this is for the tests/etc folders
init-hook='import sys; sys.path.append("src/")'
# MARK: Testing
[tool.pytest.ini_options]
testpaths = [
"tests",
]
[tool.coverage.run]
omit = [
"*/tests/*",

View File

@@ -159,10 +159,14 @@ def parse_flexible_date(
# Try different parsing methods
parsers: list[Callable[[str], datetime]] = [
# ISO 8601 format
# ISO 8601 format, also with missing "T"
lambda x: datetime.fromisoformat(x), # pylint: disable=W0108
lambda x: datetime.fromisoformat(x.replace(' ', 'T')), # pylint: disable=W0108
# Simple date format
lambda x: datetime.strptime(x, "%Y-%m-%d"),
# datetime without T
lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S"),
lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"),
# Alternative ISO formats (fallback)
lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S"),
lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f"),

View File

View File

@@ -0,0 +1,199 @@
"""
Send email wrapper
"""
import smtplib
from email.message import EmailMessage
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from corelibs.logging_handling.log import Logger
class SendEmail:
"""
send emails based on a template to a list of receivers
"""
def __init__(
self,
log: "Logger",
settings: dict[str, Any],
template: dict[str, str],
from_email: str,
combined_send: bool = True,
receivers: list[str] | None = None,
data: list[dict[str, str]] | None = None,
):
"""
init send email class
Args:
template (dict): Dictionary with body and subject
from_email (str): from email as "Name" <email>
combined_send (bool): True for sending as one set for all receivers
receivers (list): list of emails to send to
data (dict): data to replace in template
args (Namespace): _description_
"""
self.log = log
self.settings = settings
# internal settings
self.template = template
self.from_email = from_email
self.combined_send = combined_send
self.receivers = receivers
self.data = data
def send_email(
self,
data: list[dict[str, str]] | None,
receivers: list[str] | None,
template: dict[str, str] | None = None,
from_email: str | None = None,
combined_send: bool | None = None,
test_only: bool | None = None
):
"""
build email and send
Arguments:
data {list[dict[str, str]] | None} -- _description_
receivers {list[str] | None} -- _description_
combined_send {bool | None} -- _description_
Keyword Arguments:
template {dict[str, str] | None} -- _description_ (default: {None})
from_email {str | None} -- _description_ (default: {None})
Raises:
ValueError: _description_
ValueError: _description_
"""
if data is None and self.data is not None:
data = self.data
if data is None:
raise ValueError("No replace data set, cannot send email")
if receivers is None and self.receivers is not None:
receivers = self.receivers
if receivers is None:
raise ValueError("No receivers list set, cannot send email")
if combined_send is None:
combined_send = self.combined_send
if test_only is not None:
self.settings['test'] = test_only
if template is None:
template = self.template
if from_email is None:
from_email = self.from_email
if not template['subject'] or not template['body']:
raise ValueError("Both Subject and Body must be set")
self.log.debug(
"[EMAIL]:\n"
f"Subject: {template['subject']}\n"
f"Body: {template['body']}\n"
f"From: {from_email}\n"
f"Combined send: {combined_send}\n"
f"Receivers: {receivers}\n"
f"Replace data: {data}"
)
# send email
self.send_email_list(
self.prepare_email_content(
from_email, template, data
),
receivers,
combined_send,
test_only
)
def prepare_email_content(
self,
from_email: str,
template: dict[str, str],
data: list[dict[str, str]],
) -> list[EmailMessage]:
"""
prepare email for sending
Args:
template (dict): template data for this email
data (dict): data to replace in email
Returns:
list: Email Message Objects as list
"""
_subject = ""
_body = ""
msg: list[EmailMessage] = []
for replace in data:
_subject = template["subject"]
_body = template["body"]
for key, value in replace.items():
_subject = _subject.replace(f"{{{{{key}}}}}", value)
_body = _body.replace(f"{{{{{key}}}}}", value)
# create a simple email and add subhect, from email
msg_email = EmailMessage()
# msg.set_content(_body, charset='utf-8', cte='quoted-printable')
msg_email.set_content(_body, charset="utf-8")
msg_email["Subject"] = _subject
msg_email["From"] = from_email
# push to array for sening
msg.append(msg_email)
return msg
def send_email_list(
self,
email: list[EmailMessage], receivers: list[str],
combined_send: bool | None = None,
test_only: bool | None = None
):
"""
send email to receivers list
Args:
email (list): Email Message object with set obdy, subject, from as list
receivers (array): email receivers list as array
combined_send (bool): True for sending as one set for all receivers
"""
if test_only is not None:
self.settings['test'] = test_only
# localhost (postfix does the rest)
smtp = None
smtp_host = self.settings.get('smtp_host', "localhost")
try:
smtp = smtplib.SMTP(smtp_host)
except ConnectionRefusedError as e:
self.log.error("Could not open SMTP connection to: %s, %s", smtp_host, e)
# loop over messages and then over recievers
for msg in email:
if combined_send is True:
msg["To"] = ", ".join(receivers)
if not self.settings.get('test'):
if smtp is not None:
smtp.send_message(msg, msg["From"], receivers)
else:
self.log.info(f"[EMAIL] Test, not sending email\n{msg}")
else:
for receiver in receivers:
# send to
self.log.debug(f"===> Send to: {receiver}")
if "To" in msg:
msg.replace_header("To", receiver)
else:
msg["To"] = receiver
if not self.settings.get('test'):
if smtp is not None:
smtp.send_message(msg)
else:
self.log.info(f"[EMAIL] Test, not sending email\n{msg}")
# close smtp
if smtp is not None:
smtp.quit()
# __END__

View File

@@ -0,0 +1,75 @@
"""
File check if BOM encoded, needed for CSV load
"""
from pathlib import Path
from typing import TypedDict
class BomEncodingInfo(TypedDict):
"""BOM encoding info"""
has_bom: bool
bom_type: str | None
encoding: str | None
bom_length: int
bom_pattern: bytes | None
def is_bom_encoded(file_path: Path) -> bool:
"""
Detect if a file is BOM encoded
Args:
file_path (str): Path to the file to check
Returns:
bool: True if file has BOM, False otherwise
"""
return is_bom_encoded_info(file_path)['has_bom']
def is_bom_encoded_info(file_path: Path) -> BomEncodingInfo:
"""
Enhanced BOM detection with additional file analysis
Args:
file_path (str): Path to the file to check
Returns:
dict: Comprehensive BOM and encoding information
"""
try:
# Read first 1024 bytes for analysis
with open(file_path, 'rb') as f:
header = f.read(4)
bom_patterns = {
b'\xef\xbb\xbf': ('UTF-8', 'utf-8', 3),
b'\xff\xfe\x00\x00': ('UTF-32 LE', 'utf-32-le', 4),
b'\x00\x00\xfe\xff': ('UTF-32 BE', 'utf-32-be', 4),
b'\xff\xfe': ('UTF-16 LE', 'utf-16-le', 2),
b'\xfe\xff': ('UTF-16 BE', 'utf-16-be', 2),
}
for bom_pattern, (encoding_name, encoding, length) in bom_patterns.items():
if header.startswith(bom_pattern):
return {
'has_bom': True,
'bom_type': encoding_name,
'encoding': encoding,
'bom_length': length,
'bom_pattern': bom_pattern
}
return {
'has_bom': False,
'bom_type': None,
'encoding': None,
'bom_length': 0,
'bom_pattern': None
}
except Exception as e:
raise ValueError(f"Error checking BOM encoding: {e}") from e
# __END__

View File

@@ -0,0 +1,31 @@
#!/usr/bin/env python3
"""
BOM check for files
"""
from pathlib import Path
from corelibs.file_handling.file_bom_encoding import is_bom_encoded, is_bom_encoded_info
from corelibs.debug_handling.dump_data import dump_data
def main() -> None:
"""
Check files for BOM encoding
"""
base_path = Path(__file__).resolve().parent
for file_path in [
'test-data/sample_with_bom.csv',
'test-data/sample_without_bom.csv',
]:
has_bom = is_bom_encoded(base_path.joinpath(file_path))
bom_info = is_bom_encoded_info(base_path.joinpath(file_path))
print(f'File: {file_path}')
print(f' Has BOM: {has_bom}')
print(f' BOM Info: {dump_data(bom_info)}')
if __name__ == "__main__":
main()
# __END__

View File

@@ -0,0 +1,6 @@
Name,Age,City,Country
John Doe,25,New York,USA
Jane Smith,30,London,UK
山田太郎,28,東京,Japan
María García,35,Madrid,Spain
François Dupont,42,Paris,France
1 Name Age City Country
2 John Doe 25 New York USA
3 Jane Smith 30 London UK
4 山田太郎 28 東京 Japan
5 María García 35 Madrid Spain
6 François Dupont 42 Paris France

View File

@@ -0,0 +1,6 @@
Name,Age,City,Country
John Doe,25,New York,USA
Jane Smith,30,London,UK
山田太郎,28,東京,Japan
María García,35,Madrid,Spain
François Dupont,42,Paris,France
1 Name Age City Country
2 John Doe 25 New York USA
3 Jane Smith 30 London UK
4 山田太郎 28 東京 Japan
5 María García 35 Madrid Spain
6 François Dupont 42 Paris France

View File

@@ -5,7 +5,7 @@ Iterator helper testing
from typing import Any
from corelibs.debug_handling.dump_data import dump_data
from corelibs.iterator_handling.dict_mask import mask
from corelibs.iterator_handling.dict_helper import set_entry
from corelibs.iterator_handling.dict_helpers import set_entry
def __mask():

View File

View File

@@ -275,6 +275,53 @@ class TestParseFlexibleDate:
assert isinstance(result, datetime)
assert result.tzinfo is not None
def test_parse_flexible_date_missing_t_with_timezone_shift(self):
"""Test parse_flexible_date with timezone shift"""
result = parse_flexible_date('2023-12-25 15:30:45+00:00', timezone_tz='Asia/Tokyo', shift_time_zone=True)
assert isinstance(result, datetime)
assert result.tzinfo is not None
def test_parse_flexible_date_space_separated_datetime(self):
"""Test parse_flexible_date with space-separated datetime format"""
result = parse_flexible_date('2023-12-25 15:30:45')
assert isinstance(result, datetime)
assert result.year == 2023
assert result.month == 12
assert result.day == 25
assert result.hour == 15
assert result.minute == 30
assert result.second == 45
def test_parse_flexible_date_space_separated_with_microseconds(self):
"""Test parse_flexible_date with space-separated datetime and microseconds"""
result = parse_flexible_date('2023-12-25 15:30:45.123456')
assert isinstance(result, datetime)
assert result.year == 2023
assert result.month == 12
assert result.day == 25
assert result.hour == 15
assert result.minute == 30
assert result.second == 45
assert result.microsecond == 123456
def test_parse_flexible_date_t_separated_datetime(self):
"""Test parse_flexible_date with T-separated datetime (alternative ISO format)"""
result = parse_flexible_date('2023-12-25T15:30:45')
assert isinstance(result, datetime)
assert result.year == 2023
assert result.month == 12
assert result.day == 25
assert result.hour == 15
assert result.minute == 30
assert result.second == 45
def test_parse_flexible_date_t_separated_with_microseconds(self):
"""Test parse_flexible_date with T-separated datetime and microseconds"""
result = parse_flexible_date('2023-12-25T15:30:45.123456')
assert isinstance(result, datetime)
assert result.year == 2023
assert result.microsecond == 123456
def test_parse_flexible_date_invalid_format(self):
"""Test parse_flexible_date with invalid format returns None"""
result = parse_flexible_date('invalid-date')

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,538 @@
"""
PyTest: file_handling/file_bom_encoding
"""
from pathlib import Path
import pytest
from corelibs.file_handling.file_bom_encoding import (
is_bom_encoded,
is_bom_encoded_info,
BomEncodingInfo,
)
class TestIsBomEncoded:
"""Test suite for is_bom_encoded function"""
def test_utf8_bom_file(self, tmp_path: Path):
"""Test detection of UTF-8 BOM encoded file"""
test_file = tmp_path / "utf8_bom.txt"
# UTF-8 BOM: EF BB BF
content = b'\xef\xbb\xbfHello, World!'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is True
assert isinstance(result, bool)
def test_utf16_le_bom_file(self, tmp_path: Path):
"""Test detection of UTF-16 LE BOM encoded file"""
test_file = tmp_path / "utf16_le_bom.txt"
# UTF-16 LE BOM: FF FE
content = b'\xff\xfeH\x00e\x00l\x00l\x00o\x00'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is True
def test_utf16_be_bom_file(self, tmp_path: Path):
"""Test detection of UTF-16 BE BOM encoded file"""
test_file = tmp_path / "utf16_be_bom.txt"
# UTF-16 BE BOM: FE FF
content = b'\xfe\xff\x00H\x00e\x00l\x00l\x00o'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is True
def test_utf32_le_bom_file(self, tmp_path: Path):
"""Test detection of UTF-32 LE BOM encoded file"""
test_file = tmp_path / "utf32_le_bom.txt"
# UTF-32 LE BOM: FF FE 00 00
content = b'\xff\xfe\x00\x00H\x00\x00\x00e\x00\x00\x00'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is True
def test_utf32_be_bom_file(self, tmp_path: Path):
"""Test detection of UTF-32 BE BOM encoded file"""
test_file = tmp_path / "utf32_be_bom.txt"
# UTF-32 BE BOM: 00 00 FE FF
content = b'\x00\x00\xfe\xff\x00\x00\x00H\x00\x00\x00e'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is True
def test_no_bom_ascii_file(self, tmp_path: Path):
"""Test detection of ASCII file without BOM"""
test_file = tmp_path / "ascii.txt"
content = b'Hello, World!'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is False
def test_no_bom_utf8_file(self, tmp_path: Path):
"""Test detection of UTF-8 file without BOM"""
test_file = tmp_path / "utf8_no_bom.txt"
content = 'Hello, 世界!'.encode('utf-8')
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is False
def test_empty_file(self, tmp_path: Path):
"""Test detection on empty file"""
test_file = tmp_path / "empty.txt"
test_file.write_bytes(b'')
result = is_bom_encoded(test_file)
assert result is False
def test_binary_file_no_bom(self, tmp_path: Path):
"""Test detection on binary file without BOM"""
test_file = tmp_path / "binary.bin"
content = bytes(range(256))
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is False
def test_partial_bom_pattern(self, tmp_path: Path):
"""Test file with partial BOM pattern that shouldn't match"""
test_file = tmp_path / "partial_bom.txt"
# Only first two bytes of UTF-8 BOM
content = b'\xef\xbbHello'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is False
def test_false_positive_bom_pattern(self, tmp_path: Path):
"""Test file that contains BOM-like bytes but not at the start"""
test_file = tmp_path / "false_positive.txt"
content = b'Hello\xef\xbb\xbfWorld'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is False
def test_nonexistent_file(self, tmp_path: Path):
"""Test that function raises error for non-existent file"""
test_file = tmp_path / "nonexistent.txt"
with pytest.raises(ValueError, match="Error checking BOM encoding"):
is_bom_encoded(test_file)
def test_very_small_file(self, tmp_path: Path):
"""Test file smaller than largest BOM pattern (4 bytes)"""
test_file = tmp_path / "small.txt"
content = b'Hi'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is False
def test_exactly_bom_size_utf8(self, tmp_path: Path):
"""Test file that is exactly the size of UTF-8 BOM"""
test_file = tmp_path / "exact_bom.txt"
content = b'\xef\xbb\xbf'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is True
def test_exactly_bom_size_utf32(self, tmp_path: Path):
"""Test file that is exactly the size of UTF-32 BOM"""
test_file = tmp_path / "exact_bom_utf32.txt"
content = b'\xff\xfe\x00\x00'
test_file.write_bytes(content)
result = is_bom_encoded(test_file)
assert result is True
class TestIsBomEncodedInfo:
"""Test suite for is_bom_encoded_info function"""
def test_utf8_bom_info(self, tmp_path: Path):
"""Test detailed info for UTF-8 BOM encoded file"""
test_file = tmp_path / "utf8_bom.txt"
content = b'\xef\xbb\xbfHello, UTF-8!'
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
assert isinstance(result, dict)
assert result['has_bom'] is True
assert result['bom_type'] == 'UTF-8'
assert result['encoding'] == 'utf-8'
assert result['bom_length'] == 3
assert result['bom_pattern'] == b'\xef\xbb\xbf'
def test_utf16_le_bom_info(self, tmp_path: Path):
"""Test detailed info for UTF-16 LE BOM encoded file"""
test_file = tmp_path / "utf16_le_bom.txt"
content = b'\xff\xfeH\x00e\x00l\x00l\x00o\x00'
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
assert result['has_bom'] is True
assert result['bom_type'] == 'UTF-16 LE'
assert result['encoding'] == 'utf-16-le'
assert result['bom_length'] == 2
assert result['bom_pattern'] == b'\xff\xfe'
def test_utf16_be_bom_info(self, tmp_path: Path):
"""Test detailed info for UTF-16 BE BOM encoded file"""
test_file = tmp_path / "utf16_be_bom.txt"
content = b'\xfe\xff\x00H\x00e\x00l\x00l\x00o'
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
assert result['has_bom'] is True
assert result['bom_type'] == 'UTF-16 BE'
assert result['encoding'] == 'utf-16-be'
assert result['bom_length'] == 2
assert result['bom_pattern'] == b'\xfe\xff'
def test_utf32_le_bom_info(self, tmp_path: Path):
"""Test detailed info for UTF-32 LE BOM encoded file"""
test_file = tmp_path / "utf32_le_bom.txt"
content = b'\xff\xfe\x00\x00H\x00\x00\x00e\x00\x00\x00'
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
assert result['has_bom'] is True
assert result['bom_type'] == 'UTF-32 LE'
assert result['encoding'] == 'utf-32-le'
assert result['bom_length'] == 4
assert result['bom_pattern'] == b'\xff\xfe\x00\x00'
def test_utf32_be_bom_info(self, tmp_path: Path):
"""Test detailed info for UTF-32 BE BOM encoded file"""
test_file = tmp_path / "utf32_be_bom.txt"
content = b'\x00\x00\xfe\xff\x00\x00\x00H\x00\x00\x00e'
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
assert result['has_bom'] is True
assert result['bom_type'] == 'UTF-32 BE'
assert result['encoding'] == 'utf-32-be'
assert result['bom_length'] == 4
assert result['bom_pattern'] == b'\x00\x00\xfe\xff'
def test_no_bom_info(self, tmp_path: Path):
"""Test detailed info for file without BOM"""
test_file = tmp_path / "no_bom.txt"
content = b'Hello, World!'
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
assert result['has_bom'] is False
assert result['bom_type'] is None
assert result['encoding'] is None
assert result['bom_length'] == 0
assert result['bom_pattern'] is None
def test_empty_file_info(self, tmp_path: Path):
"""Test detailed info for empty file"""
test_file = tmp_path / "empty.txt"
test_file.write_bytes(b'')
result = is_bom_encoded_info(test_file)
assert result['has_bom'] is False
assert result['bom_type'] is None
assert result['encoding'] is None
assert result['bom_length'] == 0
assert result['bom_pattern'] is None
def test_bom_precedence_utf32_vs_utf16(self, tmp_path: Path):
"""Test that UTF-32 LE BOM takes precedence over UTF-16 LE when both match"""
test_file = tmp_path / "precedence.txt"
# UTF-32 LE BOM starts with UTF-16 LE BOM pattern
content = b'\xff\xfe\x00\x00Additional content'
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
# Should detect UTF-32 LE, not UTF-16 LE
assert result['has_bom'] is True
assert result['bom_type'] == 'UTF-32 LE'
assert result['encoding'] == 'utf-32-le'
assert result['bom_length'] == 4
assert result['bom_pattern'] == b'\xff\xfe\x00\x00'
def test_return_type_validation(self, tmp_path: Path):
"""Test that return type matches BomEncodingInfo TypedDict"""
test_file = tmp_path / "test.txt"
test_file.write_bytes(b'Test content')
result = is_bom_encoded_info(test_file)
# Check all required keys are present
required_keys = {'has_bom', 'bom_type', 'encoding', 'bom_length', 'bom_pattern'}
assert set(result.keys()) == required_keys
# Check types
assert isinstance(result['has_bom'], bool)
assert result['bom_type'] is None or isinstance(result['bom_type'], str)
assert result['encoding'] is None or isinstance(result['encoding'], str)
assert isinstance(result['bom_length'], int)
assert result['bom_pattern'] is None or isinstance(result['bom_pattern'], bytes)
def test_nonexistent_file_error(self, tmp_path: Path):
"""Test that function raises ValueError for non-existent file"""
test_file = tmp_path / "nonexistent.txt"
with pytest.raises(ValueError) as exc_info:
is_bom_encoded_info(test_file)
assert "Error checking BOM encoding" in str(exc_info.value)
def test_directory_instead_of_file(self, tmp_path: Path):
"""Test that function raises error when given a directory"""
test_dir = tmp_path / "test_dir"
test_dir.mkdir()
with pytest.raises(ValueError, match="Error checking BOM encoding"):
is_bom_encoded_info(test_dir)
def test_large_file_with_bom(self, tmp_path: Path):
"""Test BOM detection on large file (only first 4 bytes matter)"""
test_file = tmp_path / "large_bom.txt"
# UTF-8 BOM followed by large content
content = b'\xef\xbb\xbf' + b'A' * 100000
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
assert result['has_bom'] is True
assert result['bom_type'] == 'UTF-8'
assert result['encoding'] == 'utf-8'
def test_bom_detection_priority_order(self, tmp_path: Path):
"""Test that BOM patterns are checked in the correct priority order"""
# The function should check longer patterns first to avoid false matches
test_cases = [
(b'\xff\xfe\x00\x00', 'UTF-32 LE'), # 4 bytes
(b'\x00\x00\xfe\xff', 'UTF-32 BE'), # 4 bytes
(b'\xff\xfe', 'UTF-16 LE'), # 2 bytes
(b'\xfe\xff', 'UTF-16 BE'), # 2 bytes
(b'\xef\xbb\xbf', 'UTF-8'), # 3 bytes
]
for i, (bom_bytes, expected_type) in enumerate(test_cases):
test_file = tmp_path / f"priority_test_{i}.txt"
content = bom_bytes + b'Content'
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
assert result['bom_type'] == expected_type
assert result['bom_pattern'] == bom_bytes
def test_csv_file_with_utf8_bom(self, tmp_path: Path):
"""Test CSV file with UTF-8 BOM (common use case mentioned in docstring)"""
test_file = tmp_path / "data.csv"
content = b'\xef\xbb\xbf"Name","Age","City"\n"John",30,"New York"\n"Jane",25,"Tokyo"'
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
assert result['has_bom'] is True
assert result['bom_type'] == 'UTF-8'
assert result['encoding'] == 'utf-8'
assert result['bom_length'] == 3
def test_csv_file_without_bom(self, tmp_path: Path):
"""Test CSV file without BOM"""
test_file = tmp_path / "data_no_bom.csv"
content = b'"Name","Age","City"\n"John",30,"New York"\n"Jane",25,"Tokyo"'
test_file.write_bytes(content)
result = is_bom_encoded_info(test_file)
assert result['has_bom'] is False
assert result['bom_type'] is None
assert result['encoding'] is None
assert result['bom_length'] == 0
class TestBomEncodingInfo:
"""Test suite for BomEncodingInfo TypedDict"""
def test_typed_dict_structure(self):
"""Test that BomEncodingInfo has correct structure"""
# This is a type check - in actual usage, mypy would validate this
sample_info: BomEncodingInfo = {
'has_bom': True,
'bom_type': 'UTF-8',
'encoding': 'utf-8',
'bom_length': 3,
'bom_pattern': b'\xef\xbb\xbf'
}
assert sample_info['has_bom'] is True
assert sample_info['bom_type'] == 'UTF-8'
assert sample_info['encoding'] == 'utf-8'
assert sample_info['bom_length'] == 3
assert sample_info['bom_pattern'] == b'\xef\xbb\xbf'
def test_typed_dict_none_values(self):
"""Test TypedDict with None values"""
sample_info: BomEncodingInfo = {
'has_bom': False,
'bom_type': None,
'encoding': None,
'bom_length': 0,
'bom_pattern': None
}
assert sample_info['has_bom'] is False
assert sample_info['bom_type'] is None
assert sample_info['encoding'] is None
assert sample_info['bom_length'] == 0
assert sample_info['bom_pattern'] is None
class TestIntegration:
"""Integration tests for BOM encoding detection"""
def test_is_bom_encoded_uses_info_function(self, tmp_path: Path):
"""Test that is_bom_encoded uses is_bom_encoded_info internally"""
test_file = tmp_path / "integration.txt"
content = b'\xef\xbb\xbfIntegration test'
test_file.write_bytes(content)
# Both functions should return consistent results
simple_result = is_bom_encoded(test_file)
detailed_result = is_bom_encoded_info(test_file)
assert simple_result == detailed_result['has_bom']
assert simple_result is True
def test_multiple_file_bom_detection_workflow(self, tmp_path: Path):
"""Test a workflow of detecting BOM across multiple files"""
files = {
'utf8_bom.csv': b'\xef\xbb\xbf"data","value"\n"test",123',
'utf16_le.txt': b'\xff\xfeH\x00e\x00l\x00l\x00o\x00',
'no_bom.txt': b'Plain ASCII text',
'empty.txt': b'',
}
results = {}
detailed_results = {}
for filename, content in files.items():
file_path = tmp_path / filename
file_path.write_bytes(content)
results[filename] = is_bom_encoded(file_path)
detailed_results[filename] = is_bom_encoded_info(file_path)
# Verify results
assert results['utf8_bom.csv'] is True
assert results['utf16_le.txt'] is True
assert results['no_bom.txt'] is False
assert results['empty.txt'] is False
# Verify detailed results match simple results
for filename in files:
assert results[filename] == detailed_results[filename]['has_bom']
# Verify specific encoding details
assert detailed_results['utf8_bom.csv']['encoding'] == 'utf-8'
assert detailed_results['utf16_le.txt']['encoding'] == 'utf-16-le'
assert detailed_results['no_bom.txt']['encoding'] is None
def test_csv_loading_workflow(self, tmp_path: Path):
"""Test BOM detection workflow for CSV loading (main use case)"""
# Create CSV files with and without BOM
csv_with_bom = tmp_path / "data_with_bom.csv"
csv_without_bom = tmp_path / "data_without_bom.csv"
# CSV with UTF-8 BOM
bom_content = b'\xef\xbb\xbf"Name","Age"\n"Alice",30\n"Bob",25'
csv_with_bom.write_bytes(bom_content)
# CSV without BOM
no_bom_content = b'"Name","Age"\n"Charlie",35\n"Diana",28'
csv_without_bom.write_bytes(no_bom_content)
# Simulate CSV loading workflow
files_to_process = [csv_with_bom, csv_without_bom]
processing_info: list[dict[str, str | bool | int]] = []
for csv_file in files_to_process:
bom_info = is_bom_encoded_info(csv_file)
file_info: dict[str, str | bool | int] = {
'file': csv_file.name,
'has_bom': bom_info['has_bom'],
'encoding': bom_info['encoding'] or 'default',
'skip_bytes': bom_info['bom_length']
}
processing_info.append(file_info)
# Verify workflow results
assert len(processing_info) == 2
bom_file_info = next(info for info in processing_info if info['file'] == 'data_with_bom.csv')
no_bom_file_info = next(info for info in processing_info if info['file'] == 'data_without_bom.csv')
assert bom_file_info['has_bom'] is True
assert bom_file_info['encoding'] == 'utf-8'
assert bom_file_info['skip_bytes'] == 3
assert no_bom_file_info['has_bom'] is False
assert no_bom_file_info['encoding'] == 'default'
assert no_bom_file_info['skip_bytes'] == 0
def test_error_handling_consistency(self, tmp_path: Path):
"""Test that both functions handle errors consistently"""
nonexistent_file = tmp_path / "does_not_exist.txt"
# Both functions should raise ValueError for non-existent files
with pytest.raises(ValueError):
is_bom_encoded(nonexistent_file)
with pytest.raises(ValueError):
is_bom_encoded_info(nonexistent_file)
def test_all_supported_bom_types(self, tmp_path: Path):
"""Test detection of all supported BOM types"""
bom_test_cases = [
('utf8', b'\xef\xbb\xbf', 'UTF-8', 'utf-8', 3),
('utf16_le', b'\xff\xfe', 'UTF-16 LE', 'utf-16-le', 2),
('utf16_be', b'\xfe\xff', 'UTF-16 BE', 'utf-16-be', 2),
('utf32_le', b'\xff\xfe\x00\x00', 'UTF-32 LE', 'utf-32-le', 4),
('utf32_be', b'\x00\x00\xfe\xff', 'UTF-32 BE', 'utf-32-be', 4),
]
for name, bom_bytes, expected_type, expected_encoding, expected_length in bom_test_cases:
test_file = tmp_path / f"{name}_test.txt"
content = bom_bytes + b'Test content'
test_file.write_bytes(content)
# Test simple function
assert is_bom_encoded(test_file) is True
# Test detailed function
info = is_bom_encoded_info(test_file)
assert info['has_bom'] is True
assert info['bom_type'] == expected_type
assert info['encoding'] == expected_encoding
assert info['bom_length'] == expected_length
assert info['bom_pattern'] == bom_bytes
# __END__

View File

@@ -6,7 +6,7 @@ iterator_handling.dict_helper tests
from typing import Any
import pytest
from corelibs.iterator_handling.dict_helper import (
from corelibs.iterator_handling.dict_helpers import (
delete_keys_from_set,
build_dict,
set_entry,

2
uv.lock generated
View File

@@ -108,7 +108,7 @@ wheels = [
[[package]]
name = "corelibs"
version = "0.31.0"
version = "0.33.0"
source = { editable = "." }
dependencies = [
{ name = "cryptography" },